Skip to content

Commit

Permalink
Add RunnableSequence.astream
Browse files Browse the repository at this point in the history
  • Loading branch information
tibor-reiss committed May 28, 2024
1 parent ebd8556 commit 3638ab6
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from opentelemetry.instrumentation.langchain.workflow_wrapper import (
workflow_wrapper,
aworkflow_wrapper,
agworkflow_wrapper,
)
from opentelemetry.instrumentation.langchain.custom_llm_wrapper import (
llm_wrapper,
Expand Down Expand Up @@ -148,6 +149,13 @@
"span_name": "langchain.workflow",
"wrapper": workflow_wrapper,
},
{
"package": "langchain.schema.runnable",
"object": "RunnableSequence",
"method": "astream",
"span_name": "langchain.workflow",
"wrapper": agworkflow_wrapper,
},
{
"package": "langchain_core.language_models.llms",
"object": "LLM",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,30 @@ async def aworkflow_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
return return_value


@_with_tracer_wrapper
async def agworkflow_wrapper(tracer, to_wrap, wrapped, instance, args, kwargs):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
async for i in wrapped(*args, **kwargs):
yield i
else:
name, kind = _handle_request(instance, args, to_wrap)

attach(set_value("workflow_name", name))

with tracer.start_as_current_span(name) as span:
span.set_attribute(
SpanAttributes.TRACELOOP_SPAN_KIND,
kind,
)
span.set_attribute(SpanAttributes.TRACELOOP_ENTITY_NAME, name)

process_request(span, args, kwargs)
async for i in wrapped(*args, *kwargs):
span.add_event(name="langchain.content.completion.chunk")
yield i


def _handle_request(instance, args, to_wrap):
config = args[1] if len(args) > 1 else {}
run_name = config.get("run_name") or instance.get_name()
Expand Down
6 changes: 3 additions & 3 deletions packages/opentelemetry-instrumentation-langchain/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pytest-asyncio = "^0.23.5"
opentelemetry-sdk = "^1.23.0"
opentelemetry-instrumentation-openai = {path="../opentelemetry-instrumentation-openai", develop=true}
opentelemetry-instrumentation-bedrock = {path="../opentelemetry-instrumentation-bedrock", develop=true}
opentelemetry-instrumentation-cohere = {path = "../opentelemetry-instrumentation-cohere", develop=true}
opentelemetry-instrumentation-cohere = {path="../opentelemetry-instrumentation-cohere", develop=true}
text-generation = "^0.7.0"
anthropic = "^0.23.0"
boto3 = "1.34.88"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
interactions:
- request:
body: '{"message": "tell me a short joke", "stream": true, "model": "command-r",
"chat_history": [{"role": "System", "message": "You are a helpful assistant"}]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '153'
content-type:
- application/json
host:
- api.cohere.com
user-agent:
- python-httpx/0.27.0
x-client-name:
- langchain:partner
x-fern-language:
- Python
x-fern-sdk-name:
- cohere
x-fern-sdk-version:
- 5.5.3
method: POST
uri: https://api.cohere.com/v1/chat
response:
body:
string: '{"is_finished":false,"event_type":"stream-start","generation_id":"557b9052-655e-45fb-84ca-3a68f239743c"}
{"is_finished":false,"event_type":"text-generation","text":"What"}
{"is_finished":false,"event_type":"text-generation","text":" do"}
{"is_finished":false,"event_type":"text-generation","text":" you"}
{"is_finished":false,"event_type":"text-generation","text":" call"}
{"is_finished":false,"event_type":"text-generation","text":" a"}
{"is_finished":false,"event_type":"text-generation","text":" factory"}
{"is_finished":false,"event_type":"text-generation","text":" that"}
{"is_finished":false,"event_type":"text-generation","text":" makes"}
{"is_finished":false,"event_type":"text-generation","text":" distinctly"}
{"is_finished":false,"event_type":"text-generation","text":" average"}
{"is_finished":false,"event_type":"text-generation","text":" products"}
{"is_finished":false,"event_type":"text-generation","text":"?"}
{"is_finished":false,"event_type":"text-generation","text":"\n\n*"}
{"is_finished":false,"event_type":"text-generation","text":"An"}
{"is_finished":false,"event_type":"text-generation","text":" adequate"}
{"is_finished":false,"event_type":"text-generation","text":"-"}
{"is_finished":false,"event_type":"text-generation","text":"ory"}
{"is_finished":false,"event_type":"text-generation","text":".*"}
{"is_finished":true,"event_type":"stream-end","response":{"response_id":"ceb726f9-d4f0-47d9-9da6-3ed43bea7626","text":"What
do you call a factory that makes distinctly average products?\n\n*An adequate-ory.*","generation_id":"557b9052-655e-45fb-84ca-3a68f239743c","chat_history":[{"role":"SYSTEM","message":"You
are a helpful assistant"},{"role":"USER","message":"tell me a short joke"},{"role":"CHATBOT","message":"What
do you call a factory that makes distinctly average products?\n\n*An adequate-ory.*"}],"finish_reason":"COMPLETE","meta":{"api_version":{"version":"1"},"billed_units":{"input_tokens":10,"output_tokens":20},"tokens":{"input_tokens":79,"output_tokens":20}}},"finish_reason":"COMPLETE"}
'
headers:
Alt-Svc:
- h3=":443"; ma=2592000,h3-29=":443"; ma=2592000
Via:
- 1.1 google
access-control-expose-headers:
- X-Debug-Trace-ID
cache-control:
- no-cache, no-store, no-transform, must-revalidate, private, max-age=0
content-type:
- application/stream+json
date:
- Tue, 28 May 2024 10:39:50 GMT
expires:
- Thu, 01 Jan 1970 00:00:00 UTC
pragma:
- no-cache
server:
- envoy
transfer-encoding:
- chunked
vary:
- Origin
x-accel-expires:
- '0'
x-debug-trace-id:
- c95431b8259da9152fffc219652c20a7
x-endpoint-monthly-call-limit:
- '1000'
x-envoy-upstream-service-time:
- '114'
x-trial-endpoint-call-limit:
- '10'
x-trial-endpoint-call-remaining:
- '9'
status:
code: 200
message: OK
- request:
body: '{"message": "tell me a short joke", "stream": true, "model": "command-r",
"chat_history": [{"role": "System", "message": "You are a helpful assistant"}]}'
headers:
accept:
- '*/*'
accept-encoding:
- gzip, deflate
connection:
- keep-alive
content-length:
- '153'
content-type:
- application/json
host:
- api.cohere.com
user-agent:
- python-httpx/0.27.0
x-client-name:
- langchain:partner
x-fern-language:
- Python
x-fern-sdk-name:
- cohere
x-fern-sdk-version:
- 5.5.3
method: POST
uri: https://api.cohere.com/v1/chat
response:
body:
string: '{"is_finished":false,"event_type":"stream-start","generation_id":"df614ed1-b758-43a7-b130-a204c2e090e2"}
{"is_finished":false,"event_type":"text-generation","text":"What"}
{"is_finished":false,"event_type":"text-generation","text":" do"}
{"is_finished":false,"event_type":"text-generation","text":" you"}
{"is_finished":false,"event_type":"text-generation","text":" call"}
{"is_finished":false,"event_type":"text-generation","text":" a"}
{"is_finished":false,"event_type":"text-generation","text":" factory"}
{"is_finished":false,"event_type":"text-generation","text":" that"}
{"is_finished":false,"event_type":"text-generation","text":" makes"}
{"is_finished":false,"event_type":"text-generation","text":" distinctly"}
{"is_finished":false,"event_type":"text-generation","text":" average"}
{"is_finished":false,"event_type":"text-generation","text":" products"}
{"is_finished":false,"event_type":"text-generation","text":"?"}
{"is_finished":false,"event_type":"text-generation","text":"\n\n*"}
{"is_finished":false,"event_type":"text-generation","text":"An"}
{"is_finished":false,"event_type":"text-generation","text":" adequate"}
{"is_finished":false,"event_type":"text-generation","text":"-"}
{"is_finished":false,"event_type":"text-generation","text":"ory"}
{"is_finished":false,"event_type":"text-generation","text":".*"}
{"is_finished":true,"event_type":"stream-end","response":{"response_id":"e15d0b71-6679-49a0-9e81-bacdbfac77b3","text":"What
do you call a factory that makes distinctly average products?\n\n*An adequate-ory.*","generation_id":"df614ed1-b758-43a7-b130-a204c2e090e2","chat_history":[{"role":"SYSTEM","message":"You
are a helpful assistant"},{"role":"USER","message":"tell me a short joke"},{"role":"CHATBOT","message":"What
do you call a factory that makes distinctly average products?\n\n*An adequate-ory.*"}],"finish_reason":"COMPLETE","meta":{"api_version":{"version":"1"},"billed_units":{"input_tokens":10,"output_tokens":20},"tokens":{"input_tokens":79,"output_tokens":20}}},"finish_reason":"COMPLETE"}
'
headers:
Alt-Svc:
- h3=":443"; ma=2592000,h3-29=":443"; ma=2592000
Via:
- 1.1 google
access-control-expose-headers:
- X-Debug-Trace-ID
cache-control:
- no-cache, no-store, no-transform, must-revalidate, private, max-age=0
content-type:
- application/stream+json
date:
- Tue, 28 May 2024 10:39:51 GMT
expires:
- Thu, 01 Jan 1970 00:00:00 UTC
pragma:
- no-cache
server:
- envoy
transfer-encoding:
- chunked
vary:
- Origin
x-accel-expires:
- '0'
x-debug-trace-id:
- 21dd35baf56070c149ad90741a0ad935
x-endpoint-monthly-call-limit:
- '1000'
x-envoy-upstream-service-time:
- '92'
x-trial-endpoint-call-limit:
- '10'
x-trial-endpoint-call-remaining:
- '8'
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
Expand Up @@ -392,3 +392,30 @@ def test_streaming_cohere(exporter):
"kwargs": {'input': 'tell me a short joke'},
}
assert result == "What do you call a factory that makes distinctly average products?\n\n*An adequate-torium*."


@pytest.mark.vcr
@pytest.mark.asyncio
async def test_astreaming_cohere(exporter):
prompt = ChatPromptTemplate.from_messages(
[("system", "You are a helpful assistant"), ("user", "{input}")]
)
chat = ChatCohere(model="command-r")
parser = StrOutputParser()
runnable = prompt | chat | parser

result = ''.join([i async for i in runnable.astream({"input": "tell me a short joke"})])

spans = exporter.get_finished_spans()
workflow_span = next(
span for span in spans if span.name == "RunnableSequence.langchain.workflow"
)
assert {
"ChatPromptTemplate.langchain.task",
"RunnableSequence.langchain.workflow",
} == {span.name for span in spans}
assert json.loads(workflow_span.attributes.get("traceloop.entity.input")) == {
"args": [],
"kwargs": {'input': 'tell me a short joke'},
}
assert result == "What do you call a factory that makes distinctly average products?\n\n*An adequate-ory.*"

0 comments on commit 3638ab6

Please sign in to comment.