Observability with concurrent threads #4438
-
Hello, When running the pipeline sequentially, everything works as expected: the execution of each stage appears as SPAN within the TRACE of the main method. When paralellizing the 3 Bx stages, I observed the following behaviour:
In order to obtain a single trace, based on https://langfuse.com/docs/sdk/python/decorators#set-custom-ids, I had to manually pass to each B* method an extra Is this the proper way or is there a simpler approach to obtain a single trace with threads? Thanks a lot! |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 1 reply
-
Hello, @DavideSanvito! I found a similar discussion that might be helpful. In a previous thread, it was confirmed that when using Here's an example of how you can implement this: from concurrent.futures import ThreadPoolExecutor, as_completed
from langfuse.decorators import langfuse_context, observe
@observe()
def execute_task(*args):
return args
@observe()
def execute_groups(task_args):
trace_id = langfuse_context.get_current_trace_id()
observation_id = langfuse_context.get_current_observation_id()
with ThreadPoolExecutor(3) as executor:
futures = [
executor.submit(
execute_task,
*task_arg,
langfuse_parent_trace_id=trace_id,
langfuse_parent_observation_id=observation_id,
)
for task_arg in task_args
]
for future in as_completed(futures):
future.result()
return [f.result() for f in futures]
@observe()
def main():
task_args = [["a", "b"], ["c", "d"]]
execute_groups(task_args)
main()
langfuse_context.flush() This method is recommended over manually copying the context, as it directly addresses the issue of maintaining the trace hierarchy across threads [1]. |
Beta Was this translation helpful? Give feedback.
-
@DavideSanvito we have collected some notes on this here: https://langfuse.com/docs/sdk/python/decorators#using-threadpoolexecutors-or-processpoolexecutors Hope this helps. Let me know in case you have any additional questions! |
Beta Was this translation helpful? Give feedback.
@DavideSanvito we have collected some notes on this here: https://langfuse.com/docs/sdk/python/decorators#using-threadpoolexecutors-or-processpoolexecutors
Hope this helps. Let me know in case you have any additional questions!