-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
ClientCall.java
290 lines (275 loc) · 13 KB
/
ClientCall.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/*
* Copyright 2014 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc;
import javax.annotation.Nullable;
/**
* An instance of a call to a remote method. A call will send zero or more
* request messages to the server and receive zero or more response messages back.
*
* <p>Instances are created
* by a {@link Channel} and used by stubs to invoke their remote behavior.
*
* <p>More advanced usages may consume this interface directly as opposed to using a stub. Common
* reasons for doing so would be the need to interact with flow-control or when acting as a generic
* proxy for arbitrary operations.
*
* <p>{@link #start} must be called prior to calling any other methods, with the exception of
* {@link #cancel}. Whereas {@link #cancel} must not be followed by any other methods,
* but can be called more than once, while only the first one has effect.
*
* <p>No generic method for determining message receipt or providing acknowledgement is provided.
* Applications are expected to utilize normal payload messages for such signals, as a response
* naturally acknowledges its request.
*
* <p>Methods are guaranteed to be non-blocking. Not thread-safe except for {@link #request}, which
* may be called from any thread.
*
* <p>There is no interaction between the states on the {@link Listener Listener} and {@link
* ClientCall}, i.e., if {@link Listener#onClose Listener.onClose()} is called, it has no bearing on
* the permitted operations on {@code ClientCall} (but it may impact whether they do anything).
*
* <p>There is a race between {@link #cancel} and the completion/failure of the RPC in other ways.
* If {@link #cancel} won the race, {@link Listener#onClose Listener.onClose()} is called with
* {@link Status#CANCELLED CANCELLED}. Otherwise, {@link Listener#onClose Listener.onClose()} is
* called with whatever status the RPC was finished. We ensure that at most one is called.
*
* <h3>Usage examples</h3>
* <h4>Simple Unary (1 request, 1 response) RPC</h4>
* <pre>
* call = channel.newCall(unaryMethod, callOptions);
* call.start(listener, headers);
* call.sendMessage(message);
* call.halfClose();
* call.request(1);
* // wait for listener.onMessage()
* </pre>
*
* <h4>Flow-control in Streaming RPC</h4>
*
* <p>The following snippet demonstrates a bi-directional streaming case, where the client sends
* requests produced by a fictional <code>makeNextRequest()</code> in a flow-control-compliant
* manner, and notifies gRPC library to receive additional response after one is consumed by
* a fictional <code>processResponse()</code>.
*
* <pre>
* call = channel.newCall(bidiStreamingMethod, callOptions);
* listener = new ClientCall.Listener<FooResponse>() {
* @Override
* public void onMessage(FooResponse response) {
* processResponse(response);
* // Notify gRPC to receive one additional response.
* call.request(1);
* }
*
* @Override
* public void onReady() {
* while (call.isReady()) {
* FooRequest nextRequest = makeNextRequest();
* if (nextRequest == null) { // No more requests to send
* call.halfClose();
* return;
* }
* call.sendMessage(nextRequest);
* }
* }
* }
* call.start(listener, headers);
* // Notify gRPC to receive one response. Without this line, onMessage() would never be called.
* call.request(1);
* </pre>
*
* <p>DO NOT MOCK: Use InProcessServerBuilder and make a test server instead.
*
* @param <ReqT> type of message sent one or more times to the server.
* @param <RespT> type of message received one or more times from the server.
*/
public abstract class ClientCall<ReqT, RespT> {
/**
* Callbacks for receiving metadata, response messages and completion status from the server.
*
* <p>Implementations are free to block for extended periods of time. Implementations are not
* required to be thread-safe, but they must not be thread-hostile. The caller is free to call
* an instance from multiple threads, but only one call simultaneously. A single thread may
* interleave calls to multiple instances, so implementations using ThreadLocals must be careful
* to avoid leaking inappropriate state (e.g., clearing the ThreadLocal before returning).
*
* @param <T> type of message received.
*/
public abstract static class Listener<T> {
/**
* The response headers have been received. Headers always precede messages.
*
* <p>Since {@link Metadata} is not thread-safe, the caller must not access (read or write)
* {@code headers} after this point.
*
* @param headers containing metadata sent by the server at the start of the response.
*/
public void onHeaders(Metadata headers) {}
/**
* A response message has been received. May be called zero or more times depending on whether
* the call response is empty, a single message or a stream of messages.
*
* @param message returned by the server
*/
public void onMessage(T message) {}
/**
* The ClientCall has been closed. Any additional calls to the {@code ClientCall} will not be
* processed by the server. No further receiving will occur and no further notifications will be
* made.
*
* <p>Since {@link Metadata} is not thread-safe, the caller must not access (read or write)
* {@code trailers} after this point.
*
* <p>If {@code status} returns false for {@link Status#isOk()}, then the call failed.
* An additional block of trailer metadata may be received at the end of the call from the
* server. An empty {@link Metadata} object is passed if no trailers are received.
*
* <p>This method should not throw. If this method throws, there is no way to be notified of the
* exception. Implementations should therefore be careful of exceptions which can accidentally
* leak resources.
*
* @param status the result of the remote call.
* @param trailers metadata provided at call completion.
*/
public void onClose(Status status, Metadata trailers) {}
/**
* This indicates that the ClientCall may now be capable of sending additional messages (via
* {@link #sendMessage}) without requiring excessive buffering internally. This event is
* just a suggestion and the application is free to ignore it, however doing so may
* result in excessive buffering within the ClientCall.
*
* <p>Because there is a processing delay to deliver this notification, it is possible for
* concurrent writes to cause {@code isReady() == false} within this callback. Handle "spurious"
* notifications by checking {@code isReady()}'s current value instead of assuming it is now
* {@code true}. If {@code isReady() == false} the normal expectations apply, so there would be
* <em>another</em> {@code onReady()} callback.
*
* <p>If the type of a call is either {@link MethodDescriptor.MethodType#UNARY} or
* {@link MethodDescriptor.MethodType#SERVER_STREAMING}, this callback may not be fired. Calls
* that send exactly one message should not await this callback.
*/
public void onReady() {}
}
/**
* Start a call, using {@code responseListener} for processing response messages.
*
* <p>It must be called prior to any other method on this class, except for {@link #cancel} which
* may be called at any time.
*
* <p>Since {@link Metadata} is not thread-safe, the caller must not access (read or write) {@code
* headers} after this point.
*
* @param responseListener receives response messages
* @param headers which can contain extra call metadata, e.g. authentication credentials.
* @throws IllegalStateException if a method (including {@code start()}) on this class has been
* called.
*/
public abstract void start(Listener<RespT> responseListener, Metadata headers);
/**
* Requests up to the given number of messages from the call to be delivered to
* {@link Listener#onMessage(Object)}. No additional messages will be delivered.
*
* <p>Message delivery is guaranteed to be sequential in the order received. In addition, the
* listener methods will not be accessed concurrently. While it is not guaranteed that the same
* thread will always be used, it is guaranteed that only a single thread will access the listener
* at a time.
*
* <p>If it is desired to bypass inbound flow control, a very large number of messages can be
* specified (e.g. {@link Integer#MAX_VALUE}).
*
* <p>If called multiple times, the number of messages able to delivered will be the sum of the
* calls.
*
* <p>This method is safe to call from multiple threads without external synchronization.
*
* @param numMessages the requested number of messages to be delivered to the listener. Must be
* non-negative.
* @throws IllegalStateException if call is not {@code start()}ed
* @throws IllegalArgumentException if numMessages is negative
*/
public abstract void request(int numMessages);
/**
* Prevent any further processing for this {@code ClientCall}. No further messages may be sent or
* will be received. The server is informed of cancellations, but may not stop processing the
* call. Cancellation is permitted even if previously {@link #halfClose}d. Cancelling an already
* {@code cancel()}ed {@code ClientCall} has no effect.
*
* <p>No other methods on this class can be called after this method has been called.
*
* <p>It is recommended that at least one of the arguments to be non-{@code null}, to provide
* useful debug information. Both argument being null may log warnings and result in suboptimal
* performance. Also note that the provided information will not be sent to the server.
*
* @param message if not {@code null}, will appear as the description of the CANCELLED status
* @param cause if not {@code null}, will appear as the cause of the CANCELLED status
*/
public abstract void cancel(@Nullable String message, @Nullable Throwable cause);
/**
* Close the call for request message sending. Incoming response messages are unaffected. This
* should be called when no more messages will be sent from the client.
*
* @throws IllegalStateException if call is already {@code halfClose()}d or {@link #cancel}ed
*/
public abstract void halfClose();
/**
* Send a request message to the server. May be called zero or more times depending on how many
* messages the server is willing to accept for the operation.
*
* @param message message to be sent to the server.
* @throws IllegalStateException if call is {@link #halfClose}d or explicitly {@link #cancel}ed
*/
public abstract void sendMessage(ReqT message);
/**
* If {@code true}, indicates that the call is capable of sending additional messages
* without requiring excessive buffering internally. This event is
* just a suggestion and the application is free to ignore it, however doing so may
* result in excessive buffering within the call.
*
* <p>If {@code false}, {@link Listener#onReady()} will be called after {@code isReady()}
* transitions to {@code true}.
*
* <p>If the type of the call is either {@link MethodDescriptor.MethodType#UNARY} or
* {@link MethodDescriptor.MethodType#SERVER_STREAMING}, this method may persistently return
* false. Calls that send exactly one message should not check this method.
*
* <p>This abstract class's implementation always returns {@code true}. Implementations generally
* override the method.
*/
public boolean isReady() {
return true;
}
/**
* Enables per-message compression, if an encoding type has been negotiated. If no message
* encoding has been negotiated, this is a no-op. By default per-message compression is enabled,
* but may not have any effect if compression is not enabled on the call.
*/
public void setMessageCompression(boolean enabled) {
// noop
}
/**
* Returns additional properties of the call. May only be called after {@link Listener#onHeaders}
* or {@link Listener#onClose}. If called prematurely, the implementation may throw {@code
* IllegalStateException} or return arbitrary {@code Attributes}.
*
* @return non-{@code null} attributes
* @throws IllegalStateException (optional) if called before permitted
*/
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2607")
@Grpc.TransportAttr
public Attributes getAttributes() {
return Attributes.EMPTY;
}
}