Commit cbc8b72f by xmy

feat:配置转移

主题发送日志修改
parent 6068f312
...@@ -2,245 +2,248 @@ ...@@ -2,245 +2,248 @@
namespace Hdll\Services\Cmq; namespace Hdll\Services\Cmq;
use Hdll\Services\Common\Config\CfgCenter;
use Swoft\App; use Swoft\App;
use Hdll\Services\Common\Lib\Xcrypt; use Hdll\Services\Common\Lib\Xcrypt;
class Topic class Topic
{ {
private $topic_name; private $topic_name;
private $cmq_client; private $cmq_client;
private $encoding; private $encoding;
public function __construct($topic_name, $cmq_client, $encoding = false) public function __construct($topic_name, $cmq_client, $encoding = false)
{ {
$this->topic_name = $topic_name; $this->topic_name = $topic_name;
$this->cmq_client = $cmq_client; $this->cmq_client = $cmq_client;
$this->encoding = $encoding; $this->encoding = $encoding;
} }
public function set_encoding($encoding) public function set_encoding($encoding)
{ {
$this->encoding = $encoding; $this->encoding = $encoding;
} }
/* /*
* create topic * create topic
* @type topic_meta : TopicMeta * @type topic_meta : TopicMeta
* @param topic_meta : * @param topic_meta :
*/ */
public function create($topic_meta) public function create($topic_meta)
{ {
$params = array( $params = array(
'topicName' => $this->topic_name, 'topicName' => $this->topic_name,
'filterType' => $topic_meta->filterType, 'filterType' => $topic_meta->filterType,
); );
if ($topic_meta->maxMsgSize > 0) { if ($topic_meta->maxMsgSize > 0) {
$params['maxMsgSize'] = $topic_meta->maxMsgSize; $params['maxMsgSize'] = $topic_meta->maxMsgSize;
} }
$this->cmq_client->create_topic($params); $this->cmq_client->create_topic($params);
} }
/*
* get attributes /*
* * get attributes
* @return topic_meta :TopicMeta *
* * @return topic_meta :TopicMeta
*/ *
public function get_attributes() */
{ public function get_attributes()
$params = array( {
'topicName' => $this->topic_name, $params = array(
); 'topicName' => $this->topic_name,
$resp = $this->cmq_client->get_topic_attributes($params); );
$resp = $this->cmq_client->get_topic_attributes($params);
$topic_meta = new TopicMeta();
$this->__resp2meta($topic_meta, $resp); $topic_meta = new TopicMeta();
$this->__resp2meta($topic_meta, $resp);
return $topic_meta;
} return $topic_meta;
}
/*
* set attributes /*
* * set attributes
* @type topic_meta :TopicMeta *
* @param topic_meta : * @type topic_meta :TopicMeta
*/ * @param topic_meta :
public function set_attributes($topic_meta) */
{ public function set_attributes($topic_meta)
{
$params = array(
'topicName' => $this->topic_name, $params = array(
'maxMsgSize' => strval($topic_meta->maxMsgSize), 'topicName' => $this->topic_name,
); 'maxMsgSize' => strval($topic_meta->maxMsgSize),
$this->cmq_client->set_topic_attributes($params); );
} $this->cmq_client->set_topic_attributes($params);
}
/*
* delete topic /*
*/ * delete topic
public function delete() */
{ public function delete()
$params = array( {
'topicName' => $this->topic_name, $params = array(
); 'topicName' => $this->topic_name,
$this->cmq_client->delete_topic($params); );
} $this->cmq_client->delete_topic($params);
}
/*
* 推送消息 非批量 /*
* @type message :string * 推送消息 非批量
* @param message * @type message :string
* * @param message
* @type vTagList :list *
* @param vTagList 标签 * @type vTagList :list
* * @param vTagList 标签
* @return message handle *
*/ * @return message handle
*/
public function publish_message($message, $vTagList = null, $routingKey = null)
{ public function publish_message($message, $vTagList = null, $routingKey = null)
$params = array( {
'topicName' => $this->topic_name, $params = array(
'msgBody' => $message, 'topicName' => $this->topic_name,
); 'msgBody' => $message,
if ($routingKey != null) { );
$params['routingKey'] = $routingKey; if ($routingKey != null) {
} $params['routingKey'] = $routingKey;
if ($vTagList != null && is_array($vTagList) && !empty($vTagList)) { }
$n = 1; if ($vTagList != null && is_array($vTagList) && !empty($vTagList)) {
foreach ($vTagList as $tag) { $n = 1;
$key = 'msgTag.' . $n; foreach ($vTagList as $tag) {
$params[$key] = $tag; $key = 'msgTag.' . $n;
$n += 1; $params[$key] = $tag;
} $n += 1;
} }
$msgId = $this->cmq_client->publish_message($params); }
$msgId = $this->cmq_client->publish_message($params);
return $msgId;
} return $msgId;
}
/*
* 批量推送消息 /*
* @type vmessageList :list * 批量推送消息
* @param vmessageList: * @type vmessageList :list
* * @param vmessageList:
* @type vtagList :list *
* @param vtagList * @type vtagList :list
* * @param vtagList
* @return : return message handle list *
*/ * @return : return message handle list
*/
public function batch_publish_message($vmessageList, $vtagList = null, $routingKey = null)
{ public function batch_publish_message($vmessageList, $vtagList = null, $routingKey = null)
$params = array( {
'topicName' => $this->topic_name, $params = array(
); 'topicName' => $this->topic_name,
);
if ($routingKey != null) {
$params['routingKey'] = $routingKey; if ($routingKey != null) {
} $params['routingKey'] = $routingKey;
$n = 1; }
if (is_array($vmessageList) && !empty($vmessageList)) { $n = 1;
foreach ($vmessageList as $msg) { if (is_array($vmessageList) && !empty($vmessageList)) {
$key = 'msgBody.' . $n; foreach ($vmessageList as $msg) {
if ($this->encoding) { $key = 'msgBody.' . $n;
$params[$key] = base64_encode($msg); if ($this->encoding) {
} else { $params[$key] = base64_encode($msg);
$params[$key] = $msg; } else {
} $params[$key] = $msg;
$n += 1; }
} $n += 1;
} }
if ($vtagList != null && is_array($vtagList) && !empty($vtagList)) { }
$n = 1; if ($vtagList != null && is_array($vtagList) && !empty($vtagList)) {
foreach ($vtagList as $tag) { $n = 1;
$key = 'msgTag.' . $n; foreach ($vtagList as $tag) {
$params[$key] = $tag; $key = 'msgTag.' . $n;
$n += 1; $params[$key] = $tag;
} $n += 1;
} }
}
$msgList = $this->cmq_client->batch_publish_message($params);
$msgList = $this->cmq_client->batch_publish_message($params);
$retMessageList = array();
foreach ($msgList as $msg) { $retMessageList = array();
if (isset($msg['msgId'])) { foreach ($msgList as $msg) {
$retmsgId = $msg['msgId']; if (isset($msg['msgId'])) {
$retMessageList[] = $retmsgId; $retmsgId = $msg['msgId'];
} $retMessageList[] = $retmsgId;
} }
return $retMessageList; }
return $retMessageList;
}
/* 列出Topic的Subscriptoin }
@type topic_name :string /* 列出Topic的Subscriptoin
@param topic_name:
@type topic_name :string
@type searchWord: string @param topic_name:
@param searchWord: 订阅关键字
@type searchWord: string
@type limit: int @param searchWord: 订阅关键字
@param limit: 最多返回的订阅数目
@type limit: int
@type offset: string @param limit: 最多返回的订阅数目
@param offset: list_subscription的起始位置,上次list_subscription返回的next_offset
@type offset: string
@rtype: tuple @param offset: list_subscription的起始位置,上次list_subscription返回的next_offset
@return: subscriptionURL的列表和下次list subscription的起始位置; 如果所有subscription都list出来,next_offset为"".
*/ @rtype: tuple
public function list_subscription($searchWord = "", $limit = -1, $offset = "") @return: subscriptionURL的列表和下次list subscription的起始位置; 如果所有subscription都list出来,next_offset为"".
{ */
$params = array('topicName' => $this->topic_name); public function list_subscription($searchWord = "", $limit = -1, $offset = "")
{
if ($searchWord != "") { $params = array('topicName' => $this->topic_name);
$params['searchWord'] = $searchWord;
} if ($searchWord != "") {
$params['searchWord'] = $searchWord;
if ($limit != -1) { }
$params['limit'] = $limit;
} if ($limit != -1) {
$params['limit'] = $limit;
if ($offset != "") { }
$params['offset'] = $offset;
} if ($offset != "") {
$params['offset'] = $offset;
$resp = $this->cmq_client->list_subscription($params); }
if ($offset == "") { $resp = $this->cmq_client->list_subscription($params);
$next_offset = count($resp['subscriptionList']);
} else { if ($offset == "") {
$next_offset = $offset + count($resp['subscriptionList']); $next_offset = count($resp['subscriptionList']);
} } else {
$next_offset = $offset + count($resp['subscriptionList']);
if ($next_offset >= $resp['totalCount']) { }
$next_offset = "";
} if ($next_offset >= $resp['totalCount']) {
$next_offset = "";
return array("totalCoult" => $resp['totalCount'], }
"subscriptionList" => $resp['subscriptionList'],
"next_offset" => $next_offset); return array("totalCoult" => $resp['totalCount'],
} "subscriptionList" => $resp['subscriptionList'],
"next_offset" => $next_offset);
protected function __resp2meta($topic_meta, $resp) }
{
if (isset($resp['maxMsgSize'])) { protected function __resp2meta($topic_meta, $resp)
$topic_meta->maxMsgSize = $resp['maxMsgSize']; {
} if (isset($resp['maxMsgSize'])) {
if (isset($resp['msgRetentionSeconds'])) { $topic_meta->maxMsgSize = $resp['maxMsgSize'];
$topic_meta->msgRetentionSeconds = $resp['msgRetentionSeconds']; }
} if (isset($resp['msgRetentionSeconds'])) {
if (isset($resp['createTime'])) { $topic_meta->msgRetentionSeconds = $resp['msgRetentionSeconds'];
$topic_meta->createTime = $resp['createTime']; }
} if (isset($resp['createTime'])) {
if (isset($resp['lastModifyTime'])) { $topic_meta->createTime = $resp['createTime'];
$topic_meta->lastModifyTime = $resp['lastModifyTime']; }
} if (isset($resp['lastModifyTime'])) {
if (isset($resp['filterType'])) { $topic_meta->lastModifyTime = $resp['lastModifyTime'];
$topic_meta->filterType = $resp['filterType']; }
} if (isset($resp['filterType'])) {
$topic_meta->filterType = $resp['filterType'];
} }
}
/** /**
* 加密 发送主题消息 * 加密 发送主题消息
...@@ -250,18 +253,39 @@ class Topic ...@@ -250,18 +253,39 @@ class Topic
* @return mixed * @return mixed
* @author work * @author work
*/ */
public function cryptPushMessage(string $message, $vTagList = null, $routingKey = null){ public function cryptPushMessage(string $message, $vTagList = null, $routingKey = null)
$cryptMessage = Xcrypt::encrypt($message); {
$tryTimes = 0; $cryptMessage = Xcrypt::encrypt($message);
$tryTimes = 0;
do{
$res = $this->publish_message($cryptMessage,$vTagList,$routingKey); do {
$tryTimes++; $res = $this->publish_message($cryptMessage, $vTagList, $routingKey);
}while($res['code']!=0 && $tryTimes < 3); $tryTimes++;
} while ($res['code'] != 0 && $tryTimes < 3);
if ($tryTimes >= 3){
App::error("[消息队列失败]:$message"); if ($tryTimes >= 3) {
} App::error("[消息队列失败]:$message");
return $res; }
} $this->TopicLog($message, $cryptMessage, $vTagList, $res);
return $res;
}
protected function TopicLog($message, $cryptMessage, $tagName, $response)
{
$data = [
'tagName' => $tagName,
'topName' => $this->topic_name,
'message' => $message,
'cryptMessage' => $cryptMessage,
'createTime' => time(),
'response' => json_encode($response),
];
try {
$db = CfgCenter::dbConnect();
$db->insert('topic_log', $data);
}catch (\Exception $e){
App::error("消息主题日志记录失败:".$e->getMessage().'---'.json_encode($data));
}
}
} }
...@@ -68,7 +68,7 @@ class CfgCenter ...@@ -68,7 +68,7 @@ class CfgCenter
return [trim($rkey,':'), $valObj]; return [trim($rkey,':'), $valObj];
} }
protected static function dbConnect() public static function dbConnect()
{ {
if(\env('ENVIRONMENT', '') == '') { if(\env('ENVIRONMENT', '') == '') {
// 返回线上数据库连接 // 返回线上数据库连接
......
<?php <?php
return [ namespace Hdll\Services\Common\Config;
'qCloud' => [
'Bucket' => 'hdll-1257143824', use Swoft\Redis\Redis;
'APPID' => '1257143824', use Swoft\App;
'SecretId' => 'AKIDseHj18kua0KTSJ4g9SadbVEnEUZVjvPj',
'SecretKey' => 'IPL5g5PaaSAzd6NSO8gEmLxcN4pTzJSQ', const CACHE_PREFIX = 'CONFIG_CACHE';
'Region' => 'ap-shanghai' $redis = App::getBen(Redis::class);
], $data = $redis->get(CACHE_PREFIX . 'all');
'alisms' => [
'accessKeyId' => 'EjBn9zQxyEkKHyAA', if (!empty($data)) {
'accessKeySecret' => 'AN276rwCcqCkFUVt1GLCbAy8jnj52t', $db = CfgCenter::dbConnect();
], $data = $db->select('config', ['name', 'value']);
'cls' => [ $redis->set(CACHE_PREFIX . 'all', $data);
'appid' => '1257143824 ', }
'secretId' => 'AKIDseHj18kua0KTSJ4g9SadbVEnEUZVjvPj', return $data;
'secretKey' => 'IPL5g5PaaSAzd6NSO8gEmLxcN4pTzJSQ', \ No newline at end of file
],
'cmq' => [
'intranet_host' => 'http://cmq-topic-bj.api.tencentyun.com',//内网
'internet_host' => 'https://cmq-topic-bj.api.qcloud.com',//外网
'secretId' => 'AKIDYRW1cG2iVIg8dAoCe86vhNuA7A5oNknk',
'secretKey' => 'z0ymS7xfLrP6Sk2EKHWaXN6d0EIxX0IQ',
],
'cryptKey' => 'ebf032f01aa2093be3ee2ee2c137hdll',
];
\ No newline at end of file
<?php <?php
namespace Hdll\Services\Common\Lib; namespace Hdll\Services\Common\Lib;
use Hdll\Services\Common\Config\CfgCenter;
use Swoft\App; use Swoft\App;
use Swoft\Redis\Redis; use Swoft\Redis\Redis;
...@@ -41,8 +42,9 @@ class Xcrypt ...@@ -41,8 +42,9 @@ class Xcrypt
} }
private static function getKey(){ private static function getKey(){
$redis = App::getBean(Redis::class); // $redis = App::getBean(Redis::class);
$key = $redis->get(self::CRYPT); // $key = $redis->get(self::CRYPT);
$key = CfgCenter::get(self::CRYPT);
if (empty($key)){ if (empty($key)){
throw new \Exception('加密密钥获取失败!'); throw new \Exception('加密密钥获取失败!');
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment