diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..7d2478a --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,81 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/), +and this project adheres to [Semantic Versioning](https://semver.org/). + +## [0.1.0] - 2025-07-15 + +### Added + +- **Conversation history limits** — new `history` option with `maxMessages` (default 100) + and `maxTotalChars` (default unlimited) to prevent unbounded memory growth. + Oldest messages are trimmed in pairs to preserve user/assistant turn structure. + Emits `history_trimmed` event when messages are evicted. +- **Audio input size validation** — new `maxAudioInputSize` option (default 10 MB). + Oversized or empty audio payloads are rejected early with an `error` / `warning` event + instead of being forwarded to the transcription model. +- **Serial input queue** — `sendText()`, WebSocket `transcript` messages, and + transcribed audio are now queued and processed one at a time. This prevents + race conditions where concurrent calls could corrupt `conversationHistory` or + interleave streaming output. +- **LLM stream cancellation** — an `AbortController` is now threaded into + `streamText()` via `abortSignal`. Barge-in, disconnect, and explicit + interrupts abort the LLM stream immediately (saving tokens) instead of only + cancelling TTS. +- **`interruptCurrentResponse(reason)`** — new public method that aborts both + the LLM stream *and* ongoing speech in a single call. WebSocket barge-in + (`transcript` / `audio` / `interrupt` messages) now uses this instead of + `interruptSpeech()` alone. +- **`destroy()`** — permanently tears down the agent, releasing the socket, + clearing history and tools, and removing all event listeners. + A `destroyed` getter is also exposed. Any subsequent method call throws. +- **`history_trimmed` event** — emitted with `{ removedCount, reason }` when + the sliding-window trims old messages. +- **Input validation** — `sendText("")` now throws, and incoming WebSocket + `transcript` / `audio` messages are validated before processing. + +### Changed + +- **`disconnect()` is now a full cleanup** — aborts in-flight LLM and TTS + streams, clears the speech queue, rejects pending queued inputs, and removes + socket listeners before closing. Previously it only called `socket.close()`. +- **`connect()` and `handleSocket()` are idempotent** — calling either when a + socket is already attached will cleanly tear down the old connection first + instead of leaking it. +- **`sendWebSocketMessage()` is resilient** — checks `socket.readyState` and + wraps `send()` in a try/catch so a socket that closes mid-send does not throw + an unhandled exception. +- **Speech queue completion uses a promise** — `processUserInput` now awaits a + `speechQueueDonePromise` instead of busy-wait polling + (`while (queue.length) { await sleep(100) }`), reducing CPU waste and + eliminating a race window. +- **`interruptSpeech()` resolves the speech-done promise** — so + `processUserInput` can proceed immediately after a barge-in instead of + potentially hanging. +- **WebSocket message handler uses `if/else if`** — prevents a single message + from accidentally matching multiple type branches. +- **Chunk ID wraps at `Number.MAX_SAFE_INTEGER`** — avoids unbounded counter + growth in very long-running sessions. +- **`processUserInput` catch block cleans up speech state** — on stream error + the pending text buffer is cleared and any in-progress speech is interrupted, + so the agent does not get stuck in a broken state. +- **WebSocket close handler calls `cleanupOnDisconnect()`** — aborts LLM + TTS, + clears queues, and rejects pending input promises. + +### Fixed + +- Typo in JSDoc: `"Process text deltra"` → `"Process text delta"`. + +## [0.0.1] - 2025-07-14 + +### Added + +- Initial release. +- Streaming text generation via AI SDK `streamText`. +- Multi-step tool calling with `stopWhen`. +- Chunked streaming TTS with parallel generation and barge-in support. +- Audio transcription via AI SDK `experimental_transcribe`. +- WebSocket transport with full stream/tool/speech lifecycle events. +- Browser voice client example (`example/voice-client.html`). diff --git a/README.md b/README.md index 11e6d6c..0bb7659 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,18 @@ # voice-agent-ai-sdk -Streaming voice/text agent SDK built on AI SDK with optional WebSocket transport. +Streaming voice/text agent SDK built on [AI SDK](https://sdk.vercel.ai/) with optional WebSocket transport. -## Current status +## Features -- Streaming text generation is implemented via `streamText`. -- Tool calling is supported in-stream. -- Speech synthesis is implemented with chunked streaming TTS. -- Audio transcription is supported (when `transcriptionModel` is configured). -- WebSocket protocol events are emitted for stream, tool, and speech lifecycle. +- **Streaming text generation** via AI SDK `streamText` with multi-step tool calling. +- **Chunked streaming TTS** — text is split at sentence boundaries and converted to speech in parallel as the LLM streams, giving low time-to-first-audio. +- **Audio transcription** via AI SDK `experimental_transcribe` (e.g. Whisper). +- **Barge-in / interruption** — user speech cancels both the in-flight LLM stream and pending TTS, saving tokens and latency. +- **Memory management** — configurable sliding-window on conversation history (`maxMessages`, `maxTotalChars`) and audio input size limits. +- **Serial request queue** — concurrent `sendText` / audio inputs are queued and processed one at a time, preventing race conditions. +- **Graceful lifecycle** — `disconnect()` aborts all in-flight work; `destroy()` permanently releases every resource. +- **WebSocket transport** with a full protocol of stream, tool, and speech lifecycle events. +- **Works without WebSocket** — call `sendText()` directly for text-only or server-side use. ## Prerequisites @@ -60,6 +64,12 @@ const agent = new VoiceAgent({ parallelGeneration: true, maxParallelRequests: 2, }, + // Memory management (new in 0.1.0) + history: { + maxMessages: 50, // keep last 50 messages + maxTotalChars: 100_000, // or trim when total chars exceed 100k + }, + maxAudioInputSize: 5 * 1024 * 1024, // 5 MB limit endpoint: process.env.VOICE_WS_ENDPOINT, tools: { getWeather: weatherTool }, }); @@ -86,40 +96,87 @@ if (process.env.VOICE_WS_ENDPOINT) { The agent accepts: -- `model` (required): chat model -- `transcriptionModel` (optional): STT model -- `speechModel` (optional): TTS model -- `instructions` (optional): system prompt -- `stopWhen` (optional): stopping condition -- `tools` (optional): AI SDK tools map -- `endpoint` (optional): WebSocket endpoint -- `voice` (optional): TTS voice, default `alloy` -- `speechInstructions` (optional): style instructions for TTS -- `outputFormat` (optional): audio format, default `mp3` -- `streamingSpeech` (optional): - - `minChunkSize` - - `maxChunkSize` - - `parallelGeneration` - - `maxParallelRequests` +| Option | Required | Default | Description | +|---|---|---|---| +| `model` | **yes** | — | AI SDK chat model (e.g. `openai("gpt-4o")`) | +| `transcriptionModel` | no | — | AI SDK transcription model (e.g. `openai.transcription("whisper-1")`) | +| `speechModel` | no | — | AI SDK speech model (e.g. `openai.speech("gpt-4o-mini-tts")`) | +| `instructions` | no | `"You are a helpful voice assistant."` | System prompt | +| `stopWhen` | no | `stepCountIs(5)` | Stopping condition for multi-step tool loops | +| `tools` | no | `{}` | AI SDK tools map | +| `endpoint` | no | — | Default WebSocket URL for `connect()` | +| `voice` | no | `"alloy"` | TTS voice | +| `speechInstructions` | no | — | Style instructions passed to the speech model | +| `outputFormat` | no | `"mp3"` | Audio output format (`mp3`, `opus`, `wav`, …) | +| `streamingSpeech` | no | see below | Streaming TTS chunk tuning | +| `history` | no | see below | Conversation memory limits | +| `maxAudioInputSize` | no | `10485760` (10 MB) | Maximum accepted audio input in bytes | -### Common methods +#### `streamingSpeech` -- `sendText(text)` – process text input (streamed response) -- `sendAudio(base64Audio)` – process base64 audio input -- `sendAudioBuffer(buffer)` – process raw audio buffer input -- `transcribeAudio(buffer)` – transcribe audio directly -- `generateAndSendSpeechFull(text)` – non-streaming TTS fallback -- `interruptSpeech(reason)` – interrupt streaming speech (barge‑in) -- `connect(url?)` / `handleSocket(ws)` – WebSocket usage +| Key | Default | Description | +|---|---|---| +| `minChunkSize` | `50` | Min characters before a sentence is sent to TTS | +| `maxChunkSize` | `200` | Max characters per chunk (force-split at clause boundary) | +| `parallelGeneration` | `true` | Start TTS for upcoming chunks while the current one plays | +| `maxParallelRequests` | `3` | Cap on concurrent TTS requests | -### Key events (from demo) +#### `history` -- `text` – user/assistant messages -- `chunk:text_delta` – streaming text deltas -- `chunk:tool_call` / `tool_result` – tool lifecycle -- `speech_start` / `speech_complete` / `speech_interrupted` -- `speech_chunk_queued` / `audio_chunk` / `audio` -- `connected` / `disconnected` +| Key | Default | Description | +|---|---|---| +| `maxMessages` | `100` | Max messages kept in history (0 = unlimited). Oldest are trimmed in pairs. | +| `maxTotalChars` | `0` (unlimited) | Max total characters across all messages. Oldest are trimmed when exceeded. | + +### Methods + +| Method | Description | +|---|---| +| `sendText(text)` | Process text input. Returns a promise with the full assistant response. Requests are queued serially. | +| `sendAudio(base64Audio)` | Transcribe base64 audio and process the result. | +| `sendAudioBuffer(buffer)` | Same as above, accepts a raw `Buffer` / `Uint8Array`. | +| `transcribeAudio(buffer)` | Transcribe audio to text without generating a response. | +| `generateAndSendSpeechFull(text)` | Non-streaming TTS fallback (entire text at once). | +| `interruptSpeech(reason?)` | Cancel in-flight TTS only (LLM stream keeps running). | +| `interruptCurrentResponse(reason?)` | Cancel **both** the LLM stream and TTS. Used for barge-in. | +| `connect(url?)` / `handleSocket(ws)` | Establish or attach a WebSocket. Safe to call multiple times. | +| `disconnect()` | Close the socket and abort all in-flight work. | +| `destroy()` | Permanently release all resources. The agent cannot be reused. | +| `clearHistory()` | Clear conversation history. | +| `getHistory()` / `setHistory(msgs)` | Read or restore conversation history. | +| `registerTools(tools)` | Merge additional tools into the agent. | + +### Read-only properties + +| Property | Type | Description | +|---|---|---| +| `connected` | `boolean` | Whether a WebSocket is connected | +| `processing` | `boolean` | Whether a request is currently being processed | +| `speaking` | `boolean` | Whether audio is currently being generated / sent | +| `pendingSpeechChunks` | `number` | Number of queued TTS chunks | +| `destroyed` | `boolean` | Whether `destroy()` has been called | + +### Events + +| Event | Payload | When | +|---|---|---| +| `text` | `{ role, text }` | User input received or full assistant response ready | +| `chunk:text_delta` | `{ id, text }` | Each streaming text token from the LLM | +| `chunk:reasoning_delta` | `{ id, text }` | Each reasoning token (models that support it) | +| `chunk:tool_call` | `{ toolName, toolCallId, input }` | Tool invocation detected | +| `tool_result` | `{ name, toolCallId, result }` | Tool execution finished | +| `speech_start` | `{ streaming }` | TTS generation begins | +| `speech_complete` | `{ streaming }` | All TTS chunks sent | +| `speech_interrupted` | `{ reason }` | Speech was cancelled (barge-in, disconnect, error) | +| `speech_chunk_queued` | `{ id, text }` | A text chunk entered the TTS queue | +| `audio_chunk` | `{ chunkId, data, format, text, uint8Array }` | One TTS chunk is ready | +| `audio` | `{ data, format, uint8Array }` | Full non-streaming TTS audio | +| `transcription` | `{ text, language }` | Audio transcription result | +| `audio_received` | `{ size }` | Raw audio input received (before transcription) | +| `history_trimmed` | `{ removedCount, reason }` | Oldest messages evicted from history | +| `connected` / `disconnected` | — | WebSocket lifecycle | +| `warning` | `string` | Non-fatal issues (empty input, etc.) | +| `error` | `Error` | Errors from LLM, TTS, transcription, or WebSocket | ## Run (text-only check) @@ -131,13 +188,13 @@ Expected logs include `text`, `chunk:text_delta`, tool events, and speech chunk ## Run (WebSocket check) -1. Start local WS server: +1. Start the local WS server: - pnpm ws:server + pnpm ws:server -2. In another terminal, run demo: +2. In another terminal, run the demo: - pnpm demo + pnpm demo The demo will: - run `sendText()` first (text-only sanity check), then diff --git a/example/ws-server-2.ts b/example/ws-server-2.ts index 9948a85..112ee0a 100644 --- a/example/ws-server-2.ts +++ b/example/ws-server-2.ts @@ -96,10 +96,16 @@ Use tools when needed to provide accurate information.`, console.log(`[ws-server] 🔊 Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`); }); + agent.on("history_trimmed", ({ removedCount, reason }: { removedCount: number; reason: string }) => { + console.log(`[ws-server] 🧹 History trimmed: removed ${removedCount} messages (${reason})`); + }); + agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message)); agent.on("disconnected", () => { - console.log("[ws-server] ✗ client disconnected\n"); + // Permanently release all agent resources for this connection + agent.destroy(); + console.log("[ws-server] ✗ client disconnected (agent destroyed)\n"); }); // Hand the accepted socket to the agent – this is the key line. diff --git a/example/ws-server.ts b/example/ws-server.ts index a8537f8..a362eaf 100644 --- a/example/ws-server.ts +++ b/example/ws-server.ts @@ -116,10 +116,16 @@ Use tools when needed to provide accurate information.`, console.log(`[ws-server] 🔊 Queued speech chunk #${id}: ${text.substring(0, 50)}...`); }); + agent.on("history_trimmed", ({ removedCount, reason }: { removedCount: number; reason: string }) => { + console.log(`[ws-server] 🧹 History trimmed: removed ${removedCount} messages (${reason})`); + }); + agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message)); agent.on("disconnected", () => { - console.log("[ws-server] ✗ client disconnected\n"); + // Permanently release all agent resources for this connection + agent.destroy(); + console.log("[ws-server] ✗ client disconnected (agent destroyed)\n"); }); // Hand the accepted socket to the agent – this is the key line. diff --git a/output.mp3 b/output.mp3 deleted file mode 100644 index f1fe13f..0000000 Binary files a/output.mp3 and /dev/null differ diff --git a/output_chunk_0.mp3 b/output_chunk_0.mp3 index 79dc113..50aaad1 100644 Binary files a/output_chunk_0.mp3 and b/output_chunk_0.mp3 differ diff --git a/package.json b/package.json index 7ea31fc..2bfe99e 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,13 @@ { "name": "voice-agent-ai-sdk", - "version": "0.0.1", + "version": "0.1.0", "description": "Voice AI Agent with ai-sdk", "main": "src/index.ts", "scripts": { "build": "tsc", "dev": "tsc -w", "demo": "tsx example/demo.ts", - "ws:server": "tsx example/ws-server.ts", + "ws:server": "tsx example/ws-server-2.ts", "prepublishOnly": "pnpm build" }, "keywords": [ diff --git a/src/VoiceAgent.ts b/src/VoiceAgent.ts index b447e66..003a8e3 100644 --- a/src/VoiceAgent.ts +++ b/src/VoiceAgent.ts @@ -35,6 +35,19 @@ interface StreamingSpeechConfig { maxParallelRequests: number; } +/** + * Configuration for conversation history memory management + */ +interface HistoryConfig { + /** Maximum number of messages to keep in history. When exceeded, oldest messages are trimmed. Set to 0 for unlimited. */ + maxMessages: number; + /** Maximum total character count across all messages. When exceeded, oldest messages are trimmed. Set to 0 for unlimited. */ + maxTotalChars: number; +} + +/** Default maximum audio input size (10 MB) */ +const DEFAULT_MAX_AUDIO_SIZE = 10 * 1024 * 1024; + export interface VoiceAgentOptions { model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o')) transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1')) @@ -48,6 +61,10 @@ export interface VoiceAgentOptions { outputFormat?: string; // Audio output format (e.g., 'mp3', 'opus', 'wav') /** Configuration for streaming speech generation */ streamingSpeech?: Partial; + /** Configuration for conversation history memory limits */ + history?: Partial; + /** Maximum audio input size in bytes (default: 10 MB) */ + maxAudioInputSize?: number; } export class VoiceAgent extends EventEmitter { @@ -65,6 +82,18 @@ export class VoiceAgent extends EventEmitter { private speechInstructions?: string; private outputFormat: string; private isProcessing = false; + private isDestroyed = false; + + // Concurrency: queue incoming requests so they run serially + private inputQueue: Array<{ text: string; resolve: (v: string) => void; reject: (e: unknown) => void }> = []; + private processingQueue = false; + + // Abort controller for the current LLM stream so we can cancel it on interrupt/disconnect + private currentStreamAbortController?: AbortController; + + // Memory management + private historyConfig: HistoryConfig; + private maxAudioInputSize: number; // Streaming speech state private streamingSpeechConfig: StreamingSpeechConfig; @@ -74,6 +103,10 @@ export class VoiceAgent extends EventEmitter { private isSpeaking = false; private pendingTextBuffer = ""; + // Promise-based signal for speech queue completion (replaces busy-wait polling) + private speechQueueDonePromise?: Promise; + private speechQueueDoneResolve?: () => void; + constructor(options: VoiceAgentOptions) { super(); this.model = options.model; @@ -86,6 +119,7 @@ export class VoiceAgent extends EventEmitter { this.voice = options.voice || "alloy"; this.speechInstructions = options.speechInstructions; this.outputFormat = options.outputFormat || "mp3"; + this.maxAudioInputSize = options.maxAudioInputSize ?? DEFAULT_MAX_AUDIO_SIZE; if (options.tools) { this.tools = { ...options.tools }; } @@ -98,6 +132,22 @@ export class VoiceAgent extends EventEmitter { maxParallelRequests: 3, ...options.streamingSpeech, }; + + // Initialize history config with defaults + this.historyConfig = { + maxMessages: 100, + maxTotalChars: 0, // unlimited by default + ...options.history, + }; + } + + /** + * Ensure the agent has not been destroyed. Throws if it has. + */ + private ensureNotDestroyed(): void { + if (this.isDestroyed) { + throw new Error("VoiceAgent has been destroyed and cannot be used"); + } } private setupListeners() { @@ -109,23 +159,27 @@ export class VoiceAgent extends EventEmitter { // Handle transcribed text from the client/STT if (message.type === "transcript") { - // Interrupt ongoing speech when user starts speaking (barge-in) - if (this.isSpeaking) { - this.interruptSpeech("user_speaking"); + if (typeof message.text !== "string" || !message.text.trim()) { + this.emit("warning", "Received empty or invalid transcript message"); + return; } - await this.processUserInput(message.text); + // Interrupt ongoing speech when user starts speaking (barge-in) + this.interruptCurrentResponse("user_speaking"); + await this.enqueueInput(message.text); } // Handle raw audio data that needs transcription - if (message.type === "audio") { - // Interrupt ongoing speech when user starts speaking (barge-in) - if (this.isSpeaking) { - this.interruptSpeech("user_speaking"); + else if (message.type === "audio") { + if (typeof message.data !== "string" || !message.data) { + this.emit("warning", "Received empty or invalid audio message"); + return; } + // Interrupt ongoing speech when user starts speaking (barge-in) + this.interruptCurrentResponse("user_speaking"); await this.processAudioInput(message.data); } // Handle explicit interrupt request from client - if (message.type === "interrupt") { - this.interruptSpeech(message.reason || "client_request"); + else if (message.type === "interrupt") { + this.interruptCurrentResponse(message.reason || "client_request"); } } catch (err) { console.error("Failed to process message:", err); @@ -136,6 +190,8 @@ export class VoiceAgent extends EventEmitter { this.socket.on("close", () => { console.log("Disconnected"); this.isConnected = false; + // Clean up all in-flight work when the socket closes + this.cleanupOnDisconnect(); this.emit("disconnected"); }); @@ -145,6 +201,39 @@ export class VoiceAgent extends EventEmitter { }); } + /** + * Clean up all in-flight state when the connection drops. + */ + private cleanupOnDisconnect(): void { + // Abort ongoing LLM stream + if (this.currentStreamAbortController) { + this.currentStreamAbortController.abort(); + this.currentStreamAbortController = undefined; + } + // Abort ongoing speech generation + if (this.currentSpeechAbortController) { + this.currentSpeechAbortController.abort(); + this.currentSpeechAbortController = undefined; + } + // Clear speech state + this.speechChunkQueue = []; + this.pendingTextBuffer = ""; + this.isSpeaking = false; + this.isProcessing = false; + // Resolve any pending speech-done waiters + if (this.speechQueueDoneResolve) { + this.speechQueueDoneResolve(); + this.speechQueueDoneResolve = undefined; + this.speechQueueDonePromise = undefined; + } + // Reject any queued inputs + for (const item of this.inputQueue) { + item.reject(new Error("Connection closed")); + } + this.inputQueue = []; + this.processingQueue = false; + } + public registerTools(tools: Record) { this.tools = { ...this.tools, ...tools }; } @@ -195,7 +284,8 @@ export class VoiceAgent extends EventEmitter { } /** - * Interrupt ongoing speech generation and playback (barge-in support) + * Interrupt ongoing speech generation and playback (barge-in support). + * This only interrupts TTS — the LLM stream is left running. */ public interruptSpeech(reason: string = "interrupted"): void { if (!this.isSpeaking && this.speechChunkQueue.length === 0) { @@ -213,6 +303,13 @@ export class VoiceAgent extends EventEmitter { this.pendingTextBuffer = ""; this.isSpeaking = false; + // Resolve any pending speech-done waiters so processUserInput can finish + if (this.speechQueueDoneResolve) { + this.speechQueueDoneResolve(); + this.speechQueueDoneResolve = undefined; + this.speechQueueDonePromise = undefined; + } + // Notify clients to stop audio playback this.sendWebSocketMessage({ type: "speech_interrupted", @@ -222,6 +319,20 @@ export class VoiceAgent extends EventEmitter { this.emit("speech_interrupted", { reason }); } + /** + * Interrupt both the current LLM stream and ongoing speech. + * Use this for barge-in scenarios where the entire response should be cancelled. + */ + public interruptCurrentResponse(reason: string = "interrupted"): void { + // Abort the LLM stream first + if (this.currentStreamAbortController) { + this.currentStreamAbortController.abort(); + this.currentStreamAbortController = undefined; + } + // Then interrupt speech + this.interruptSpeech(reason); + } + /** * Extract complete sentences from text buffer * Returns [extractedSentences, remainingBuffer] @@ -272,17 +383,68 @@ export class VoiceAgent extends EventEmitter { return [sentences, remaining]; } + /** + * Trim conversation history to stay within configured limits. + * Removes oldest messages (always in pairs to preserve user/assistant turns). + */ + private trimHistory(): void { + const { maxMessages, maxTotalChars } = this.historyConfig; + + // Trim by message count + if (maxMessages > 0 && this.conversationHistory.length > maxMessages) { + const excess = this.conversationHistory.length - maxMessages; + // Remove from the front, ensuring we remove at least `excess` messages + // Round up to even number to preserve turn pairs + const toRemove = excess % 2 === 0 ? excess : excess + 1; + this.conversationHistory.splice(0, toRemove); + this.emit("history_trimmed", { removedCount: toRemove, reason: "max_messages" }); + } + + // Trim by total character count + if (maxTotalChars > 0) { + let totalChars = this.conversationHistory.reduce((sum, msg) => { + const content = typeof msg.content === "string" ? msg.content : JSON.stringify(msg.content); + return sum + content.length; + }, 0); + + let removedCount = 0; + while (totalChars > maxTotalChars && this.conversationHistory.length > 2) { + const removed = this.conversationHistory.shift(); + if (removed) { + const content = typeof removed.content === "string" ? removed.content : JSON.stringify(removed.content); + totalChars -= content.length; + removedCount++; + } + } + if (removedCount > 0) { + this.emit("history_trimmed", { removedCount, reason: "max_total_chars" }); + } + } + } + /** * Queue a text chunk for speech generation */ private queueSpeechChunk(text: string): void { if (!this.speechModel || !text.trim()) return; + // Wrap chunk ID to prevent unbounded growth in very long sessions + if (this.nextChunkId >= Number.MAX_SAFE_INTEGER) { + this.nextChunkId = 0; + } + const chunk: SpeechChunk = { id: this.nextChunkId++, text: text.trim(), }; + // Create the speech-done promise if not already present + if (!this.speechQueueDonePromise) { + this.speechQueueDonePromise = new Promise((resolve) => { + this.speechQueueDoneResolve = resolve; + }); + } + // Start generating audio immediately (parallel generation) if (this.streamingSpeechConfig.parallelGeneration) { const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length; @@ -395,14 +557,21 @@ export class VoiceAgent extends EventEmitter { this.isSpeaking = false; this.currentSpeechAbortController = undefined; + // Signal that the speech queue is fully drained + if (this.speechQueueDoneResolve) { + this.speechQueueDoneResolve(); + this.speechQueueDoneResolve = undefined; + this.speechQueueDonePromise = undefined; + } + this.sendWebSocketMessage({ type: "speech_stream_end" }); this.emit("speech_complete", { streaming: true }); } } /** - * Process text deltra for streaming speech - * Call this as text chunks arrive from LLM + * Process text delta for streaming speech. + * Call this as text chunks arrive from LLM. */ private processTextForStreamingSpeech(textDelta: string): void { if (!this.speechModel) return; @@ -439,12 +608,28 @@ export class VoiceAgent extends EventEmitter { try { const audioBuffer = Buffer.from(base64Audio, "base64"); + + // Validate audio size to prevent memory issues + if (audioBuffer.length > this.maxAudioInputSize) { + const sizeMB = (audioBuffer.length / (1024 * 1024)).toFixed(1); + const maxMB = (this.maxAudioInputSize / (1024 * 1024)).toFixed(1); + this.emit("error", new Error( + `Audio input too large (${sizeMB} MB). Maximum allowed: ${maxMB} MB` + )); + return; + } + + if (audioBuffer.length === 0) { + this.emit("warning", "Received empty audio data"); + return; + } + this.emit("audio_received", { size: audioBuffer.length }); const transcribedText = await this.transcribeAudio(audioBuffer); if (transcribedText.trim()) { - await this.processUserInput(transcribedText); + await this.enqueueInput(transcribedText); } } catch (error) { console.error("Failed to process audio input:", error); @@ -453,6 +638,13 @@ export class VoiceAgent extends EventEmitter { } public async connect(url?: string): Promise { + this.ensureNotDestroyed(); + + // Clean up any existing connection first + if (this.socket) { + this.disconnectSocket(); + } + return new Promise((resolve, reject) => { try { // Use provided URL, configured endpoint, or default URL @@ -481,6 +673,13 @@ export class VoiceAgent extends EventEmitter { * agent to handle messages on that socket. */ public handleSocket(socket: WebSocket): void { + this.ensureNotDestroyed(); + + // Clean up any existing connection first + if (this.socket) { + this.disconnectSocket(); + } + this.socket = socket; this.isConnected = true; this.setupListeners(); @@ -488,10 +687,15 @@ export class VoiceAgent extends EventEmitter { } /** - * Send text input for processing (bypasses transcription) + * Send text input for processing (bypasses transcription). + * Requests are queued and processed serially to prevent race conditions. */ public async sendText(text: string): Promise { - return this.processUserInput(text); + this.ensureNotDestroyed(); + if (!text || !text.trim()) { + throw new Error("Text input cannot be empty"); + } + return this.enqueueInput(text); } /** @@ -499,6 +703,7 @@ export class VoiceAgent extends EventEmitter { * @param audioData Base64 encoded audio data */ public async sendAudio(audioData: string): Promise { + this.ensureNotDestroyed(); await this.processAudioInput(audioData); } @@ -506,26 +711,65 @@ export class VoiceAgent extends EventEmitter { * Send raw audio buffer to be transcribed and processed */ public async sendAudioBuffer(audioBuffer: Buffer | Uint8Array): Promise { + this.ensureNotDestroyed(); const base64Audio = Buffer.from(audioBuffer).toString("base64"); await this.processAudioInput(base64Audio); } /** - * Process user input with streaming text generation - * Handles the full pipeline: text -> LLM (streaming) -> TTS -> WebSocket + * Enqueue a text input for serial processing. + * This ensures only one processUserInput runs at a time, preventing + * race conditions on conversationHistory, fullText accumulation, etc. + */ + private enqueueInput(text: string): Promise { + return new Promise((resolve, reject) => { + this.inputQueue.push({ text, resolve, reject }); + this.drainInputQueue(); + }); + } + + /** + * Drain the input queue, processing one request at a time. + */ + private async drainInputQueue(): Promise { + if (this.processingQueue) return; + this.processingQueue = true; + + try { + while (this.inputQueue.length > 0) { + const item = this.inputQueue.shift()!; + try { + const result = await this.processUserInput(item.text); + item.resolve(result); + } catch (error) { + item.reject(error); + } + } + } finally { + this.processingQueue = false; + } + } + + /** + * Process user input with streaming text generation. + * Handles the full pipeline: text -> LLM (streaming) -> TTS -> WebSocket. + * + * This method is designed to be called serially via drainInputQueue(). */ private async processUserInput(text: string): Promise { - if (this.isProcessing) { - this.emit("warning", "Already processing a request, queuing..."); - } this.isProcessing = true; + // Create an abort controller for this LLM stream so it can be cancelled + this.currentStreamAbortController = new AbortController(); + const streamAbortSignal = this.currentStreamAbortController.signal; + try { // Emit text event for incoming user input this.emit("text", { role: "user", text }); - // Add user message to conversation history + // Add user message to conversation history and trim if needed this.conversationHistory.push({ role: "user", content: text }); + this.trimHistory(); // Use streamText for streaming responses with tool support const result = streamText({ @@ -534,6 +778,7 @@ export class VoiceAgent extends EventEmitter { messages: this.conversationHistory, tools: this.tools, stopWhen: this.stopWhen, + abortSignal: streamAbortSignal, onChunk: ({ chunk }) => { // Emit streaming chunks for real-time updates // Note: onChunk only receives a subset of stream events @@ -782,18 +1027,19 @@ export class VoiceAgent extends EventEmitter { } } - // Add assistant response to conversation history + // Add assistant response to conversation history and trim if (fullText) { this.conversationHistory.push({ role: "assistant", content: fullText }); + this.trimHistory(); } // Ensure any remaining speech is flushed (in case text-end wasn't triggered) this.flushStreamingSpeech(); - // Wait for all speech chunks to complete before signaling response complete - // This ensures audio playback can finish - while (this.speechChunkQueue.length > 0 || this.isSpeaking) { - await new Promise(resolve => setTimeout(resolve, 100)); + // Wait for all speech chunks to complete using promise-based signaling + // (replaces the previous busy-wait polling loop) + if (this.speechQueueDonePromise) { + await this.speechQueueDonePromise; } // Send the complete response @@ -808,8 +1054,16 @@ export class VoiceAgent extends EventEmitter { }); return fullText; + } catch (error) { + // Clean up speech state on error so the agent isn't stuck in a broken state + this.pendingTextBuffer = ""; + if (this.speechChunkQueue.length > 0 || this.isSpeaking) { + this.interruptSpeech("stream_error"); + } + throw error; } finally { this.isProcessing = false; + this.currentStreamAbortController = undefined; } } @@ -848,11 +1102,20 @@ export class VoiceAgent extends EventEmitter { } /** - * Send a message via WebSocket if connected + * Send a message via WebSocket if connected. + * Gracefully handles send failures (e.g., socket closing mid-send). */ private sendWebSocketMessage(message: Record): void { - if (this.socket && this.isConnected) { - this.socket.send(JSON.stringify(message)); + if (!this.socket || !this.isConnected) return; + + try { + if (this.socket.readyState === WebSocket.OPEN) { + this.socket.send(JSON.stringify(message)); + } + } catch (error) { + // Socket may have closed between the readyState check and send() + console.error("Failed to send WebSocket message:", error); + this.emit("error", error); } } @@ -895,14 +1158,44 @@ export class VoiceAgent extends EventEmitter { } /** - * Disconnect from WebSocket + * Internal helper to close and clean up the current socket. + */ + private disconnectSocket(): void { + if (!this.socket) return; + + // Stop all in-flight work tied to this connection + this.cleanupOnDisconnect(); + + try { + this.socket.removeAllListeners(); + if (this.socket.readyState === WebSocket.OPEN || + this.socket.readyState === WebSocket.CONNECTING) { + this.socket.close(); + } + } catch { + // Ignore close errors — socket may already be dead + } + this.socket = undefined; + this.isConnected = false; + } + + /** + * Disconnect from WebSocket and stop all in-flight work. */ disconnect() { - if (this.socket) { - this.socket.close(); - this.socket = undefined; - this.isConnected = false; - } + this.disconnectSocket(); + } + + /** + * Permanently destroy the agent, releasing all resources. + * After calling this, the agent cannot be reused. + */ + destroy() { + this.isDestroyed = true; + this.disconnectSocket(); + this.conversationHistory = []; + this.tools = {}; + this.removeAllListeners(); } /** @@ -932,4 +1225,11 @@ export class VoiceAgent extends EventEmitter { get pendingSpeechChunks(): number { return this.speechChunkQueue.length; } + + /** + * Check if agent has been permanently destroyed + */ + get destroyed(): boolean { + return this.isDestroyed; + } }