From 66ef96c7bb5883048fa7d662590275cafb6564fc Mon Sep 17 00:00:00 2001 From: jacoblee93 Date: Mon, 12 Aug 2024 05:42:21 -0700 Subject: [PATCH] Remove signal event listener when completed --- .../src/runnables/tests/signal.test.ts | 48 +++++++++++++++++++ langchain-core/src/utils/signal.ts | 8 ++-- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/langchain-core/src/runnables/tests/signal.test.ts b/langchain-core/src/runnables/tests/signal.test.ts index 7413ea3794cf..f3a6c20779e3 100644 --- a/langchain-core/src/runnables/tests/signal.test.ts +++ b/langchain-core/src/runnables/tests/signal.test.ts @@ -14,6 +14,9 @@ import { FakeChatMessageHistory, FakeListChatModel, } from "../../utils/testing/index.js"; +import { StringOutputParser } from "../../output_parsers/string.js"; +import { Document } from "../../documents/document.js"; +import { ChatPromptTemplate } from "../../prompts/chat.js"; const chatModel = new FakeListChatModel({ responses: ["hey"], sleep: 500 }); @@ -152,3 +155,48 @@ describe.each(Object.keys(TEST_CASES))("Test runnable %s", (name) => { }).rejects.toThrowError(); }); }); + +test("Should not raise node warning", async () => { + const formatDocumentsAsString = (documents: Document[]) => { + return documents.map((doc) => doc.pageContent).join("\n\n"); + }; + const retriever = RunnableLambda.from(() => { + return [ + new Document({ pageContent: "test1" }), + new Document({ pageContent: "test2" }), + new Document({ pageContent: "test4" }), + new Document({ pageContent: "test5" }), + ]; + }); + const ragChainWithSources = RunnableMap.from({ + // Return raw documents here for now since we want to return them at + // the end - we'll format in the next step of the chain + context: retriever, + question: new RunnablePassthrough(), + }).assign({ + answer: RunnableSequence.from([ + (input) => { + return { + // Now we format the documents as strings for the prompt + context: formatDocumentsAsString(input.context as Document[]), + question: input.question, + }; + }, + ChatPromptTemplate.fromTemplate("Hello"), + new FakeListChatModel({ responses: ["test"] }), + new StringOutputParser(), + ]), + }); + + const stream = await ragChainWithSources.stream( + { + question: "What is the capital of France?", + }, + { + signal: new AbortController().signal, + } + ); + for await (const _ of stream) { + // console.log(_); + } +}); diff --git a/langchain-core/src/utils/signal.ts b/langchain-core/src/utils/signal.ts index 7ccb554429cf..b3fa4aea714c 100644 --- a/langchain-core/src/utils/signal.ts +++ b/langchain-core/src/utils/signal.ts @@ -5,6 +5,7 @@ export async function raceWithSignal( if (signal === undefined) { return promise; } + let listener: () => void; return Promise.race([ promise.catch((err) => { if (!signal?.aborted) { @@ -14,13 +15,14 @@ export async function raceWithSignal( } }), new Promise((_, reject) => { - signal.addEventListener("abort", () => { + listener = () => { reject(new Error("Aborted")); - }); + }; + signal.addEventListener("abort", listener); // Must be here inside the promise to avoid a race condition if (signal.aborted) { reject(new Error("Aborted")); } }), - ]); + ]).finally(() => signal.removeEventListener("abort", listener)); }