Commit b1265d4e by 王召彬

重传

parent 78528d36
.buildpath
.settings/
.project
*.patch
.idea/
.git/
runtime/
vendor/
temp/
*.lock
.phpintel/
.DS_Store
\ No newline at end of file
language: php
php:
- 7.0
- 7.1
services:
- mysql
before_install:
- mysql -e 'CREATE DATABASE IF NOT EXISTS test;'
install:
- wget https://github.com/redis/hiredis/archive/v0.13.3.tar.gz -O hiredis.tar.gz && mkdir -p hiredis && tar -xf hiredis.tar.gz -C hiredis --strip-components=1 && cd hiredis && sudo make -j$(nproc) && sudo make install && sudo ldconfig && cd ..
- pecl install -f swoole-2.0.12
before_script:
- composer update
script: composer test
This diff is collapsed. Click to expand it.
# Swoft Rpc-server
Swoft Rpc-server Component
# Install
# Document
# LICENSE
Swoft Rpc-server Component is open-sourced software licensed under the [Apache license](LICENSE).
{
"name": "swoft/rpc-server",
"type": "library",
"keywords": [
"php",
"swoole",
"swoft",
"rpc"
],
"description": "microservice framework base on swoole",
"license": "Apache-2.0",
"require": {
"swoft/framework": "^1.0",
"swoft/console": "^1.0",
"swoft/http-message": "^1.0",
"psr/http-server-middleware": "^1.0"
},
"require-dev": {
"eaglewu/swoole-ide-helper": "dev-master",
"phpunit/phpunit": "^5.7"
},
"autoload": {
"psr-4": {
"Swoft\\Rpc\\Server\\": "src/"
}
},
"autoload-dev": {
"psr-4": {
"SwoftTest\\RpcServer\\": "test/Cases"
}
},
"repositories": [
{
"type": "composer",
"url": "https://packagist.phpcomposer.com"
}
],
"scripts": {
"test": "./vendor/bin/phpunit -c phpunit.xml"
}
}
<?xml version="1.0" encoding="UTF-8"?>
<phpunit backupGlobals="false"
backupStaticAttributes="false"
bootstrap="./test/bootstrap.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false">
<testsuites>
<testsuite name="Tests">
<directory suffix="Test.php">./test/Cases</directory>
</testsuite>
</testsuites>
<filter>
<whitelist processUncoveredFilesFromWhitelist="true">
<directory suffix=".php">./app</directory>
</whitelist>
</filter>
</phpunit>
<?php
namespace Swoft\Rpc\Server\Bean\Annotation;
/**
* Service annotation
*
* @Annotation
* @Target("CLASS")
*/
class Service
{
/**
* @var string
*/
private $version = "0";
/**
* Service constructor.
*
* @param array $values
*/
public function __construct(array $values)
{
if (isset($values['value'])) {
$this->version = $values['value'];
}
if (isset($values['version'])) {
$this->version = $values['version'];
}
}
/**
* @return string
*/
public function getVersion(): string
{
return $this->version;
}
}
<?php
namespace Swoft\Rpc\Server\Bean\Collector;
use Swoft\Bean\CollectorInterface;
use Swoft\Rpc\Server\Bean\Annotation\Service;
/**
* Service colletor
*/
class ServiceCollector implements CollectorInterface
{
/**
* @var array
*/
private static $serviceMapping = [];
/**
* @param string $className
* @param null $objectAnnotation
* @param string $propertyName
* @param string $methodName
* @param null $propertyValue
*
* @return void
*/
public static function collect(string $className, $objectAnnotation = null, string $propertyName = '', string $methodName = '', $propertyValue = null)
{
// collect service
if ($objectAnnotation instanceof Service) {
$rc = new \ReflectionClass($className);
$interfaces = $rc->getInterfaceNames();
$methods = $rc->getMethods(\ReflectionMethod::IS_PUBLIC);
$version = $objectAnnotation->getVersion();
foreach ($interfaces as $interfaceClass){
foreach ($methods as $method){
$methodName = $method->getName();
self::$serviceMapping[$interfaceClass][$version][$methodName] = [$className, $methodName];
}
}
return;
}
}
/**
* @return array
*/
public static function getCollector(): array
{
return self::$serviceMapping;
}
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Bean\Parser;
use Swoft\Bean\Annotation\Scope;
use Swoft\Bean\Parser\AbstractParser;
use Swoft\Rpc\Server\Bean\Collector\ServiceCollector;
use Swoft\Rpc\Server\Bean\Annotation\Service;
/**
* Service annotation parser
*/
class ServiceParser extends AbstractParser
{
/**
* @param string $className
* @param Service $objectAnnotation
* @param string $propertyName
* @param string $methodName
* @param null $propertyValue
* @return mixed
*/
public function parser(
string $className,
$objectAnnotation = null,
string $propertyName = '',
string $methodName = '',
$propertyValue = null
) {
$beanName = $className;
$scope = Scope::SINGLETON;
ServiceCollector::collect($className, $objectAnnotation, $propertyName, $methodName, $propertyValue);
return [$beanName, $scope, ''];
}
}
<?php
namespace Swoft\Rpc\Server\Bean\Wrapper;
use Swoft\Bean\Annotation\Enum;
use Swoft\Bean\Annotation\Floats;
use Swoft\Bean\Annotation\Inject;
use Swoft\Bean\Annotation\Integer;
use Swoft\Http\Message\Bean\Annotation\Middleware;
use Swoft\Http\Message\Bean\Annotation\Middlewares;
use Swoft\Bean\Annotation\Number;
use Swoft\Bean\Annotation\Strings;
use Swoft\Bean\Annotation\Value;
use Swoft\Bean\Wrapper\AbstractWrapper;
use Swoft\Rpc\Server\Bean\Annotation\Service;
/**
* Servic eWrapper
*/
class ServiceWrapper extends AbstractWrapper
{
/**
* 类注解
*
* @var array
*/
protected $classAnnotations
= [
Service::class,
Middleware::class,
Middlewares::class,
];
/**
* 属性注解
*
* @var array
*/
protected $propertyAnnotations
= [
Inject::class,
Value::class,
];
/**
* 方法注解
*
* @var array
*/
protected $methodAnnotations
= [
Middleware::class,
Middlewares::class,
Strings::class,
Floats::class,
Number::class,
Integer::class,
Enum::class
];
/**
* 是否解析类注解
*
* @param array $annotations
*
* @return bool
*/
public function isParseClassAnnotations(array $annotations): bool
{
return isset($annotations[Service::class]);
}
/**
* 是否解析属性注解
*
* @param array $annotations
*
* @return bool
*/
public function isParsePropertyAnnotations(array $annotations): bool
{
return isset($annotations[Inject::class]) || isset($annotations[Value::class]);
}
/**
* 是否解析方法注解
*
* @param array $annotations
*
* @return bool
*/
public function isParseMethodAnnotations(array $annotations): bool
{
return true;
}
}
<?php
namespace Swoft\Rpc\Server\Bootstrap;
use Swoft\Bean\Annotation\BootBean;
use Swoft\Core\BootBeanInterface;
use Swoft\Rpc\Server\Router\HandlerMapping;
use Swoft\Rpc\Server\ServiceDispatcher;
/**
* The core bean of service
*
* @BootBean()
*/
class CoreBean implements BootBeanInterface
{
/**
* @return array
*/
public function beans()
{
return [
'ServiceDispatcher' => [
'class' => ServiceDispatcher::class,
],
'serviceRouter' => [
'class' => HandlerMapping::class,
],
];
}
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Bootstrap\Listeners;
use Swoft\App;
use Swoft\Bean\Annotation\SwooleListener;
use Swoft\Bootstrap\Listeners\Interfaces\CloseInterface;
use Swoft\Bootstrap\Listeners\Interfaces\ConnectInterface;
use Swoft\Bootstrap\Listeners\Interfaces\ReceiveInterface;
use Swoole\Server;
use Swoft\Bootstrap\SwooleEvent;
/**
*
* @SwooleListener({
* SwooleEvent::ON_RECEIVE,
* SwooleEvent::ON_CONNECT,
* SwooleEvent::ON_CLOSE
* },
* type=SwooleEvent::TYPE_PORT
* )
*/
class RpcEventListener implements ReceiveInterface,ConnectInterface,CloseInterface
{
/**
* RPC 请求每次启动一个协程来处理
*
* @param Server $server
* @param int $fd
* @param int $fromId
* @param string $data
*/
public function onReceive(Server $server, int $fd, int $fromId, string $data)
{
/** @var \Swoft\Rpc\Server\ServiceDispatcher $dispatcher */
$dispatcher = App::getBean('ServiceDispatcher');
$dispatcher->dispatch($server, $fd, $fromId, $data);
}
/**
* 连接成功后回调函数
*
* @param Server $server
* @param int $fd
* @param int $from_id
*
*/
public function onConnect(Server $server, int $fd, int $from_id)
{
var_dump('connnect------');
}
/**
* 连接断开成功后回调函数
*
* @param Server $server
* @param int $fd
* @param int $reactorId
*
*/
public function onClose(Server $server, int $fd, int $reactorId)
{
var_dump('close------');
}
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Command;
use Swoft\Console\Bean\Annotation\Command;
use Swoft\Rpc\Server\Rpc\RpcServer;
/**
* The group command list of rpc server
* @Command(coroutine=false,server=true)
*/
class RpcCommand
{
/**
* start rpc server
*
* @Usage {fullCommand} [-d|--daemon]
* @Options
* -d, --daemon Run server on the background
* @Example
* {fullCommand}
* {fullCommand} -d
*/
public function start()
{
$rpcServer = $this->getRpcServer();
// 是否正在运行
if ($rpcServer->isRunning()) {
$serverStatus = $rpcServer->getServerSetting();
\output()->writeln("<error>The server have been running!(PID: {$serverStatus['masterPid']})</error>", true, true);
}
// 选项参数解析
$this->setStartArgs($rpcServer);
$tcpStatus = $rpcServer->getTcpSetting();
// tcp启动参数
$tcpHost = $tcpStatus['host'];
$tcpPort = $tcpStatus['port'];
$tcpType = $tcpStatus['type'];
$tcpMode = $tcpStatus['mode'];
// 信息面板
$lines = [
' Information Panel ',
'*************************************************************',
"* tcp | Host: <note>$tcpHost</note>, port: <note>$tcpPort</note>, mode: <note>$tcpMode</note>, type: <note>$tcpType</note>",
'*************************************************************',
];
\output()->writeln(implode("\n", $lines));
// 启动
$rpcServer->start();
}
/**
* reload worker process
*
* @Usage
* {fullCommand} [arguments] [options]
* @Options
* -t Only to reload task processes, default to reload worker and task
* @Example
* php swoft.php rpc:reload
*/
public function reload()
{
$rpcServer = $this->getRpcServer();
// 是否已启动
if (! $rpcServer->isRunning()) {
output()->writeln('<error>The server is not running! cannot reload</error>', true, true);
}
// 打印信息
output()->writeln(sprintf('<info>Server %s is reloading ...</info>', input()->getFullScript()));
// 重载
$reloadTask = input()->hasOpt('t');
$rpcServer->reload($reloadTask);
output()->writeln(sprintf('<success>Server %s is reload success</success>', input()->getFullScript()));
}
/**
* stop rpc server
*
* @Usage {fullCommand}
* @Example {fullCommand}
*/
public function stop()
{
$rpcServer = $this->getRpcServer();
// 是否已启动
if (! $rpcServer->isRunning()) {
\output()->writeln('<error>The server is not running! cannot stop</error>', true, true);
}
// pid文件
$serverStatus = $rpcServer->getServerSetting();
$pidFile = $serverStatus['pfile'];
@unlink($pidFile);
\output()->writeln(sprintf('<info>Swoft %s is stopping ...</info>', input()->getFullScript()));
$result = $rpcServer->stop();
// 停止失败
if (! $result) {
\output()->writeln(sprintf('<error>Swoft %s stop fail</error>', input()->getFullScript()));
}
\output()->writeln(sprintf('<success>Swoft %s stop success</success>', input()->getFullScript()));
}
/**
* restart rpc server
*
* @Usage {fullCommand}
* @Options
* -d, --daemon Run server on the background
* @Example
* {fullCommand}
* {fullCommand} -d
*/
public function restart()
{
$rpcServer = $this->getRpcServer();
// 是否已启动
if ($rpcServer->isRunning()) {
$this->stop();
}
// 重启默认是守护进程
$rpcServer->setDaemonize();
$this->start();
}
/**
* @return RpcServer
*/
private function getRpcServer(): RpcServer
{
$script = \input()->getScript();
$rpcServer = new RpcServer();
$rpcServer->setScriptFile($script);
return $rpcServer;
}
/**
* @param RpcServer $rpcServer
*/
private function setStartArgs(RpcServer $rpcServer)
{
if (\input()->getSameOpt(['d', 'daemon'], false)) {
$rpcServer->setDaemonize();
}
}
}
<?php
namespace Swoft\Rpc\Server\Event\Listeners;
use Swoft\App;
use Swoft\Core\RequestContext;
use Swoft\Bean\Annotation\Listener;
use Swoft\Event\EventInterface;
use Swoft\Event\EventHandlerInterface;
use Swoft\Rpc\Server\Event\RpcServerEvent;
/**
* Event after RPC request
* @Listener(RpcServerEvent::AFTER_RECEIVE)
*/
class AfterReceiveListener implements EventHandlerInterface
{
/**
* @param EventInterface $event
*/
public function handle(EventInterface $event)
{
App::getLogger()->appendNoticeLog();
RequestContext::destroy();
}
}
<?php
namespace Swoft\Rpc\Server\Event\Listeners;
use Swoft\App;
use Swoft\Bean\Annotation\Listener;
use Swoft\Event\AppEvent;
use Swoft\Event\EventHandlerInterface;
use Swoft\Event\EventInterface;
use Swoft\Rpc\Server\Bean\Collector\ServiceCollector;
/**
* The listener of applicatioin loader
* @Listener(AppEvent::APPLICATION_LOADER)
*/
class ApplicationLoaderListener implements EventHandlerInterface
{
/**
* @param \Swoft\Event\EventInterface $event
*/
public function handle(EventInterface $event)
{
/* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */
$serviceRouter = App::getBean('serviceRouter');
$serviceMapping = ServiceCollector::getCollector();
$serviceRouter->register($serviceMapping);
}
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Event\Listeners;
use Swoft\Core\RequestContext;
use Swoft\Bean\Annotation\Listener;
use Swoft\Event\EventInterface;
use Swoft\Event\EventHandlerInterface;
use Swoft\Rpc\Server\Event\RpcServerEvent;
/**
* Event before RPC request
* @Listener(RpcServerEvent::BEFORE_RECEIVE)
*/
class BeforeReceiveListener implements EventHandlerInterface
{
/**
* @param EventInterface $event
*/
public function handle(EventInterface $event)
{
$params = $event->getParams();
if (! isset($params[0])) {
return;
}
$data = $params[0];
$logid = $data['logid'] ?? uniqid('', true);
$spanid = $data['spanid'] ?? 0;
$uri = $data['func'] ?? 'null';
$contextData = [
'logid' => $logid,
'spanid' => $spanid,
'uri' => $uri,
'requestTime' => microtime(true),
];
RequestContext::setContextData($contextData);
}
}
<?php
namespace Swoft\Rpc\Server\Event;
/**
* RPC Server event defines
*/
class RpcServerEvent
{
/**
* Before rpc request
*/
const BEFORE_RECEIVE = 'beforeReceive';
/**
* After rpc request
*/
const AFTER_RECEIVE = 'afterReceive';
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Exception;
use Swoft\Exception\Exception;
/**
* Base exception of rpc server
*/
class RpcServerException extends Exception
{
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Middleware;
use Psr\Http\Server\RequestHandlerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoft\App;
use Swoft\Bean\Annotation\Bean;
use Swoft\Http\Message\Middleware\MiddlewareInterface;
/**
* service handler adapter
*
* @Bean()
*/
class HandlerAdapterMiddleware implements MiddlewareInterface
{
/**
* execute service with handler
*
* @param \Psr\Http\Message\ServerRequestInterface $request
* @param \Psr\Http\Server\RequestHandlerInterface $handler
*
* @return \Psr\Http\Message\ResponseInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
$serviceHandler = $request->getAttribute(RouterMiddleware::ATTRIBUTE);
/* @var \Swoft\Rpc\Server\Router\HandlerAdapter $handlerAdapter */
$handlerAdapter = App::getBean('serviceHandlerAdapter');
$response = $handlerAdapter->doHandler($request, $serviceHandler);
return $response;
}
}
<?php
namespace Swoft\Rpc\Server\Middleware;
use Psr\Http\Server\RequestHandlerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoft\App;
use Swoft\Bean\Annotation\Bean;
use Swoft\Http\Message\Middleware\MiddlewareInterface;
use Swoft\Rpc\Server\Event\RpcServerEvent;
use Swoft\Rpc\Server\Router\HandlerAdapter;
/**
* service packer
*
* @Bean()
* @uses PackerMiddleware
* @version 2017年11月26日
* @author stelin <phpcrazy@126.com>
* @copyright Copyright 2010-2016 swoft software
* @license PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
*/
class PackerMiddleware implements MiddlewareInterface
{
/**
* the server param of service
*/
const ATTRIBUTE_SERVER = 'serviceRequestServer';
/**
* the fd param of service
*/
const ATTRIBUTE_FD = 'serviceRequestFd';
/**
* the fromid param of service
*/
const ATTRIBUTE_FROMID = 'serviceRequestFromid';
/**
* the data param of service
*/
const ATTRIBUTE_DATA = 'serviceRequestData';
/**
* packer middleware
*
* @param \Psr\Http\Message\ServerRequestInterface $request
* @param \Psr\Http\Server\RequestHandlerInterface $handler
*
* @return \Psr\Http\Message\ResponseInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
$packer = service_packer();
$data = $request->getAttribute(self::ATTRIBUTE_DATA);
$data = $packer->unpack($data);
// init data and trigger event
App::trigger(RpcServerEvent::BEFORE_RECEIVE, null, $data);
$request = $request->withAttribute(self::ATTRIBUTE_DATA, $data);
/* @var \Swoft\Rpc\Server\Rpc\Response $response */
$response = $handler->handle($request);
$serviceResult = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
$serviceResult = $packer->pack($serviceResult);
return $response->withAttribute(HandlerAdapter::ATTRIBUTE, $serviceResult);
}
}
<?php
namespace Swoft\Rpc\Server\Middleware;
use Psr\Http\Server\RequestHandlerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoft\App;
use Swoft\Bean\Annotation\Bean;
use Swoft\Http\Message\Middleware\MiddlewareInterface;
/**
* service router
*
* @Bean()
* @uses RouterMiddleware
* @version 2017年11月26日
* @author stelin <phpcrazy@126.com>
* @copyright Copyright 2010-2016 swoft software
* @license PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
*/
class RouterMiddleware implements MiddlewareInterface
{
/**
* the attributed key of service
*/
const ATTRIBUTE = "serviceHandler";
/**
* get handler from router
*
* @param \Psr\Http\Message\ServerRequestInterface $request
* @param \Psr\Http\Server\RequestHandlerInterface $handler
*
* @return \Psr\Http\Message\ResponseInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
// service data
$data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
$method = $data['method']??"";
$version = $data['version']??"";
$interface = $data['interface']??"";
/* @var \Swoft\Rpc\Server\Router\HandlerMapping $serviceRouter */
$serviceRouter = App::getBean('serviceRouter');
$serviceHandler = $serviceRouter->getHandler($interface, $version, $method);
// deliver service data
$request = $request->withAttribute(self::ATTRIBUTE, $serviceHandler);
return $handler->handle($request);
}
}
<?php
namespace Swoft\Rpc\Server\Middleware;
use Psr\Http\Server\RequestHandlerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoft\Core\RequestHandler;
use Swoft\Bean\Annotation\Bean;
use Swoft\Http\Message\Middleware\MiddlewareInterface;
use Swoft\Http\Message\Bean\Collector\MiddlewareCollector;
/**
* the annotation middlewares of action
*
* @Bean()
* @uses UserMiddleware
* @version 2017年12月10日
* @author stelin <phpcrazy@126.com>
* @copyright Copyright 2010-2016 swoft software
* @license PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
*/
class UserMiddleware implements MiddlewareInterface
{
/**
* @param \Psr\Http\Message\ServerRequestInterface $request
* @param \Psr\Http\Server\RequestHandlerInterface $handler
*
* @return \Psr\Http\Message\ResponseInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
$serviceHandler = $request->getAttribute(RouterMiddleware::ATTRIBUTE);
list($className, $funcName) = $serviceHandler;
$middlewares = [];
$collector = MiddlewareCollector::getCollector();
$middlewareCollector = $collector[$className]['middlewares']??[];
$groupMiddlewares = $middlewareCollector['group'] ?? [];
$funcMiddlewares = $middlewareCollector['actions'][$funcName]??[];
$middlewares = array_merge($middlewares, $groupMiddlewares, $funcMiddlewares);
if (!empty($middlewares) && $handler instanceof RequestHandler) {
$handler->insertMiddlewares($middlewares);
}
return $handler->handle($request);
}
}
<?php
namespace Swoft\Rpc\Server\Middleware;
use Psr\Http\Server\RequestHandlerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use Swoft\App;
use Swoft\Bean\Annotation\Bean;
use Swoft\Bean\Collector\ValidatorCollector;
use Swoft\Http\Message\Middleware\MiddlewareInterface;
use Swoft\Rpc\Server\Validator\ServiceValidator;
use Swoft\Validator\ValidatorInterface;
/**
* the middleware of service middleware
*
* @Bean()
* @uses ValidatorMiddleware
* @version 2017年12月10日
* @author stelin <phpcrazy@126.com>
* @copyright Copyright 2010-2016 swoft software
* @license PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
*/
class ValidatorMiddleware implements MiddlewareInterface
{
/**
* @param \Psr\Http\Message\ServerRequestInterface $request
* @param \Psr\Http\Server\RequestHandlerInterface $handler
*
* @return \Psr\Http\Message\ResponseInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
/* @var ValidatorInterface $validator */
$serviceHandler = $request->getAttribute(RouterMiddleware::ATTRIBUTE);
$serviceData = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
$validator = App::getBean(ServiceValidator::class);
list($className, $validatorKey) = $serviceHandler;
$collector = ValidatorCollector::getCollector();
if (isset($collector[$className][$validatorKey]['validator'])) {
$validators = $collector[$className][$validatorKey]['validator'];
$validator->validate($validators, $serviceHandler, $serviceData);
}
return $handler->handle($request);
}
}
<?php
namespace Swoft\Rpc\Server\Router;
use Psr\Http\Message\ServerRequestInterface;
use Swoft\App;
use Swoft\Bean\Annotation\Bean;
use Swoft\Helper\PhpHelper;
use Swoft\Helper\ResponseHelper;
use Swoft\Http\Message\Router\HandlerAdapterInterface;
use Swoft\Rpc\Server\Middleware\PackerMiddleware;
use Swoft\Rpc\Server\Rpc\Response;
/**
* Service handler adapter
* @Bean("serviceHandlerAdapter")
*/
class HandlerAdapter implements HandlerAdapterInterface
{
/**
* The result of service handler
*/
const ATTRIBUTE = 'serviceResult';
/**
* Execute service handler
*
* @param \Psr\Http\Message\ServerRequestInterface $request
* @param array $handler
* @return Response
*/
public function doHandler(ServerRequestInterface $request, array $handler): Response
{
// the function params of service
$data = $request->getAttribute(PackerMiddleware::ATTRIBUTE_DATA);
$params = $data['params'] ?? [];
list($serviceClass, $method) = $handler;
$service = App::getBean($serviceClass);
// execute handler with params
$response = PhpHelper::call([$service, $method], $params);
$response = ResponseHelper::formatData($response);
// response
if (! $response instanceof Response) {
$response = (new Response())->withAttribute(self::ATTRIBUTE, $response);
}
return $response;
}
}
<?php
namespace Swoft\Rpc\Server\Router;
use Swoft\Http\Message\Router\HandlerMappingInterface;
use Swoft\Rpc\Server\Exception\RpcServerException;
/**
* Handler of service
*/
class HandlerMapping implements HandlerMappingInterface
{
/**
* Service routes
*
* @var array
*/
private $routes = [];
/**
* Get handler from router
*
* @param array ...$params
*
* @return array
* @throws \InvalidArgumentException
*/
public function getHandler(...$params): array
{
list($interfaceClass, $version, $method) = $params;
return $this->match($interfaceClass, $version, $method);
}
/**
* Auto register routes
*
* @param array $serviceMapping
*/
public function register(array $serviceMapping)
{
foreach ($serviceMapping as $interfaceName => $versions) {
foreach ($versions as $version => $methods) {
$this->registerRoute($interfaceName, $version, $methods);
}
}
}
/**
* Match route
*
* @param $func
*
* @return array
* @throws \InvalidArgumentException
*/
/**
* Match route
*
* @param string $interfaceClass
* @param string $version
* @param string $method
*
* @return array
* @throws RpcServerException
*/
public function match(string $interfaceClass, string $version, string $method): array
{
$serviceKey = $this->getServiceKey($interfaceClass, $version, $method);
if (!isset($this->routes[$serviceKey])) {
throw new RpcServerException(sprintf('The %s of %s %s is not exist! ', $method, $interfaceClass, $version));
}
return $this->routes[$serviceKey];
}
/**
* Register one route
*
* @param string $interfaceName
* @param string $version
* @param array $methods
*/
private function registerRoute(string $interfaceName, string $version, array $methods)
{
foreach ($methods as $method => $handler) {
$serviceKey = $this->getServiceKey($interfaceName, $version, $method);
$this->routes[$serviceKey] = $handler;
}
}
/**
* Get service key
*
* @param string $interfaceClass
* @param string $version
* @param string $method
*
* @return string
*/
private function getServiceKey(string $interfaceClass, string $version, string $method): string
{
return \sprintf('%s_%s_%s', $interfaceClass, $version, $method);
}
/**
* @return array
*/
public function getRoutes(): array
{
return $this->routes;
}
}
<?php
namespace Swoft\Rpc\Server\Rpc;
/**
* The request of rpc server
*/
class Request extends \Swoft\Http\Message\Server\Request
{
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Rpc;
/**
* The response of rpc server
*/
class Response extends \Swoft\Http\Message\Base\Response
{
}
\ No newline at end of file
<?php
namespace Swoft\Rpc\Server\Rpc;
use Swoft\Bean\Collector\SwooleListenerCollector;
use Swoft\Bootstrap\SwooleEvent;
use Swoole\Server;
use Swoft\Bootstrap\Server\AbstractServer;
/**
* RPC Server
*/
class RpcServer extends AbstractServer
{
/**
* Start server
*/
public function start()
{
$this->server = new Server($this->tcpSetting['host'], $this->tcpSetting['port'], $this->tcpSetting['mode'], $this->tcpSetting['type']);
// Bind event callback
$listenSetting = $this->getListenTcpSetting();
$setting = array_merge($this->setting, $listenSetting);
$this->server->set($setting);
$this->server->on(SwooleEvent::ON_START, [$this, 'onStart']);
$this->server->on(SwooleEvent::ON_WORKER_START, [$this, 'onWorkerStart']);
$this->server->on(SwooleEvent::ON_MANAGER_START, [$this, 'onManagerStart']);
$this->server->on(SwooleEvent::ON_PIPE_MESSAGE, [$this, 'onPipeMessage']);
$swooleEvents = $this->getSwooleEvents();
$this->registerSwooleEvents($this->server, $swooleEvents);
// before start
$this->beforeServerStart();
$this->server->start();
}
/**
* @return array
*/
private function getSwooleEvents(): array
{
$swooleListeners = SwooleListenerCollector::getCollector();
$portEvents = $swooleListeners[SwooleEvent::TYPE_PORT][0] ?? [];
$serverEvents = $swooleListeners[SwooleEvent::TYPE_SERVER] ?? [];
return array_merge($portEvents, $serverEvents);
}
}
<?php
namespace Swoft\Rpc\Server;
use Swoft\App;
use Swoft\Contract\DispatcherInterface;
use Swoft\Core\RequestHandler;
use Swoft\Event\AppEvent;
use Swoft\Helper\ResponseHelper;
use Swoft\Rpc\Server\Event\RpcServerEvent;
use Swoft\Rpc\Server\Middleware\HandlerAdapterMiddleware;
use Swoft\Rpc\Server\Middleware\PackerMiddleware;
use Swoft\Rpc\Server\Middleware\RouterMiddleware;
use Swoft\Rpc\Server\Middleware\UserMiddleware;
use Swoft\Rpc\Server\Middleware\ValidatorMiddleware;
use Swoft\Rpc\Server\Router\HandlerAdapter;
use Swoft\Rpc\Server\Rpc\Request;
use Swoole\Server;
/**
* Service dispatcher
*/
class ServiceDispatcher implements DispatcherInterface
{
/**
* Service middlewares
*
* @var array
*/
private $middlewares = [];
/**
* The default of handler adapter
*
* @var string
*/
private $handlerAdapter = HandlerAdapterMiddleware::class;
/**
* @param array ...$params
* @throws \Swoft\Rpc\Exception\RpcException
* @throws \InvalidArgumentException
*/
public function dispatch(...$params)
{
/**
* @var Server $server
* @var int $fd
* @var int $fromid
* @var string $data
*/
list($server, $fd, $fromid, $data) = $params;
try {
// request middlewares
$serviceRequest = $this->getRequest($server, $fd, $fromid, $data);
$middlewares = $this->requestMiddleware();
$requestHandler = new RequestHandler($middlewares, $this->handlerAdapter);
/* @var \Swoft\Rpc\Server\Rpc\Response $response */
$response = $requestHandler->handle($serviceRequest);
$data = $response->getAttribute(HandlerAdapter::ATTRIBUTE);
} catch (\Throwable $t) {
$message = sprintf('%s %s %s', $t->getMessage(), $t->getFile(), $t->getLine());
$data = ResponseHelper::formatData('', $message, $t->getCode());
$data = service_packer()->pack($data);
} finally {
// Release system resources
App::trigger(AppEvent::RESOURCE_RELEASE);
$server->send($fd, $data);
}
App::trigger(RpcServerEvent::AFTER_RECEIVE);
}
/**
* Request middleware
*
* @return array
*/
public function requestMiddleware(): array
{
return array_merge($this->preMiddleware(), $this->middlewares, $this->afterMiddleware());
}
/**
* Pre middleware
*
* @return array
*/
public function preMiddleware(): array
{
return [
PackerMiddleware::class,
RouterMiddleware::class,
];
}
/**
* After middleware
*
* @return array
*/
public function afterMiddleware(): array
{
return [
ValidatorMiddleware::class,
UserMiddleware::class,
];
}
/**
* @param \Swoole\Server $server
* @param int $fd
* @param int $fromid
* @param string $data
* @return Request
*/
private function getRequest(Server $server, int $fd, int $fromid, string $data): Request
{
$serviceRequest = new Request('get', '/');
return $serviceRequest->withAttribute(PackerMiddleware::ATTRIBUTE_SERVER, $server)
->withAttribute(PackerMiddleware::ATTRIBUTE_FD, $fd)
->withAttribute(PackerMiddleware::ATTRIBUTE_FROMID, $fromid)
->withAttribute(PackerMiddleware::ATTRIBUTE_DATA, $data);
}
/**
* @return array
*/
public function getMiddlewares(): array
{
return $this->middlewares;
}
}
<?php
namespace Swoft\Rpc\Server\Validator;
use Swoft\Bean\Annotation\Bean;
use Swoft\Validator\AbstractValidator;
/**
* Service Validator
* @Bean()
*/
class ServiceValidator extends AbstractValidator
{
/**
* @param array ...$params
* @return void
* @throws \ReflectionException
* @throws \Swoft\Exception\ValidatorException
*/
public function validate(...$params)
{
list($validators, $serviceHandler, $serviceData) = $params;
$args = $this->getServiceArgs($serviceHandler, $serviceData);
foreach ($validators ?? [] as $type => $validator) {
$this->validateArg($args, $validator);
}
}
/**
* validate arg
*
* @param array $args
* @param array $validator
* @throws \Swoft\Exception\ValidatorException
*/
public function validateArg(array $args, array $validator)
{
foreach ($validator as $name => $info) {
if (! isset($args[$name])) {
continue;
}
$this->doValidation($name, $args[$name], $info);
}
}
/**
* get args of called function
*
* @param array $serviceHandler
* @param array $serviceData
* @return array
* @throws \ReflectionException
*/
private function getServiceArgs(array $serviceHandler, array $serviceData): array
{
list($className, $method) = $serviceHandler;
$rc = new \ReflectionClass($className);
$rm = $rc->getMethod($method);
$mps = $rm->getParameters();
$params = $serviceData['params'] ?? [];
if (empty($params)) {
return [];
}
$index = 0;
$args = [];
foreach ($mps as $mp) {
$name = $mp->getName();
if (! isset($params[$index])) {
break;
}
$args[$name] = $params[$index];
$index++;
}
return $args;
}
}
# test config
TEST_NAME=test
TEST_URI=127.0.0.1:6378
TEST_MAX_IDEL=2
TEST_MAX_ACTIVE=2
TEST_MAX_WAIT=2
TEST_TIMEOUT=2
TEST_BALANCER=r1
TEST_USE_PROVIDER=true
TEST_PROVIDER=c1
# the pool of master nodes pool
DB_NAME=master2
DB_URI=127.0.0.1:3302,127.0.0.1:3302
DB_MAX_IDEL=2
DB_MAX_ACTIVE=2
DB_MAX_WAIT=2
DB_TIMEOUT=2
DB_USE_PROVIDER=true
DB_BALANCER=random2
DB_PROVIDER=consul2
# the pool of slave nodes pool
DB_SLAVE_NAME=slave2
DB_SLAVE_URI=127.0.0.1:3306/test?user=root&password=&charset=utf8,127.0.0.1:3306/test?user=root&password=&charset=utf8
DB_SLAVE_MAX_IDEL=2
DB_SLAVE_MAX_ACTIVE=2
DB_SLAVE_MAX_WAIT=2
DB_SLAVE_TIMEOUT=2
DB_SLAVE_USE_PROVIDER=false
DB_SLAVE_BALANCER=random
DB_SLAVE_PROVIDER=consul2
# the pool of redis
REDIS_NAME=redis2
REDIS_URI=127.0.0.1:2222,127.0.0.1:2222
REDIS_MAX_IDEL=2
REDIS_MAX_ACTIVE=2
REDIS_MAX_WAIT=2
REDIS_TIMEOUT=2
REDIS_USE_PROVIDER=true
REDIS_BALANCER=random2
REDIS_PROVIDER=consul2
# consul provider
PROVIDER_CONSUL_ADDRESS=http://127.0.0.1:82
PROVIDER_CONSUL_TAGS=1,2
PROVIDER_CONSUL_TIMEOUT=2
PROVIDER_CONSUL_INTERVAL=2
<?php
namespace SwoftTest\RpcServer;
use PHPUnit\Framework\TestCase;
/**
* @uses AbstractTestCase
* @version 2017年11月03日
* @author huangzhhui <huangzhwork@gmail.com>
* @copyright Copyright 2010-2017 Swoft software
* @license PHP Version 7.x {@link http://www.php.net/license/3_0.txt}
*/
abstract class AbstractTestCase extends TestCase
{
}
\ No newline at end of file
<?php
namespace SwoftTest\RpcServer;
class DemoTest extends AbstractTestCase
{
public function testDemo()
{
$this->assertTrue(true);
}
}
\ No newline at end of file
<?php
require_once dirname(__FILE__, 2) . '/vendor/autoload.php';
require_once dirname(__FILE__, 2) . '/test/config/define.php';
// init
\Swoft\App::$isInTest = true;
\Swoft\Bean\BeanFactory::init();
/* @var \Swoft\Bootstrap\Boots\Bootable $bootstrap*/
$bootstrap = \Swoft\App::getBean(\Swoft\Bootstrap\Bootstrap::class);
$bootstrap->bootstrap();
\Swoft\Bean\BeanFactory::reload([
'application' => [
'class' => \Swoft\Testing\Application::class,
'inTest' => true
],
]);
$initApplicationContext = new \Swoft\Core\InitApplicationContext();
$initApplicationContext->init();
<?php
return [];
\ No newline at end of file
<?php
return [
"noticeHandler" => [
"class" => \Swoft\Log\FileHandler::class,
"logFile" => "@runtime/logs/notice.log",
'formatter' => '${lineFormatter}',
"levels" => [
\Swoft\Log\Logger::NOTICE,
\Swoft\Log\Logger::INFO,
\Swoft\Log\Logger::DEBUG,
\Swoft\Log\Logger::TRACE,
]
],
"applicationHandler" => [
"class" => \Swoft\Log\FileHandler::class,
"logFile" => "@runtime/logs/error.log",
'formatter' => '${lineFormatter}',
"levels" => [
\Swoft\Log\Logger::ERROR,
\Swoft\Log\Logger::WARNING
]
],
"logger" => [
"class" => \Swoft\Log\Logger::class,
"name" => APP_NAME,
"flushInterval" => 100,
"flushRequest" => true,
"handlers" => [
'${noticeHandler}',
'${applicationHandler}'
]
],
];
<?php
use \Swoft\App;
// Constants
!defined('DS') && define('DS', DIRECTORY_SEPARATOR);
// 系统名称
!defined('APP_NAME') && define('APP_NAME', 'swoft');
// 基础根目录
!defined('BASE_PATH') && define('BASE_PATH', dirname(__DIR__, 1));
// cli命名空间
!defined('COMMAND_NS') && define('COMMAND_NS', "App\Commands");
// 注册别名
$aliases = [
'@root' => BASE_PATH,
'@app' => '@root/app',
'@res' => '@root/resources',
'@runtime' => '@root/runtime',
'@configs' => '@root/config',
'@resources' => '@root/resources',
'@beans' => '@configs/beans',
'@properties' => '@configs/properties',
'@console' => '@beans/console.php',
];
App::setAliases($aliases);
<?php
return [
"version" => '1.0',
'autoInitBean' => true,
'beanScan' => [
'Swoft\\Http\\Server\\Test\\Testing' => BASE_PATH."/Testing"
],
'I18n' => [
'sourceLanguage' => '@root/resources/messages/',
],
'env' => 'Base',
'user.stelin.steln' => 'fafafa',
'Service' => [
'user' => [
'timeout' => 3000
]
],
'cache' => require dirname(__FILE__) . DS . "cache.php",
];
<?php
return [
'redis' => [
'name' => 'redis1',
"uri" => [
'127.0.0.1:1111',
'127.0.0.1:1111',
],
"maxIdel" => 1,
"maxActive" => 1,
"maxWait" => 1,
"timeout" => 1,
"balancer" => 'random1',
"useProvider" => true,
'provider' => 'consul1',
],
];
<?php
return [
'server' => [
'pfile' => env('PFILE', '/tmp/swoft.pid'),
'pname' => env('PNAME', 'php-swoft'),
'tcpable' => env('TCPABLE', true),
'cronable' => env('CRONABLE', false),
'autoReload' => env('AUTO_RELOAD', true),
],
'tcp' => [
'host' => env('TCP_HOST', '0.0.0.0'),
'port' => env('TCP_PORT', 8099),
'mode' => env('TCP_MODE', SWOOLE_PROCESS),
'type' => env('TCP_TYPE', SWOOLE_SOCK_TCP),
'package_max_length' => env('TCP_PACKAGE_MAX_LENGTH', 2048),
'open_eof_check' => env('TCP_OPEN_EOF_CHECK', false),
],
'http' => [
'host' => env('HTTP_HOST', '0.0.0.0'),
'port' => env('HTTP_PORT', 80),
'mode' => env('HTTP_MODE', SWOOLE_PROCESS),
'type' => env('HTTP_TYPE', SWOOLE_SOCK_TCP),
],
'crontab' => [
'task_count' => env('CRONTAB_TASK_COUNT', 1024),
'task_queue' => env('CRONTAB_TASK_QUEUE', 2048),
],
'setting' => [
'worker_num' => env('WORKER_NUM', 1),
'max_request' => env('MAX_REQUEST', 10000),
'daemonize' => env('DAEMONIZE', 0),
'dispatch_mode' => env('DISPATCH_MODE', 2),
'log_file' => env('LOG_FILE', '@runtime/logs/swoole.log'),
'task_worker_num' => env('TASK_WORKER_NUM', 1),
'upload_tmp_dir' => env('UPLOAD_TMP_DIR', '@runtime/uploadfiles'),
],
];
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment