Skip to content

Commit

Permalink
change swoole to 4.4.20
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Sep 3, 2020
1 parent 5844f54 commit 0aa3bc6
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 45 deletions.
6 changes: 3 additions & 3 deletions src/Application/Swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ public function onTask(SwooleHttpServer $server, int $task_id, int $source, stri
// 'data' => $data,
// ]);

$task = SwooleSerialize::unpack($data);
$task = unserialize($data);
$unit = Application::getContainer()->get($task->getClass());
$result = $unit->run($task->getPayload());
$server->finish(SwooleSerialize::pack($result, 1));
$result = call_user_func([$unit, $task->getMethod()], $task->getParameter());
$server->finish(serialize($result));

return $result;
}
Expand Down
4 changes: 2 additions & 2 deletions src/Database/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ public function reconnect()
return $this->connect();
}

public function listen($callback)
public function listen(callable $callback): void
{
$this->listen = $callback;
}
Expand All @@ -440,7 +440,7 @@ protected function connection()
return $this->pdo instanceof PDO ? $this->pdo : $this->connect();
}

public function action($actions)
public function action(callable $actions)
{
if (is_callable($actions)) {
$this->connection()->beginTransaction();
Expand Down
25 changes: 12 additions & 13 deletions src/Swoole/CronManager.php
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
<?php

declare(strict_types=1);
declare (strict_types = 1);

namespace Ypf\Swoole;

use Cron\CronExpression;
use Exception;
use ReflectionClass;
use Ypf\Application;
use Ypf\Controller\CronWorker;
use Cron\CronExpression;

class CronManager
{
Expand All @@ -29,14 +29,18 @@ public function process()
$className = $worker[0];
$classReflection = new ReflectionClass($className);
if (!$classReflection->isSubclassOf(CronWorker::class)) {
throw new Exception('cron worker mustbe extends \Ypf\Controller\\'.CronWorker::class);
throw new Exception('cron worker mustbe extends \Ypf\Controller\\' . CronWorker::class);
}
if (!isset($worker[1])) {
go(function () use ($classReflection) {
$classReflection->newInstance()->run();
});
} else {
$this->queue[] = [$classReflection->newInstance(), $worker[1]];
if(is_int($worker[1])) {
\swoole_timer_tick(1000 * $worker[1], [$classReflection->newInstance(), 'run']);
}else{
$this->queue[] = [$classReflection->newInstance(), $worker[1]];
}
}
}
\swoole_timer_tick(1000, [$this, 'tick']);
Expand All @@ -46,22 +50,17 @@ public function tick()
{
$queue = $this->queue;
foreach ($queue as $key => $val) {
$crontab = CronExpression::isValidExpression($val[1]);
if (!$crontab) {
$timeSecond = intval($val[1]);
} else {
$cron = CronExpression::factory($val[1]);
$nextRunTime = $cron->getNextRunDate()->getTimestamp();
$timeSecond = intval($nextRunTime - time());
}
$cron = CronExpression::factory($val[1]);
$nextRunTime = $cron->getNextRunDate()->getTimestamp();
$timeSecond = intval($nextRunTime - time());
if ($timeSecond < 1) {
continue;
}

\swoole_timer_after(1000 * $timeSecond, function () use ($key, $val) {
$this->queue[$key] = $val;
unset($this->job[$key]);
go(function () use ($val) {$val[0]->run(); });
go(function () use ($val) {$val[0]->run();});
});
unset($this->queue[$key]);
$this->job[$key] = $val;
Expand Down
24 changes: 11 additions & 13 deletions src/Swoole/Tasks/Server.php
Original file line number Diff line number Diff line change
@@ -1,30 +1,28 @@
<?php

declare(strict_types=1);
declare (strict_types = 1);

namespace Ypf\Swoole\Tasks;

use Swoole\Server as SwooleServer;
use Ypf\Application\Swoole as YAS;
use Swoole\Serialize;

class Server
{
//task worker中不能推送
/*
$ts = new \Ypf\Swoole\Tasks\Server;
$task = new \Ypf\Swoole\Tasks\Task(\App\Worker\Test::class, 'task1');
$ts->push($task);
*/
public function push(Task $task)
{
$payload = Serialize::pack($task, 1);
YAS::getServer()->task(
$payload,
-1,
function (SwooleServer $server, $source, $data) use ($task) {
call_user_func($task->getCallback(), Serialize::unpack($data));
}
);
$payload = serialize($task);
YAS::getServer()->task($payload, -1);
}

public function await(Task $task, float $timeout = 1)
{
$payload = Serialize::pack($task, 1);
$payload = serialize($task);

return YAS::getServer()->taskwait($payload, (float) $timeout);
}
Expand All @@ -35,7 +33,7 @@ public function parallel(array $tasks, float $timeout = 10): array
$results = [];
foreach ($tasks as $idx => $task) {
/* @var Task $task */
$normalized[] = Serialize::pack($task, 1);
$normalized[] = serialize($task);
$results[$idx] = false;
}
$result = YAS::getServer()->taskWaitMulti($normalized, (float) $timeout);
Expand Down
26 changes: 12 additions & 14 deletions src/Swoole/Tasks/Task.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,37 @@
class Task
{
private $class;
private $payload = null;
private $callback;
private $parameter = null;
private $method;

public function __construct(string $class, callable $callback = null)
public function __construct(string $class, string $method = null)
{
$this->class = $class;
$this->callback = $callback;
$this->method = $method;
}

public function getClass(): string
{
return $this->class;
}

public function withPayload($payload): void
public function withParameter($parameter): void
{
$this->payload = $payload;
$this->parameter = $parameter;
}

public function getPayload()
public function getParameter()
{
return $this->payload;
return $this->parameter;
}

public function withCallback(callable $callback): void
public function withMethod(string $method): void
{
$this->callback = $callback;
$this->method = $method;
}

public function getCallback(): callable
public function getMethod(): string
{
return $this->callback ?? function () {
// Nothing to do
};
return $this->method;
}
}

0 comments on commit 0aa3bc6

Please sign in to comment.