-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathDnodeSyncClient.php
206 lines (182 loc) · 5.96 KB
/
DnodeSyncClient.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
<?php
/**
* Dnode Synchronous Client for PHP
*
* @copyright 2012 erasys GmbH - see ./LICENSE.txt for more info
*/
namespace DnodeSyncClient;
/**
* Base class for all exceptions thrown by this library.
*/
class Exception extends \RuntimeException {}
/**
* Thrown in case of network error
*/
class IOException extends Exception {}
/**
* Thrown in case remote response can't be parsed
*/
class ProtocolException extends Exception {}
/**
* Thrown if client of this library calls method not declared by remote
*/
class MethodNotExistsException extends Exception {}
/**
* Thrown when calling method on closed connection
*/
class ConnectionClosedException extends Exception {}
/**
* Main Dnode client class
*
* This is the only class you should instantiate directly from your code.
*/
class Dnode {
/**
* Creates new dnode connection to given host and port
*
* @param string $host
* @param string $port
* @param float $connectTimeout Number of seconds until `connect()` should timeout.
* Default: `ini_get("default_socket_timeout")`
*
* @return \DnodeSyncClient\Connection
*
* @throws \DnodeSyncClient\IOException
* @throws \DnodeSyncClient\ProtocolException
*/
public function connect($host, $port, $connectTimeout = false) {
$address = "tcp://$host:$port";
$stream = $connectTimeout ?
@\stream_socket_client($address, $error, $errorMessage, $connectTimeout) :
@\stream_socket_client($address, $error, $errorMessage);
if (!$stream) {
throw new IOException("Can't create socket to $address. Error: $error $errorMessage");
}
return new Connection($stream);
}
}
/**
* Connection to dnode service
*/
class Connection {
private $stream;
private $methods;
private $callbackNumber = 41; // lets start from some higher number to make
// sure that remote is using our callback numbers
private $closed = false;
/**
* Initializes connect on given stream
*
* Do not use directly if you know host and port of dnode service, rather use
* \DnodeSyncClient\Dnode::connect
*
* @param resource $stream
*
* @throws \DnodeSyncClient\IOException
* @throws \DnodeSyncClient\ProtocolException
*/
public function __construct($stream) {
$this->stream = $stream;
// write our (empty) methods description
fputs($this->stream, json_encode(array("method" => "methods")) ."\n");
// read remote methods
$line = fgets($this->stream);
if ($line === false) {
throw new IOException("Can't read method description from remote");
}
$line = trim($line);
$methods = json_decode($line, true);
if ($methods === null) {
throw new ProtocolException("First line is not valid json: $line");
}
if (!isset($methods['method'])) {
throw new ProtocolException("First line does not have method field: $line");
}
if ($methods['method'] !== 'methods') {
throw new ProtocolException("First line method must be \"methods\": $line");
}
if (!isset($methods['arguments'])) {
throw new ProtocolException("Methods arguments missing: $line");
}
if (count($methods['arguments']) != 1) {
throw new ProtocolException("Methods must have single argument: $line");
}
$this->methods = array_keys($methods['arguments'][0]);
if (count($this->methods) == 0) {
throw new ProtocolException("Remote is expected to have some methods: $line");
}
}
/**
* Calls method on this dnode connection
*
* @param string $method Method name
* @param array $arguments Arguments
*
* @return array Response arguments as array.
*
* @throws \DnodeSyncClient\MethodNotExistsException Thrown if remote does not declare called method.
* @throws \DnodeSyncClient\IOException Thrown in case of network error
* @throws \DnodeSyncClient\ProtocolException Thrown if remote answer does not have supported format.
*/
public function call($method, array $arguments = array()) {
if ($this->closed) {
throw new ConnectionClosedException();
}
if (!in_array($method, $this->methods)) {
throw new MethodNotExistsException("Method $method does not exists on remote.");
}
$callbacks = new \stdclass();
$callbacks->{++$this->callbackNumber} = array(count($arguments));
fwrite($this->stream, json_encode(array(
'method' => $method,
'arguments' => $arguments,
'callbacks' => $callbacks,
)) . "\n");
// this will block the stream until response is read
$line = fgets($this->stream);
if ($line === false) {
$this->close();
throw new IOException("Can't read response from remote");
}
$line = trim($line);
$message = json_decode($line, true);
if ($message === null) {
throw new ProtocolException("Response is not valid json: $line");
}
if (!isset($message['method'])) {
throw new ProtocolException("Response does not have method field: $line");
}
if ($message['method'] !== $this->callbackNumber) {
throw new ProtocolException("Response does not call expected callback, expected "
. $this->callbackNumber . ", got $line");
}
if (isset($message['links']) && $message['links']) {
throw new ProtocolException("Response contains links, we do not support that: $line");
}
if (isset($message['callbacks']) && $message['callbacks']) {
throw new ProtocolException("Response contains callbacks, we do not support that: $line");
}
if (!array_key_exists('arguments', $message)) {
return array();
}
if (!is_array($message['arguments'])) {
throw new ProtocolException("Response arguments must be array: $line");
}
return $message['arguments'];
}
/**
* Lists methods available by remote dnode service
*
* @return array
*/
public function getAvailableMethods() {
return $this->methods;
}
/**
* Closes this connection
*/
public function close() {
fclose($this->stream);
$this->closed = true;
}
}