Skip to content

Commit

Permalink
Force reconnect when the socket errors
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwdan committed Dec 7, 2017
1 parent 24def09 commit 58c26f2
Showing 1 changed file with 12 additions and 8 deletions.
20 changes: 12 additions & 8 deletions src/Subject/WebSocketSubject.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Rx\Thruway\Subject;

use function EventLoop\getLoop;
use Rx\DisposableInterface;
use Rx\Observable;
use Rx\ObserverInterface;
Expand All @@ -17,7 +18,6 @@ final class WebSocketSubject extends Subject
{
private $ws;
private $sendSubject;
private $loop;
private $openObserver;
private $closeObserver;
private $serializer;
Expand All @@ -27,10 +27,9 @@ public function __construct(string $url, array $protocols = [], Subject $openObs
$this->openObserver = $openObserver ?? new Subject();
$this->closeObserver = $closeObserver ?? new Subject();
$this->serializer = new JsonSerializer();
$this->loop = \EventLoop\getLoop();
$this->sendSubject = new ReplaySubject();

$this->ws = new Client($url, false, $protocols, $this->loop);
$this->ws = new Client($url, false, $protocols, getLoop());
}

public function onNext($value)
Expand All @@ -47,17 +46,22 @@ protected function _subscribe(ObserverInterface $observer): DisposableInterface

// Now that the connection has been established, use the message subject directly.
$this->sendSubject = $ms;
$this->openObserver->onNext($ms);
})
->finally(function () {
// The connection has closed, so start buffering messages util it reconnects.
$this->sendSubject = new ReplaySubject();
$this->closeObserver->onNext(0);
})
->repeatWhen(function (Observable $a) {
return $a->do(function () {
echo "Reconnecting\n";
})->delay(1000);
})
->do([$this->openObserver, 'onNext'])
->finally(function () {
// The connection has closed, so start buffering messages util it reconnects.
$this->sendSubject = new ReplaySubject();
$this->closeObserver->onNext(0);
->retryWhen(function (Observable $a) {
return $a->do(function (\Throwable $e) {
echo "Error {$e->getMessage()}, Reconnecting\n";
})->delay(1000);
})
->mergeAll()
->map([$this->serializer, 'deserialize'])
Expand Down

0 comments on commit 58c26f2

Please sign in to comment.