Skip to content

Commit

Permalink
支持优雅的 SSE 服务端推送功能 (#506)
Browse files Browse the repository at this point in the history
* 优化 MemoryStream 性能

* 优化代码

* 支持 SSE 服务端推送

* 增加测试用例和文档

* 修复测试

* 更新文档

* 更新文档

* 补充响应头

* 为 IEmitHandler::send() 增加 bool 返回值

* 更新文档
  • Loading branch information
Yurunsoft authored Apr 29, 2023
1 parent d2dc13f commit 693ea1c
Show file tree
Hide file tree
Showing 35 changed files with 723 additions and 166 deletions.
1 change: 1 addition & 0 deletions doc/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
* [响应(Response)](components/httpserver/view.md)
* [响应类](components/httpserver/response.md)
* [视图](components/httpserver/view.md)
* [SSE](components/httpserver/sse.md)
* [RESTful](components/httpserver/restful.md)
* [Session](components/httpserver/session.md)
* [JWT](components/httpserver/jwt.md)
Expand Down
4 changes: 3 additions & 1 deletion doc/base/version/2.0-2.1.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ v2.0 是一个非常成功的 LTS 版本,进行了底层重构,增加了强

### v2.1.45

**发布日期:** `2023-04-28`
**发布日期:** `2023-04-29`

* 异步执行新增 `@Defer``@DeferAsync` 注解 ([文档](https://doc.imiphp.com/v2.1/components/async/index.html))

* 支持优雅的 SSE 服务端推送功能 ([文档](https://doc.imiphp.com/v2.1/components/httpserver/sse.html))

### v2.1.43

**发布日期:** `2023-04-07`
Expand Down
77 changes: 77 additions & 0 deletions doc/components/httpserver/sse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# SSE

[TOC]

## SSE 介绍

SSE 是一种服务端主动向客户端(浏览器)推送数据的技术。

大名鼎鼎的 ChatGPT 的 API 接口就用了这项技术,实现逐字返回的打字机效果。

服务端向客户端发送一个响应头:`Content-Type: text/event-stream`

然后服务端按如下格式发送数据:

```text
: 注释
data: 数据\n
event: 事件\n
id: id值\n
retry: 重试时间间隔,单位:秒\n\n
```

> 其中每一行都是非必传项,每一行必须以 `\n` 结尾
> `\n\n` 代表一次推送的结束
## 环境支持

| 名称 | 是否支持 | 备注
| -|-|-
| Swoole || |
| Workerman || |
| php-fpm || `php -S` 暂时有 BUG,`php-fpm` 可用。 |
| RoadRunner || 暂时无法实现 |

## 使用示例

```php
use Imi\Server\Http\Message\Emitter\SseEmitter;
use Imi\Server\Http\Message\Emitter\SseMessageEvent;

/**
* SSE.
*
* @Action
*/
public function sse(): void
{
$this->response->setResponseBodyEmitter(new class() extends SseEmitter {
protected function task(): void
{
$handler = $this->getHandler();
// 模拟推送数据
foreach (range(1, 100) as $i)
{
// 推送数据
$handler->send((string) new SseMessageEvent((string) $i));
usleep(10000);
}
}
});
}
```

### SseMessageEvent

`Imi\Server\Http\Message\Emitter\SseMessageEvent` 类是 SSE 推送事件类,构造方法参数如下:

```php
public function __construct(
?string $data = null,
?string $event = null,
?string $id = null,
?int $retry = null,
?string $comment = null
)
```
17 changes: 17 additions & 0 deletions src/Components/fpm/src/Http/Message/FpmEmitHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Imi\Fpm\Http\Message;

use Imi\Server\Http\Message\Emitter\Handler\IEmitHandler;

class FpmEmitHandler implements IEmitHandler
{
public function send(string $data): bool
{
echo $data;

return true;
}
}
22 changes: 19 additions & 3 deletions src/Components/fpm/src/Http/Message/FpmResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

class FpmResponse extends Response
{
protected bool $emitterWritting = false;

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -66,10 +68,24 @@ private function sendHeaders(): void
*/
public function send(): self
{
$this->sendHeaders();
if ($this->isBodyWritable())
if ($this->responseBodyEmitter)
{
echo (string) $this->getBody();
if ($this->emitterWritting)
{
return $this;
}
$this->emitterWritting = true;
$this->responseBodyEmitter->init($this, new FpmEmitHandler());
$this->sendHeaders();
$this->responseBodyEmitter->send();
}
else
{
$this->sendHeaders();
if ($this->isBodyWritable())
{
echo (string) $this->getBody();
}
}

return $this;
Expand Down
14 changes: 14 additions & 0 deletions src/Components/fpm/tests/HttpServer/Tests/ResponseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,18 @@ public function testDownload(): void
$this->assertEquals(MediaType::IMAGE_PNG, $response->getHeaderLine('content-type'));
$this->assertEquals('attachment; filename*=UTF-8\'\'' . rawurlencode('测试.jpg'), $response->getHeaderLine('content-disposition'));
}

public function testSSE(): void
{
$http = new HttpRequest();
$response = $http->get($this->host . 'sse');
// php-src BUG: https://github.com/php/php-src/issues/11146
$this->assertStringStartsWith(MediaType::TEXT_EVENT_STREAM, $response->getHeaderLine('content-type'));
$result = '';
foreach (range(1, 100) as $i)
{
$result .= 'data: ' . $i . "\n\n";
}
$this->assertEquals($result, $response->body());
}
}
25 changes: 25 additions & 0 deletions src/Components/fpm/tests/Web/Controller/IndexController.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

use Imi\RequestContext;
use Imi\Server\Http\Controller\HttpController;
use Imi\Server\Http\Message\Emitter\SseEmitter;
use Imi\Server\Http\Message\Emitter\SseMessageEvent;
use Imi\Server\Http\Message\Proxy\ResponseProxy;
use Imi\Server\Http\Route\Annotation\Action;
use Imi\Server\Http\Route\Annotation\Controller;
Expand Down Expand Up @@ -363,4 +365,27 @@ public function duplicated1(): void
public function duplicated2(): void
{
}

/**
* SSE.
*
* @Action
*/
public function sse(): void
{
$this->response->setResponseBodyEmitter(new class() extends SseEmitter {
protected function task(): void
{
$handler = $this->getHandler();
foreach (range(1, 100) as $i)
{
if (!$handler->send((string) new SseMessageEvent((string) $i)))
{
throw new \RuntimeException('Send failed');
}
usleep(10000);
}
}
});
}
}
3 changes: 2 additions & 1 deletion src/Components/fpm/tests/Web/Middleware/PoweredBy.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
23 changes: 23 additions & 0 deletions src/Components/swoole/src/Http/Message/SwooleEmitHandler.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Imi\Swoole\Http\Message;

use Imi\Server\Http\Message\Emitter\Handler\IEmitHandler;
use Swoole\Http\Response;

class SwooleEmitHandler implements IEmitHandler
{
private Response $response;

public function __construct(Response $response)
{
$this->response = $response;
}

public function send(string $data): bool
{
return (bool) $this->response->write($data);
}
}
23 changes: 20 additions & 3 deletions src/Components/swoole/src/Http/Message/SwooleResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class SwooleResponse extends Response
*/
protected ?ISwooleServer $serverInstance = null;

protected bool $emitterWritting = false;

public function __construct(ISwooleServer $server, \Swoole\Http\Response $response)
{
$this->swooleResponse = $response;
Expand Down Expand Up @@ -90,10 +92,25 @@ private function sendHeaders(): void
*/
public function send(): self
{
$this->sendHeaders();
if ($this->isBodyWritable())
if ($this->responseBodyEmitter)
{
$this->swooleResponse->end($this->getBody());
if ($this->emitterWritting)
{
return $this;
}
$this->emitterWritting = true;
$this->responseBodyEmitter->init($this, new SwooleEmitHandler($this->swooleResponse));
$this->sendHeaders();
$this->responseBodyEmitter->send();
$this->swooleResponse->end();
}
else
{
$this->sendHeaders();
if ($this->isBodyWritable())
{
$this->swooleResponse->end($this->getBody());
}
}

return $this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
use Imi\Server\Http\Annotation\ExtractData;
use Imi\Server\Http\Annotation\RequestParam;
use Imi\Server\Http\Controller\HttpController;
use Imi\Server\Http\Message\Emitter\SseEmitter;
use Imi\Server\Http\Message\Emitter\SseMessageEvent;
use Imi\Server\Http\Message\Proxy\ResponseProxy;
use Imi\Server\Http\Route\Annotation\Action;
use Imi\Server\Http\Route\Annotation\Controller;
Expand Down Expand Up @@ -496,4 +498,27 @@ public function domain2(string $value): array
'value' => $value,
];
}

/**
* SSE.
*
* @Action
*/
public function sse(): void
{
$this->response->setResponseBodyEmitter(new class() extends SseEmitter {
protected function task(): void
{
$handler = $this->getHandler();
foreach (range(1, 100) as $i)
{
if (!$handler->send((string) new SseMessageEvent((string) $i)))
{
throw new \RuntimeException('Send failed');
}
usleep(10000);
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
13 changes: 13 additions & 0 deletions src/Components/swoole/tests/unit/HttpServer/Tests/ResponseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,17 @@ public function testDownload(): void
$this->assertEquals(MediaType::IMAGE_PNG, $response->getHeaderLine('content-type'));
$this->assertEquals('attachment; filename*=UTF-8\'\'' . rawurlencode('测试.jpg'), $response->getHeaderLine('content-disposition'));
}

public function testSSE(): void
{
$http = new HttpRequest();
$response = $http->get($this->host . 'sse');
$this->assertEquals(MediaType::TEXT_EVENT_STREAM, $response->getHeaderLine('content-type'));
$result = '';
foreach (range(1, 100) as $i)
{
$result .= 'data: ' . $i . "\n\n";
}
$this->assertEquals($result, $response->body());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PoweredBy implements MiddlewareInterface
*/
public function process(ServerRequestInterface $request, RequestHandlerInterface $handler): ResponseInterface
{
return $handler->handle($request)->withAddedHeader('X-Powered-By', 'imiphp.com');
// @phpstan-ignore-next-line
return $handler->handle($request)->setHeader('X-Powered-By', 'imiphp.com');
}
}
Loading

0 comments on commit 693ea1c

Please sign in to comment.