diff --git a/melos.yaml b/melos.yaml index b1835a9d..04ed4fb6 100644 --- a/melos.yaml +++ b/melos.yaml @@ -49,6 +49,7 @@ command: json_annotation: ^4.9.0 json_path: ^0.7.4 langchain_tiktoken: ^1.0.1 + logging: ^1.2.0 math_expressions: ^2.6.0 meta: ^1.11.0 objectbox: ^4.0.1 @@ -59,6 +60,7 @@ command: shelf_router: ^1.1.4 supabase: ^2.2.7 uuid: ^4.4.2 + web_socket_channel: ^3.0.1 dev_dependencies: build_runner: ^2.4.11 freezed: ^2.5.7 diff --git a/packages/langchain/README.md b/packages/langchain/README.md index d01e9ccd..bb169643 100644 --- a/packages/langchain/README.md +++ b/packages/langchain/README.md @@ -87,16 +87,17 @@ The following packages are maintained (and used internally) by LangChain.dart, a > Depend on an API client package if you just want to consume the API of a specific provider directly without using LangChain.dart abstractions. -| Package | Version | Description | -|-------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------| -| [anthropic_sdk_dart](https://pub.dev/packages/anthropic_sdk_dart) | [![anthropic_sdk_dart](https://img.shields.io/pub/v/anthropic_sdk_dart.svg)](https://pub.dev/packages/anthropic_sdk_dart) | [Anthropic](https://docs.anthropic.com/en/api) API client | -| [chromadb](https://pub.dev/packages/chromadb) | [![chromadb](https://img.shields.io/pub/v/chromadb.svg)](https://pub.dev/packages/chromadb) | [Chroma DB](https://trychroma.com/) API client | -| [googleai_dart](https://pub.dev/packages/googleai_dart) | [![googleai_dart](https://img.shields.io/pub/v/googleai_dart.svg)](https://pub.dev/packages/googleai_dart) | [Google AI for Developers](https://ai.google.dev/) API client | -| [mistralai_dart](https://pub.dev/packages/mistralai_dart) | [![mistralai_dart](https://img.shields.io/pub/v/mistralai_dart.svg)](https://pub.dev/packages/mistralai_dart) | [Mistral AI](https://docs.mistral.ai/api) API client | -| [ollama_dart](https://pub.dev/packages/ollama_dart) | [![ollama_dart](https://img.shields.io/pub/v/ollama_dart.svg)](https://pub.dev/packages/ollama_dart) | [Ollama](https://ollama.ai/) API client | -| [openai_dart](https://pub.dev/packages/openai_dart) | [![openai_dart](https://img.shields.io/pub/v/openai_dart.svg)](https://pub.dev/packages/openai_dart) | [OpenAI](https://platform.openai.com/docs/api-reference) API client | -| [tavily_dart](https://pub.dev/packages/tavily_dart) | [![tavily_dart](https://img.shields.io/pub/v/tavily_dart.svg)](https://pub.dev/packages/tavily_dart) | [Tavily](https://tavily.com) API client | -| [vertex_ai](https://pub.dev/packages/vertex_ai) | [![vertex_ai](https://img.shields.io/pub/v/vertex_ai.svg)](https://pub.dev/packages/vertex_ai) | [GCP Vertex AI](https://cloud.google.com/vertex-ai) API client | +| Package | Version | Description | +|-----------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------| +| [anthropic_sdk_dart](https://pub.dev/packages/anthropic_sdk_dart) | [![anthropic_sdk_dart](https://img.shields.io/pub/v/anthropic_sdk_dart.svg)](https://pub.dev/packages/anthropic_sdk_dart) | [Anthropic](https://docs.anthropic.com/en/api) API client | +| [chromadb](https://pub.dev/packages/chromadb) | [![chromadb](https://img.shields.io/pub/v/chromadb.svg)](https://pub.dev/packages/chromadb) | [Chroma DB](https://trychroma.com/) API client | +| [googleai_dart](https://pub.dev/packages/googleai_dart) | [![googleai_dart](https://img.shields.io/pub/v/googleai_dart.svg)](https://pub.dev/packages/googleai_dart) | [Google AI for Developers](https://ai.google.dev/) API client | +| [mistralai_dart](https://pub.dev/packages/mistralai_dart) | [![mistralai_dart](https://img.shields.io/pub/v/mistralai_dart.svg)](https://pub.dev/packages/mistralai_dart) | [Mistral AI](https://docs.mistral.ai/api) API client | +| [ollama_dart](https://pub.dev/packages/ollama_dart) | [![ollama_dart](https://img.shields.io/pub/v/ollama_dart.svg)](https://pub.dev/packages/ollama_dart) | [Ollama](https://ollama.ai/) API client | +| [openai_dart](https://pub.dev/packages/openai_dart) | [![openai_dart](https://img.shields.io/pub/v/openai_dart.svg)](https://pub.dev/packages/openai_dart) | [OpenAI](https://platform.openai.com/docs/api-reference) API client | +| [openai_realtime_dart](https://pub.dev/packages/openai_realtime_dart) | [![openai_realtime_dart](https://img.shields.io/pub/v/openai_realtime_dart.svg)](https://pub.dev/packages/openai_realtime_dart) | [OpenAI Realtime](https://platform.openai.com/docs/guides/realtime) API client | +| [tavily_dart](https://pub.dev/packages/tavily_dart) | [![tavily_dart](https://img.shields.io/pub/v/tavily_dart.svg)](https://pub.dev/packages/tavily_dart) | [Tavily](https://tavily.com) API client | +| [vertex_ai](https://pub.dev/packages/vertex_ai) | [![vertex_ai](https://img.shields.io/pub/v/vertex_ai.svg)](https://pub.dev/packages/vertex_ai) | [GCP Vertex AI](https://cloud.google.com/vertex-ai) API client | ## Integrations diff --git a/packages/openai_realtime_dart/.gitignore b/packages/openai_realtime_dart/.gitignore new file mode 100644 index 00000000..3cceda55 --- /dev/null +++ b/packages/openai_realtime_dart/.gitignore @@ -0,0 +1,7 @@ +# https://dart.dev/guides/libraries/private-files +# Created by `dart pub` +.dart_tool/ + +# Avoid committing pubspec.lock for library packages; see +# https://dart.dev/guides/libraries/private-files#pubspeclock. +pubspec.lock diff --git a/packages/openai_realtime_dart/CHANGELOG.md b/packages/openai_realtime_dart/CHANGELOG.md new file mode 100644 index 00000000..35882371 --- /dev/null +++ b/packages/openai_realtime_dart/CHANGELOG.md @@ -0,0 +1,7 @@ +📣 Check out the [releases page](https://github.com/davidmigloz/langchain_dart/releases) or the [#announcements](https://discord.com/channels/1123158322812555295/1123250594644242534) channel on the [LangChain.dart Discord](https://discord.gg/x4qbhqecVR) server for more details. + +--- + +## 0.0.1-dev.1 + +- Bootstrap project diff --git a/packages/openai_realtime_dart/LICENSE b/packages/openai_realtime_dart/LICENSE new file mode 100644 index 00000000..f407ffdd --- /dev/null +++ b/packages/openai_realtime_dart/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 David Miguel Lozano + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/packages/openai_realtime_dart/README.md b/packages/openai_realtime_dart/README.md new file mode 100644 index 00000000..1150d8cd --- /dev/null +++ b/packages/openai_realtime_dart/README.md @@ -0,0 +1,358 @@ +# OpenAI Realtime API Dart Client + +[![tests](https://img.shields.io/github/actions/workflow/status/davidmigloz/langchain_dart/test.yaml?logo=github&label=tests)](https://github.com/davidmigloz/langchain_dart/actions/workflows/test.yaml) +[![openai_realtime_dart](https://img.shields.io/pub/v/openai_realtime_dart.svg)](https://pub.dev/packages/openai_realtime_dart) +[![](https://dcbadge.vercel.app/api/server/x4qbhqecVR?style=flat)](https://discord.gg/x4qbhqecVR) +[![MIT](https://img.shields.io/badge/license-MIT-purple.svg)](https://github.com/davidmigloz/langchain_dart/blob/main/LICENSE) + +Unofficial Dart client for the [OpenAI Realtime API](https://platform.openai.com/docs/guides/realtime). + +## Table of contents + +- [Quickstart](#quickstart) + * [Browser quickstart](#browser-quickstart) +- [Project structure](#project-structure) +- [Using the client](#using-the-client) + * [Sending messages](#sending-messages) + * [Sending streaming audio](#sending-streaming-audio) + * [Adding and using tools](#adding-and-using-tools) + + [Manually using tools](#manually-using-tools) + * [Interrupting the model](#interrupting-the-model) +- [Client events](#client-events) + * [Utility events](#utility-events) + * [Server events](#server-events) +- [Acknowledgements](#acknowledgements) +- [License](#license) + +## Quickstart + +This library is built to be used both server-side and client-side (e.g. Flutter apps). + +```dart +final client = RealtimeClient( + apiKey: Platform.environment['OPENAI_API_KEY'], +); + +// Can set parameters ahead of connecting, either separately or all at once +client.updateSession(instructions: 'You are a great, upbeat friend.'); +client.updateSession(voice: 'alloy'); +client.updateSession( + turnDetection: {'type': 'none'}, + inputAudioTranscription: {'model': 'whisper-1'}, +); + +// Set up event handling +client.on('conversation.updated', (event) { + // item is the current item being updated + final item = event?['item']; + // delta can be null or populated + final delta = event?['delta']; + // you can fetch a full list of items at any time +}); + +// Connect to Realtime API +await client.connect(); + +// Send a item and triggers a generation +client.sendUserMessageContent([ + {'type': 'input_text', 'text': 'How are you?'}, +]); +``` + +### Browser quickstart + +You can use this client directly from the browser in e.g. +**We do not recommend this, your API keys are at risk if you connect to OpenAI directly from the browser.** +In order to instantiate the client in a browser environment, use: + +```dart +final client = RealtimeClient( + apiKey: Platform.environment['OPENAI_API_KEY'], + dangerouslyAllowAPIKeyInBrowser: true, +); +``` + +If you are running your own relay server, you can instead connect to the relay server URL like so: + +```dart +final client = RealtimeClient(url: RELAY_SERVER_URL); +``` + +## Project structure + +In this package, there are three primitives for interfacing with the Realtime API. We recommend starting with the `RealtimeClient`, but more advanced users may be more comfortable working closer to the metal. + +1. [`RealtimeClient`](./lib/src/client.dart) + - Primary abstraction for interfacing with the Realtime API + - Enables rapid application development with a simplified control flow + - Has custom `conversation.updated`, `conversation.item.appended`, `conversation.item.completed`, `conversation.interrupted` and `realtime.event` events + - These events send item deltas and conversation history +2. [`RealtimeAPI`](./lib/src/api.dart) + - Exists on client instance as `client.realtime` + - Thin wrapper over [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) + - Use this for connecting to the API, authenticating, and sending items + - There is **no item validation**, you will have to rely on the API specification directly + - Dispatches events as `server.{event_name}` and `client.{event_name}`, respectively +3. [`RealtimeConversation`](./lib/src/conversation.dart) + - Exists on client instance as `client.conversation` + - Stores a client-side cache of the current conversation + - Has **event validation**, will validate incoming events to make sure it can cache them properly + +## Using the client + +The client comes packaged with some basic utilities that make it easy to build realtime apps quickly. + +### Sending messages + +Sending messages to the server from the user is easy. + +```dart +client.sendUserMessageContent([ + {'type': 'input_text', 'text': 'How are you?'}, +]); +// or (empty audio) +client.sendUserMessageContent([ + {'type': 'input_audio', 'audio': Uint8List(0)}, +]); +``` + +### Sending streaming audio + +To send streaming audio, use the `.appendInputAudio()` method. If you're in `turn_detection: 'disabled'` mode, then you need to use `.createResponse()` to tell the model to respond. + +```dart +// Send user audio, must be Uint8List +// Default audio format is pcm16 with sample rate of 24,000 Hz +// This populates 1s of noise in 0.1s chunks +for (var i = 0; i < 10; i++) { + final data = Uint8List(2400); + for (var n = 0; n < 2400; n++) { + final value = (Random().nextDouble() * 2 - 1) * 0x8000; + data[n] = value.toInt(); + } + client.appendInputAudio(data); +} +// Pending audio is committed and model is asked to generate +client.createResponse(); +``` + +### Adding and using tools + +Working with tools is easy. Just call `.addTool()` and set a callback as the second parameter. The callback will be executed with the parameters for the tool, and the result will be automatically sent back to the model. + +```dart +// We can add tools as well, with callbacks specified +client.addTool( + { + 'name': 'get_weather', + 'description': 'Retrieves the weather for a given lat, lng coordinate pair. Specify a label for the location.', + 'parameters': { + 'type': 'object', + 'properties': { + 'lat': { + 'type': 'number', + 'description': 'Latitude', + }, + 'lng': { + 'type': 'number', + 'description': 'Longitude', + }, + 'location': { + 'type': 'string', + 'description': 'Name of the location', + }, + }, + 'required': ['lat', 'lng', 'location'], + }, + }, + (Map params) async { + final result = await HttpClient() + .getUrl( + Uri.parse( + 'https://api.open-meteo.com/v1/forecast?' + 'latitude=${params['lat']}&' + 'longitude=${params['lng']}&' + 'current=temperature_2m,wind_speed_10m', + ), + ) + .then((request) => request.close()) + .then((response) => response.transform(Utf8Decoder()).join()) + .then(jsonDecode); + return result; + }, +); +``` + +#### Manually using tools + +The `.addTool()` method automatically runs a tool handler and triggers a response on handler completion. Sometimes you may not want that, for example: using tools to generate a schema that you use for other purposes. + +In this case, we can use the `tools` item with `updateSession`. In this case you **must** specify `type: 'function'`, which is not required for `.addTool()`. + +**Note:** Tools added with `.addTool()` will **not** be overridden when updating sessions manually like this, but every `updateSession()` change will override previous `updateSession()` changes. Tools added via `.addTool()` are persisted and appended to anything set manually here. + +```dart +client.updateSession( + tools: [ + { + 'type': 'function', + 'name': 'get_weather', + 'description': + 'Retrieves the weather for a given lat, lng coordinate pair. Specify a label for the location.', + 'parameters': { + 'type': 'object', + 'properties': { + 'lat': { + 'type': 'number', + 'description': 'Latitude', + }, + 'lng': { + 'type': 'number', + 'description': 'Longitude', + }, + 'location': { + 'type': 'string', + 'description': 'Name of the location', + }, + }, + 'required': ['lat', 'lng', 'location'], + }, + }, + ], +); +``` + +Then, to handle function calls... + +```dart +client.on('conversation.item.completed', (event) { + final item = event?['item'] as Map?; + if (item?['type'] == 'function_call') { + // your function call is complete, execute some custom code + } +}); +``` + +### Interrupting the model + +You may want to manually interrupt the model, especially in `turn_detection: 'disabled'` mode. To do this, we can use: + +```dart +// id is the id of the item currently being generated +// sampleCount is the number of audio samples that have been heard by the listener +client.cancelResponse(id, sampleCount); +``` + +This method will cause the model to immediately cease generation, but also truncate the item being played by removing all audio after `sampleCount` and clearing the text response. By using this method you can interrupt the model and prevent it from "remembering" anything it has generated that is ahead of where the user's state is. + +## Client events + +If you need more manual control and want to send custom client events according to the [Realtime Client Events API Reference](https://platform.openai.com/docs/api-reference/realtime-client-events), you can use `client.realtime.send()` like so: + +```dart +client.realtime.send('conversation.item.create', { + 'item': { + 'type': 'function_call_output', + 'call_id': 'my-call-id', + 'output': '{function_succeeded:true}', + }, +}); +``` + +### Utility events + +With `RealtimeClient` we have reduced the event overhead from server events to **five** main events that are most critical for your application control flow. These events **are not** part of the API specification itself, but wrap logic to make application development easier. + +```dart +// Errors like connection failures +client.on('error', (event) { + // do something +}); + +// In VAD mode, the user starts speaking +// we can use this to stop audio playback of a previous response if necessary +client.on('conversation.interrupted', (event) { + // do something +}); + +// Includes all changes to conversations +// delta may be populated +client.on('conversation.updated', (event) { + final item = event?['item'] as Map?; + final delta = event?['delta'] as Map?; + + // get all items, e.g. if you need to update a chat window + final items = client.conversation.getItems(); + + final type = item?['type'] as String?; + switch (type) { + case 'message': + // system, user, or assistant message (item.role) + case 'function_call': + // always a function call from the model + case 'function_call_output': + // always a response from the user / application + } + if (delta != null) { + // Only one of the following will be populated for any given event + // delta['audio'] -> Uint8List, audio added + // delta['transcript'] -> string, transcript added + // delta['arguments'] -> string, function arguments added + } +}); + +// Only triggered after item added to conversation +client.on('conversation.item.appended', (event) { + final item = event?['item'] as Map?; + // item?['status'] -> can be 'in_progress' or 'completed' +}); + +// Only triggered after item completed in conversation +// will always be triggered after conversation.item.appended +client.on('conversation.item.completed', (event) { + final item = event?['item'] as Map?; + // item?['status'] -> will always be 'completed' +}); +``` + +### Server events + +If you want more control over your application development, you can use the `realtime.event` event and choose only to respond to **server** events. The full documentation for these events are available on the [Realtime Server Events API Reference](https://platform.openai.com/docs/api-reference/realtime-server-events). + +```dart +// all events, can use for logging, debugging, or manual event handling +client.on('realtime.event', (event ) { + final time = event?['time'] as String?; + final source = event?['source'] as String?; + final eventPayload = event?['event'] as Map?; + if (source == 'server') { + // do something + } +}); +``` + +## Logging + +This package uses the [logging](https://pub.dev/packages/logging) package to log messages. + +In debug mode, records with `Level.INFO` and above are printed to the console. + +### Listen to all logs + +```dart +import 'package:logging/logging.dart'; + +final logger = Logger('openai_realtime_dart'); +logger.level = Level.ALL; // custom log level filtering, default is Level.INFO +logger.onRecord.listen((record) { + print('${record.level.name}: ${record.time}: ${record.message}'); +}); +``` + +## Acknowledgements + +This package is based on the [OpenAI Realtime API Reference Client](https://github.com/openai/openai-realtime-api-beta). + +## License + +OpenAI Realtime API Dart Client is licensed under the [MIT License](https://github.com/davidmigloz/langchain_dart/blob/main/LICENSE). diff --git a/packages/openai_realtime_dart/analysis_options.yaml b/packages/openai_realtime_dart/analysis_options.yaml new file mode 100644 index 00000000..f04c6cf0 --- /dev/null +++ b/packages/openai_realtime_dart/analysis_options.yaml @@ -0,0 +1 @@ +include: ../../analysis_options.yaml diff --git a/packages/openai_realtime_dart/example/openai_realtime_dart_example.dart b/packages/openai_realtime_dart/example/openai_realtime_dart_example.dart new file mode 100644 index 00000000..072f5046 --- /dev/null +++ b/packages/openai_realtime_dart/example/openai_realtime_dart_example.dart @@ -0,0 +1,35 @@ +// ignore_for_file: cascade_invocations, unused_local_variable +import 'dart:io'; + +import 'package:openai_realtime_dart/openai_realtime_dart.dart'; + +Future main() async { + final client = RealtimeClient( + apiKey: Platform.environment['OPENAI_API_KEY'], + ); + + // Can set parameters ahead of connecting, either separately or all at once + client.updateSession(instructions: 'You are a great, upbeat friend.'); + client.updateSession(voice: 'alloy'); + client.updateSession( + turnDetection: {'type': 'none'}, + inputAudioTranscription: {'model': 'whisper-1'}, + ); + + // Set up event handling + client.on('conversation.updated', (event) { + // item is the current item being updated + final item = event?['item']; + // delta can be null or populated + final delta = event?['delta']; + // you can fetch a full list of items at any time + }); + + // Connect to Realtime API + await client.connect(); + + // Send a item and triggers a generation + client.sendUserMessageContent([ + {'type': 'input_text', 'text': 'How are you?'}, + ]); +} diff --git a/packages/openai_realtime_dart/lib/openai_realtime_dart.dart b/packages/openai_realtime_dart/lib/openai_realtime_dart.dart new file mode 100644 index 00000000..898d6515 --- /dev/null +++ b/packages/openai_realtime_dart/lib/openai_realtime_dart.dart @@ -0,0 +1,7 @@ +/// Dart client for the OpenAI Realtime API. +library; + +export 'src/api.dart'; +export 'src/client.dart'; +export 'src/conversation.dart'; +export 'src/event_handler.dart'; diff --git a/packages/openai_realtime_dart/lib/src/api.dart b/packages/openai_realtime_dart/lib/src/api.dart new file mode 100644 index 00000000..7d62bacf --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/api.dart @@ -0,0 +1,148 @@ +// ignore_for_file: avoid_print +import 'dart:async'; +import 'dart:convert'; + +import 'package:logging/logging.dart'; +import 'package:web_socket_channel/status.dart' as status; +import 'package:web_socket_channel/web_socket_channel.dart'; + +import 'event_handler.dart'; +import 'utils.dart'; +import 'web_socket/web_socket.dart'; + +final _log = Logger('openai_realtime_dart.api'); + +/// Thin wrapper over [WebSocket](https://developer.mozilla.org/en-US/docs/Web/API/WebSocket) +/// to handle the communication with OpenAI Realtime API. +/// +/// Dispatches events as `server.{event_name}` and `client.{event_name}`, +/// respectively. +class RealtimeAPI extends RealtimeEventHandler { + /// Create a new [RealtimeAPI] instance. + RealtimeAPI({ + this.url = 'wss://api.openai.com/v1/realtime', + this.apiKey, + this.debug = false, + this.dangerouslyAllowAPIKeyInBrowser = true, + }) { + if (apiKey != null && !dangerouslyAllowAPIKeyInBrowser) { + throw Exception( + 'Cannot provide API key in the browser without ' + '"dangerouslyAllowAPIKeyInBrowser" set to true', + ); + } + } + + /// The base URL of the Realtime API. + final String url; + + /// The API key to authenticate with the Realtime API. + final String? apiKey; + + /// Whether to log debug messages. + final bool debug; + + /// Whether to allow the API key to be used in the browser. + final bool dangerouslyAllowAPIKeyInBrowser; + + WebSocketChannel? _ws; + String _model = ''; + StreamSubscription? _logSubscription; + + /// Tells us whether or not the WebSocket is connected. + bool isConnected() => _ws != null; + + /// Connects to Realtime API Websocket Server. + Future connect({ + final String model = 'gpt-4o-realtime-preview-2024-10-01', + }) async { + if (isConnected()) { + throw Exception('Already connected'); + } + + _configLogger(); + + _model = model; + final uri = Uri.parse('$url?model=$_model'); + + try { + _ws = connectWebSocket(uri, apiKey); + + // Wait for the connection to be established + await _ws!.ready; + + _log.info('Connected to "$url"'); + + _ws!.stream.listen( + (data) { + final message = json.decode(data) as Map; + receive(message['type'], message); + }, + onError: (dynamic error) { + _log.severe('Error', error); + dispatch('close', {'error': true}); + }, + onDone: () { + _log.info('Disconnected from "$url"'); + dispatch('close', {'error': false}); + }, + ); + + return true; + } catch (e) { + _log.severe('Could not connect to "$url"', e); + return false; + } + } + + void _configLogger() { + if (debug) { + final logger = Logger('openai_realtime_dart'); + _logSubscription = logger.onRecord.listen((record) { + if (record.level >= Level.INFO) { + print( + '[${record.loggerName}/${record.time.toIso8601String()}]: ' + '${record.message} ${record.error ?? ""}', + ); + } + }); + } + } + + /// Disconnects from Realtime API server. + Future disconnect() async { + if (_ws != null) { + await _ws!.sink.close(status.normalClosure); + _ws = null; + } + await _logSubscription?.cancel(); + } + + /// Receives an event from WebSocket and dispatches as + /// "server.{eventName}" and "server.*" events. + void receive(String eventName, Map event) { + _log.info('received: $eventName $event'); + dispatch('server.$eventName', event); + dispatch('server.*', event); + } + + /// Sends an event to WebSocket and dispatches as "client.{eventName}" + /// and "client.*" events. + void send(String eventName, [Map? data]) { + if (!isConnected()) { + throw Exception('RealtimeAPI is not connected'); + } + + final event = { + 'event_id': RealtimeUtils.generateId('evt_'), + 'type': eventName, + ...?data, + }; + + dispatch('client.$eventName', event); + dispatch('client.*', event); + _log.info('sent: $eventName $event'); + + _ws!.sink.add(json.encode(event)); + } +} diff --git a/packages/openai_realtime_dart/lib/src/client.dart b/packages/openai_realtime_dart/lib/src/client.dart new file mode 100644 index 00000000..64b72827 --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/client.dart @@ -0,0 +1,451 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; +import 'api.dart'; +import 'conversation.dart'; +import 'event_handler.dart'; +import 'utils.dart'; + +/// Dart client for the OpenAI Realtime API. +/// Enables rapid application development with a simplified control flow. +/// +/// It reduces the event overhead from server events to five main events +/// that are most critical for your application control flow: +/// - `conversation.updated`: When the conversation state is updated. +/// - `conversation.interrupted`: When the conversation is interrupted. +/// - `conversation.item.appended`: When a new item is appended to the conversation. +/// - `conversation.item.completed`: When an item is completed. +/// - `realtime.event`: All events from the Realtime API. +/// +/// These events are not part of the API specification itself, but wrap logic +/// to make application development easier. +class RealtimeClient extends RealtimeEventHandler { + /// Create a new [RealtimeClient] instance. + RealtimeClient({ + final String url = 'wss://api.openai.com/v1/realtime', + final String? apiKey, + final bool debug = false, + final bool dangerouslyAllowAPIKeyInBrowser = true, + }) : realtime = RealtimeAPI( + url: url, + apiKey: apiKey, + debug: debug, + dangerouslyAllowAPIKeyInBrowser: dangerouslyAllowAPIKeyInBrowser, + ), + conversation = RealtimeConversation() { + _resetConfig(); + _addApiEventHandlers(); + } + + /// The Realtime API instance. + final RealtimeAPI realtime; + + /// The Realtime conversation instance. + final RealtimeConversation conversation; + + /// The session configuration. + Map sessionConfig = {}; + + /// The tools added to the client. + final Map tools = {}; + + /// Whether the session has been created. + bool sessionCreated = false; + + /// The input audio buffer. + Uint8List inputAudioBuffer = Uint8List(0); + + void _resetConfig() { + sessionCreated = false; + tools.clear(); + sessionConfig = { + 'modalities': ['text', 'audio'], + 'instructions': '', + 'voice': 'alloy', + 'input_audio_format': 'pcm16', + 'output_audio_format': 'pcm16', + 'input_audio_transcription': null, + 'turn_detection': null, + 'tools': [], + 'tool_choice': 'auto', + 'temperature': 0.8, + 'max_response_output_tokens': 4096, + }; + inputAudioBuffer = Uint8List(0); + } + + void _addApiEventHandlers() { + realtime + ..on('client.*', (event) { + dispatch('realtime.event', { + 'time': DateTime.now().toIso8601String(), + 'source': 'client', + 'event': event, + }); + }) + ..on('server.*', (event) { + dispatch('realtime.event', { + 'time': DateTime.now().toIso8601String(), + 'source': 'server', + 'event': event, + }); + }) + ..on('server.session.created', (_) => sessionCreated = true); + + Future> handler( + Map? event, [ + dynamic args, + ]) async { + return conversation.processEvent(event, args); + } + + Future> handlerWithDispatch( + Map? event, [ + dynamic args, + ]) async { + final result = await handler(event, args); + if (result['item'] != null) { + dispatch('conversation.updated', result); + } + return result; + } + + realtime + ..on('server.response.created', handler) + ..on('server.response.output_item.added', handler) + ..on('server.response.content_part.added', handler) + ..on('server.input_audio_buffer.speech_started', (event) async { + await handler(event); + dispatch('conversation.interrupted', null); + }) + ..on( + 'server.input_audio_buffer.speech_stopped', + (event) async => handler(event, inputAudioBuffer), + ) + ..on('server.conversation.item.created', (event) async { + final result = await handlerWithDispatch(event); + final item = result['item'] as Map?; + dispatch('conversation.item.appended', {'item': item}); + if (item?['status'] == 'completed') { + dispatch('conversation.item.completed', {'item': item}); + } + }) + ..on('server.conversation.item.truncated', handlerWithDispatch) + ..on('server.conversation.item.deleted', handlerWithDispatch) + ..on( + 'server.conversation.item.input_audio_transcription.completed', + handlerWithDispatch, + ) + ..on('server.response.audio_transcript.delta', handlerWithDispatch) + ..on('server.response.audio.delta', handlerWithDispatch) + ..on('server.response.text.delta', handlerWithDispatch) + ..on( + 'server.response.function_call_arguments.delta', + handlerWithDispatch, + ) + ..on('server.response.output_item.done', (event) async { + final result = await handlerWithDispatch(event); + final item = result['item'] as Map?; + if (item?['status'] == 'completed') { + dispatch('conversation.item.completed', {'item': item}); + } + final formatted = item?['formatted'] as Map?; + if (formatted?['tool'] != null) { + await _callTool(formatted!['tool']); + } + }); + } + + Future _callTool(Map tool) async { + try { + final jsonArguments = tool['arguments'] as Map; + final toolConfig = tools[tool['name']] as Map?; + if (toolConfig == null) { + throw Exception('Tool "${tool['name']}" has not been added'); + } + final handler = toolConfig['handler'] as FutureOr> + Function(Map); + final result = await handler(jsonArguments); + realtime.send('conversation.item.create', { + 'item': { + 'type': 'function_call_output', + 'call_id': tool['call_id'], + 'output': result, + }, + }); + } catch (e) { + realtime.send('conversation.item.create', { + 'item': { + 'type': 'function_call_output', + 'call_id': tool['call_id'], + 'output': {'error': e.toString()}, + }, + }); + } + createResponse(); + } + + /// Tells us whether the realtime socket is connected and the session has + /// started. + bool isConnected() => realtime.isConnected(); + + /// Resets the client instance entirely: disconnects and clears active config. + Future reset() async { + await disconnect(); + clearEventHandlers(); + realtime.clearEventHandlers(); + _resetConfig(); + _addApiEventHandlers(); + } + + /// Connects to the Realtime WebSocket API. + /// Updates session config and conversation config. + Future connect() async { + if (isConnected()) { + throw Exception('Already connected, use .disconnect() first'); + } + final connected = await realtime.connect(); + if (connected) { + updateSession(); + } + return connected; + } + + /// Waits for a `session.created` event to be executed before proceeding. + Future waitForSessionCreated() async { + if (!isConnected()) { + throw Exception('Not connected, use .connect() first'); + } + while (!sessionCreated) { + await Future.delayed(const Duration(milliseconds: 50)); + } + return true; + } + + /// Disconnects from the Realtime API and clears the conversation history. + Future disconnect() async { + sessionCreated = false; + conversation.clear(); + if (realtime.isConnected()) { + await realtime.disconnect(); + } + } + + /// Gets the active turn detection mode. + String? getTurnDetectionType() { + final turnDetection = + sessionConfig['turn_detection'] as Map?; + return turnDetection?['type']; + } + + /// Add a tool and handler. + Map addTool( + Map definition, + FutureOr> Function(Map) handler, + ) { + if (definition['name'] == null) { + throw Exception('Missing tool name in definition'); + } + final name = definition['name']; + if (tools.containsKey(name)) { + throw Exception( + 'Tool "$name" already added. ' + 'Please use .removeTool("$name") before trying to add again.', + ); + } + tools[name] = {'definition': definition, 'handler': handler}; + updateSession(); + return tools[name]; + } + + /// Removes a tool. + bool removeTool(String name) { + if (!tools.containsKey(name)) { + throw Exception('Tool "$name" does not exist, can not be removed.'); + } + tools.remove(name); + return true; + } + + /// Deletes an item. + bool deleteItem(String id) { + realtime.send('conversation.item.delete', {'item_id': id}); + return true; + } + + /// Updates session configuration. + /// If the client is not yet connected, will save details and instantiate + /// upon connection. + void updateSession({ + List? modalities, + String? instructions, + String? voice, + String? inputAudioFormat, + String? outputAudioFormat, + Map? inputAudioTranscription, + Map? turnDetection, + List>? tools, + dynamic toolChoice, + double? temperature, + dynamic maxResponseOutputTokens, + }) { + if (modalities != null) sessionConfig['modalities'] = modalities; + if (instructions != null) sessionConfig['instructions'] = instructions; + if (voice != null) sessionConfig['voice'] = voice; + if (inputAudioFormat != null) { + sessionConfig['input_audio_format'] = inputAudioFormat; + } + if (outputAudioFormat != null) { + sessionConfig['output_audio_format'] = outputAudioFormat; + } + if (inputAudioTranscription != null) { + sessionConfig['input_audio_transcription'] = inputAudioTranscription; + } + if (turnDetection != null) sessionConfig['turn_detection'] = turnDetection; + if (tools != null) sessionConfig['tools'] = tools; + if (toolChoice != null) sessionConfig['tool_choice'] = toolChoice; + if (temperature != null) sessionConfig['temperature'] = temperature; + if (maxResponseOutputTokens != null) { + sessionConfig['max_response_output_tokens'] = maxResponseOutputTokens; + } + + // Load tools from tool definitions + already loaded tools + final useTools = [ + ...(tools ?? const []).map((toolDefinition) { + final definition = { + 'type': 'function', + ...toolDefinition, + }; + if (this.tools.containsKey(definition['name'])) { + throw Exception( + 'Tool "${definition['name']}" has already been defined', + ); + } + return definition; + }), + ...this.tools.values.map( + (tool) => { + 'type': 'function', + ...(tool as Map)['definition'], + }, + ), + ]; + + final session = Map.from(sessionConfig); + session['tools'] = useTools; + + if (realtime.isConnected()) { + realtime.send('session.update', {'session': session}); + } + } + + /// Sends user message content and generates a response. + bool sendUserMessageContent(List> content) { + if (content.isNotEmpty) { + for (final c in content) { + if (c['type'] == 'input_audio') { + if (c['audio'] is Uint8List) { + c['audio'] = base64.encode(c['audio'] as Uint8List); + } + } + } + realtime.send('conversation.item.create', { + 'item': { + 'type': 'message', + 'role': 'user', + 'content': content, + }, + }); + } + createResponse(); + return true; + } + + /// Appends user audio to the existing audio buffer. + bool appendInputAudio(Uint8List audioData) { + if (audioData.isNotEmpty) { + realtime.send('input_audio_buffer.append', { + 'audio': base64.encode(audioData), + }); + inputAudioBuffer = RealtimeUtils.mergeUint8Lists( + inputAudioBuffer, + audioData, + ); + } + return true; + } + + /// Forces a model response generation. + bool createResponse() { + if (getTurnDetectionType() == null && inputAudioBuffer.isNotEmpty) { + realtime.send('input_audio_buffer.commit'); + conversation.queueInputAudio(inputAudioBuffer); + inputAudioBuffer = Uint8List(0); + } + realtime.send('response.create'); + return true; + } + + /// Cancels the ongoing server generation and truncates ongoing generation, + /// if applicable. + /// If no id provided, will simply call `cancel_generation` command. + Map cancelResponse(String? id, [int sampleCount = 0]) { + if (id == null) { + realtime.send('response.cancel'); + return {'item': null}; + } else { + final item = conversation.getItem(id) as Map?; + if (item == null) { + throw Exception('Could not find item "$id"'); + } + final type = item['type'] as String; + final role = item['role'] as String; + + if (type != 'message') { + throw Exception('Can only cancelResponse messages with type "message"'); + } else if (role != 'assistant') { + throw Exception( + 'Can only cancelResponse messages with role "assistant"', + ); + } + realtime.send('response.cancel'); + + final content = item['content'] as List>; + final audioIndex = content.indexWhere((c) => c['type'] == 'audio'); + if (audioIndex == -1) { + throw Exception('Could not find audio on item to cancel'); + } + realtime.send('conversation.item.truncate', { + 'item_id': id, + 'content_index': audioIndex, + 'audio_end_ms': + (sampleCount / conversation.defaultFrequency * 1000).floor(), + }); + return {'item': item}; + } + } + + /// Utility for waiting for the next `conversation.item.appended` event to + /// be triggered by the server. + Future> waitForNextItem({ + Duration? timeout, + }) async { + final event = await waitForNext( + 'conversation.item.appended', + timeout: timeout, + ); + return {'item': event?['item']}; + } + + /// Utility for waiting for the next `conversation.item.completed` event to + /// be triggered by the server. + Future> waitForNextCompletedItem({ + Duration? timeout, + }) async { + final event = await waitForNext( + 'conversation.item.completed', + timeout: timeout, + ); + return {'item': event?['item']}; + } +} diff --git a/packages/openai_realtime_dart/lib/src/conversation.dart b/packages/openai_realtime_dart/lib/src/conversation.dart new file mode 100644 index 00000000..fb472a46 --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/conversation.dart @@ -0,0 +1,414 @@ +import 'dart:async'; +import 'dart:convert'; +import 'dart:typed_data'; +import 'utils.dart'; + +/// Stores a client-side cache of the current conversation and performs event +/// validation to make sure it can cache them properly. +class RealtimeConversation { + /// Create a new [RealtimeConversation] instance. + RealtimeConversation() { + _initializeEventProcessors(); + } + + /// Map of item ids to items. + final Map itemLookup = {}; + + /// List of items in the conversation. + final List items = []; + + /// Map of response ids to responses. + final Map responseLookup = {}; + + /// List of responses in the conversation. + final List responses = []; + + /// Map of queued speech items. + final Map queuedSpeechItems = {}; + + /// Map of queued transcript items. + final Map queuedTranscriptItems = {}; + + /// Queued input audio. + Uint8List? queuedInputAudio; + + /// Event processors for conversation events. + final Map< + String, + FutureOr> Function( + Map? event, [ + dynamic args, + ])> _eventProcessors = {}; + + /// Default frequency for audio. + final int defaultFrequency = 24000; // 24,000 Hz + + void _initializeEventProcessors() { + _eventProcessors['conversation.item.created'] = ( + Map? event, [ + dynamic args, + ]) { + final item = event?['item']; + final newItem = Map.from(item); + if (!itemLookup.containsKey(newItem['id'])) { + itemLookup[newItem['id']] = newItem; + items.add(newItem); + } + newItem['formatted'] = { + 'audio': Uint8List(0), + 'text': '', + 'transcript': '', + }; + // If we have a speech item, can populate audio + final newItemId = newItem['id'] as String; + final formatted = newItem['formatted'] as Map; + if (queuedSpeechItems.containsKey(newItemId)) { + final queuedSpeechItem = + queuedSpeechItems[newItemId] as Map; + formatted['audio'] = queuedSpeechItem['audio']; + queuedSpeechItems.remove(newItemId); + } + // Populate formatted text if it comes out on creation + final newItemContent = + (newItem['content'] as List?)?.cast>(); + if (newItemContent != null) { + final textContent = newItemContent + .where((c) => ['text', 'input_text'].contains(c['type'] as String)); + for (final content in textContent) { + formatted['text'] = '${formatted['text']}${content['text']}'; + } + } + // If we have a transcript item, can pre-populate transcript + if (queuedTranscriptItems.containsKey(newItem['id'])) { + final queuedTranscriptItem = + queuedTranscriptItems[newItemId] as Map; + formatted['transcript'] = queuedTranscriptItem['transcript']; + queuedTranscriptItems.remove(newItem['id']); + } + if (newItem['type'] == 'message') { + if (newItem['role'] == 'user') { + newItem['status'] = 'completed'; + if (queuedInputAudio != null) { + formatted['audio'] = queuedInputAudio; + queuedInputAudio = null; + } + } else { + newItem['status'] = 'in_progress'; + } + } else if (newItem['type'] == 'function_call') { + formatted['tool'] = { + 'type': 'function', + 'name': newItem['name'], + 'call_id': newItem['call_id'], + 'arguments': '', + }; + newItem['status'] = 'in_progress'; + } else if (newItem['type'] == 'function_call_output') { + newItem['status'] = 'completed'; + formatted['output'] = newItem['output']; + } + return {'item': newItem, 'delta': null}; + }; + + _eventProcessors['conversation.item.truncated'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final audioEndMs = event?['audio_end_ms'] as int; + final item = itemLookup[itemId] as Map?; + if (item == null) { + throw Exception('item.truncated: Item "$itemId" not found'); + } + final formatted = item['formatted'] as Map; + final endIndex = audioEndMs * defaultFrequency ~/ 1000; + formatted['transcript'] = ''; + formatted['audio'] = + (formatted['audio'] as Uint8List).sublist(0, endIndex); + return {'item': item, 'delta': null}; + }; + + _eventProcessors['conversation.item.deleted'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final item = itemLookup[itemId]; + if (item == null) { + throw Exception('item.deleted: Item "$itemId" not found'); + } + itemLookup.remove(itemId); + items.remove(item); + return {'item': item, 'delta': null}; + }; + + _eventProcessors['conversation.item.input_audio_transcription.completed'] = + ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final contentIndex = event?['content_index']; + final transcript = event?['transcript']; + final item = itemLookup[itemId] as Map?; + // We use a single space to represent an empty transcript for .formatted values + // Otherwise it looks like no transcript provided + final formattedTranscript = transcript ?? ' '; + if (item == null) { + // We can receive transcripts in VAD mode before item.created + // This happens specifically when audio is empty + queuedTranscriptItems[itemId] = {'transcript': formattedTranscript}; + return {'item': null, 'delta': null}; + } else { + final itemContent = item['content'] as List>; + final formatted = item['formatted'] as Map; + itemContent[contentIndex]['transcript'] = transcript; + formatted['transcript'] = formattedTranscript; + return { + 'item': item, + 'delta': {'transcript': transcript}, + }; + } + }; + + _eventProcessors['input_audio_buffer.speech_started'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final audioStartMs = event?['audio_start_ms']; + queuedSpeechItems[itemId] = {'audio_start_ms': audioStartMs}; + return {'item': null, 'delta': null}; + }; + + _eventProcessors['input_audio_buffer.speech_stopped'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final audioEndMs = event?['audio_end_ms']; + final speech = queuedSpeechItems[itemId] as Map; + speech['audio_end_ms'] = audioEndMs; + if (args != null) { + final inputAudioBuffer = args as Uint8List; + final speechAudioStartMs = speech['audio_start_ms'] as int; + final speechAudioEndMs = speech['audio_end_ms'] as int; + final startIndex = speechAudioStartMs * defaultFrequency ~/ 1000; + final endIndex = speechAudioEndMs * defaultFrequency ~/ 1000; + speech['audio'] = inputAudioBuffer.sublist(startIndex, endIndex); + } + return {'item': null, 'delta': null}; + }; + + _eventProcessors['response.created'] = ( + Map? event, [ + dynamic args, + ]) { + final response = event?['response'] as Map?; + final responseId = response?['id'] as String? ?? ''; + if (!responseLookup.containsKey(responseId)) { + responseLookup[responseId] = response; + responses.add(response); + } + return {'item': null, 'delta': null}; + }; + + _eventProcessors['response.output_item.added'] = ( + Map? event, [ + dynamic args, + ]) { + final responseId = event?['response_id']; + final item = event?['item'] as Map?; + final itemId = item?['id'] as String? ?? ''; + final response = responseLookup[responseId] as Map?; + if (response == null) { + throw Exception( + 'response.output_item.added: Response "$responseId" not found', + ); + } + (response['output'] as List).add(itemId); + return {'item': null, 'delta': null}; + }; + + _eventProcessors['response.output_item.done'] = ( + Map? event, [ + dynamic args, + ]) { + final item = event?['item'] as Map?; + if (item == null) { + throw Exception('response.output_item.done: Missing "item"'); + } + final itemId = item['id'] as String; + final foundItem = itemLookup[itemId] as Map?; + if (foundItem == null) { + throw Exception( + 'response.output_item.done: Item "${item['id']}" not found', + ); + } + foundItem['status'] = item['status']; + return {'item': foundItem, 'delta': null}; + }; + + _eventProcessors['response.content_part.added'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final part = event?['part']; + final item = itemLookup[itemId] as Map?; + if (item == null) { + throw Exception( + 'response.content_part.added: Item "$itemId" not found', + ); + } + (item['content'] as List).add(part); + return {'item': item, 'delta': null}; + }; + + _eventProcessors['response.audio_transcript.delta'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final contentIndex = event?['content_index']; + final delta = event?['delta']; + final item = itemLookup[itemId] as Map?; + if (item == null) { + throw Exception( + 'response.audio_transcript.delta: Item "$itemId" not found', + ); + } + final itemContent = + (item['content'] as List).cast>(); + final itemTranscript = itemContent[contentIndex]['transcript'] as String; + final formatted = item['formatted'] as Map; + final formattedTranscript = formatted['transcript'] as String; + itemContent[contentIndex]['transcript'] = itemTranscript + delta; + formatted['transcript'] = formattedTranscript + delta; + return { + 'item': item, + 'delta': {'transcript': delta}, + }; + }; + + _eventProcessors['response.audio.delta'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final delta = event?['delta']; + final item = itemLookup[itemId] as Map?; + if (item == null) { + throw Exception('response.audio.delta: Item "$itemId" not found'); + } + // This never gets rendered, we care about the file data instead + // item.content[content_index].audio += delta; + final arrayBuffer = base64.decode(delta); + final formatted = item['formatted'] as Map; + formatted['audio'] = RealtimeUtils.mergeUint8Lists( + formatted['audio'], + arrayBuffer, + ); + return { + 'item': item, + 'delta': {'audio': arrayBuffer}, + }; + }; + + _eventProcessors['response.text.delta'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final contentIndex = event?['content_index']; + final delta = event?['delta']; + final item = itemLookup[itemId] as Map?; + if (item == null) { + throw Exception('response.text.delta: Item "$itemId" not found'); + } + final itemContent = item['content'] as List>; + final itemText = itemContent[contentIndex]['text'] as String; + final formatted = item['formatted'] as Map; + final formattedText = formatted['text'] as String; + itemContent[contentIndex]['text'] = itemText + delta; + formatted['text'] = formattedText + delta; + return { + 'item': item, + 'delta': {'text': delta}, + }; + }; + + _eventProcessors['response.function_call_arguments.delta'] = ( + Map? event, [ + dynamic args, + ]) { + final itemId = event?['item_id']; + final delta = event?['delta']; + final item = itemLookup[itemId] as Map?; + if (item == null) { + throw Exception( + 'response.function_call_arguments.delta: Item "$itemId" not found', + ); + } + final arguments = item['arguments'] as String; + final formatted = item['formatted'] as Map; + final formattedTool = formatted['tool'] as Map; + final formattedToolArguments = formattedTool['arguments'] as String; + item['arguments'] = arguments + delta; + formattedTool['arguments'] = formattedToolArguments + delta; + return { + 'item': item, + 'delta': {'arguments': delta}, + }; + }; + } + + /// * Clears the conversation history and resets to default. + void clear() { + itemLookup.clear(); + items.clear(); + responseLookup.clear(); + responses.clear(); + queuedSpeechItems.clear(); + queuedTranscriptItems.clear(); + queuedInputAudio = null; + } + + /// Queue input audio for manual speech event. + Uint8List queueInputAudio(Uint8List inputAudio) { + queuedInputAudio = inputAudio; + return inputAudio; + } + + /// Process an event from the WebSocket server and compose items. + FutureOr> processEvent( + Map? event, [ + dynamic args, + ]) { + if (event?['event_id'] == null) { + throw Exception('Missing "event_id" on event'); + } + if (event?['type'] == null) { + throw Exception('Missing "type" on event'); + } + + final eventProcessor = _eventProcessors[event?['type']]; + if (eventProcessor == null) { + throw Exception( + 'Missing conversation event processor for "${event?['type']}"', + ); + } + + return eventProcessor(event, args); + } + + /// Retrieves a item by id. + dynamic getItem(String id) { + return itemLookup[id]; + } + + /// Retrieves all items in the conversation. + List getItems() { + return List.from(items); + } +} diff --git a/packages/openai_realtime_dart/lib/src/event_handler.dart b/packages/openai_realtime_dart/lib/src/event_handler.dart new file mode 100644 index 00000000..458eabb6 --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/event_handler.dart @@ -0,0 +1,85 @@ +import 'dart:async'; + +/// EventHandler callback. +typedef EventHandlerCallback = FutureOr Function( + Map? event, +); + +/// Inherited class for RealtimeAPI and RealtimeClient. +/// Adds basic event handling. +class RealtimeEventHandler { + final Map> _eventHandlers = {}; + final Map> _nextEventHandlers = {}; + + /// Listen to specific events. + /// - [eventName] The name of the event to listen to. + /// - [callback] The callback function to call when the event is received. + void on(String eventName, EventHandlerCallback callback) { + _eventHandlers.putIfAbsent(eventName, () => []).add(callback); + } + + /// Listen for the next event of a specified type. + /// - [eventName] The name of the event to listen to. + /// - [callback] The callback function to call when the event is received. + void onNext(String eventName, EventHandlerCallback callback) { + _nextEventHandlers.putIfAbsent(eventName, () => []).add(callback); + } + + /// Turns off event listening for specific events. + /// Calling without a callback will remove all listeners for the event. + /// - [eventName] The name of the event to stop listening to. + /// - [callback] The callback function to remove. + void off(String eventName, [EventHandlerCallback? callback]) { + if (callback == null) { + _eventHandlers.remove(eventName); + } else { + _eventHandlers[eventName]?.remove(callback); + } + } + + /// Turns off event listening for the next event of a specific type. + /// Calling without a callback will remove all listeners for the next event. + /// - [eventName] The name of the event to stop listening to. + /// - [callback] The callback function to remove. + void offNext(String eventName, [EventHandlerCallback? callback]) { + if (callback == null) { + _nextEventHandlers.remove(eventName); + } else { + _nextEventHandlers[eventName]?.remove(callback); + } + } + + /// Waits for next event of a specific type and returns the payload. + /// - [eventName] The name of the event to wait for. + /// - [timeout] The maximum time to wait for the event. + Future?> waitForNext( + String eventName, { + Duration? timeout, + }) async { + final completer = Completer?>(); + + onNext(eventName, completer.complete); + + if (timeout != null) { + return completer.future.timeout(timeout, onTimeout: () => null); + } + + return completer.future; + } + + /// Executes all events in the order they were added, with [on] event + /// handlers executing before[onNext] handlers. + /// - [eventName] The name of the event to dispatch. + /// - [event] The event payload to pass to the handlers. + void dispatch(String eventName, Map? event) { + _eventHandlers[eventName]?.forEach((handler) async => handler(event)); + _nextEventHandlers[eventName]?.forEach((handler) async => handler(event)); + _nextEventHandlers.remove(eventName); + } + + /// Clears all event handlers. + void clearEventHandlers() { + _eventHandlers.clear(); + _nextEventHandlers.clear(); + } +} diff --git a/packages/openai_realtime_dart/lib/src/utils.dart b/packages/openai_realtime_dart/lib/src/utils.dart new file mode 100644 index 00000000..6a2bac9a --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/utils.dart @@ -0,0 +1,22 @@ +// ignore_for_file: public_member_api_docs, cascade_invocations +import 'dart:math'; +import 'dart:typed_data'; + +class RealtimeUtils { + static Uint8List mergeUint8Lists(Uint8List left, Uint8List right) { + final result = Uint8List(left.length + right.length); + result.setRange(0, left.length, left); + result.setRange(left.length, result.length, right); + return result; + } + + static String generateId(String prefix, {int length = 21}) { + const chars = '123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz'; + final random = Random(); + final str = List.generate( + length - prefix.length, + (_) => chars[random.nextInt(chars.length)], + ).join(''); + return '$prefix$str'; + } +} diff --git a/packages/openai_realtime_dart/lib/src/web_socket/web_socket.dart b/packages/openai_realtime_dart/lib/src/web_socket/web_socket.dart new file mode 100644 index 00000000..63e1c56f --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/web_socket/web_socket.dart @@ -0,0 +1,3 @@ +export 'web_socket_stub.dart' + if (dart.library.io) 'web_socket_io.dart' + if (dart.library.js_interop) 'web_socket_html.dart'; diff --git a/packages/openai_realtime_dart/lib/src/web_socket/web_socket_html.dart b/packages/openai_realtime_dart/lib/src/web_socket/web_socket_html.dart new file mode 100644 index 00000000..f776ba3f --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/web_socket/web_socket_html.dart @@ -0,0 +1,14 @@ +import 'package:web_socket_channel/html.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +/// Creates a new WebSocket connection using the `dart:html` implementation. +WebSocketChannel connectWebSocket(Uri uri, String? apiKey) { + return HtmlWebSocketChannel.connect( + uri, + protocols: [ + 'realtime', + if (apiKey != null) 'openai-insecure-api-key.$apiKey', + 'openai-beta.realtime-v1', + ], + ); +} diff --git a/packages/openai_realtime_dart/lib/src/web_socket/web_socket_io.dart b/packages/openai_realtime_dart/lib/src/web_socket/web_socket_io.dart new file mode 100644 index 00000000..fcd8eea8 --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/web_socket/web_socket_io.dart @@ -0,0 +1,13 @@ +import 'package:web_socket_channel/io.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; + +/// Creates a new WebSocket connection using the `dart:io` implementation. +WebSocketChannel connectWebSocket(Uri uri, String? apiKey) { + return IOWebSocketChannel.connect( + uri, + headers: { + if (apiKey != null) 'Authorization': 'Bearer $apiKey', + 'OpenAI-Beta': 'realtime=v1', + }, + ); +} diff --git a/packages/openai_realtime_dart/lib/src/web_socket/web_socket_stub.dart b/packages/openai_realtime_dart/lib/src/web_socket/web_socket_stub.dart new file mode 100644 index 00000000..64573fff --- /dev/null +++ b/packages/openai_realtime_dart/lib/src/web_socket/web_socket_stub.dart @@ -0,0 +1,7 @@ +import 'package:web_socket_channel/web_socket_channel.dart'; + +/// Creates a new WebSocket connection. +WebSocketChannel connectWebSocket(Uri uri, String? apiKey) => + throw UnsupportedError( + 'Cannot create a WebSocket connection without dart:html or dart:io.', + ); diff --git a/packages/openai_realtime_dart/pubspec.yaml b/packages/openai_realtime_dart/pubspec.yaml new file mode 100644 index 00000000..d5c711ca --- /dev/null +++ b/packages/openai_realtime_dart/pubspec.yaml @@ -0,0 +1,26 @@ +name: openai_realtime_dart +description: Dart client for the OpenAI Realtime API (beta). +version: 0.0.1-dev.1 +repository: https://github.com/davidmigloz/langchain_dart/tree/main/packages/openai_realtime_dart +issue_tracker: https://github.com/davidmigloz/langchain_dart/issues?q=label:p:openai_realtime_dart +homepage: https://github.com/davidmigloz/langchain_dart +documentation: https://langchaindart.dev + +topics: + - ai + - llms + - openai + +environment: + sdk: ">=3.4.0 <4.0.0" + +dependencies: + http: ^1.2.2 + json_annotation: ^4.9.0 + logging: ^1.2.0 + web_socket_channel: ^3.0.1 + +dev_dependencies: + test: ^1.25.8 + json_serializable: ^6.8.0 + build_runner: ^2.4.11 diff --git a/packages/openai_realtime_dart/test/api_test.dart b/packages/openai_realtime_dart/test/api_test.dart new file mode 100644 index 00000000..29a0fad9 --- /dev/null +++ b/packages/openai_realtime_dart/test/api_test.dart @@ -0,0 +1,105 @@ +import 'dart:io'; + +import 'package:openai_realtime_dart/openai_realtime_dart.dart'; +import 'package:test/test.dart'; + +void main() { + group('RealtimeAPI Tests', () { + late RealtimeAPI realtime; + + setUp(() { + realtime = RealtimeAPI( + apiKey: Platform.environment['OPENAI_API_KEY'], + debug: false, + ); + }); + + tearDown(() async { + if (realtime.isConnected()) { + await realtime.disconnect(); + } + }); + + test('Should instantiate the RealtimeAPI with no apiKey', () { + final apiWithoutKey = RealtimeAPI(debug: false); + expect(apiWithoutKey, isNotNull); + expect(apiWithoutKey.apiKey, isNull); + }); + + test('Should fail to connect to the RealtimeAPI with no apiKey', () async { + final apiWithoutKey = RealtimeAPI(debug: false); + await apiWithoutKey.connect(); + + final event = await apiWithoutKey.waitForNext( + 'server.error', + timeout: const Duration(seconds: 1), + ); + + expect(event, isNotNull); + expect(event?['error'], isNotNull); + final error = event?['error'] as Map?; + expect(error?['message'], contains('Missing bearer')); + }); + + test('Should instantiate the RealtimeAPI', () { + expect(realtime, isNotNull); + expect(realtime.apiKey, equals(Platform.environment['OPENAI_API_KEY'])); + }); + + test('Should connect to the RealtimeAPI', () async { + final isConnected = await realtime.connect(); + + expect(isConnected, isTrue); + expect(realtime.isConnected(), isTrue); + }); + + test('Should close the RealtimeAPI connection', () async { + await realtime.connect(); + await realtime.disconnect(); + + expect(realtime.isConnected(), isFalse); + }); + + test('Should handle multiple connections and disconnections', () async { + for (int i = 0; i < 3; i++) { + final isConnected = await realtime.connect(); + expect(isConnected, isTrue, reason: 'Connection $i failed'); + expect(realtime.isConnected(), isTrue, reason: 'Connection $i failed'); + + await realtime.disconnect(); + expect( + realtime.isConnected(), + isFalse, + reason: 'Disconnection $i failed', + ); + } + }); + + test('Should receive server events', () async { + await realtime.connect(); + + final event = await realtime.waitForNext('server.session.created'); + expect(event, isNotNull); + expect(event?['session'], isNotNull); + final session = event?['session'] as Map; + expect(session['id'], isNotNull); + }); + + test('Should receive server error events', () async { + await realtime.connect(); + + final testEvent = { + 'type': 'test_event', + 'data': {'message': 'Hello, World!'}, + }; + + // Set up the listener before sending the event + final eventFuture = realtime.waitForNext('server.error'); + + realtime.send('test_event', testEvent['data'] as Map?); + + final receivedEvent = await eventFuture; + expect(receivedEvent, isNotNull); + }); + }); +} diff --git a/packages/openai_realtime_dart/test/audio_test.dart b/packages/openai_realtime_dart/test/audio_test.dart new file mode 100644 index 00000000..f1163f66 --- /dev/null +++ b/packages/openai_realtime_dart/test/audio_test.dart @@ -0,0 +1,113 @@ +import 'dart:convert'; +import 'dart:io'; +import 'dart:typed_data'; +import 'package:openai_realtime_dart/openai_realtime_dart.dart'; +import 'package:test/test.dart'; +import 'utils.dart'; + +void main() { + group('Audio samples tests', timeout: const Timeout(Duration(minutes: 5)), + () { + test('Toronto Audio sample test', () async { + // Should connect to the RealtimeClient + final realtimeEvents = ?>[]; + final client = RealtimeClient( + apiKey: Platform.environment['OPENAI_API_KEY'], + debug: false, + ) + ..updateSession( + instructions: + 'Please follow the instructions of any query you receive.\n' + 'Be concise in your responses. Speak quickly and answer shortly.', + ) + ..on('realtime.event', realtimeEvents.add); + + final isConnected = await client.connect(); + expect(isConnected, isTrue); + expect(client.isConnected(), isTrue); + + // Should receive "session.created" and send "session.update" + await client.waitForSessionCreated(); + + expect(realtimeEvents.length, equals(2)); + + final clientEvent1 = realtimeEvents[0]; + expect(clientEvent1?['source'], equals('client')); + final clientEventData1 = clientEvent1?['event'] as Map?; + expect(clientEventData1?['type'], equals('session.update')); + + final serverEvent1 = realtimeEvents[1]; + expect(serverEvent1?['source'], equals('server')); + final serverEventData1 = serverEvent1?['event'] as Map?; + expect(serverEventData1?['type'], equals('session.created')); + + final session = serverEventData1?['session'] as Map?; + expect(session?['id'], isNotNull); + + // Should load audio samples + final audioData = await readSampleAudioFile('toronto.pcm'); + expect(audioData, isNotNull); + expect(audioData, isA()); + expect(audioData.isNotEmpty, isTrue); + + // Should send an audio file about toronto + final audioDataBase64 = base64.encode(audioData); + final content = [ + {'type': 'input_audio', 'audio': audioDataBase64}, + ]; + + client.sendUserMessageContent(content); + expect(realtimeEvents.length, equals(4)); + + final itemEvent = realtimeEvents[2]; + expect(itemEvent?['source'], equals('client')); + final itemEventData = itemEvent?['event'] as Map?; + expect(itemEventData?['type'], equals('conversation.item.create')); + + final responseEvent = realtimeEvents[3]; + expect(responseEvent, isNotNull); + expect(responseEvent?['source'], equals('client')); + final responseEventData = + responseEvent?['event'] as Map?; + expect(responseEventData?['type'], equals('response.create')); + + // Should waitForNextItem to receive "conversation.item.created" from user + final result1 = await client.waitForNextItem(); + final item1 = result1['item'] as Map?; + + expect(item1, isNotNull); + expect(item1!['type'], equals('message')); + expect(item1['role'], equals('user')); + expect(item1['status'], equals('completed')); + final item1Formatted = item1['formatted'] as Map?; + expect(item1Formatted?['text'], equals('')); + + // Should waitForNextItem to receive "conversation.item.created" from assistant + final result2 = await client.waitForNextItem(); + final item2 = result2['item'] as Map?; + + expect(item2, isNotNull); + expect(item2!['type'], equals('message')); + expect(item2['role'], equals('assistant')); + expect(item2['status'], equals('in_progress')); + final item2Formatted = item2['formatted'] as Map?; + expect(item2Formatted?['text'], equals('')); + + // Should waitForNextCompletedItem to receive completed item from assistant + final result3 = await client.waitForNextCompletedItem(); + final item3 = result3['item'] as Map?; + + expect(item3, isNotNull); + expect(item3!['type'], equals('message')); + expect(item3['role'], equals('assistant')); + expect(item3['status'], equals('completed')); + final item3Formatted = item3['formatted'] as Map?; + final item3Transcript = item3Formatted?['transcript'] as String?; + expect(item3Transcript?.toLowerCase(), contains('toronto')); + + // Should disconnect from the RealtimeClient + await client.disconnect(); + expect(client.isConnected(), isFalse); + }); + }); +} diff --git a/packages/openai_realtime_dart/test/client_test.dart b/packages/openai_realtime_dart/test/client_test.dart new file mode 100644 index 00000000..fde01031 --- /dev/null +++ b/packages/openai_realtime_dart/test/client_test.dart @@ -0,0 +1,111 @@ +// ignore_for_file: avoid_print + +import 'dart:io'; + +import 'package:openai_realtime_dart/openai_realtime_dart.dart'; +import 'package:test/test.dart'; + +void main() { + group('RealtimeClient Tests', () { + test('RealtimeClient test', () async { + // Should instantiate the RealtimeClient + final realtimeEvents = ?>[]; + final client = RealtimeClient( + apiKey: Platform.environment['OPENAI_API_KEY'], + debug: false, + ) + ..updateSession( + instructions: 'You always, ALWAYS reference San Francisco ' + 'by name in every response. Always include the phrase "San Francisco". ' + 'This is for testing so stick to it!', + ) + ..on('realtime.event', realtimeEvents.add); + expect(client, isNotNull); + expect(client.realtime, isNotNull); + expect(client.conversation, isNotNull); + expect( + client.realtime.apiKey, + equals(Platform.environment['OPENAI_API_KEY']), + ); + + // Should connect to the RealtimeClient + final isConnected = await client.connect(); + + expect(isConnected, isTrue); + expect(client.isConnected(), isTrue); + + // Should receive "session.created" and send "session.update" + await client.waitForSessionCreated(); + expect(realtimeEvents.length, equals(2)); + + final clientEvent1 = realtimeEvents[0]; + expect(clientEvent1?['source'], equals('client')); + final clientEvent1Event = clientEvent1?['event'] as Map?; + expect(clientEvent1Event?['type'], equals('session.update')); + + final serverEvent1 = realtimeEvents[1]; + expect(serverEvent1?['source'], equals('server')); + final serverEvent1Event = serverEvent1?['event'] as Map?; + expect(serverEvent1Event?['type'], equals('session.created')); + + final session = serverEvent1Event?['session'] as Map?; + expect(session?['id'], isNotNull); + + // Should send a simple hello message (text) + final content1 = [ + {'type': 'input_text', 'text': 'How are you?'}, + ]; + client.sendUserMessageContent(content1); + + expect(realtimeEvents.length, equals(4)); + + final itemEvent = realtimeEvents[2]; + expect(itemEvent?['source'], equals('client')); + final event = itemEvent?['event'] as Map?; + expect(event?['type'], equals('conversation.item.create')); + + final responseEvent = realtimeEvents[3]; + expect(responseEvent?['source'], equals('client')); + final response = responseEvent?['event'] as Map?; + expect(response?['type'], equals('response.create')); + + // Should receive "conversation.item.created" from user + final userItem = await client.waitForNextItem(); + + expect(userItem['item'], isNotNull); + final userItemItem = userItem['item'] as Map; + expect(userItemItem['type'], equals('message')); + expect(userItemItem['role'], equals('user')); + expect(userItemItem['status'], equals('completed')); + final formatted1 = userItemItem['formatted'] as Map; + expect(formatted1['text'], equals('How are you?')); + + // Should receive "conversation.item.created" from assistant + final assistantItem = await client.waitForNextItem(); + + expect(assistantItem['item'], isNotNull); + final assistantItemItem = assistantItem['item'] as Map; + expect(assistantItemItem['type'], equals('message')); + expect(assistantItemItem['role'], equals('assistant')); + expect(assistantItemItem['status'], equals('in_progress')); + final formatted2 = assistantItemItem['formatted'] as Map; + expect(formatted2['text'], isEmpty); + + // Should receive completed item from assistant + final completedItem = await client.waitForNextCompletedItem(); + + expect(completedItem['item'], isNotNull); + final completedItemItem = completedItem['item'] as Map; + expect(completedItemItem['type'], equals('message')); + expect(completedItemItem['role'], equals('assistant')); + expect(completedItemItem['status'], equals('completed')); + final formatted = completedItemItem['formatted'] as Map; + final transcript = formatted['transcript'] as String; + expect(transcript.toLowerCase(), contains('san francisco')); + + // Should close the RealtimeClient connection + await client.disconnect(); + expect(client.isConnected(), isFalse); + }); + }); +} diff --git a/packages/openai_realtime_dart/test/samples/toronto.pcm b/packages/openai_realtime_dart/test/samples/toronto.pcm new file mode 100644 index 00000000..251d93d9 Binary files /dev/null and b/packages/openai_realtime_dart/test/samples/toronto.pcm differ diff --git a/packages/openai_realtime_dart/test/utils.dart b/packages/openai_realtime_dart/test/utils.dart new file mode 100644 index 00000000..b1ff8fe1 --- /dev/null +++ b/packages/openai_realtime_dart/test/utils.dart @@ -0,0 +1,9 @@ +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +/// Reads an audio file and returns its content as a Uint8List +Future readSampleAudioFile(String fileName) async { + final file = File('./test/samples/$fileName'); + return file.readAsBytes(); +}