Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Found some issue , when i using kafka consume, with kafka-coroutine context #10407

Open
gipyeong-lee opened this issue Oct 13, 2023 · 2 comments · May be fixed by #10405
Open

Found some issue , when i using kafka consume, with kafka-coroutine context #10407

gipyeong-lee opened this issue Oct 13, 2023 · 2 comments · May be fixed by #10405

Comments

@gipyeong-lee
Copy link

Prerequisites

What version of pinpoint are you using?

2.6.0-SNAPSHOT

Describe your problem**

Kafka Consuming - | trace failed | -> Kotlin Coroutine -> Webclient Request

What have you done?

  1. Service A : Begin request to Service A
  2. Service B : Producing Kafka
  3. Service C : Consume from kafka
  4. Service C : using kotlin-coroutine context, request webclient api to Service C
  5. Service D : (something to do)

Actually Result:

  1. User -> Service A -> Kafka -> Service B
  2. User -> Service C

Expected Result:

  1. User -> Service A -> Kafka -> Service B -> Service C

Screenshots

Actually Result
스크린샷 2023-10-13 오후 2 54 05

Expected Result
스크린샷 2023-10-13 오후 2 55 59

@emeroad
Copy link
Member

emeroad commented Oct 16, 2023

This PR is not thread safe.
TraceContext must be thread safe, so it must not have state.

https://github.com/pinpoint-apm/pinpoint/tree/master/agent-sdk
When delegating request processing using thread-pool, you must use Async SDK below.

@gipyeong-lee
Copy link
Author

gipyeong-lee commented Oct 16, 2023

@emeroad thank you. but, is that agent doesn't support about coroutine scope . when i applied that sdk. it doesn't work what i want.
In DispatchInterceptor, traceContext.currentTraceObject(); is always null.

@Override
    public void before(Object target, Object[] args) {
        if (isDebug) {
            logger.beforeInterceptor(target, args);
        }

        final Continuation continuation = getContinuation(args);
        if (continuation == null) {
            return;
        }

        if (isCompletedContinuation(continuation)) {
            return;
        }

        if (!checkSupportCoroutinesName(continuation)) {
            return;
        }

        final Trace trace = traceContext.currentTraceObject();
        if (trace == null) {
            return;
        }

        final SpanEventRecorder recorder = trace.traceBlockBegin();

        final AsyncContextAccessor asyncContextAccessor = getAsyncContextAccessor(args);
        if (asyncContextAccessor != null) {
            // make asynchronous trace-id
            final AsyncContext asyncContext = recorder.recordNextAsyncContext();
            asyncContextAccessor._$PINPOINT$_setAsyncContext(asyncContext);
        }
    }

and it make cut off tracing between consumer thread with kotlin-coroutine context.
btw, i implemented simple thread safety code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants