Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc error: code = Internal desc = transport: SendHeader called multiple times #51

Open
sczyh30 opened this issue Jan 17, 2023 · 3 comments
Labels
good first issue Good for newcomers kind/bug Something isn't working

Comments

@sczyh30
Copy link
Member

sczyh30 commented Jan 17, 2023

Issue Description

Type: bug report

Describe what happened

Error occurred when CRD changes received:

{"timestamp":"2023-01-17 19:44:31.31170","caller":"crd_watcher.go:171","logLevel":"INFO","msg":"controller.fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy OpenSergo CRD received","crd":{"kind":"ConcurrencyLimitStrategy","apiVersion":"fault-tolerance.opensergo.io/v1alpha1","metadata":{"name":"concurrency-limit-foo","namespace":"default","uid":"0b79c7c4-7f6a-4677-a636-2210a9608a13","resourceVersion":"7910317","generation":1,"creationTimestamp":"2023-01-17T11:29:55Z","labels":{"app":"foo-app"},"annotations":{"kubectl.kubernetes.io/last-applied-configuration":"{\"apiVersion\":\"fault-tolerance.opensergo.io/v1alpha1\",\"kind\":\"ConcurrencyLimitStrategy\",\"metadata\":{\"annotations\":{},\"labels\":{\"app\":\"foo-app\"},\"name\":\"concurrency-limit-foo\",\"namespace\":\"default\"},\"spec\":{\"limitMode\":\"Local\",\"maxConcurrency\":8}}\n"},"managedFields":[{"manager":"kubectl-client-side-apply","operation":"Update","apiVersion":"fault-tolerance.opensergo.io/v1alpha1","time":"2023-01-17T11:29:55Z","fieldsType":"FieldsV1","fieldsV1":{"f:metadata":{"f:annotations":{".":{},"f:kubectl.kubernetes.io/last-applied-configuration":{}},"f:labels":{".":{},"f:app":{}}},"f:spec":{".":{},"f:limitMode":{},"f:maxConcurrency":{}}}}]},"spec":{"maxConcurrency":8,"limitMode":"Local"},"status":{}},"crdNamespace":"default","crdName":"concurrency-limit-foo","kind":"fault-tolerance.opensergo.io/v1alpha1/ConcurrencyLimitStrategy"}
{"timestamp":"2023-01-17 19:44:31.31170","caller":"crd_watcher.go:197","logLevel":"ERROR","msg":"controller.fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy Failed to send rules","kind":"fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy","crdNamespace":"default","crdName":"rate-limit-foo","kind":"fault-tolerance.opensergo.io/v1alpha1/RateLimitStrategy"}
rpc error: code = Internal desc = transport: SendHeader called multiple times
{"timestamp":"2023-01-17 19:44:31.31170","caller":"crd_watcher.go:197","logLevel":"ERROR","msg":"controller.fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule Failed to send rules","kind":"fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule","crdNamespace":"default","crdName":"my-opensergo-rule-2","kind":"fault-tolerance.opensergo.io/v1alpha1/FaultToleranceRule"}
rpc error: code = Internal desc = transport: SendHeader called multiple times

Tell us your environment

OpenSergo control plane v0.1.0

Anything else we need to know?

NONE

@sczyh30
Copy link
Member Author

sczyh30 commented Jan 18, 2023

Client-side error (Java SDK):

2023-01-18 09:52:07.674 ERROR 13375 --- [ault-executor-1] openSergoLogger                          : Fatal error occurred on OpenSergo gRPC ClientObserver

io.grpc.StatusRuntimeException: INTERNAL: transport: SendHeader called multiple times
	at io.grpc.Status.asRuntimeException(Status.java:535) ~[grpc-api-1.42.1.jar:1.42.1]
	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:479) [grpc-stub-1.42.1.jar:1.42.1]
	at io.grpc.internal.DelayedClientCall$DelayedListener$3.run(DelayedClientCall.java:463) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.DelayedClientCall$DelayedListener.delayOrExecute(DelayedClientCall.java:427) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.DelayedClientCall$DelayedListener.onClose(DelayedClientCall.java:460) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:562) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:70) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:743) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:722) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) [grpc-core-1.42.1.jar:1.42.1]
	at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) [grpc-core-1.42.1.jar:1.42.1]

@sczyh30 sczyh30 added kind/bug Something isn't working good first issue Good for newcomers labels Jan 18, 2023
@anushkabishnoi
Copy link

Hello, this is my first experience with open source contributions. Would really appreciate it if you explained exactly what is needed to be done.

I would love to work on this issue.

Thanks in advance! :)

@mayooot
Copy link

mayooot commented Apr 1, 2024

@sczyh30 This seems to be caused by too fast a send rate.
I think it can be fixed by adding a send interval or sync.Mutex

control_plane.go:96

const sendInterval = 100 * time.Millisecond

func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
	if stream == nil {
		return nil
	}
	err := stream.SendMsg(&trpb.SubscribeResponse{
		Status:          status,
		Ack:             "",
		Namespace:       namespace,
		App:             app,
		Kind:            kind,
		DataWithVersion: dataWithVersion,
		ControlPlane:    c.protoDesc,
		ResponseId:      respId,
	})
	if err != nil {
		return err
	}
	time.Sleep(sendInterval)
	return nil
}

or

func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream, namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error {
	if stream == nil {
		return nil
	}
	
	c.mux.Lock()
	defer c.mux.Unlock()
	
	return stream.SendMsg(&trpb.SubscribeResponse{
		Status:          status,
		Ack:             "",
		Namespace:       namespace,
		App:             app,
		Kind:            kind,
		DataWithVersion: dataWithVersion,
		ControlPlane:    c.protoDesc,
		ResponseId:      respId,
	})
}

reference:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue Good for newcomers kind/bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants