Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Firehose to gRPC sink job failure with error Uncaught exception in the SynchronizationContext. Panic! java.lang.IllegalStateException: Could not find policy 'pick_first'. #202

Closed
Shreyansh228 opened this issue Nov 16, 2022 · 1 comment

Comments

@Shreyansh228
Copy link
Contributor

Firehose to gRPC sink job failure with error Uncaught exception in the SynchronizationContext. Panic! java.lang.IllegalStateException: Could not find policy 'pick_first'.

Nov 14, 2022 5:57:37 PM io.grpc.internal.ManagedChannelImpl$2 uncaughtException
SEVERE: [Channel<1>: (127.0.0.1:6565)] Uncaught exception in the SynchronizationContext. Panic!
java.lang.IllegalStateException: Could not find policy 'pick_first'. Make sure its implementation is either registered to LoadBalancerRegistry 
        or included in META-INF/services/io.grpc.LoadBalancerProvider from your jar files.
        at io.grpc.internal.AutoConfiguredLoadBalancerFactory$AutoConfiguredLoadBalancer.<init>(AutoConfiguredLoadBalancerFactory.java:92)
        at io.grpc.internal.AutoConfiguredLoadBalancerFactory.newLoadBalancer(AutoConfiguredLoadBalancerFactory.java:63)
        at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:406)
        at io.grpc.internal.ManagedChannelImpl$RealChannel$2.run(ManagedChannelImpl.java:972)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.ManagedChannelImpl$RealChannel.newCall(ManagedChannelImpl.java:969)
        at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:911)
        at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63)
        at io.grpc.stub.MetadataUtils$HeaderAttachingClientInterceptor.interceptCall(MetadataUtils.java:74)
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
        at io.odpf.firehose.sink.grpc.client.GrpcClient.execute(GrpcClient.java:59)
        at io.odpf.firehose.sink.grpc.GrpcSink.execute(GrpcSink.java:38)
        at io.odpf.firehose.sink.AbstractSink.pushMessage(AbstractSink.java:46)
        at io.odpf.firehose.sinkdecorator.SinkDecorator.pushMessage(SinkDecorator.java:28)
        at io.odpf.firehose.sinkdecorator.SinkWithFailHandler.pushMessage(SinkWithFailHandler.java:34)
        at io.odpf.firehose.sinkdecorator.SinkDecorator.pushMessage(SinkDecorator.java:28)
        at io.odpf.firehose.sinkdecorator.SinkWithRetry.pushMessage(SinkWithRetry.java:54)
        at io.odpf.firehose.sinkdecorator.SinkDecorator.pushMessage(SinkDecorator.java:28)
        at io.odpf.firehose.sinkdecorator.SinkFinal.pushMessage(SinkFinal.java:28)
        at io.odpf.firehose.consumer.FirehoseSyncConsumer.process(FirehoseSyncConsumer.java:43)
        at io.odpf.firehose.launch.Main.lambda$multiThreadedConsumers$0(Main.java:65)
        at io.odpf.firehose.launch.Task.lambda$run$0(Task.java:49)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)

Expected Behavior

A firehose job need to interact with gRPC API for the response.

Steps to Reproduce

  1. Proto used to reproduce the scenario
syntax = "proto3";
package io.odpf.dagger.consumer;
option java_multiple_files = true;
option java_package = "io.odpf.dagger.consumer";
option java_outer_classname = "SampleGrpcServerProto";

service TestServer {
  rpc TestRpcMethod (TestGrpcRequest) returns (TestGrpcResponse) {}
}
message TestGrpcRequest {
  string field1 = 1;
  string field2 = 2;
}
message TestGrpcResponse {
  bool success = 1;
  repeated Error error = 2;
  string field3 = 3;
  string field4 = 4;
}
message Error {
  string code = 1;
  string entity = 2;
}
  1. Write a simple gRPC API which expects two fields and gives back response as is.

  2. Run a firehose job in local with below local properties which consumes data from local kafka and uses gRPC as sink.

java -jar build/libs/firehose-0.4.2.jar

KAFKA_RECORD_PARSER_MODE=message
SINK_TYPE=grpc
INPUT_SCHEMA_PROTO_CLASS=io.odpf.dagger.consumer.TestGrpcRequest
SCHEMA_REGISTRY_STENCIL_ENABLE=false
SOURCE_KAFKA_BROKERS=127.0.0.1:9092
SOURCE_KAFKA_TOPIC=test-grpc-request
SOURCE_KAFKA_CONSUMER_GROUP_ID=sample-grpc-group-id2
SINK_GRPC_SERVICE_HOST=127.0.0.1
SINK_GRPC_SERVICE_PORT=6565
SINK_GRPC_METHOD_URL=io.odpf.dagger.consumer.TestServer/TestRpcMethod
SINK_GRPC_RESPONSE_SCHEMA_PROTO_CLASS=io.odpf.dagger.consumer.TestGrpcResponse

The job is failing with the above mentioned error.

##Analysis:
In the current implementation, the gRPC client chooses default LoadBalancerProvider ('pick_first') and default NameResolverProvider(DNS). The implementation classes PickFirstLoadBalancerProvider and DnsNameResolverProvider respectively are missing.

We could able to solve the issue with the including implementation classes through service provider like creating META-INF/services folder and creating a file named io.grpc.LoadBalancerProvider with value as io.grpc.internal.PickFirstLoadBalancerProvider and create another file io.grpc.NameResolverProvider with value io.grpc.internal.DnsNameResolverProvider under it.

Also if we provide only one service provider io.grpc.LoadBalancerProvider and miss other, we are getting below error.

Failed to resolve name. status=Status{code=UNAVAILABLE, description=Failed to initialize xDS, 
cause=io.grpc.xds.XdsInitializationException: Cannot find bootstrap configuration
Environment variables searched:
- GRPC_XDS_BOOTSTRAP
- GRPC_XDS_BOOTSTRAP_CONFIG

Java System Properties searched:
- io.grpc.xds.bootstrap
- io.grpc.xds.bootstrapConfig
        at io.grpc.xds.BootstrapperImpl.bootstrap(BootstrapperImpl.java:101)
        at io.grpc.xds.SharedXdsClientPoolProvider.getOrCreate(SharedXdsClientPoolProvider.java:90)
        at io.grpc.xds.XdsNameResolver.start(XdsNameResolver.java:155)
        at io.grpc.internal.ManagedChannelImpl.exitIdleMode(ManagedChannelImpl.java:412)
        at io.grpc.internal.ManagedChannelImpl$RealChannel$2.run(ManagedChannelImpl.java:972)
        at io.grpc.SynchronizationContext.drain(SynchronizationContext.java:95)
        at io.grpc.SynchronizationContext.execute(SynchronizationContext.java:127)
        at io.grpc.internal.ManagedChannelImpl$RealChannel.newCall(ManagedChannelImpl.java:969)
        at io.grpc.internal.ManagedChannelImpl.newCall(ManagedChannelImpl.java:911)
        at io.grpc.internal.ForwardingManagedChannel.newCall(ForwardingManagedChannel.java:63)
        at io.grpc.stub.MetadataUtils$HeaderAttachingClientInterceptor.interceptCall(MetadataUtils.java:74)
        at io.grpc.ClientInterceptors$InterceptorChannel.newCall(ClientInterceptors.java:156)
        at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:142)
        at io.odpf.firehose.sink.grpc.client.GrpcClient.execute(GrpcClient.java:59)
        at io.odpf.firehose.sink.grpc.GrpcSink.execute(GrpcSink.java:38)
        at io.odpf.firehose.sink.AbstractSink.pushMessage(AbstractSink.java:46)
        at io.odpf.firehose.sinkdecorator.SinkDecorator.pushMessage(SinkDecorator.java:28)
        at io.odpf.firehose.sinkdecorator.SinkWithFailHandler.pushMessage(SinkWithFailHandler.java:34)
        at io.odpf.firehose.sinkdecorator.SinkDecorator.pushMessage(SinkDecorator.java:28)
        at io.odpf.firehose.sinkdecorator.SinkWithRetry.pushMessage(SinkWithRetry.java:54)
        at io.odpf.firehose.sinkdecorator.SinkDecorator.pushMessage(SinkDecorator.java:28)
        at io.odpf.firehose.sinkdecorator.SinkFinal.pushMessage(SinkFinal.java:28)
        at io.odpf.firehose.consumer.FirehoseSyncConsumer.process(FirehoseSyncConsumer.java:43)
        at io.odpf.firehose.launch.Main.lambda$multiThreadedConsumers$0(Main.java:65)
        at io.odpf.firehose.launch.Task.lambda$run$0(Task.java:49)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
}
@sumitaich1998
Copy link
Collaborator

@Shreyansh228 please use firehose 0.3.3 for the grpc sink ,

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants