Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Traceid and spanid wrong when using Kotlin Reactor Coroutines #174

Open
akoufa opened this issue Feb 21, 2023 · 67 comments
Open

Traceid and spanid wrong when using Kotlin Reactor Coroutines #174

akoufa opened this issue Feb 21, 2023 · 67 comments
Labels
bug A general bug

Comments

@akoufa
Copy link

akoufa commented Feb 21, 2023

Hello. When using Kotlin Reactor Coroutines like the following:

  return mono {
            val text = "Greetings from Spring Boot!"
            logger.error(text)
            text
        }

then the traceId and spanId that are being included in the log statement above look like the following:

ERROR [00000000000000000000000000000000,0000000000000000] 80834 --- [atcher-worker-1] com.example.demo.HelloController         : Greetings from Spring Boot!

The traceId and spanId in the logs are always 0 (zeros) .

This happens only for log statements inside such mono {} blocks. Other log statements outside of these blocks work as expected.

I have created a sample reproducer project here:

https://github.com/akoufa/springbootissues

Just start the Spring Boot app and use the following curl request:

curl --location 'localhost:8080/'
@marcingrzejszczak
Copy link
Contributor

Since I'm not an expert in Kotlin - we have a test here that proves that this is working (https://github.com/micrometer-metrics/micrometer/blob/main/micrometer-core/src/test/kotlin/io/micrometer/core/instrument/kotlin/AsContextElementKtTests.kt). How does what we do differ from what you do?

@akoufa
Copy link
Author

akoufa commented Mar 1, 2023

@marcingrzejszczak To be honest I don't know exactly. I don't use Observation directly but I am using the Reactor Coroutine library like the following:

return mono {
            val text = "Greetings from Spring Boot!"
            logger.error(text)
            text
        }

This mono {...} builder allows using coroutines and suspending functions inside the { } and evaluates to a Mono<T> . But the log statements have the issue I mentioned above. You can checkout the reproducer code I have shared it's pretty plain and simple.

@marcingrzejszczak marcingrzejszczak added bug A general bug and removed waiting for feedback labels Mar 2, 2023
@akoufa
Copy link
Author

akoufa commented Mar 6, 2023

@marcingrzejszczak @shakuzen
The behaviour is very strange. I did some tests and I found the following:

 @GetMapping("/")
    suspend fun index(): String {
        logger.error("Greetings")
        mono {
            "test"
        }.awaitSingle()
        logger.error("Spring Boot!")
        return "Spring Boot"
    }

The first statement logs the trace id and span id as expected. The second statement mostly has zeroed values.

image

Of course the difference is that we have thread switch after the mono { ... } statement from ctor-http-nio-4 to atcher-worker-1

@vooft
Copy link

vooft commented Mar 20, 2023

The problem is related to setting the span to null on the scope reset when there is no other span observation to fall back to, and how OtelTracer handles this.

Basically, when coroutines being finished, it does the following:

  1. Internally Kotlin invokes io.micrometer.core.instrument.kotlin.KotlinObservationContextElement#restoreThreadContext
  2. If there was no prior Observation set, it will invoke io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor#reset.
  3. This eventually gets into io.micrometer.tracing.handler.TracingObservationHandler#onScopeReset, where it ignores any passed context
  4. It calls the current tracer's method withSpan and passes null
  5. If the current tracer is OtelTracer and the span passed into io.micrometer.tracing.otel.bridge.OtelTracer#withSpan is null, then it initializes the current Otel context with the invalid span, which has traceId and spanId set to zeroes (as a side-effect in the OtelSpanInScope constructor)

In the end we get the current Otel context being set to the invalid span, even if before it was set to null, which results to a polluted ThreadLocal variable.

Somewhere in io.micrometer.tracing.otel.bridge.OtelSpanInScope#storedContext, perhaps, some special handling should be added for an invalid span, so it gets reset to null, instead of the invalid value.

I'm not entirely sure what is the right approach here, as I don't fully understand the difference between reset and close.

@marcingrzejszczak
Copy link
Contributor

AFAIR the problem in OTel is that you can't remove a span from thread local - you can only set it to invalid. OpenTelemetry will never return a null span when there's no current span. Look at the Span interface from opentelemetry

public interface Span extends ImplicitContextKeyed {

  /**
   * Returns the {@link Span} from the current {@link Context}, falling back to a default, no-op
   * {@link Span} if there is no span in the current context.
   */
  static Span current() {
    Span span = Context.current().get(SpanContextKey.KEY);
    return span == null ? getInvalid() : span;
  }

  /**
   * Returns an invalid {@link Span}. An invalid {@link Span} is used when tracing is disabled,
   * usually because there is no OpenTelemetry SDK installed.
   */
  static Span getInvalid() {
    return PropagatedSpan.INVALID;
  }

So if we null the current span by calling Tracing's tracer.withSpan(null) the best we can do for OTel is to reset this to the default invalid span.

@akoufa
Copy link
Author

akoufa commented Mar 21, 2023

@marcingrzejszczak @vooft What does this mean that the library cannot be used in conjunction with Kotlin Coroutines + Reactor?

@marcingrzejszczak
Copy link
Contributor

I didn't say that. I'm just replying to @vooft comment

@msosa
Copy link

msosa commented Apr 20, 2023

@marcingrzejszczak I am trying to use observationRegistry.asContextElement() but it seems to be losing my logging.. a small snippet of how I am using it

logger.info("has context")
CoroutineScope(Dispatchers.Default).launch(errorHandler) {
	withContext(observationRegistry.asContextElement()) {
		logger.info("has no context")
	}
}
logger.info("also has no context")

I am using spring boot and injecting observationRegistry, not sure if that is proper usage.

Also I can get the "has no context" logging properly if I change withContext(observationRegistry.asContextElement()) { with withContext(MDCContext(context)) { where val context = MDC.getCopyOfContextMap() is set before CoroutineScope. However that last logger.info always loses context

@qavid
Copy link

qavid commented Apr 20, 2023

Hi @marcingrzejszczak. We have the same problem as @akoufa but with coroutines router (we are using Brave insead of OTel).
If we use standard router and provide coroutine context as suggested, both logs (before&after delay) contains traceId&spanId.

@Configuration(proxyBeanMethods = false)
class RoutesConfiguration(@Autowired private val observationRegistry: ObservationRegistry) {
	private val logger = KotlinLogging.logger {}

	// traceId&spanId are logged only before delay
	@Bean
	fun notWorkingCoRouter() = coRouter {
		GET("/log1") {
			logger.info("before")
			delay(500)
			logger.info { "after" }
			ServerResponse.ok().buildAndAwait()
		}
	}

	// traceId&spanId are logged before and after delay
	@Bean
	fun workingRouter() = router {
		GET("/log2") {
			mono(observationRegistry.asContextElement()) {
				logger.info("before")
				delay(500)
				logger.info { "after" }
				ServerResponse.ok().buildAndAwait()
			}
		}
	}
}

Do you think this issue can be fixed or we should use some kind of workaround?

@tomfi
Copy link

tomfi commented May 16, 2023

Is there any workaround that does not include injecting the observationRegistry to all controllers?

And is there any plan to fix this issue?

@marcingrzejszczak
Copy link
Contributor

I don't have enough Kotlin knowledge. If there's anyone that can help us out with this then please be my guest and comment

@tomfi
Copy link

tomfi commented May 16, 2023

@marcingrzejszczak in general the problem is that when a coroutine opens, the context might be lost in the child coroutines.

for example:

@RestController
class FooRestController {

  @GetMapping
  suspend fun foo(): Foo {
    logger.info { "before" }
    delay(500) // kotlin decide to change to coroutine#2
    logger.info { "after" } // we lose the observation context
  }

  @GetMapping
  fun foo(): Mono<Foo> {
    return mono(observationRegistry.asContextElement()) {
        logger.info { "before" }
        delay(500) // kotlin decide to change to coroutine#2
        logger.info { "after" } // we do not loss the context, because the parent coroutine was started with the observation context and its being propogated
    }
  }
}

If it was possible to somehow add the observationRegistry.asContextElement() into the place that initialize the coroutine it should be propagated downstream.

Another option is somehow to hook between the MDC context, Reactor context, and Coroutine context.
An example of how it has been done for MDC can be found with this official library: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-slf4j/

with this library, another working example of propagating properly would be:

@RestController
class FooRestController {

  @GetMapping
  suspend fun foo(): Foo {
    logger.info { "before" }
    delay(500) // kotlin decide to change to coroutine#2
    logger.info { "after" } // we lose the observation context
  }

  @GetMapping
  suspend fun foo(): Foo {
    return withContext(MDCContext()) {
        logger.info { "before" }
        delay(500) // kotlin decide to change to coroutine#2
        logger.info { "after" } // we do not loss the context, because kotlinx-coroutines-slf4j handles the binding between MDC and the coroutine context
    }
  }
}

i am not sure where exactly it should be fixed, but in theory if we had an entrypoint to add the observationContext as a coroutine context into the place that spring initialize the coroutine scope for the controller's suspend function.. it should work as expected.

@calebdelnay
Copy link

calebdelnay commented May 16, 2023

FYI, on the Spring side, for Controllers, the coroutine entry point is provided by InvocableHandlerMethod class.

For Web MVC:

https://github.com/spring-projects/spring-framework/blob/e5ee369e70e0a8ca04437f502649b11171a1c741/spring-web/src/main/java/org/springframework/web/method/support/InvocableHandlerMethod.java#L201

Its doInvoke(...) checks if the function is a Kotlin suspend function and, if so, ultimately calls CoroutinesUtils.invokeSuspendingFunction(...)

Notably, it does not give a CoroutineContext object, so the coroutine is started with no context.

For WebFlux:

https://github.com/spring-projects/spring-framework/blob/c3e28728ce5c11655831cbc20f828e956e1c42a1/spring-webflux/src/main/java/org/springframework/web/reactive/result/method/InvocableHandlerMethod.java#L133

Similar to Web MVC, it calls CoroutinesUtils.invokeSuspendingFunction(...) without giving a CoroutineContext.

Maybe Spring should take whatever is in Reactor Context at this point and copy it into Coroutine Context?

EDIT: I don't have a suggested workaround, a fix probably needs to happen in Spring itself.

EDIT 2: There are two InvocableHandlerMethod implementations, one for Web MVC and one of WebFlux.

@marcingrzejszczak
Copy link
Contributor

marcingrzejszczak commented May 17, 2023

I think we need help from Sebastien. Let us summon our Kotlin Guru @sdeleuze!

@tomfi
Copy link

tomfi commented May 22, 2023

@marcingrzejszczak @sdeleuze

If it helps, i played with spring's code and the only thing that worked for me is getting the observationContext and using it in org.springframework.core.CoroutinesUtils#invokeSuspendingFunction

CoroutineContext combinedContext;
if (observationRegistry != null) {
    combinedContext = KCoroutineUtils.asCoroutineContext(observationRegistry)
        .plus(context);
} else {
    combinedContext = context;
}

Mono<Object> mono = MonoKt.mono(combinedContext, (scope, continuation) ->
                KCallables.callSuspend(function, getSuspendedFunctionArgs(method, target, args), continuation))
        .filter(result -> !Objects.equals(result, Unit.INSTANCE))
        .onErrorMap(InvocationTargetException.class, InvocationTargetException::getTargetException);

KCoroutineUtils

object KCoroutineUtils {
    @JvmStatic
    fun asCoroutineContext(observationRegistry: ObservationRegistry): CoroutineContext {
        return observationRegistry.asContextElement()
    }
}

For the POC i just took the observationContext from the applicationContext in org.springframework.web.reactive.result.method.InvocableHandlerMethod#invoke

if (isSuspendingFunction) {
    ObservationRegistry observationRegistry = exchange.getApplicationContext().getBean(ObservationRegistry.class);
    value = CoroutinesUtils.invokeSuspendingFunction(method, getBean(), observationRegistry, args);
}

If we find a way to inject the observationRegistry at this point, everything will work as intended.

I also tried to use captureContext and i saw there is an observation inside the reactor context but i could not make it propagate properly to the coroutines

omprakashsridharan added a commit to omprakashsridharan/spring-boot-playground that referenced this issue May 29, 2023
@akoufa
Copy link
Author

akoufa commented Jun 6, 2023

@marcingrzejszczak Do we have any update on this or a suggestion for a temporary workaround on our side? Thank you

@sdeleuze
Copy link

Be aware there are a bunch of Coroutines context issues still open on Spring Framework, and this area needs refinements to support all the use cases. See for example spring-projects/spring-framework#26977, spring-projects/spring-framework#27522 and spring-projects/spring-framework#27010.

It is expected to have seemless context propagation when doing Reactor to Coroutines conversion (this is handled on Coroutines Reactor integration side) but when creating a new Coroutines context and when an existing should be reused likely requires some refinements.

We probably need to plan some related discussion with me and @marcingrzejszczak and/or @bclozel to discuss the design of such refined integration.

@tomfi
Copy link

tomfi commented Jul 19, 2023

Any updates regarding this issue?

@marcingrzejszczak
Copy link
Contributor

We're updating the issue when there are updates. We need to discuss things with @sdeleuze but there were other priorities so far

@sdeleuze
Copy link

Hey, I am starting working on that and may ask guidance to Kotin team on the best way to fix that. This kind of issue seems to impact both Micrometer and Spring as soon as kotlinx.coroutines.reactor.mono is involved.

@sdeleuze
Copy link

So the challenge was to pass the CoroutineContext despite the fact that kotlinx.coroutines.reactor.mono is not a suspending function (it cannot capture the context and requires to pass the context explicitly, as documented).

I made progresses on Spring Framework side, for now focusing on the functional router DSL:

I will try to refine the annotation-based WebFlux support as well.

I am not sure what need to be done or not on Micrometer side, but hopefully those progresses on Spring Framework side will help. Feedback on those draft commits welcomed if any, otherwise I will move forward on Spring Framework side and drop a comment here when the various improvement will be available in Spring Framework 6.1.0-SNAPSHOT.

@sdeleuze
Copy link

With the 3 issues above fixed, 6.1.0-SNAPSHOT Spring Framework builds should now contain everything needed from a Spring Framework POV for CoroutineContext propagation in WebFlux functional DSL.

If somebody could build and share a reproducer with Spring coRouter and Micrometer, that would allow me and Micrometer team to provide guidelines, identify potential refinements and validate that those recent Spring changes are useful and works as expected.

@sdeleuze
Copy link

sdeleuze commented Sep 7, 2023

I have just added the last part of Spring Framework 6.1 CoroutineContext refinement: it is now possible to propagate it via CoWebFilter. spring-projects/spring-framework#27522

@sdeleuze
Copy link

Could it be related to the usage of ThreadLocal based infrastructure here? With Coroutines, like with Reactive, you are not supposed to use ThreadLocal but rather Reactor or Coroutines contexts.

@grassehh
Copy link

grassehh commented Dec 12, 2023

@meberhard Not sure if this has anything to do, but have you tried declaring Hooks.enableAutomaticContextPropagation() inside your WebConfig post contruct rather than after call to runApplication ?

@boris-senapt
Copy link

@grassehh is correct - enableAutomaticContextPropagation() needs to be the first thing you do @meberhard

@chemicL
Copy link
Contributor

chemicL commented Dec 12, 2023

Hint: you can also use spring.reactor.context-propagation=auto property instead of calling Hooks.enableAutomaticContextPropagation() by hand since Spring Boot 3.2.0.

@meberhard
Copy link

meberhard commented Dec 12, 2023

Could it be related to the usage of ThreadLocal based infrastructure here? With Coroutines, like with Reactive, you are not supposed to use ThreadLocal but rather Reactor or Coroutines contexts.

I tried removing the two lines

ContextRegistry.getInstance().registerThreadLocalAccessor(ObservationAwareSpanThreadLocalAccessor(tracer));
ObservationThreadLocalAccessor.getInstance().observationRegistry = observationRegistry

-> same result, no context/traceId in the onError method.

@meberhard Not sure if this has anything to do, but have you tried declaring Hooks.enableAutomaticContextPropagation() inside your WebConfig post contruct rather than after call to runApplication ?

Also tried that -> moved this to the WebConfig, same result (no context/traceId in the onError method).

I tried it both - with the ThreadLocal stuff that you have in your example and without.

Hooks.enableAutomaticContextPropagation()
ContextRegistry.getInstance().registerThreadLocalAccessor(ObservationAwareSpanThreadLocalAccessor(tracer));
ObservationThreadLocalAccessor.getInstance().observationRegistry = observationRegistry
Metrics.observationRegistry(observationRegistry)
Hooks.enableAutomaticContextPropagation()
Metrics.observationRegistry(observationRegistry)

Just FYI - with @RestController only the line Hooks.enableAutomaticContextPropagation() is required - so probably we can drop the other lines also.

@grassehh is correct - enableAutomaticContextPropagation() needs to be the first thing you do @meberhard

good point -> I don't even know why it worked at all with the @RestController like that. I anyway also moved this line before the run - same result: no context/traceId in the onError call.

Hint: you can also use spring.reactor.context-propagation=auto property instead of calling Hooks.enableAutomaticContextPropagation() by hand since Spring Boot 3.2.0.

Awesome! Thanks a lot for this 🙏

@sdeleuze
Copy link

I have fixed the nested route + context issue via spring-projects/spring-framework#31831, so please test and report any remaining problem with Spring Framework 6.1.2-SNAPSHOT (release planned tomorrow).

@meberhard
Copy link

I have fixed the nested route + context issue via spring-projects/spring-framework#31831, so please test and report any remaining problem with Spring Framework 6.1.2-SNAPSHOT (release planned tomorrow).

I can confirm that the nesting now works as expected, but the context is still not propagated to the onError handler.

    @Bean
    fun demoRoutes() = coRouter {
        context { observationRegistry.asContextElement() }
        "/rest".nest {
            "v1".nest {
                "/demo-router".nest {
                    GET("", demoHandler::demoCall)
                }
            }
        }
        onError<RuntimeException> { throwable, serverRequest ->
            errorHandler.handleError(throwable, serverRequest)
        }
    }

For the route demo-router the context and trace is now available!
But if there is any error, there is still no context and trace for the call to errorHandler.

@sdeleuze
Copy link

@meberhard This works for me so please double check:

@Bean
fun router() = coRouter {
    context { CoroutineName("Custom context") }
    "/nested".nest {
        GET("/") {
            throw RuntimeException()
        }
    }
    onError<RuntimeException> { _, _ ->
        ok().bodyValueAndAwait(currentCoroutineContext().toString())
    }
}

If you can reproduce, please create a Spring Framework issue with a self contained repro as an attached archive or a link to a repository.

@msosa
Copy link

msosa commented Dec 14, 2023

I am not sure if this is related but in my original comment #174 (comment), when starting a new coroutine within a contexed span, it would lose all span/trace info.

Is this expected when launching a new coroutine?

@sdeleuze
Copy link

sdeleuze commented Jan 3, 2024

We have potentially a repro on spring-projects/spring-framework#31893 (which uses spring.reactor.context-propagation=auto), if somebody could check if the issue is on Micrometer side, or let me know if something weird is seen on Spring side, that could help moving forward.

@sdeleuze
Copy link

sdeleuze commented Jan 4, 2024

See spring-projects/spring-framework#31893 (comment) for ideas on what to explore on Micrometer side to provide proper Coroutines support.

@bclozel
Copy link
Contributor

bclozel commented Jan 31, 2024

So, @sdeleuze and I were investigating on a different issue and talked about this one in the process.
If you'e subscribed to this issue and you have a sample application that shows this problem, could you try the following?

Upgrade to Spring Boot 3.2.2

Sebastien fixed a few things in recent Spring Framework versions, so please check this version first.

Contribute a custom CoWebFilter

If the above didn't work, please continue with this step.
Kotlin coroutines provides an SLF4J module that could help setting/restoring values in the MDC from the coroutines context.
Please try and configure this in your application and report back here:

First, add the following dependencies to your application:

    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-reactor'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-slf4j:1.7.3'

Add the following class to your application in a package that's scanned for components:

package com.example.kotlinschedule

import kotlinx.coroutines.slf4j.MDCContext
import kotlinx.coroutines.withContext
import org.springframework.stereotype.Component
import org.springframework.web.server.CoWebFilter
import org.springframework.web.server.CoWebFilterChain
import org.springframework.web.server.ServerWebExchange

@Component
class MDCCoWebFilter : CoWebFilter() {

    override suspend fun filter(exchange: ServerWebExchange, chain: CoWebFilterChain) {
        withContext(MDCContext()) {
            chain.filter(exchange)
        }
    }
}

Then check with one of your controllers whether the traceId and spanId information is available from logs statements in controller handlers.

@antechrestos
Copy link

antechrestos commented Jan 31, 2024

@bclozel as I wrote in spring issue I think this may be what you are looking for

I can enrich the example and post the zip in this discussion if you want

@bclozel
Copy link
Contributor

bclozel commented Jan 31, 2024

@antechrestos I'm not sure I understand. You're saying this filter solution works for your case then? I don't need a sample now just confirmation. For me in some cases it worked even without that filter, hence my question to the community. Thanks!

@antechrestos
Copy link

antechrestos commented Jan 31, 2024

@bclozel without filter,, when I get result from a CoroutineCrudRepository the traceid is lost

With a filter that get the current observation.asCoroutineContext it works

Yet my code does no delay, and so on.
I need to push a little further

@antechrestos
Copy link

The code I used

    @Bean
    fun coroutineContextFilter(observationRegistry: ObservationRegistry): CoWebFilter = object : CoWebFilter() {
        override suspend fun filter(exchange: ServerWebExchange, chain: CoWebFilterChain) {
            withContext(observationRegistry.asContextElement()){
                chain.filter(exchange)
            }
        }
    }

@antechrestos
Copy link

antechrestos commented Feb 1, 2024

@bclozel I enriched my example

A CoWebFilter

    @Bean
    fun coroutineContextFilter(observationRegistry: ObservationRegistry): CoWebFilter = object : CoWebFilter() {
        override suspend fun filter(exchange: ServerWebExchange, chain: CoWebFilterChain) {
            when(exchange.request.uri.path) {
                "/api/v1/users-without-context" -> chain.filter(exchange)
                "/api/v1/users-with-mdc" -> withContext(MDCContext()){
                    chain.filter(exchange)
                }
                else -> withContext(observationRegistry.asContextElement()){
                    chain.filter(exchange)
                }
            }
        }
    }

And a controller with different scenario and same treatment

    @PostMapping("/api/v1/users-without-context")
    @ResponseStatus(HttpStatus.CREATED)
    suspend fun createUserNoContext(@RequestBody user: CreateUserDto) : UserDto {
        return launchUserCreation(user)
    }

    @PostMapping("/api/v1/users")
    @ResponseStatus(HttpStatus.CREATED)
    suspend fun createUser(@RequestBody user: CreateUserDto) : UserDto {
        return launchUserCreation(user)
    }

    @PostMapping("/api/v1/users-with-mdc")
    @ResponseStatus(HttpStatus.CREATED)
    suspend fun createUserMdc(@RequestBody user: CreateUserDto) : UserDto {
        return launchUserCreation(user)
    }

    private suspend fun launchUserCreation(user: CreateUserDto): UserDto {
        logger.debug("Start save of user {}", user)
        return userRepository.create(
            User.newUser(
                firstname = user.firstname,
                name = user.name,
            )
        )
            .let { toWeb(it) }
            .also {
                logger.debug("Wait for it")
                longTreatment(it)
                logger.debug("Created user {}", it)
            }
    }

    private suspend fun longTreatment(userDto: UserDto) {
        Mono.just(userDto)
            .flatMap { mono(MDCContext()) {
                logger.debug("Start long treatment")
                delay(Duration.ofSeconds(2).toMillis())
                logger.debug("End long treatment")
            } }
            .tap(
                Micrometer.observation(observationRegistry) { registry ->
                    val observation = Observation.createNotStarted("long-treatment", registry)
                        .contextualName("long-treatment")
                    observation
                }
            )
            .awaitSingle()
    }

Without contexts

2024-02-01T15:05:50.277+01:00 DEBUG 7296 --- [or-http-epoll-2] [c1265f5a0fe3c50e318f7cec8253067b-f4afc96976097cde] o.a.s.t.api.web.UserController           : Start save of user CreateUserDto(firstname=string, name=string)
2024-02-01T15:05:50.277+01:00 DEBUG 7296 --- [or-http-epoll-2] [c1265f5a0fe3c50e318f7cec8253067b-f4afc96976097cde] o.a.s.t.infrastructure.MongoRepository   : about to create User(id=b132c76a-53cb-40d6-87df-54c04af8ca0d, firstname=string, name=string)
2024-02-01T15:05:50.280+01:00 DEBUG 7296 --- [ntLoopGroup-3-3] [c1265f5a0fe3c50e318f7cec8253067b-f4afc96976097cde] o.a.s.t.infrastructure.MongoRepository   : user created User(id=b132c76a-53cb-40d6-87df-54c04af8ca0d, firstname=string, name=string)
2024-02-01T15:05:50.281+01:00 DEBUG 7296 --- [ntLoopGroup-3-3] [                                                 ] o.a.s.t.api.web.UserController           : Wait for it
2024-02-01T15:05:50.281+01:00 DEBUG 7296 --- [atcher-worker-1] [c1265f5a0fe3c50e318f7cec8253067b-9c4b3066d9da0f6c] o.a.s.t.api.web.UserController           : Start long treatment
2024-02-01T15:05:52.282+01:00 DEBUG 7296 --- [atcher-worker-1] [c1265f5a0fe3c50e318f7cec8253067b-9c4b3066d9da0f6c] o.a.s.t.api.web.UserController           : End long treatment
2024-02-01T15:05:52.283+01:00 DEBUG 7296 --- [atcher-worker-1] [c1265f5a0fe3c50e318f7cec8253067b-f4afc96976097cde] o.a.s.t.api.web.UserController           : Created user UserDto(id=b132c76a-53cb-40d6-87df-54c04af8ca0d, firstname=string, name=string)

We can see that starting a new observation let the SLF4JEventistener implemented instanciated by spring get notified of a scope opened. I observed the same behaviour in the issue I opened in spring.

However we see thaat soan are well corralated

With context

whatever context (MDCContext or observationRegistry.asContextElement()), the result is the same, trace id are in log and span are well corralated

I prefer using observationRegistry.asContextElement as I do not need another dependency (org.jetbrains.kotlinx:kotlinx-coroutines-slf4j )

@antechrestos
Copy link

@sdeleuze why not putting a default observationRegistry.asContextElement when no context is found as spring always starts an observation if ObservationRefistry is present?

@sdeleuze
Copy link

sdeleuze commented Feb 2, 2024

@antechrestos Maybe we could use a variant of that combined with @bclozel findings, see spring-projects/spring-framework#32165 (comment) for more details.

@antechrestos
Copy link

@sdeleuze nice

@ilya40umov
Copy link

I have created a separate issue for extending Micrometer Observation API with some more convenient methods to help using it from Kotlin code: micrometer-metrics/micrometer#4754. Not sure if you folks would agree that it's necessary though.

@ilya40umov
Copy link

@sdeleuze I have been trying to find a ticket in Spring Boot which would take care of adding the following 2 bits of configuration automatically (i.e. things that are needed to ensure context propagation), as otherwise pretty much every micro-service developed with Spring Boot & Coroutines needs to add these (or something similar):

Is this something on your radar?

@sdeleuze
Copy link

@ilya40umov Related issue is likely going to be #174 (comment) (we need to update the title to expand its scope) so feel free to comment there after reading my latest proposal.

@maxxis95
Copy link

@bclozel Hi, I'm trying to start coroutine from GatewayFilter. The tracer works fine till I enter into this flat map " return super.writeWith(fluxBody.buffer().flatMap { dataBuffers: List ->..."
until then I'm able to get current span and trace id. Inside this flat map tracer giving me null for current context.
During research I found I need to use this lines, to activate automatic propagation. So I having that set up
"
Hooks.enableAutomaticContextPropagation()
ContextRegistry.getInstance().registerThreadLocalAccessor(ObservationAwareSpanThreadLocalAccessor(tracer))
ObservationThreadLocalAccessor.getInstance().observationRegistry = observationRegistry
Metrics.observationRegistry(observationRegistry)
"
can someone told me how to resolve this propagation of trace id to reactive processing like here in this flat map?

https://github.com/component
class BodyModifyingFilter(
private val linkExpansionAndTranslationService: LinkExpansionAndTranslationService,
private val observationRegistry: ObservationRegistry,
private val tracer: Tracer
) :
GatewayFilter, Ordered {

override fun filter(exchange: ServerWebExchange, chain: GatewayFilterChain): Mono {
val decoratedResponse: ServerHttpResponseDecorator = object : ServerHttpResponseDecorator(exchange.response) {
override fun writeWith(body: Publisher): Mono {
if (body is Flux<*>) {
val fluxBody: Flux = Flux.from(body as Publisher)
return super.writeWith(fluxBody.buffer().flatMap { dataBuffers: List ->
val bufferFactory = exchange.response.bufferFactory()
val joined = bufferFactory.join(dataBuffers)
val bytes = ByteArray(joined.readableByteCount())
joined.read(bytes)
DataBufferUtils.release(joined) // Release the memory for the data buffer
val securityContextMono = ReactiveSecurityContextHolder.getContext()

                securityContextMono.flatMap { securityContext ->
                    mono(SecurityCoroutineContext(securityContext, tracer)) {
                        linkExpansionAndTranslationService.processBody(exchange, bytes)
                    }.flatMap { processedBytes ->
                        val buffer = bufferFactory.wrap(processedBytes)
                        exchange.response.headers.contentLength = processedBytes.size.toLong()
                        Mono.just(buffer)
                    }
                }

            })
        }
        return super.writeWith(body) // Fallback for other types of publishers
    }
}

return chain.filter(exchange.mutate().response(decoratedResponse).build())

}

}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug A general bug
Projects
None yet
Development

No branches or pull requests