Skip to content

Commit

Permalink
Merge branch 'master' into ADO-2659-context-to-support-chat
Browse files Browse the repository at this point in the history
* master:
  feat(editor): Overhaul document title management (#10999)
  refactor(core): Organize all event maps (#10997)
  fix(AI Agent Node): Fix output parsing and empty tool input handling in AI Agent node (#10970)
  • Loading branch information
MiloradFilipovic committed Sep 30, 2024
2 parents c135cfe + bb28956 commit 958fc56
Show file tree
Hide file tree
Showing 51 changed files with 381 additions and 264 deletions.
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
import { BINARY_ENCODING, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';

import type { AgentAction, AgentFinish } from 'langchain/agents';
import { AgentExecutor, createToolCallingAgent } from 'langchain/agents';
import type { BaseChatMemory } from '@langchain/community/memory/chat_memory';
import { HumanMessage } from '@langchain/core/messages';
import type { BaseMessage } from '@langchain/core/messages';
import type { BaseOutputParser, StructuredOutputParser } from '@langchain/core/output_parsers';
import type { BaseMessagePromptTemplateLike } from '@langchain/core/prompts';
import { ChatPromptTemplate } from '@langchain/core/prompts';
import { omit } from 'lodash';
import { RunnableSequence } from '@langchain/core/runnables';
import type { Tool } from '@langchain/core/tools';
import { DynamicStructuredTool } from '@langchain/core/tools';
import type { AgentAction, AgentFinish } from 'langchain/agents';
import { AgentExecutor, createToolCallingAgent } from 'langchain/agents';
import { OutputFixingParser } from 'langchain/output_parsers';
import { omit } from 'lodash';
import { BINARY_ENCODING, jsonParse, NodeConnectionType, NodeOperationError } from 'n8n-workflow';
import type { IExecuteFunctions, INodeExecutionData } from 'n8n-workflow';
import type { ZodObject } from 'zod';
import { z } from 'zod';
import type { BaseOutputParser, StructuredOutputParser } from '@langchain/core/output_parsers';
import { OutputFixingParser } from 'langchain/output_parsers';
import { HumanMessage } from '@langchain/core/messages';
import { RunnableSequence } from '@langchain/core/runnables';

import { SYSTEM_MESSAGE } from './prompt';
import {
isChatInstance,
getPromptInputByType,
getOptionalOutputParsers,
getConnectedTools,
} from '../../../../../utils/helpers';
import { SYSTEM_MESSAGE } from './prompt';

function getOutputParserSchema(outputParser: BaseOutputParser): ZodObject<any, any, any, any> {
const parserType = outputParser.lc_namespace[outputParser.lc_namespace.length - 1];
Expand Down Expand Up @@ -74,6 +75,39 @@ async function extractBinaryMessages(ctx: IExecuteFunctions) {
content: [...binaryMessages],
});
}
/**
* Fixes empty content messages in agent steps.
*
* This function is necessary when using RunnableSequence.from in LangChain.
* If a tool doesn't have any arguments, LangChain returns input: '' (empty string).
* This can throw an error for some providers (like Anthropic) which expect the input to always be an object.
* This function replaces empty string inputs with empty objects to prevent such errors.
*
* @param steps - The agent steps to fix
* @returns The fixed agent steps
*/
function fixEmptyContentMessage(steps: AgentFinish | AgentAction[]) {
if (!Array.isArray(steps)) return steps;

steps.forEach((step) => {
if ('messageLog' in step && step.messageLog !== undefined) {
if (Array.isArray(step.messageLog)) {
step.messageLog.forEach((message: BaseMessage) => {
if ('content' in message && Array.isArray(message.content)) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
(message.content as Array<{ input?: string | object }>).forEach((content) => {
if (content.input === '') {
content.input = {};
}
});
}
});
}
}
});

return steps;
}

export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeExecutionData[][]> {
this.logger.debug('Executing Tools Agent');
Expand Down Expand Up @@ -156,6 +190,14 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
// If the steps do not contain multiple outputs, return them as is
return agentFinishSteps;
}

// If memory is connected we need to stringify the returnValues so that it can be saved in the memory as a string
function handleParsedStepOutput(output: Record<string, unknown>) {
return {
returnValues: memory ? { output: JSON.stringify(output) } : output,
log: 'Final response formatted',
};
}
async function agentStepsParser(
steps: AgentFinish | AgentAction[],
): Promise<AgentFinish | AgentAction[]> {
Expand All @@ -168,24 +210,18 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
unknown
>;

return {
returnValues,
log: 'Final response formatted',
};
return handleParsedStepOutput(returnValues);
}
}
// If the steps are an AgentFinish and the outputParser is defined it must mean that the LLM didn't use `format_final_response` tool so we will parse the output manually

// If the steps are an AgentFinish and the outputParser is defined it must mean that the LLM didn't use `format_final_response` tool so we will try to parse the output manually
if (outputParser && typeof steps === 'object' && (steps as AgentFinish).returnValues) {
const finalResponse = (steps as AgentFinish).returnValues;
const returnValues = (await outputParser.parse(finalResponse as unknown as string)) as Record<
string,
unknown
>;

return {
returnValues,
log: 'Final response formatted',
};
return handleParsedStepOutput(returnValues);
}
return handleAgentFinishOutput(steps);
}
Expand Down Expand Up @@ -233,7 +269,7 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
});
agent.streamRunnable = false;

const runnableAgent = RunnableSequence.from([agent, agentStepsParser]);
const runnableAgent = RunnableSequence.from([agent, agentStepsParser, fixEmptyContentMessage]);

const executor = AgentExecutor.fromAgentAndTools({
agent: runnableAgent,
Expand Down Expand Up @@ -273,6 +309,13 @@ export async function toolsAgentExecute(this: IExecuteFunctions): Promise<INodeE
'IMPORTANT: Always call `format_final_response` to format your final response!',
});

if (memory && outputParser) {
const parsedOutput = jsonParse<{ output: Record<string, unknown> }>(
response.output as string,
);
response.output = parsedOutput?.output ?? parsedOutput;
}

returnData.push({
json: omit(
response,
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/base-command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { generateHostInstanceId } from '@/databases/utils/generators';
import * as Db from '@/db';
import { initErrorHandling } from '@/error-reporting';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import { initExpressionEvaluator } from '@/expression-evaluator';
import { ExternalHooks } from '@/external-hooks';
import { ExternalSecretsManager } from '@/external-secrets/external-secrets-manager.ee';
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import config from '@/config';
import { N8N_VERSION, inTest } from '@/constants';
import { EventMessageGeneric } from '@/eventbus/event-message-classes/event-message-generic';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import { JobProcessor } from '@/scaling/job-processor';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import type { ScalingService } from '@/scaling/scaling.service';
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/decorators/redactable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { RedactableError } from '@/errors/redactable.error';
import type { UserLike } from '@/events/relay-event-map';
import type { UserLike } from '@/events/maps/relay.event-map';

function toRedactable(userLike: UserLike) {
return {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import type { INode, IRun, IWorkflowBase } from 'n8n-workflow';

import type { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventService } from '@/events/event.service';
import { LogStreamingEventRelay } from '@/events/log-streaming-event-relay';
import type { RelayEventMap } from '@/events/relay-event-map';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { LogStreamingEventRelay } from '@/events/relays/log-streaming.event-relay';
import type { IWorkflowDb } from '@/interfaces';

describe('LogStreamingEventRelay', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import type { ProjectRelationRepository } from '@/databases/repositories/project
import type { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import type { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/relay-event-map';
import { TelemetryEventRelay } from '@/events/telemetry-event-relay';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { TelemetryEventRelay } from '@/events/relays/telemetry.event-relay';
import type { IWorkflowDb } from '@/interfaces';
import type { License } from '@/license';
import type { NodeTypes } from '@/node-types';
Expand Down
9 changes: 5 additions & 4 deletions packages/cli/src/events/event.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ import { Service } from 'typedi';

import { TypedEmitter } from '@/typed-emitter';

import type { AiEventMap } from './ai-event-map';
import type { QueueMetricsEventMap } from './queue-metrics-event-map';
import type { RelayEventMap } from './relay-event-map';
import type { AiEventMap } from './maps/ai.event-map';
import type { PubSubEventMap } from './maps/pub-sub.event-map';
import type { QueueMetricsEventMap } from './maps/queue-metrics.event-map';
import type { RelayEventMap } from './maps/relay.event-map';

type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap;
type EventMap = RelayEventMap & QueueMetricsEventMap & AiEventMap & PubSubEventMap;

@Service()
export class EventService extends TypedEmitter<EventMap> {}
File renamed without changes.
104 changes: 104 additions & 0 deletions packages/cli/src/events/maps/pub-sub.event-map.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import type { WorkerStatus, PushType } from '@n8n/api-types';

import type { IWorkflowDb } from '@/interfaces';

export type PubSubEventMap = PubSubCommandMap & PubSubWorkerResponseMap;

export type PubSubCommandMap = {
// #region Lifecycle

'reload-license': never;

'restart-event-bus': never;

'reload-external-secrets-providers': never;

// #endregion

// #region Community packages

'community-package-install': {
packageName: string;
packageVersion: string;
};

'community-package-update': {
packageName: string;
packageVersion: string;
};

'community-package-uninstall': {
packageName: string;
};

// #endregion

// #region Worker view

'get-worker-id': never;

'get-worker-status': never;

// #endregion

// #region Multi-main setup

'add-webhooks-triggers-and-pollers': {
workflowId: string;
};

'remove-triggers-and-pollers': {
workflowId: string;
};

'display-workflow-activation': {
workflowId: string;
};

'display-workflow-deactivation': {
workflowId: string;
};

'display-workflow-activation-error': {
workflowId: string;
errorMessage: string;
};

'relay-execution-lifecycle-event': {
type: PushType;
args: Record<string, unknown>;
pushRef: string;
};

'clear-test-webhooks': {
webhookKey: string;
workflowEntity: IWorkflowDb;
pushRef: string;
};

// #endregion
};

export type PubSubWorkerResponseMap = {
// #region Lifecycle

'restart-event-bus': {
result: 'success' | 'error';
error?: string;
};

'reload-external-secrets-providers': {
result: 'success' | 'error';
error?: string;
};

// #endregion

// #region Worker view

'get-worker-id': never;

'get-worker-status': WorkerStatus;

// #endregion
};
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import type { ProjectRole } from '@/databases/entities/project-relation';
import type { GlobalRole } from '@/databases/entities/user';
import type { IWorkflowDb } from '@/interfaces';

import type { AiEventMap } from './ai-event-map';
import type { AiEventMap } from './ai.event-map';

export type UserLike = {
id: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { Service } from 'typedi';

import type { RelayEventMap } from '@/events/relay-event-map';

import { EventService } from './event.service';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/maps/relay.event-map';

@Service()
export class EventRelay {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ import { Service } from 'typedi';

import { Redactable } from '@/decorators/redactable';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { EventRelay } from '@/events/event-relay';
import type { RelayEventMap } from '@/events/relay-event-map';

import { EventService } from './event.service';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { EventRelay } from '@/events/relays/event-relay';

@Service()
export class LogStreamingEventRelay extends EventRelay {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import { ProjectRelationRepository } from '@/databases/repositories/project-rela
import { SharedWorkflowRepository } from '@/databases/repositories/shared-workflow.repository';
import { WorkflowRepository } from '@/databases/repositories/workflow.repository';
import { EventService } from '@/events/event.service';
import type { RelayEventMap } from '@/events/relay-event-map';
import type { RelayEventMap } from '@/events/maps/relay.event-map';
import { determineFinalExecutionStatus } from '@/execution-lifecycle-hooks/shared/shared-hook-functions';
import type { IExecutionTrackProperties } from '@/interfaces';
import { License } from '@/license';
import { NodeTypes } from '@/node-types';

import { EventRelay } from './event-relay';
import { Telemetry } from '../telemetry';
import { Telemetry } from '../../telemetry';

@Service()
export class TelemetryEventRelay extends EventRelay {
Expand Down
Loading

0 comments on commit 958fc56

Please sign in to comment.