利用协程特性以同步方式来编写异步代码,增强可读性。
将swoole的异步特性与传统框架的MVC相结合。
可以用作api也可以用作http server,rpc server.
目前实现了以redis为注册中心的服务化治理.
框架基本使用与传统框架基本一致,路由,控制器,服务层,数据层。
在异步调用的地方需要以yield关键词来触发协程切换
全异步协程调度,支持高并发
异步TCP,HTTP客户端
异步日志
异步文件读写
异步Mysql
异步Mysql事务处理
异步Redis
支持Mysql连接池,Redis连接池
SOA服务化调用,内部封装完整的RPC通信,服务端采用异步Task处理后合并数据并返回。
异步TCP客户端支持并行、串行调用
支持EOF结束符协议、自定义网络通信协议,支持json化、php序列化包体,支持gzip。
Twig、Doctrine支持视图、服务数据层
单元测试覆盖
use AsyncTcp ;
$ tcp = new AsyncTcp ('127.0.0.1 ' , 9501 );
$ tcp ->setTimeout (2 );
//串行发送
$ res = (yield $ tcp ->call ('hello server! ' ));
$ res = (yield $ tcp ->call ('hello server! ' ));
//并行发送数据包
$ tcp ->addCall ('hello server1! ' );
$ tcp ->addCall ('hello server2! ' );
$ res = (yield $ tcp ->multiCall ());
Tips(如果使用tcp异步客户端和其他服务端通信)
tcp客户端的数据包格式可在config/app.php中配置.
protocol为buf时,是按包头+包体封装数据包的,包头为4个字节,存放包体的长度,解包时同样也是按包头+包体解包,所以服务端send数据时也要按同样规则封包。
protocol为eof时,是按'\r\n'结束符封装数据包的,解包时同样也是按'\r\n'解包,所以服务端send数据时也要按'\r\n'结束符封装数据包。
protocol为空的话,不封装数据包。在应答式响应中可以使用,否则会出现粘包现象。(框架内部封装的service为该模式)
use AsyncHttp ;
//直接使用域名, get方式
$ http = new AsyncHttp ('http://groupco.com ' );
//设置2s超时
$ http ->setTimeout (2 );
//$http->setCookies(['token' => 'xxxx']);
$ res = (yield $ http ->get ('/ ' ));
//也可以通过ip:port方式
$ http = new AsyncHttp ('http://127.0.0.1:80 ' );
$ http ->setHost ('groupco.com ' );
$ res = (yield $ http ->get ('/user ' , ['id ' => 1 ]));
//使用https, post方式
$ http = new AsyncHttp ('https://groupco.com ' );
$ res = (yield $ http ->post ('/test ' , ['postId ' => 52 ]));
use AsyncRedis ;
//关闭连接池
AsyncRedis ::enablePool (false );
//开启连接池
AsyncRedis ::enablePool (true );
//设置超时时间
AsyncRedis ::setTimeout (2 );
yield AsyncRedis ::set ('foo ' , 'bar ' );
dump (yield AsyncRedis ::get ('foo ' ));
$ user = json_encode (['foo ' => 'bar ' ]);
yield AsyncRedis ::hSet ('user ' , 1 , $ user );
dump (yield AsyncRedis ::hGet ('user ' , 1 ));
use AsyncMysql ;
//第二个参数设为false将不会使用连接池中的资源,默认都会从连接池中取,配置连接池数量 => config/database.php
AsyncMysql ::query ($ sql , $ usePool = true );
//设置超时时间
AsyncMysql ::setTimeout (2 );
$ res = (yield AsyncMysql ::query ("INSERT INTO `user` (`id`, `mobile`, `password`) VALUES (NULL, '18768122222', '11111') " ));
//失败返回false
if ($ res ) {
$ result = $ res ->getResult ();
$ affectedRows = $ res ->getAffectedRows ();
$ id = $ res ->getInsertId ();
}
use AsyncMysql ;
public function test ()
{
try {
yield AsyncMysql ::begin ();
$ res = (yield $ this ->doTrans ());
if ($ res === false ) {
throw new \Exception ("need roll back " );
}
yield AsyncMysql ::commit ();
} catch (\Exception $ e ) {
yield AsyncMysql ::rollback ();
}
}
public function doTrans ()
{
$ res = (yield AsyncMysql ::query ("INSERT INTO `user` (`id`, `mobile`, `password`) VALUES (NULL, '187681343332', '11111') " ));
if ($ res ) {
$ result = $ res ->getResult ();
$ affectedRows = $ res ->getAffectedRows ();
$ id = $ res ->getInsertId ();
$ res = (yield AsyncMysql ::query ("SELECT * FROM `user` WHERE id = {$ id }" ));
$ res = (yield AsyncMysql ::query ("SELECT * FROM `user` " ));
$ res = (yield AsyncMysql ::query ("DELETE FROM `user` WHERE id = {$ id }" , false ));
}
yield true ;
}
use AsyncLog ;
yield AsyncLog ::info ('hello world ' );
yield AsyncLog ::debug ('test debug ' , ['foo ' => 'bar ' ]);
yield AsyncLog ::notice ('hello world ' ,[], 'group.com ' );
yield AsyncLog ::warning ('hello world ' );
yield AsyncLog ::error ('hello world ' );
yield AsyncLog ::critical ('hello world ' );
yield AsyncLog ::alert ('hello world ' );
yield AsyncLog ::emergency ('hello world ' );
use AsyncFile ;
$ content = (yield AsyncFile ::read (__ROOT__ ."runtime/test.txt " ));
$ res = (yield AsyncFile ::write (__ROOT__ ."runtime/test.txt " , "hello wordls! " ));
$ res = (yield AsyncFile ::write (__ROOT__ ."runtime/test.txt " , "hello wordls! " , FILE_APPEND ));
//如果在业务层不catch,框架层会捕捉,并返回一个500的server error响应。如果在开发环境会返回一个500的具体错误的trace响应。
try {
throw new \Exception ("Error Processing Request " , 1 );
//yield throwException(new \Exception("Error Processing Request", 1));
} catch (\Exception $ e ) {
echo $ e ->getMessage ();
}
$ start = microtime (true );
//设置2秒超时
service ("user " )->setTimeout (2 );
$ users = (yield service ("user " )->call ("User\User::getUsersCache " , ['ids ' => [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]]));
dump ($ users );
$ start = microtime (true );
//设置2秒超时
$ service = (yield service_center ("User " ));
$ service ->setTimeout (2 );
$ users = (yield $ service ->call ("User::getUsersCache " , ['ids ' => [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]]));
dump ($ users );
$ start = microtime (true );
//设置2秒超时
service ("user " )->setTimeout (2 );
$ callId1 = service ("user " )->addCall ("User\User::getUsersCache " , ['ids ' => [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]]);
$ callId2 = service ("user " )->addCall ("User\User::getUser " , ['id ' => 1 ]);
$ res = (yield service ("user " )->multiCall ());
dump ($ res [$ callId1 ]);
dump ($ res [$ callId2 ]);
dump (microtime (true ) - $ start );
SOA客户端,并行调用(使用服务中心,只能针对同一服务模块)
$ start = microtime (true );
//设置2秒超时
$ service = (yield service_center ("User " ));
$ service ->setTimeout (2 );
$ callId1 = $ service ->addCall ("User::getUsersCache " , ['ids ' => [1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 , 10 ]]);
$ callId2 = $ service ->addCall ("User::getUser " , ['id ' => 1 ]);
$ res = (yield $ service ->multiCall ());
dump ($ res [$ callId1 ]);
dump ($ res [$ callId2 ]);
dump (microtime (true ) - $ start );
//在框架层,调用servcie时,会抛出KernalEvent::SERVICE_CALL事件,你可以监听该事件,做数据上报处理,请以异步方式上报,例如:
<?php
namespace src\Web \Listeners ;
use Listener ;
use Event ;
class ServiceCallListener extends Listener
{
public function setMethod ()
{
return 'onServiceCall ' ;
}
public function onServiceCall (Event $ event )
{
$ data = $ event ->getProperty ();
$ cmd = $ data ['cmd ' ];
$ calltime = $ data ['calltime ' ];
//上报监控平台
//do something
}
}
自定义protocol。
异步大文件读取,目前只能读取<4M的文件