Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Agent: Interact over WebRTC if URL param is set #493

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/voice/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"classnames": "^2.3.2",
"eslint": "8.42.0",
"eslint-config-next": "^14.0.1",
"livekit-client": "^1.14.4",
"lodash": "^4.17.21",
"next": "^14.0.1",
"playht": "^0.9.0-beta.7",
Expand Down
209 changes: 207 additions & 2 deletions packages/voice/src/app/agent/chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ import {
Transcript,
} from 'ai-jsx/lib/asr/asr';
import { createTextToSpeech, BuildUrlOptions, TextToSpeechBase, TextToSpeechProtocol } from 'ai-jsx/lib/tts/tts';
import {
createLocalTracks,
DataPacket_Kind,
LocalAudioTrack,
RemoteAudioTrack,
RemoteTrack,
Room,
RoomEvent,
Track,
TrackEvent,
} from 'livekit-client';

const DEFAULT_ASR_FRAME_SIZE = 20;

Expand Down Expand Up @@ -225,7 +236,7 @@ export enum ChatManagerState {
SPEAKING = 'speaking',
}

interface ChatManagerInit {
export interface ChatManagerInit {
asrProvider: string;
ttsProvider: string;
model: string;
Expand All @@ -234,12 +245,34 @@ interface ChatManagerInit {
asrLanguage?: string;
ttsModel?: string;
ttsVoice?: string;
webrtc: boolean;
}

/**
* Abstract interface for a voice-based LLM chat session.
*/
export interface ChatManager {
onStateChange?: (state: ChatManagerState) => void;
onInputChange?: (text: string, final: boolean, latency?: number) => void;
onOutputChange?: (text: string, final: boolean, latency: number) => void;
onAudioGenerate?: (latency: number) => void;
onAudioStart?: (latency: number) => void;
onAudioEnd?: () => void;
onError?: () => void;

state: ChatManagerState;
inputAnalyzer?: AnalyserNode;
outputAnalyzer?: AnalyserNode;
start(initialMessage?: string): Promise<void>;
stop(): void;
interrupt(): void;
}

/**
* Manages a single chat with a LLM, including speculative execution.
* All RPCs are managed from within the browser context.
*/
export class ChatManager {
export class LocalChatManager implements ChatManager {
private _state = ChatManagerState.IDLE;
private history: ChatMessage[] = [];
private pendingRequests = new Map<string, ChatRequest>();
Expand Down Expand Up @@ -477,3 +510,175 @@ export class ChatManager {
this.changeState(ChatManagerState.LISTENING);
}
}

export class StreamAnalyzer {
source: MediaStreamAudioSourceNode;
analyzer: AnalyserNode;
constructor(context: AudioContext, stream: MediaStream) {
this.source = context.createMediaStreamSource(stream);
this.analyzer = context.createAnalyser();
this.source.connect(this.analyzer);
}
stop() {
this.source.disconnect();
}
}

/**
* Manages a single chat with a LLM, including speculative execution.
* All RPCs are performed remotely, and audio is streamed to/from the server via WebRTC.
*/
export class WebRtcChatManager implements ChatManager {
private params: ChatManagerInit;
private audioContext = new AudioContext();
private audioElement = new Audio();
private textEncoder = new TextEncoder();
private textDecoder = new TextDecoder();
private _state = ChatManagerState.IDLE;
private socket?: WebSocket;
private room?: Room;
private localAudioTrack?: LocalAudioTrack;
private inAnalyzer?: StreamAnalyzer;
private outAnalyzer?: StreamAnalyzer;
private pinger?: NodeJS.Timer;
onStateChange?: (state: ChatManagerState) => void;
onInputChange?: (text: string, final: boolean, latency?: number) => void;
onOutputChange?: (text: string, final: boolean, latency: number) => void;
onAudioGenerate?: (latency: number) => void;
onAudioStart?: (latency: number) => void;
onAudioEnd?: () => void;
onError?: () => void;

constructor(params: ChatManagerInit) {
this.params = params;
this.audioElement = new Audio();
this.warmup();
}
get state() {
return this._state;
}
get inputAnalyzer() {
return this.inAnalyzer?.analyzer;
}
get outputAnalyzer() {
return this.outAnalyzer?.analyzer;
}
warmup() {
const url = 'ws://localhost:8080/livekit'; //`${window.location.protocol === 'https:' ? 'wss:' : 'ws:'}//${window.location.host}/livekit')`;
juberti marked this conversation as resolved.
Show resolved Hide resolved
this.socket = new WebSocket(url);
this.socket.onopen = () => this.handleSocketOpen();
this.socket.onmessage = (event) => this.handleSocketMessage(event);
this.socket.onclose = (event) => this.handleSocketClose(event);
}
async start() {
console.log('[chat] starting');
const localTracks = await createLocalTracks({ audio: true, video: false });
this.localAudioTrack = localTracks[0] as LocalAudioTrack;
console.log('[chat] got mic stream');
this.inAnalyzer = new StreamAnalyzer(this.audioContext, this.localAudioTrack!.mediaStream!);
this.pinger = setInterval(() => {
var obj = { type: 'ping', timestamp: performance.now() };
this.room?.localParticipant.publishData(this.textEncoder.encode(JSON.stringify(obj)), DataPacket_Kind.RELIABLE);
}, 5000);
this.maybePublishLocalAudio();
this.audioElement.play();
this.changeState(ChatManagerState.LISTENING);
}
async stop() {
console.log('[chat] stopping');
clearInterval(this.pinger);
this.pinger = undefined;
await this.room?.disconnect();
this.room = undefined;
this.inAnalyzer?.stop();
this.outAnalyzer?.stop();
this.inAnalyzer = undefined;
this.outAnalyzer = undefined;
this.localAudioTrack?.stop();
this.localAudioTrack = undefined;
this.socket?.close();
this.socket = undefined;
this.changeState(ChatManagerState.IDLE);
}
interrupt() {
throw new Error('Method not implemented.');
}
private changeState(state: ChatManagerState) {
if (state != this._state) {
console.log(`[chat] ${this._state} -> ${state}`);
this._state = state;
this.onStateChange?.(state);
}
}
private maybePublishLocalAudio() {
if (this.room && this.room.state == 'connected' && this.localAudioTrack) {
console.log(`[chat] publishing local audio track`);
const opts = { name: 'audio', simulcast: false, source: Track.Source.Microphone };
this.room.localParticipant.publishTrack(this.localAudioTrack, opts);
}
}
private handleSocketOpen() {
console.log('[chat] socket opened');
const obj = {
type: 'init',
params: {
asr: {
provider: this.params.asrProvider,
language: this.params.asrLanguage,
},
tts: {
provider: this.params.ttsProvider,
model: this.params.ttsModel,
voice: this.params.ttsVoice,
},
agent: {
model: this.params.model,
agentId: this.params.agentId,
docs: this.params.docs,
},
},
};
this.socket?.send(JSON.stringify(obj));
}
private async handleSocketMessage(event: MessageEvent) {
const msg = JSON.parse(event.data);
switch (msg.type) {
case 'room_info':
this.room = new Room();
await this.room.connect(msg.roomUrl, msg.token);
console.log('[chat] connected to room', msg.roomUrl);
this.maybePublishLocalAudio();
this.room.on(RoomEvent.TrackSubscribed, (track) => this.handleTrackSubscribed(track));
this.room.on(RoomEvent.DataReceived, (payload, participant) => this.handleDataReceived(payload, participant));
break;
default:
console.warn('unknown message type', msg.type);
}
}
private handleSocketClose(event: CloseEvent) {
console.log(`[chat] socket closed, code=${event.code}, reason=${event.reason}`);
}
private handleTrackSubscribed(track: RemoteTrack) {
console.log(`[chat] subscribed to remote audio track ${track.sid}`);
const audioTrack = track as RemoteAudioTrack;
audioTrack.on(TrackEvent.AudioPlaybackStarted, () => console.log(`[chat] audio playback started`));
audioTrack.on(TrackEvent.AudioPlaybackFailed, (err) => console.error(`[chat] audio playback failed`, err));
audioTrack.attach(this.audioElement);
this.outAnalyzer = new StreamAnalyzer(this.audioContext, track.mediaStream!);
}
private handleDataReceived(payload: Uint8Array, participant: any) {
const data = JSON.parse(this.textDecoder.decode(payload));
if (data.type === 'pong') {
const elapsed_ms = performance.now() - data.timestamp;
console.debug(`[chat] worker RTT: ${elapsed_ms.toFixed(0)} ms`);
}
}
}

export function createChatManager(init: ChatManagerInit): ChatManager {
if (init.webrtc) {
return new WebRtcChatManager(init);
} else {
return new LocalChatManager(init);
}
}
6 changes: 4 additions & 2 deletions packages/voice/src/app/agent/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import React, { useCallback, useEffect, useRef, useState } from 'react';
import { useSearchParams } from 'next/navigation';
import { useSwipeable } from 'react-swipeable';
import { ChatManager, ChatManagerState } from './chat';
import { ChatManager, ChatManagerState, createChatManager } from './chat';
import { getAgent, getAgentImageUrl } from './agents';
import Image from 'next/image';
import '../globals.css';
Expand Down Expand Up @@ -221,6 +221,7 @@ const AgentPageComponent: React.FC = () => {
const ttsVoice = searchParams.get('ttsVoice') || agentVoice;
const model = getAgent(agentId) === undefined ? 'fixie' : searchParams.get('llm') || DEFAULT_LLM;
const docs = searchParams.get('docs') !== null;
const webrtc = searchParams.get('webrtc') !== null;
const [showChooser, setShowChooser] = useState(searchParams.get('chooser') !== null);
const showInput = searchParams.get('input') !== null;
const showOutput = searchParams.get('output') !== null;
Expand All @@ -237,7 +238,7 @@ const AgentPageComponent: React.FC = () => {
useEffect(() => init(), [asrProvider, asrLanguage, ttsProvider, ttsModel, ttsVoice, model, agentId, docs]);
const init = () => {
console.log(`[page] init asr=${asrProvider} tts=${ttsProvider} llm=${model} agent=${agentId} docs=${docs}`);
const manager = new ChatManager({
const manager = createChatManager({
asrProvider,
asrLanguage,
ttsProvider,
Expand All @@ -246,6 +247,7 @@ const AgentPageComponent: React.FC = () => {
model,
agentId,
docs,
webrtc,
});
setChatManager(manager);
manager.onStateChange = (state) => {
Expand Down
Loading