Commit 0bb14237 by xmy

cmq

parent c410e2b7
<?php
namespace Hdll\Services\Cmq;
/*
Account 对象非线程安全,如果多线程使用,需要每个线程单独初始化Account对象类
*/
class Account
{
private $secretId;
private $secretKey;
private $cmq_client;
/*
@type host: string
@param host: 访问的url,例如:https://cmq-queue-gz.api.qcloud.com
@type secretId: string
@param secretId: 用户的secretId, 腾讯云官网获取
@type secretKey: string
@param secretKey: 用户的secretKey,腾讯云官网获取
@note: Exception
:: CMQClientParameterException host格式错误
*/
public function __construct($host, $secretId, $secretKey)
{
$this->host = $host;
$this->secretId = $secretId;
$this->secretKey = $secretKey;
$this->cmq_client = new CMQClient($host, $secretId, $secretKey);
}
/*
* @type sign_method:string
* @param sign_method : only support sha1 and sha256
*/
public function set_sign_method($sign_method = 'sha1')
{
$this->cmq_client->set_sign_method($sign_method);
}
/* 设置访问的url
@type host: string
@param host: 访问的url,例如:http://cmq-queue-gz.api.tencentyun.com
@type secretId: string
@param secretId: 用户的secretId,腾讯云官网获取
@type secretKey: string
@param secretKey: 用户的secretKey,腾讯云官网获取
@note: Exception
:: CMQClientParameterException host格式错误
*/
public function set_client($host, $secretId = null, $secretKey = null)
{
if ($secretId == null) {
$secretId = $this->secretId;
}
if ($secretKey == null) {
$secretKey = $this->secretKey;
}
$this->cmq_client = new CMQClient($host, $secretId, $secretKey);
}
/* 获取queue client
@rtype: CMQClient object
@return: 返回使用的CMQClient object
*/
public function get_client()
{
return $this->cmq_client;
}
/* 获取Account的一个Queue对象
@type queue_name: string
@param queue_name: 队列名
@rtype: Queue object
@return: 返回该Account的一个Queue对象
*/
public function get_queue($queue_name)
{
return new Queue($queue_name, $this->cmq_client);
}
/* 列出Account的队列
@type searchWord: string
@param searchWord: 队列名的前缀
@type limit: int
@param limit: list_queue最多返回的队列数
@type offset: string
@param offset: list_queue的起始位置,上次list_queue返回的next_offset
@rtype: tuple
@return: QueueURL的列表和下次list queue的起始位置; 如果所有queue都list出来,next_offset为"".
*/
public function list_queue($searchWord = "", $limit = -1, $offset = "")
{
$params = array();
if ($searchWord != "") {
$params['searchWord'] = $searchWord;
}
if ($limit != -1) {
$params['limit'] = $limit;
}
if ($offset != "") {
$params['offset'] = $offset;
}
$ret_pkg = $this->cmq_client->list_queue($params);
if ($offset == "") {
$next_offset = count($ret_pkg['queueList']);
} else {
$next_offset = $offset + count($ret_pkg['queueList']);
}
if ($next_offset >= $ret_pkg['totalCount']) {
$next_offset = "";
}
return array("totalCount" => $ret_pkg['totalCount'],
"queueList" => $ret_pkg['queueList'], "next_offset" => $next_offset);
}
/* 列出Account的主题
@type searchWord: string
@param searchWord: 主题关键字
@type limit: int
@param limit: 最多返回的主题数目
@type offset: string
@param offset: list_topic的起始位置,上次list_topic返回的next_offset
@rtype: tuple
@return: TopicURL的列表和下次list topic的起始位置; 如果所有topic都list出来,next_offset为"".
*/
public function list_topic($searchWord = "", $limit = -1, $offset = "")
{
$params = array();
if ($searchWord != "") {
$params['searchWord'] = $searchWord;
}
if ($limit != -1) {
$params['limit'] = $limit;
}
if ($offset != "") {
$params['offset'] = $offset;
}
$resp = $this->cmq_client->list_topic($params);
if ($offset == "") {
$next_offset = count($resp['topicList']);
} else {
$next_offset = $offset + count($resp['topicList']);
}
if ($next_offset >= $resp['totalCount']) {
$next_offset = "";
}
return array("totalCoult" => $resp['totalCount'],
"topicList" => $resp['topicList'],
"next_offset" => $next_offset);
}
/* 获取Account的一个Topic对象
@type topic_name: string
@param queue_name:
@rtype: Topic object
@return: 返回该Account的一个Topic对象
*/
public function get_topic($topic_name)
{
return new Topic($topic_name, $this->cmq_client);
}
/* 获取Account的一个Subscription对象
@type topic_name: string
@param queue_name:
@type subscription_name :string
@param subscription_name:
@rtype: Subscription object
@return: 返回该Account的一个Subscription对象
*/
public function get_subscription($topic_name, $subscription_name)
{
return new \Subscription($topic_name, $subscription_name, $this->cmq_client);
}
}
<?php
namespace Hdll\Services\Cmq;
class CMQClient
{
private $host;
private $secretId;
private $secretKey;
private $version;
private $http;
private $method;
private $URISEC = '/v2/index.php';
public function __construct($host, $secretId, $secretKey, $version = "SDK_PHP_1.3", $method = "POST")
{
$this->process_host($host);
$this->secretId = $secretId;
$this->secretKey = $secretKey;
$this->version = $version;
$this->method = $method;
$this->sign_method = 'HmacSHA1';
$this->http = new CMQHttp($this->host);
}
protected function process_host($host)
{
if (strpos($host, "http://") === 0) {
$_host = substr($host, 7, strlen($host) - 7);
} elseif (strpos($host, "https://") === 0) {
$_host = substr($host, 8, strlen($host) - 8);
} else {
throw new CMQClientParameterException("Only support http(s) prototol. Invalid endpoint:" . $host);
}
if ($_host[strlen($_host) - 1] == "/") {
$this->host = substr($_host, 0, strlen($_host) - 1);
} else {
$this->host = $_host;
}
}
public function set_sign_method($sign_method = 'sha1')
{
if ($sign_method == 'sha1' || $sign_method == 'HmacSHA256') {
$this->sign_method = 'HmacSHA1';
} elseif ($sign_method == 'sha256') {
$this->sign_method = 'HmacSHA256';
} else {
throw new CMQClientParameterException('Only support sign method HmasSHA256 or HmacSHA1 . Invalid sign method:' . $sign_method);
}
}
public function set_method($method = 'POST')
{
$this->method = $method;
}
public function set_connection_timeout($connection_timeout)
{
$this->http->set_connection_timeout($connection_timeout);
}
public function set_keep_alive($keep_alive)
{
$this->http->set_keep_alive($keep_alive);
}
protected function build_req_inter($action, $params, &$req_inter)
{
$_params = $params;
$_params['Action'] = ucfirst($action);
$_params['RequestClient'] = $this->version;
if (!isset($_params['SecretId'])) {
$_params['SecretId'] = $this->secretId;
}
if (!isset($_params['Nonce'])) {
$_params['Nonce'] = rand(1, 65535);
}
if (!isset($_params['Timestamp'])) {
$_params['Timestamp'] = time();
}
if (!isset($_params['SignatureMethod'])) {
$_params['SignatureMethod'] = $this->sign_method;
}
$plainText = Signature::makeSignPlainText($_params,
$this->method, $this->host, $req_inter->uri);
$_params['Signature'] = Signature::sign($plainText, $this->secretKey, $this->sign_method);
$req_inter->data = http_build_query($_params);
$this->build_header($req_inter);
}
protected function build_header(&$req_inter)
{
if ($this->http->is_keep_alive()) {
$req_inter->header["Connection"] = "Keep-Alive";
}
}
protected function check_status($resp_inter)
{
if ($resp_inter->status != 200) {
throw new CMQServerNetworkException($resp_inter->status, $resp_inter->header, $resp_inter->data);
}
$resp = json_decode($resp_inter->data, true);
$code = $resp['code'];
$message = $resp['message'];
$requestId = $resp['requestId'];
if ($code != 0) {
throw new CMQServerException($message = $message, $request_id = $requestId, $code = $code, $data = $resp);
}
}
protected function request($action, $params)
{
// make request internal
$req_inter = new RequestInternal($this->method, $this->URISEC);
$this->build_req_inter($action, $params, $req_inter);
$iTimeout = 0;
if (array_key_exists("UserpollingWaitSeconds", $params)) {
$iTimeout = (int) $params['UserpollingWaitSeconds'];
}
// send request
$resp_inter = $this->http->send_request($req_inter, $iTimeout);
return $resp_inter;
}
//===============================================queue operation===============================================
public function create_queue($params)
{
$resp_inter = $this->request('CreateQueue', $params);
$this->check_status($resp_inter);
}
public function delete_queue($params)
{
$resp_inter = $this->request('DeleteQueue', $params);
$this->check_status($resp_inter);
}
public function rewindQueue($params)
{
$resp_inter = $this->request('RewindQueue', $params);
$this->check_status($resp_inter);
}
public function list_queue($params)
{
$resp_inter = $this->request('ListQueue', $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function set_queue_attributes($params)
{
$resp_inter = $this->request('SetQueueAttributes', $params);
$this->check_status($resp_inter);
}
public function get_queue_attributes($params)
{
$resp_inter = $this->request('GetQueueAttributes', $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function send_message($params)
{
$resp_inter = $this->request('SendMessage', $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret['msgId'];
}
public function batch_send_message($params)
{
$resp_inter = $this->request('BatchSendMessage', $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret['msgList'];
}
public function receive_message($params)
{
$resp_inter = $this->request('ReceiveMessage', $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function batch_receive_message($params)
{
$resp_inter = $this->request('BatchReceiveMessage', $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret['msgInfoList'];
}
public function delete_message($params)
{
$resp_inter = $this->request('DeleteMessage', $params);
$this->check_status($resp_inter);
}
public function batch_delete_message($params)
{
$resp_inter = $this->request('BatchDeleteMessage', $params);
$this->check_status($resp_inter);
}
//=============================================topic operation================================================
public function create_topic($params)
{
$resp_inter = $this->request("CreateTopic", $params);
$this->check_status($resp_inter);
}
public function delete_topic($params)
{
$resp_inter = $this->request("DeleteTopic", $params);
$this->check_status($resp_inter);
}
public function list_topic($params)
{
$resp_inter = $this->request("ListTopic", $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function set_topic_attributes($params)
{
$resp_inter = $this->request("SetTopicAttributes", $params);
$this->check_status($resp_inter);
}
public function get_topic_attributes($params)
{
$resp_inter = $this->request("GetTopicAttributes", $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function publish_message($params)
{
$resp_inter = $this->request("PublishMessage", $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function batch_publish_message($params)
{
$resp_inter = $this->request("BatchPublishMessage", $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
//============================================subscription operation=============================================
public function create_subscription($params)
{
$resp_inter = $this->request("Subscribe", $params);
$this->check_status($resp_inter);
}
public function clear_filterTags($params)
{
$resp_inter = $this->request("ClearSubscriptionFilterTags", $params);
$this->check_status($resp_inter);
}
public function delete_subscription($params)
{
$resp_inter = $this->request("Unsubscribe", $params);
$this->check_status($resp_inter);
}
public function get_subscription_attributes($params)
{
$resp_inter = $this->request("GetSubscriptionAttributes", $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
public function set_subscription_attributes($params)
{
$resp_inter = $this->request("SetSubscriptionAttributes", $params);
$this->check_status($resp_inter);
}
public function list_subscription($params)
{
$resp_inter = $this->request("ListSubscriptionByTopic", $params);
$this->check_status($resp_inter);
$ret = json_decode($resp_inter->data, true);
return $ret;
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:29
* Description: CMQClientException.php
*/
namespace Hdll\Services\Cmq;
class CMQClientException extends CMQExceptionBase
{
public function __construct($message, $code = -1, $data = array())
{
parent::__construct($message, $code, $data);
}
public function __toString()
{
return "CMQClientException " . $this->get_info();
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:28
* Description: CMQClientNetworkException.php
*/
namespace Hdll\Services\Cmq;
class CMQClientNetworkException extends CMQClientException
{
/* 网络异常
@note: 检查endpoint是否正确、本机网络是否正常等;
*/
public function __construct($message, $code = -1, $data = array())
{
parent::__construct($message, $code, $data);
}
public function __toString()
{
return "CMQClientNetworkException " . $this->get_info();
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:27
* Description: CMQClientParameterException.php
*/
namespace Hdll\Services\Cmq;
class CMQClientParameterException extends CMQClientException
{
/* 参数格式错误
@note: 请根据提示修改对应参数;
*/
public function __construct($message, $code = -1, $data = array())
{
parent::__construct($message, $code, $data);
}
public function __toString()
{
return "CMQClientParameterException " . $this->get_info();
}
}
<?php
namespace Hdll\Services\Cmq;
class CMQExceptionBase extends \RuntimeException
{
/*
@type code: int
@param code: 错误类型
@type message: string
@param message: 错误描述
@type data: array
@param data: 错误数据
*/
public $code;
public $message;
public $data;
public function __construct($message, $code = -1, $data = array())
{
$this->code = $code;
$this->message = $message;
$this->data = $data;
}
public function __toString()
{
return "CMQExceptionBase " . $this->get_info();
}
public function get_info()
{
$info = array("code" => $this->code,
"data" => json_encode($this->data),
"message" => $this->message);
return json_encode($info);
}
}
<?php
namespace Hdll\Services\Cmq;
class CMQHttp
{
private $connection_timeout;
private $keep_alive;
private $host;
public function __construct($host, $connection_timeout = 10, $keep_alive = true)
{
$this->connection_timeout = $connection_timeout;
$this->keep_alive = $keep_alive;
$this->host = $host . "" . "/v2/index.php";
$this->curl = null;
}
public function set_method($method = 'POST')
{
$this->method = $method;
}
public function set_connection_timeout($connection_timeout)
{
$this->connection_timeout = $connection_timeout;
}
public function set_keep_alive($keep_alive)
{
$this->keep_alive = $keep_alive;
}
public function is_keep_alive()
{
return $this->keep_alive;
}
public function send_request($req_inter, $userTimeout)
{
if (!$this->keep_alive) {
$this->curl = curl_init();
} else {
if ($this->curl == null) {
$this->curl = curl_init();
}
}
if ($this->curl == null) {
throw new CMQClientException("Curl init failed");
return;
}
$url = $this->host;
if ($req_inter->method == 'POST') {
curl_setopt($this->curl, CURLOPT_POST, 1);
curl_setopt($this->curl, CURLOPT_POSTFIELDS, $req_inter->data);
} else {
$url .= $req_inter->uri . '?' . $req_inter->data;
}
if (isset($req_inter->header)) {
curl_setopt($this->curl, CURLOPT_HTTPHEADER, $req_inter->header);
}
curl_setopt($this->curl, CURLOPT_URL, $url);
curl_setopt($this->curl, CURLOPT_TIMEOUT, $this->connection_timeout + $userTimeout);
curl_setopt($this->curl, CURLOPT_RETURNTRANSFER, true);
if (false !== strpos($url, "https")) {
// 证书
// curl_setopt($ch,CURLOPT_CAINFO,"ca.crt");
curl_setopt($this->curl, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($this->curl, CURLOPT_SSL_VERIFYHOST, false);
}
$resultStr = curl_exec($this->curl);
if (curl_errno($this->curl)) {
throw new CMQClientNetworkException(curl_error($this->curl));
}
$info = curl_getinfo($this->curl);
$resp_inter = new ResponseInternal($info['http_code'], null, $resultStr);
return $resp_inter;
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:26
* Description: CMQServerException.php
*/
namespace Hdll\Services\Cmq;
class CMQServerException extends CMQExceptionBase
{
/* cmq处理异常
@note: 根据code进行分类处理,常见错误类型:
: 4000 参数不合法
: 4100 鉴权失败:密钥不存在/失效
: 4300 账户欠费了
: 4400 消息大小超过队列属性设置的最大值
: 4410 已达到队列最大的消息堆积数
: 4420 qps限流
: 4430 删除消息的句柄不合法或者过期了
: 4440 队列不存在
: 4450 队列个数超过限制
: 4460 队列已经存在
: 6000 服务器内部错误
: 6010 批量删除消息失败(具体原因还要看每个消息删除失败的错误码)
: 7000 空消息,即队列当前没有可用消息
: 更多错误类型请登录腾讯云消息服务官网进行了解;
*/
public $request_id;
public function __construct($message, $request_id, $code = -1, $data = array())
{
parent::__construct($message, $code, $data);
$this->request_id = $request_id;
}
public function __toString()
{
return "CMQServerException " . $this->get_info() . ", RequestID:" . $this->request_id;
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:27
* Description: CMQServerNetworkException.php
*/
namespace Hdll\Services\Cmq;
class CMQServerNetworkException extends CMQExceptionBase
{
//服务器网络异常
public $status;
public $header;
public $data;
public function __construct($status = 200, $header = null, $data = "")
{
if ($header == null) {
$header = array();
}
$this->status = $status;
$this->header = $header;
$this->data = $data;
}
public function __toString()
{
$info = array("status" => $this->status,
"header" => json_encode($this->header),
"data" => $this->data);
return "CMQServerNetworkException " . json_encode($info);
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:21
* Description: Message.php
*/
namespace Hdll\Services\Cmq;
class Message
{
public $msgBody;
public $msgId;
public $enqueueTime;
public $receiptHandle;
/* 消息属性
@note: send_message 指定属性
:: msgBody 消息体
@note: send_message 返回属性
:: msgId 消息编号
@note: receive_message 返回属性,除基本属性外
:: receiptHandle 下次删除或修改消息的临时句柄
:: enqueueTime 消息入队时间
:: nextVisibleTime 下次可被再次消费的时间
:: dequeueCount 总共被消费的次数
:: firstDequeueTime 第一次被消费的时间
*/
public function __construct($message_body = "")
{
$this->msgBody = $message_body;
$this->msgId = "";
$this->enqueueTime = -1;
$this->receiptHandle = "";
$this->nextVisibleTime = -1;
$this->dequeueCount = -1;
$this->firstDequeueTime = -1;
}
public function __toString()
{
$info = array("msgBody" => $this->msgBody,
"msgId" => $this->msgId,
"enqueueTime" => date("Y-m-d H:i:s", $this->enqueueTime),
"nextVisibleTime" => date("Y-m-d H:i:s", $this->nextVisibleTime),
"firstDequeueTime" => date("Y-m-d H:i:s", $this->firstDequeueTime),
"dequeueCount" => $this->dequeueCount,
"receiptHandle" => $this->receiptHandle);
return json_encode($info);
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:22
* Description: QueueMeta.php
*/
namespace Hdll\Services\Cmq;
class QueueMeta
{
public $queueName;
public $maxMsgHeapNum;
public $pollingWaitSeconds;
public $visibilityTimeout;
public $maxMsgSize;
public $msgRetentionSeconds;
public $createTime;
public $lastModifyTime;
public $activeMsgNum;
public $inactiveMsgNum;
public $rewindSeconds;
public $rewindmsgNum;
public $minMsgTime;
public $delayMsgNum;
/* 队列属性
@note: 设置属性
:: maxMsgHeapNum: 最大堆积消息数
:: pollingWaitSeconds: receive message时,长轮询时间,单位:秒
:: visibilityTimeout: 消息可见性超时, 单位:秒
:: maxMsgSize: 消息最大长度, 单位:Byte
:: msgRetentionSeconds: 消息保留周期,单位:秒
:: rewindSeconds : 最大回溯时间, 单位:秒
@note: 非设置属性
:: activeMsgNum: 可消费消息数,近似值
:: inactiveMsgNum: 正在被消费的消息数,近似值
:: createTime: queue创建时间,单位:秒
:: lastModifyTime: 修改queue属性的最近时间,单位:秒
:: queue_name: 队列名称
:: rewindmsgNum:已删除,但是任然在回溯保留时间内的消息数量
:: minMsgTime: 消息最小未消费时间,单位为秒
:: delayMsgNum:延时消息数量
*/
public function __construct()
{
$this->queueName = "";
$this->maxMsgHeapNum = -1;
$this->pollingWaitSeconds = 0;
$this->visibilityTimeout = 30;
$this->maxMsgSize = 65536;
$this->msgRetentionSeconds = 345600;
$this->createTime = -1;
$this->lastModifyTime = -1;
$this->activeMsgNum = -1;
$this->inactiveMsgNum = -1;
$this->rewindSeconds = 0;
$this->rewindmsgNum = 0;
$this->minMsgTime = 0;
$this->delayMsgNum = 0;
}
public function __toString()
{
$info = array("visibilityTimeout" => $this->visibilityTimeout,
"maxMsgHeapNum" => $this->maxMsgHeapNum,
"maxMsgSize" => $this->maxMsgSize,
"msgRetentionSeconds" => $this->msgRetentionSeconds,
"pollingWaitSeconds" => $this->pollingWaitSeconds,
"activeMsgNum" => $this->activeMsgNum,
"inactiveMsgNum" => $this->inactiveMsgNum,
"createTime" => date("Y-m-d H:i:s", $this->createTime),
"lastModifyTime" => date("Y-m-d H:i:s", $this->lastModifyTime),
"QueueName" => $this->queueName,
"rewindSeconds" => $this->rewindSeconds,
"rewindmsgNum" => $this->rewindmsgNum,
"minMsgTime" => $this->minMsgTime,
"delayMsgNum" => $this->delayMsgNum);
return json_encode($info);
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:20
* Description: RequestInternal.php
*/
namespace Hdll\Services\Cmq;
class RequestInternal
{
public $header;
public $method;
public $uri;
public $data;
public function __construct($method = "", $uri = "", $header = null, $data = "")
{
if ($header == null) {
$header = array();
}
$this->method = $method;
$this->uri = $uri;
$this->header = $header;
$this->data = $data;
}
public function __toString()
{
$info = array("method" => $this->method,
"uri" => $this->uri,
"header" => json_encode($this->header),
"data" => $this->data);
return json_encode($info);
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:20
* Description: ResponseInternal.php
*/
namespace Hdll\Services\Cmq;
class ResponseInternal
{
public $header;
public $status;
public $data;
public function __construct($status = 0, $header = null, $data = "")
{
if ($header == null) {
$header = array();
}
$this->status = $status;
$this->header = $header;
$this->data = $data;
}
public function __toString()
{
$info = array("status" => $this->status,
"header" => json_encode($this->header),
"data" => $this->data);
return json_encode($info);
}
}
<?php
namespace Hdll\Services\Cmq;
/**
* Sign
* 签名类
*/
class Signature
{
/**
* sign
* 生成签名
* @param string $srcStr 拼接签名源文字符串
* @param string $secretKey secretKey
* @param string $method 请求方法
* @return
*/
public static function sign($srcStr, $secretKey, $method = 'HmacSHA1')
{
switch ($method) {
case 'HmacSHA1':
$retStr = base64_encode(hash_hmac('sha1', $srcStr, $secretKey, true));
break;
case 'HmacSHA256':
$retStr = base64_encode(hash_hmac('sha256', $srcStr, $secretKey, true));
break;
default:
throw new Exception($method . ' is not a supported encrypt method');
return false;
break;
}
return $retStr;
}
/**
* makeSignPlainText
* 生成拼接签名源文字符串
* @param array $requestParams 请求参数
* @param string $requestMethod 请求方法
* @param string $requestHost 接口域名
* @param string $requestPath url路径
* @return
*/
public static function makeSignPlainText($requestParams,
$requestMethod = 'POST', $requestHost = YUNAPI_URL,
$requestPath = '/v2/index.php') {
$url = $requestHost . $requestPath;
// 取出所有的参数
$paramStr = self::_buildParamStr($requestParams, $requestMethod);
$plainText = $requestMethod . $url . $paramStr;
return $plainText;
}
/**
* _buildParamStr
* 拼接参数
* @param array $requestParams 请求参数
* @param string $requestMethod 请求方法
* @return
*/
protected static function _buildParamStr($requestParams, $requestMethod = 'POST')
{
$paramStr = '';
ksort($requestParams);
$i = 0;
foreach ($requestParams as $key => $value) {
if ($key == 'Signature') {
continue;
}
// 排除上传文件的参数
if ($requestMethod == 'POST' && substr($value, 0, 1) == '@') {
continue;
}
// 把 参数中的 _ 替换成 .
if (strpos($key, '_')) {
$key = str_replace('_', '.', $key);
}
if ($i == 0) {
$paramStr .= '?';
} else {
$paramStr .= '&';
}
$paramStr .= $key . '=' . $value;
++$i;
}
return $paramStr;
}
}
<?php
class Subscription
{
private $topic_name;
private $subscription_name;
private $cmq_client;
private $encoding;
public function __construct($topic_name, $subscription_name, $cmq_client, $encoding = false)
{
$this->topic_name = $topic_name;
$this->subscription_name = $subscription_name;
$this->cmq_client = $cmq_client;
$this->encoding = $encoding;
}
public function set_encoding($encoding)
{
$this->encoding = $encoding;
}
/*
* create subscription
* @type subscription_meta :SubscriptionMeta
*/
public function create($subscription_meta)
{
$params = array(
'topicName' => $this->topic_name,
'subscriptionName' => $this->subscription_name,
'notifyStrategy' => $subscription_meta->NotifyStrategy,
'notifyContentFormat' => $subscription_meta->NotifyContentFormat,
);
if ($subscription_meta->Endpoint != "") {
$params['endpoint'] = $subscription_meta->Endpoint;
}
if ($subscription_meta->Protocol != "") {
$params['protocol'] = $subscription_meta->Protocol;
}
if (!$subscription_meta->bindindKey != null && is_array($subscription_meta->bindindKey) && !empty($subscription_meta->bindindKey)) {
$n = 1;
foreach ($subscription_meta->bindindKey as $tag) {
$key = 'bindindKey.' . $n;
$params[$key] = $tag;
$n += 1;
}
}
if (!$subscription_meta->FilterTag != null && is_array($subscription_meta->FilterTag) && !empty($subscription_meta->FilterTag)) {
$n = 1;
foreach ($subscription_meta->FilterTag as $tag) {
$key = 'filterTag.' . $n;
$params[$key] = $tag;
$n += 1;
}
}
$this->cmq_client->create_subscription($params);
}
/*
* delete subscription
*/
public function delete()
{
$params = array(
'topicName' => $this->topic_name,
'subscriptionName' => $this->subscription_name,
);
$this->cmq_client->delete_subscription($params);
}
/*
* clear subscription tags
*/
public function clearFilterTags()
{
$params = array(
'topicName' => $this->topic_name,
'subscriptionName' => $this->subscription_name,
);
$this->cmq_client->clear_filterTags($params);
}
/*
* get attributes
*
* @return subscription_meta :SubscriptionMeta
*/
public function get_attributes()
{
$params = array(
'topicName' => $this->topic_name,
'subscriptionName' => $this->subscription_name,
);
$resp = $this->cmq_client->get_subscription_attributes($params);
$subscription_meta = new SubscriptionMeta();
$this->__resp2meta($subscription_meta, $resp);
return $subscription_meta;
}
/*
* set attributes
* @type subscription_meta : SubscriptionMeta
*
*/
public function set_attributes($subscription_meta)
{
$params = array(
'topicName' => $this->topic_name,
'subscriptionName' => $this->subscription_name,
);
if ($subscription_meta->NotifyStrategy != "") {
$params['notifyStrategy'] = $subscription_meta->NotifyStrategy;
}
if ($subscription_meta->NotifyContentFormat != "") {
$params['notifyContentFormat'] = $subscription_meta->NotifyContentFormat;
}
if ($subscription_meta->Endpoint != "") {
$params['endpoint'] = $subscription_meta->Endpoint;
}
if ($subscription_meta->Protocol != "") {
$params['protocol'] = $subscription_meta->Protocol;
}
if (!$subscription_meta->bindindKey != null && is_array($subscription_meta->bindindKey) && !empty($subscription_meta->bindindKey)) {
$n = 1;
foreach ($subscription_meta->bindindKey as $tag) {
$key = 'bindindKey.' . $n;
$params[$key] = $tag;
$n += 1;
}
}
if (!$subscription_meta->FilterTag != null && is_array($subscription_meta->FilterTag) && !empty($subscription_meta->FilterTag)) {
$n = 1;
foreach ($subscription_meta->FilterTag as $tag) {
$key = 'filterTag.' . $n;
$params[$key] = $tag;
$n += 1;
}
}
$this->cmq_client->set_subscription_attributes($params);
}
protected function __resp2meta($subscription_meta, $resp)
{
if (isset($resp['endpoint'])) {
$subscription_meta->Endpoint = $resp['endpoint'];
}
if (isset($resp['protocol'])) {
$subscription_meta->Protocol = $resp['protocol'];
}
if (isset($resp['notifyStrategy'])) {
$subscription_meta->NotifyStrategy = $resp['notifyStrategy'];
}
if (isset($resp['notifyContentFormat'])) {
$subscription_meta->NotifyContentFormat = $resp['notifyContentFormat'];
}
if (isset($resp['bindindKey'])) {
foreach ($resp['bindindKey'] as $tag) {
array_push($subscription_meta->bindindKey, $tag);
}
}
if (isset($resp['filterTags'])) {
foreach ($resp['filterTags'] as $tag) {
array_push($subscription_meta->FilterTag, $tag);
}
}
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:24
* Description: SubscriptionMeta.php
*/
namespace Hdll\Services\Cmq;
class SubscriptionMeta
{
// default NotifyStrategy BACKOFF_RETRY
// default NotifyContentFormat JSON
/* 订阅属性
@note: 可修改
:: Endpoint 推送消息地址
:: Protocal 协议
:: FilterTag 订阅 标签
:: NotifyStrategy 重试策略
:: NotifyContentFormat 推送消息格式
*/
public $Endpoint;
public $Protocol;
public $FilterTag;
public $NotifyStrategy;
public $NotifyContentFormat;
public function __construct()
{
$this->Endpoint = "";
$this->Protocol = "";
$this->FilterTag = array();
$this->NotifyStrategy = "BACKOFF_RETRY";
$this->NotifyContentFormat = "JSON";
$this->bindindKey = array();
}
public function __toString()
{
$info = array(
"endpoint" => $this->Endpoint,
"protocol" => $this->Protocol,
"filterTag" => $this->FilterTag,
"notifyStrategy" => $this->NotifyStrategy,
"notifyContentFormat" => $this->NotifyContentFormat,
"bindingKey" => $this->bindindKey,
);
return json_encode($info);
}
}
<?php
namespace Hdll\Services\Cmq;
class Topic
{
private $topic_name;
private $cmq_client;
private $encoding;
public function __construct($topic_name, $cmq_client, $encoding = false)
{
$this->topic_name = $topic_name;
$this->cmq_client = $cmq_client;
$this->encoding = $encoding;
}
public function set_encoding($encoding)
{
$this->encoding = $encoding;
}
/*
* create topic
* @type topic_meta : TopicMeta
* @param topic_meta :
*/
public function create($topic_meta)
{
$params = array(
'topicName' => $this->topic_name,
'filterType' => $topic_meta->filterType,
);
if ($topic_meta->maxMsgSize > 0) {
$params['maxMsgSize'] = $topic_meta->maxMsgSize;
}
$this->cmq_client->create_topic($params);
}
/*
* get attributes
*
* @return topic_meta :TopicMeta
*
*/
public function get_attributes()
{
$params = array(
'topicName' => $this->topic_name,
);
$resp = $this->cmq_client->get_topic_attributes($params);
$topic_meta = new TopicMeta();
$this->__resp2meta($topic_meta, $resp);
return $topic_meta;
}
/*
* set attributes
*
* @type topic_meta :TopicMeta
* @param topic_meta :
*/
public function set_attributes($topic_meta)
{
$params = array(
'topicName' => $this->topic_name,
'maxMsgSize' => strval($topic_meta->maxMsgSize),
);
$this->cmq_client->set_topic_attributes($params);
}
/*
* delete topic
*/
public function delete()
{
$params = array(
'topicName' => $this->topic_name,
);
$this->cmq_client->delete_topic($params);
}
/*
* 推送消息 非批量
* @type message :string
* @param message
*
* @type vTagList :list
* @param vTagList 标签
*
* @return message handle
*/
public function publish_message($message, $vTagList = null, $routingKey = null)
{
$params = array(
'topicName' => $this->topic_name,
'msgBody' => $message,
);
if ($routingKey != null) {
$params['routingKey'] = $routingKey;
}
if ($vTagList != null && is_array($vTagList) && !empty($vTagList)) {
$n = 1;
foreach ($vTagList as $tag) {
$key = 'msgTag.' . $n;
$params[$key] = $tag;
$n += 1;
}
}
$msgId = $this->cmq_client->publish_message($params);
return $msgId;
}
/*
* 批量推送消息
* @type vmessageList :list
* @param vmessageList:
*
* @type vtagList :list
* @param vtagList
*
* @return : return message handle list
*/
public function batch_publish_message($vmessageList, $vtagList = null, $routingKey = null)
{
$params = array(
'topicName' => $this->topic_name,
);
if ($routingKey != null) {
$params['routingKey'] = $routingKey;
}
$n = 1;
if (is_array($vmessageList) && !empty($vmessageList)) {
foreach ($vmessageList as $msg) {
$key = 'msgBody.' . $n;
if ($this->encoding) {
$params[$key] = base64_encode($msg);
} else {
$params[$key] = $msg;
}
$n += 1;
}
}
if ($vtagList != null && is_array($vtagList) && !empty($vtagList)) {
$n = 1;
foreach ($vtagList as $tag) {
$key = 'msgTag.' . $n;
$params[$key] = $tag;
$n += 1;
}
}
$msgList = $this->cmq_client->batch_publish_message($params);
$retMessageList = array();
foreach ($msgList as $msg) {
if (isset($msg['msgId'])) {
$retmsgId = $msg['msgId'];
$retMessageList[] = $retmsgId;
}
}
return $retMessageList;
}
/* 列出Topic的Subscriptoin
@type topic_name :string
@param topic_name:
@type searchWord: string
@param searchWord: 订阅关键字
@type limit: int
@param limit: 最多返回的订阅数目
@type offset: string
@param offset: list_subscription的起始位置,上次list_subscription返回的next_offset
@rtype: tuple
@return: subscriptionURL的列表和下次list subscription的起始位置; 如果所有subscription都list出来,next_offset为"".
*/
public function list_subscription($searchWord = "", $limit = -1, $offset = "")
{
$params = array('topicName' => $this->topic_name);
if ($searchWord != "") {
$params['searchWord'] = $searchWord;
}
if ($limit != -1) {
$params['limit'] = $limit;
}
if ($offset != "") {
$params['offset'] = $offset;
}
$resp = $this->cmq_client->list_subscription($params);
if ($offset == "") {
$next_offset = count($resp['subscriptionList']);
} else {
$next_offset = $offset + count($resp['subscriptionList']);
}
if ($next_offset >= $resp['totalCount']) {
$next_offset = "";
}
return array("totalCoult" => $resp['totalCount'],
"subscriptionList" => $resp['subscriptionList'],
"next_offset" => $next_offset);
}
protected function __resp2meta($topic_meta, $resp)
{
if (isset($resp['maxMsgSize'])) {
$topic_meta->maxMsgSize = $resp['maxMsgSize'];
}
if (isset($resp['msgRetentionSeconds'])) {
$topic_meta->msgRetentionSeconds = $resp['msgRetentionSeconds'];
}
if (isset($resp['createTime'])) {
$topic_meta->createTime = $resp['createTime'];
}
if (isset($resp['lastModifyTime'])) {
$topic_meta->lastModifyTime = $resp['lastModifyTime'];
}
if (isset($resp['filterType'])) {
$topic_meta->filterType = $resp['filterType'];
}
}
}
<?php
/**
* Created by Same.Inc
*
* Author: flog@same.com
* CreateAt: 2018/8/21 00:25
* Description: TopicMeta.php
*/
namespace Hdll\Services\Cmq;
class TopicMeta
{
// default maxMsgSize 65536
// default msgRetentionSeconds 86400, one day
/* 主题属性
@note: 可修改
:: maxMsgSize 消息最大值
@note: 不可修改
:: msgRetentionSeconds 消息最长保存时间,默认为 一天
:: createTime 创建时间
:: lastModifyTime 上次修改时间
*/
public $maxMsgSize;
public $msgRetentionSeconds;
public $createTime;
public $lastModifyTime;
public function __construct()
{
$this->maxMsgSize = 65536;
$this->msgRetentionSeconds = 86400;
$this->createTime = 0;
$this->lastModifyTime = 0;
$this->filterType = 1;
}
public function __toString()
{
$info = array(
"maxMsgSize" => $this->maxMsgSize,
"msgRetentionSeconds" => $this->msgRetentionSeconds,
"createTime" => $this->createTime,
"lastModifyTime" => $this->lastModifyTime,
"filterType" => $this->filterType,
);
return json_encode($info);
}
}
......@@ -17,5 +17,10 @@ return [
'secretId' => 'AKIDseHj18kua0KTSJ4g9SadbVEnEUZVjvPj',
'secretKey' => 'IPL5g5PaaSAzd6NSO8gEmLxcN4pTzJSQ',
],
'cryptKey'=>'ebf032f01aa2093be3ee2ee2c137hdll',
'cmq' => [
'host' => '',
'secretId' => '',
'secretKey' => '',
],
'cryptKey' => 'ebf032f01aa2093be3ee2ee2c137hdll',
];
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment