Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move away from ClientMiddleware and ClientAuthHandler in pydeephaven #5489

Merged
merged 41 commits into from
May 16, 2024

Conversation

jcferretti
Copy link
Member

@jcferretti jcferretti commented May 14, 2024

Using a pydeephaven.Session from multiple threads and doing concurrent server operations generates an abort inside gRPC with a message similar to

E0415 01:35:08.933973681   11735 call_op_set.h:978]                    API misuse of type GRPC_CALL_ERROR_TOO_MANY_OPERATIONS observed
E0415 01:35:08.934022350   11735 call_op_set.h:980]                    assertion failed: false
Aborted (core dumped)

The program py/client/examples/mt_session.py added in this PR reproduces the issue very quickly, if py/client/session.py is modified to refresh the authentication token every 3 seconds; a crash is typically observed before 3 minutes.

A second failure mode is a segmentation fault insde a cython pyx call to get_token (gdb stack traces attached).

A third failure mode looks like a server disconnection that makes all the threads raise exceptions.

In investigating the issue, the crashes always happen around upcalls from C++ gRPC or C++ pyarrow code to python callbacks in the form of ClientMiddleware (interceptors we use to read and write headers for auth tokens) and ClientAuthHandler.

This PR moves the python client away from ClientMiddleware and ClientAuthHandler to instead manage authentication the way the current C++ client does it: by explicitly setting headers in RPC stub calls, and explicitly reading headers from stub call returns. This prevents any upcall from C++ to python, which prevents the issues and should also be more efficient.

This PR also adds more defensive coding around concurrent use (read/modify/write) of session state, in particular around auth token, and adds retries and warn logging around auth refresh failures instead of raising exceptions right away. Of note is during the investigation was established that raising an exception from ClientMiddleware or ClientAuthHandler code (which is being called from C++) will abort the program right away (as in failure mode 1, GRPC_CALL_ERROR_TOO_MANY_OPERATIONS).

With the changes in this PR, the was run for 3 hours without failures.

pydh_test_mt_failure_1.gdb.txt

pydh_test_mt_failure_2.gdb.txt

pydh_test_mt_failure_3.output.txt

@jcferretti jcferretti self-assigned this May 14, 2024
@jcferretti jcferretti requested a review from kosak May 15, 2024 00:10
@jcferretti jcferretti marked this pull request as ready for review May 15, 2024 00:39
@jcferretti jcferretti marked this pull request as draft May 15, 2024 00:40
@jcferretti jcferretti marked this pull request as ready for review May 15, 2024 04:07
@jcferretti
Copy link
Member Author

jcferretti commented May 15, 2024

I believe this explains the SEGFAULT we see with _get_token at the top of the stack. Thanks to Jianfeng for pointing out that the thread safety of pyarrow FlightClient.authenticate was suspect.

This is the cython code (pyx) implementation of flight_client.authenticate: https://github.com/apache/arrow/blob/6a28035c2b49b432dc63f5ee7524d76b4ed2d762/python/pyarrow/_flight.pyx#L1440

   1440     def authenticate(self, auth_handler, options: FlightCallOptions = None):
   1441         """Authenticate to the server.
   1442
   1443         Parameters
   1444         ----------
   1445         auth_handler : ClientAuthHandler
   1446             The authentication mechanism to use.
   1447         options : FlightCallOptions
   1448             Options for this call.
   1449         """
   1450         cdef:
   1451             unique_ptr[CClientAuthHandler] handler
   1452             CFlightCallOptions* c_options = FlightCallOptions.unwrap(options)
   1453
   1454         if not isinstance(auth_handler, ClientAuthHandler):
   1455             raise TypeError(
   1456                 "FlightClient.authenticate takes a ClientAuthHandler, "
   1457                 "not '{}'".format(type(auth_handler)))
   1458         handler.reset((<ClientAuthHandler> auth_handler).to_handler())
   1459         with nogil:
   1460             check_flight_status(
   1461                 self.client.get().Authenticate(deref(c_options),
   1462                                                move(handler)))

The call to Authenticate on line 1461 is to grpc_client.cc line 860:

    860   Status Authenticate(const FlightCallOptions& options,
    861                       std::unique_ptr<ClientAuthHandler> auth_handler) override {
    862     auth_handler_ = std::move(auth_handler);
    863     ClientRpc rpc(options);
    864     return AuthenticateInternal(rpc);
    865   }

Many calls in the same file use the auth_handler_ member that is being assigned on line 862 above, eg, DoPut:

    997   Status DoPut(const FlightCallOptions& options,
    998                std::unique_ptr<internal::ClientDataStream>* out) override {
    999     using GrpcStream = ::grpc::ClientReaderWriter<pb::FlightData, pb::PutResult>;
   1000
   1001     auto rpc = std::make_shared<ClientRpc>(options);
   1002     RETURN_NOT_OK(rpc->SetToken(auth_handler_.get()));
   1003     std::shared_ptr<GrpcStream> stream = stub_->DoPut(&rpc->context);
   1004     *out = std::make_unique<GrpcClientPutStream>(std::move(rpc), std::move(stream));
   1005     return Status::OK();
   1006   }

SetToken on the same file, line 86:

     86   /// \brief Add an auth token via an auth handler
     87   Status SetToken(ClientAuthHandler* auth_handler) {
     88     if (auth_handler) {
     89       std::string token;
     90       RETURN_NOT_OK(auth_handler->GetToken(&token));
     91       context.AddMetadata(kGrpcAuthHeader, token);
     92     }
     93     return Status::OK();
     94   }

None of this has any locking or protection against races to write/read. In particular a second call to python's flight_client.authenticate is going to invoke operator= on auth_handler_, which will reduce the shared count for the previous wrapped object, taking it to zero and triggering delete on the pointed to object. It is perfectly possible, and likely it is exactly what is happening in the crash, that while a call to Authenticate has assigned to the shared_ptr resulting in deleting the old value, a call to DoPut is trying to use the pointer to do SetToken (which calls GetToken on auth_handler_.get(), which in turn is going to invoke our python side get_token in the ClientAuthHandler, which is where we have been seeing a SEGFAULT).
The issue is doing the .get() call on the shared_ptr, which will take the naked pointer out of the shared_ptr disregarding any safeties. After .get() and before the value is used, another thread may have deleted the pointed to object.

@jcferretti jcferretti requested a review from niloc132 May 15, 2024 14:19
@jcferretti jcferretti added this to the 3. May 2024 milestone May 15, 2024
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Show resolved Hide resolved
py/client/pydeephaven/_console_service.py Outdated Show resolved Hide resolved
py/client/pydeephaven/_table_service.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
py/client/pydeephaven/session.py Outdated Show resolved Hide resolved
@@ -26,6 +26,8 @@ def tearDownClass(cls) -> None:
os.remove(BaseTestCase.csv_file)

def setUp(self) -> None:
# For netty server and psk, change auth_token to what the server printed.
# self.session = Session(port = 8080, auth_type = 'io.deephaven.authentication.psk.PskAuthenticationHandler', auth_token = 'safw7c4nzegp')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what this is, but is this a secret and/or stale token that is being checked into the codebase? Just double-checking

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

psk is a different method to authenticate, and netty is a different gRPC backend engine that the server can be run with. I had a need to confirm that the bug would also manifest in a netty server with psk authentication, so I had to figure out what the F*** to change to get that to work. That comment is me trying to ensure if I need to do that again I don't have to research it again.

console_pb2.BindTableToVariableRequest(console_id=self.console_id,
table_id=table.ticket,
variable_name=variable_name),
metadata=self.session.grpc_metadata)
self.session.update_metadata(call.initial_metadata())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Style comment: maybe no action for now, but.....

it's a little bit unfortunate that we have to remember to call self.session.update_metadata(call.initial_metadata()) after every one of our grpc calls. It might be nice to wrap that in a helper method... or a method that forwards arguments to grpc and then does the metadata update for us.... I think that's what I did in C++

Anyway, the suggestion for now is to a least think about how we could make it nicer and a little more foolproof the next time we refactor this code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it; added wrappers.

@jcferretti jcferretti requested a review from kosak May 15, 2024 22:56
skew = random()
# Backoff schedule for retries after consecutive failures to refresh auth token
self._refresh_backoff = [ skew + 0.1, skew + 1, skew + 10 ]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

removing trailing spaces on a newline

return self._input_table_service

@property
def plugin_object_service(self) -> PluginObjService:
def plugin_object_service(self) -> PluginObjService:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def plugin_object_service(self) -> PluginObjService:
def plugin_object_service(self) -> PluginObjService:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

niloc132
niloc132 previously approved these changes May 16, 2024
jmao-denver
jmao-denver previously approved these changes May 16, 2024
@jcferretti jcferretti requested a review from kosak May 16, 2024 21:31
kosak
kosak previously approved these changes May 16, 2024
Comment on lines 240 to 263
def update_metadata(self, metadata: Iterable[Tuple[str, Union[str, bytes]]]):
for header_tuple in metadata:
if header_tuple[0] == "authorization":
v = header_tuple[1]
self._auth_header_value = v if isinstance(v, bytes) else v.encode('ascii')
break

def wrap_rpc(self, stub_call, *args, **kwargs):
if 'metadata' in kwargs:
raise DHError('Internal error: "metadata" in kwargs not supported in wrap_rpc.')
kwargs["metadata"] = self.grpc_metadata
# We use a future to get a chance to process initial metadata before the call
# is completed
future = stub_call.future(*args, **kwargs)
self.update_metadata(future.initial_metadata())
# Now block until we get the result (or an exception)
return future.result()

def wrap_bidi_rpc(self, stub_call, *args, **kwargs):
if 'metadata' in kwargs:
raise DHError('Internal error: "metadata" in kwargs not supported in wrap_bidi_rpc.')
kwargs["metadata"] = self.grpc_metadata
response = stub_call(*args, **kwargs)
self.update_metadata(response.initial_metadata())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing type hints and pydocs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


def get_token(self):
return self._token
def trace(who):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. If this is not intended to be public, it should start with _.
  2. If this is intended to be public, it needs pydocs.
  3. Missing type hints.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@jcferretti jcferretti dismissed stale reviews from kosak, jmao-denver, and niloc132 via 82f1049 May 16, 2024 22:10
@jcferretti jcferretti enabled auto-merge (squash) May 16, 2024 22:24
@jcferretti jcferretti merged commit af861b7 into deephaven:main May 16, 2024
15 checks passed
@jcferretti jcferretti deleted the cfs-pyclient-auth branch May 16, 2024 22:52
@github-actions github-actions bot locked and limited conversation to collaborators May 16, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants