Skip to content

Commit

Permalink
[MLOB-1804] feat(langchain): add langchain instrumentation (#4860)
Browse files Browse the repository at this point in the history
* wip

* wip

* first pass at chain invoke and chat,llm generate

* add langchain openai embeddings

* add batch call

* change api key logic

* testing

* ts def changes

* codeowners changes

* add clarifying issue as reason for skipping esm tests

* fix langchain patching for possible esm files vs commonjs files, namespace

* configurable truncation and prompt completion sampling

* remove unneeded util file

* remove some unneeded code

* fix patching esm vs cjs issues

* json stringify non-string chain outputs

* apikey, model, provider should no-op by default

* add some token handling logic

* review comments

* check lc_ for ignored properties
  • Loading branch information
sabrenner authored and rochdev committed Nov 21, 2024
1 parent 599697b commit 32d524c
Show file tree
Hide file tree
Showing 20 changed files with 1,565 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .github/workflows/plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,14 @@ jobs:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test-and-upstream

langchain:
runs-on: ubuntu-latest
env:
PLUGINS: langchain
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test

limitd-client:
runs-on: ubuntu-latest
services:
Expand Down
1 change: 1 addition & 0 deletions docs/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ tracer.use('kafkajs');
tracer.use('knex');
tracer.use('koa');
tracer.use('koa', httpServerOptions);
tracer.use('langchain');
tracer.use('mariadb', { service: () => `my-custom-mariadb` })
tracer.use('memcached');
tracer.use('microgateway-core');
Expand Down
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ interface Plugins {
"kafkajs": tracer.plugins.kafkajs
"knex": tracer.plugins.knex;
"koa": tracer.plugins.koa;
"langchain": tracer.plugins.langchain;
"mariadb": tracer.plugins.mariadb;
"memcached": tracer.plugins.memcached;
"microgateway-core": tracer.plugins.microgateway_core;
Expand Down Expand Up @@ -1592,6 +1593,12 @@ declare namespace tracer {
*/
interface kafkajs extends Instrumentation {}

/**
* This plugin automatically instruments the
* [langchain](https://js.langchain.com/) module
*/
interface langchain extends Instrumentation {}

/**
* This plugin automatically instruments the
* [ldapjs](https://github.com/ldapjs/node-ldapjs/) module.
Expand Down
3 changes: 3 additions & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ module.exports = {
'@jest/test-sequencer': () => require('../jest'),
'@jest/transform': () => require('../jest'),
'@koa/router': () => require('../koa'),
'@langchain/core': () => require('../langchain'),
'@langchain/openai': () => require('../langchain'),
'@node-redis/client': () => require('../redis'),
'@opensearch-project/opensearch': () => require('../opensearch'),
'@opentelemetry/sdk-trace-node': () => require('../otel-sdk-trace'),
Expand Down Expand Up @@ -67,6 +69,7 @@ module.exports = {
koa: () => require('../koa'),
'koa-router': () => require('../koa'),
kafkajs: () => require('../kafkajs'),
langchain: () => require('../langchain'),
ldapjs: () => require('../ldapjs'),
'limitd-client': () => require('../limitd-client'),
lodash: () => require('../lodash'),
Expand Down
77 changes: 77 additions & 0 deletions packages/datadog-instrumentations/src/langchain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
'use strict'

const { addHook } = require('./helpers/instrument')
const shimmer = require('../../datadog-shimmer')

const tracingChannel = require('dc-polyfill').tracingChannel

const invokeTracingChannel = tracingChannel('apm:langchain:invoke')

function wrapLangChainPromise (fn, type, namespace = []) {
return function () {
if (!invokeTracingChannel.start.hasSubscribers) {
return fn.apply(this, arguments)
}

// Runnable interfaces have an `lc_namespace` property
const ns = this.lc_namespace || namespace
const resource = [...ns, this.constructor.name].join('.')

const ctx = {
args: arguments,
instance: this,
type,
resource
}

return invokeTracingChannel.tracePromise(fn, ctx, this, ...arguments)
}
}

// langchain compiles into ESM and CommonJS, with ESM being the default and landing in the `.js` files
// however, CommonJS ends up in `cjs` files, and are required under the hood with `.cjs` files
// we patch each separately and explicitly to match against exports only once, and not rely on file regex matching
const extensions = ['js', 'cjs']

for (const extension of extensions) {
addHook({ name: '@langchain/core', file: `dist/runnables/base.${extension}`, versions: ['>=0.1'] }, exports => {
const RunnableSequence = exports.RunnableSequence
shimmer.wrap(RunnableSequence.prototype, 'invoke', invoke => wrapLangChainPromise(invoke, 'chain'))
shimmer.wrap(RunnableSequence.prototype, 'batch', batch => wrapLangChainPromise(batch, 'chain'))
return exports
})

addHook({
name: '@langchain/core',
file: `dist/language_models/chat_models.${extension}`,
versions: ['>=0.1']
}, exports => {
const BaseChatModel = exports.BaseChatModel
shimmer.wrap(
BaseChatModel.prototype,
'generate',
generate => wrapLangChainPromise(generate, 'chat_model')
)
return exports
})

addHook({ name: '@langchain/core', file: `dist/language_models/llms.${extension}`, versions: ['>=0.1'] }, exports => {
const BaseLLM = exports.BaseLLM
shimmer.wrap(BaseLLM.prototype, 'generate', generate => wrapLangChainPromise(generate, 'llm'))
return exports
})

addHook({ name: '@langchain/openai', file: `dist/embeddings.${extension}`, versions: ['>=0.1'] }, exports => {
const OpenAIEmbeddings = exports.OpenAIEmbeddings

// OpenAI (and Embeddings in general) do not define an lc_namespace
const namespace = ['langchain', 'embeddings', 'openai']
shimmer.wrap(OpenAIEmbeddings.prototype, 'embedDocuments', embedDocuments =>
wrapLangChainPromise(embedDocuments, 'embedding', namespace)
)
shimmer.wrap(OpenAIEmbeddings.prototype, 'embedQuery', embedQuery =>
wrapLangChainPromise(embedQuery, 'embedding', namespace)
)
return exports
})
}
50 changes: 50 additions & 0 deletions packages/datadog-plugin-langchain/src/handlers/chain.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict'

const LangChainHandler = require('./default')

class LangChainChainHandler extends LangChainHandler {
getSpanStartTags (ctx) {
const tags = {}

if (!this.isPromptCompletionSampled()) return tags

let inputs = ctx.args?.[0]
inputs = Array.isArray(inputs) ? inputs : [inputs]

for (const idx in inputs) {
const input = inputs[idx]
if (typeof input !== 'object') {
tags[`langchain.request.inputs.${idx}`] = this.normalize(input)
} else {
for (const [key, value] of Object.entries(input)) {
// these are mappings to the python client names, ie lc_kwargs
// only present on BaseMessage types
if (key.includes('lc_')) continue
tags[`langchain.request.inputs.${idx}.${key}`] = this.normalize(value)
}
}
}

return tags
}

getSpanEndTags (ctx) {
const tags = {}

if (!this.isPromptCompletionSampled()) return tags

let outputs = ctx.result
outputs = Array.isArray(outputs) ? outputs : [outputs]

for (const idx in outputs) {
const output = outputs[idx]
tags[`langchain.response.outputs.${idx}`] = this.normalize(
typeof output === 'string' ? output : JSON.stringify(output)
)
}

return tags
}
}

module.exports = LangChainChainHandler
53 changes: 53 additions & 0 deletions packages/datadog-plugin-langchain/src/handlers/default.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
'use strict'

const Sampler = require('../../../dd-trace/src/sampler')

const RE_NEWLINE = /\n/g
const RE_TAB = /\t/g

// TODO: should probably refactor the OpenAI integration to use a shared LLMTracingPlugin base class
// This logic isn't particular to LangChain
class LangChainHandler {
constructor (config) {
this.config = config
this.sampler = new Sampler(config.spanPromptCompletionSampleRate)
}

// no-op for default handler
getSpanStartTags (ctx) {}

// no-op for default handler
getSpanEndTags (ctx) {}

// no-op for default handler
extractApiKey (instance) {}

// no-op for default handler
extractProvider (instance) {}

// no-op for default handler
extractModel (instance) {}

normalize (text) {
if (!text) return
if (typeof text !== 'string' || !text || (typeof text === 'string' && text.length === 0)) return

const max = this.config.spanCharLimit

text = text
.replace(RE_NEWLINE, '\\n')
.replace(RE_TAB, '\\t')

if (text.length > max) {
return text.substring(0, max) + '...'
}

return text
}

isPromptCompletionSampled () {
return this.sampler.isSampled()
}
}

module.exports = LangChainHandler
63 changes: 63 additions & 0 deletions packages/datadog-plugin-langchain/src/handlers/embedding.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
'use strict'

const LangChainHandler = require('./default')

class LangChainEmbeddingHandler extends LangChainHandler {
getSpanStartTags (ctx) {
const tags = {}

const inputTexts = ctx.args?.[0]

const sampled = this.isPromptCompletionSampled()
if (typeof inputTexts === 'string') {
// embed query
if (sampled) {
tags['langchain.request.inputs.0.text'] = this.normalize(inputTexts)
}
tags['langchain.request.input_counts'] = 1
} else {
// embed documents
if (sampled) {
for (const idx in inputTexts) {
const inputText = inputTexts[idx]
tags[`langchain.request.inputs.${idx}.text`] = this.normalize(inputText)
}
}
tags['langchain.request.input_counts'] = inputTexts.length
}

return tags
}

getSpanEndTags (ctx) {
const tags = {}

const { result } = ctx
if (!Array.isArray(result)) return

tags['langchain.response.outputs.embedding_length'] = (
Array.isArray(result[0]) ? result[0] : result
).length

return tags
}

extractApiKey (instance) {
const apiKey = instance.clientConfig?.apiKey
if (!apiKey || apiKey.length < 4) return ''
return `...${apiKey.slice(-4)}`
}

extractProvider (instance) {
return instance.constructor.name.split('Embeddings')[0].toLowerCase()
}

extractModel (instance) {
for (const attr of ['model', 'modelName', 'modelId', 'modelKey', 'repoId']) {
const modelName = instance[attr]
if (modelName) return modelName
}
}
}

module.exports = LangChainEmbeddingHandler
Loading

0 comments on commit 32d524c

Please sign in to comment.