From 6510232655eb00523fee83c4d2c7bd33d0c411d0 Mon Sep 17 00:00:00 2001 From: Bijit Mondal Date: Fri, 13 Feb 2026 17:16:12 +0530 Subject: [PATCH] feat: implement WebSocket server with VoiceAgent for real-time voice interaction - Added a new WebSocket server implementation in `ws-server-2.ts` that utilizes the `VoiceAgent` for handling voice interactions. - Integrated weather and time tools using the `ai` library for enhanced responses. - Refactored existing `ws-server.ts` to streamline the connection handling and event logging. - Enhanced `VoiceAgent` to support streaming speech generation with improved chunk handling and interruption capabilities. - Introduced new event listeners for better logging and handling of speech-related events. - Added graceful shutdown handling for the WebSocket server. --- README.md | 54 +- example/demo.ts | 74 ++- example/voice-client.html | 1088 +++++++++++++++++++++++++++++++++++++ example/ws-server-2.ts | 120 ++++ example/ws-server.ts | 243 ++++----- src/VoiceAgent.ts | 344 +++++++++++- 6 files changed, 1749 insertions(+), 174 deletions(-) create mode 100644 example/voice-client.html create mode 100644 example/ws-server-2.ts diff --git a/README.md b/README.md index 526fc8f..736762d 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,14 @@ # voice-agent-ai-sdk -Minimal voice/text agent SDK built on AI SDK with optional WebSocket transport. +Streaming voice/text agent SDK built on AI SDK with optional WebSocket transport. ## Current status -- Text flow works via `sendText()` (no WebSocket required). -- WebSocket flow works when `connect()` is used with a running WS endpoint. -- Voice streaming is not implemented yet. +- Streaming text generation is implemented via `streamText`. +- Tool calling is supported in-stream. +- Speech synthesis is implemented with chunked streaming TTS. +- Audio transcription is supported (when `transcriptionModel` is configured). +- WebSocket protocol events are emitted for stream, tool, and speech lifecycle. ## Prerequisites @@ -25,13 +27,35 @@ Minimal voice/text agent SDK built on AI SDK with optional WebSocket transport. OPENAI_API_KEY=your_openai_api_key VOICE_WS_ENDPOINT=ws://localhost:8080 +`VOICE_WS_ENDPOINT` is optional for text-only usage. + +## VoiceAgent configuration + +The agent accepts: + +- `model` (required): chat model +- `transcriptionModel` (optional): STT model +- `speechModel` (optional): TTS model +- `instructions` (optional): system prompt +- `stopWhen` (optional): stopping condition +- `tools` (optional): AI SDK tools map +- `endpoint` (optional): WebSocket endpoint +- `voice` (optional): TTS voice, default `alloy` +- `speechInstructions` (optional): style instructions for TTS +- `outputFormat` (optional): audio format, default `mp3` +- `streamingSpeech` (optional): + - `minChunkSize` + - `maxChunkSize` + - `parallelGeneration` + - `maxParallelRequests` + ## Run (text-only check) -This validates model + tool calls without requiring WebSocket: +This validates LLM + tool + streaming speech without requiring WebSocket: pnpm demo -Expected logs include `text` events and optional `tool_start`. +Expected logs include `text`, `chunk:text_delta`, tool events, and speech chunk events. ## Run (WebSocket check) @@ -45,7 +69,22 @@ Expected logs include `text` events and optional `tool_start`. The demo will: - run `sendText()` first (text-only sanity check), then -- connect to `VOICE_WS_ENDPOINT` if provided. +- connect to `VOICE_WS_ENDPOINT` if provided, +- emit streaming protocol messages (`text_delta`, `tool_call`, `audio_chunk`, `response_complete`, etc.). + +## Browser voice client (HTML) + +A simple browser client is available at [example/voice-client.html](example/voice-client.html). + +What it does: +- captures microphone speech using Web Speech API (speech-to-text) +- sends transcript to the agent via WebSocket (`type: "transcript"`) +- receives streaming `audio_chunk` messages and plays them in order + +How to use: +1. Start your agent server/WebSocket endpoint. +2. Open [example/voice-client.html](example/voice-client.html) in a browser (Chrome/Edge recommended). +3. Connect to `ws://localhost:8080` (or your endpoint), then click **Start Mic**. ## Scripts @@ -58,3 +97,4 @@ The demo will: - If `VOICE_WS_ENDPOINT` is empty, WebSocket connect is skipped. - The sample WS server sends a mock `transcript` message for end-to-end testing. +- Streaming TTS uses chunk queueing and supports interruption (`interrupt`). diff --git a/example/demo.ts b/example/demo.ts index 0b36731..87ada97 100644 --- a/example/demo.ts +++ b/example/demo.ts @@ -43,6 +43,13 @@ Use tools when needed to provide accurate information.`, voice: "alloy", // Options: alloy, echo, fable, onyx, nova, shimmer speechInstructions: "Speak in a friendly, natural conversational tone.", outputFormat: "mp3", + // Streaming speech tuning + streamingSpeech: { + minChunkSize: 40, + maxChunkSize: 180, + parallelGeneration: true, + maxParallelRequests: 2, + }, // WebSocket endpoint endpoint: process.env.VOICE_WS_ENDPOINT, // Tools @@ -70,13 +77,13 @@ agent.on("text", (msg: { role: string; text: string }) => { }); // Streaming text delta events (real-time text chunks) -agent.on("text_delta", ({ text }: { text: string }) => { +agent.on("chunk:text_delta", ({ text }: { text: string }) => { process.stdout.write(text); }); -// Tool events -agent.on("tool_start", ({ name, input }: { name: string; input?: unknown }) => { - console.log(`\n[Tool] Calling ${name}...`, input ? JSON.stringify(input) : ""); +// Tool events (stream-level) +agent.on("chunk:tool_call", ({ toolName, input }: { toolName: string; input: unknown }) => { + console.log(`\n[Tool] Calling ${toolName}...`, input ? JSON.stringify(input) : ""); }); agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => { @@ -84,19 +91,51 @@ agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => }); // Speech events -agent.on("speech_start", ({ text }: { text: string }) => { - console.log(`[TTS] Generating speech for: "${text.substring(0, 50)}..."`); +agent.on("speech_start", ({ streaming }: { streaming: boolean }) => { + console.log(`[TTS] Speech started (streaming=${streaming})`); }); agent.on("speech_complete", () => { console.log("[TTS] Speech generation complete"); }); -// Audio events (when TTS audio is generated) +agent.on("speech_chunk_queued", ({ id, text }: { id: number; text: string }) => { + console.log(`[TTS] Queued chunk #${id}: ${text.substring(0, 40)}...`); +}); + +// Streaming audio chunk events +agent.on( + "audio_chunk", + async ({ chunkId, format, uint8Array }: { chunkId: number; format: string; uint8Array: Uint8Array }) => { + console.log(`[Audio] Chunk #${chunkId} (${uint8Array.length} bytes, ${format})`); + await writeFile(`output_chunk_${chunkId}.${format}`, Buffer.from(uint8Array)); + }, +); + +// Full audio event (non-streaming fallback via generateAndSendSpeechFull) agent.on("audio", async (audio: { data: string; format: string; uint8Array: Uint8Array }) => { - console.log(`[Audio] Received ${audio.format} audio (${audio.uint8Array.length} bytes)`); - // Optionally save to file for testing - await writeFile(`output.${audio.format}`, Buffer.from(audio.uint8Array)); + console.log(`[Audio] Full response audio (${audio.uint8Array.length} bytes, ${audio.format})`); + await writeFile(`output_full.${audio.format}`, Buffer.from(audio.uint8Array)); +}); + +// Speech interruption (barge-in) +agent.on("speech_interrupted", ({ reason }: { reason: string }) => { + console.log(`[TTS] Speech interrupted: ${reason}`); +}); + +// Transcription event (when server-side Whisper is used) +agent.on("transcription", ({ text, language }: { text: string; language?: string }) => { + console.log(`[STT] Transcription (${language || "unknown"}): ${text}`); +}); + +// Audio received event +agent.on("audio_received", ({ size }: { size: number }) => { + console.log(`[Audio] Received ${(size / 1024).toFixed(1)} KB of audio input`); +}); + +// Warning events +agent.on("warning", (msg: string) => { + console.warn(`[Warning] ${msg}`); }); // Error handling @@ -111,20 +150,9 @@ agent.on("error", (error: Error) => { try { // Test 1: Simple text query with streaming - console.log("--- Test 1: Weather Query ---"); - const response1 = await agent.sendText("What is the weather in Berlin?"); + console.log("--- Test 1: Text Query ---"); + await agent.sendText("What's the weather in San Francisco?"); console.log("\n"); - - // Test 2: Multi-turn conversation - console.log("--- Test 2: Follow-up Question ---"); - const response2 = await agent.sendText("What about Tokyo?"); - console.log("\n"); - - // Test 3: Time query - console.log("--- Test 3: Time Query ---"); - const response3 = await agent.sendText("What time is it?"); - console.log("\n"); - // Show conversation history console.log("--- Conversation History ---"); const history = agent.getHistory(); diff --git a/example/voice-client.html b/example/voice-client.html new file mode 100644 index 0000000..27b67d4 --- /dev/null +++ b/example/voice-client.html @@ -0,0 +1,1088 @@ + + + + + + + Voice Agent Web Client + + + + +

πŸŽ™οΈ Voice Agent Web Client

+

Real-time voice I/O with streaming speech generation. Supports browser STT or server-side + Whisper transcription.

+ + +
+
+ + + +
+
Disconnected
+
+ + +
+
+ + +
+
+ + + +
+
+
+
+
+ + +
+
+ + +

πŸ‘€ You said

+
+ +

πŸ€– Assistant idle

+
+ + + + + +

πŸ“‹ Logs

+
+ + + + + \ No newline at end of file diff --git a/example/ws-server-2.ts b/example/ws-server-2.ts new file mode 100644 index 0000000..9948a85 --- /dev/null +++ b/example/ws-server-2.ts @@ -0,0 +1,120 @@ +import "dotenv/config"; +import { WebSocketServer } from "ws"; +import { VoiceAgent } from "../src"; +import { tool } from "ai"; +import { z } from "zod"; +import { openai } from "@ai-sdk/openai"; + +const endpoint = process.env.VOICE_WS_ENDPOINT || "ws://localhost:8080"; +const url = new URL(endpoint); +const port = Number(url.port || 8080); +const host = url.hostname || "localhost"; + +// ── Tools (same as demo.ts) ──────────────────────────────────────────── +const weatherTool = tool({ + description: "Get the weather in a location", + inputSchema: z.object({ + location: z.string().describe("The location to get the weather for"), + }), + execute: async ({ location }) => ({ + location, + temperature: 72 + Math.floor(Math.random() * 21) - 10, + conditions: ["sunny", "cloudy", "rainy", "partly cloudy"][ + Math.floor(Math.random() * 4) + ], + }), +}); + +const timeTool = tool({ + description: "Get the current time", + inputSchema: z.object({}), + execute: async () => ({ + time: new Date().toLocaleTimeString(), + timezone: Intl.DateTimeFormat().resolvedOptions().timeZone, + }), +}); + +// ── WebSocket server ─────────────────────────────────────────────────── +const wss = new WebSocketServer({ port, host }); + +wss.on("listening", () => { + console.log(`[ws-server] listening on ${endpoint}`); + console.log("[ws-server] Waiting for connections...\n"); +}); + +wss.on("connection", (socket) => { + console.log("[ws-server] βœ“ client connected"); + + // Create a fresh VoiceAgent per connection + const agent = new VoiceAgent({ + model: openai("gpt-4o"), + transcriptionModel: openai.transcription("whisper-1"), + speechModel: openai.speech("gpt-4o-mini-tts"), + instructions: `You are a helpful voice assistant. +Keep responses concise and conversational since they will be spoken aloud. +Use tools when needed to provide accurate information.`, + voice: "alloy", + speechInstructions: "Speak in a friendly, natural conversational tone.", + outputFormat: "mp3", + streamingSpeech: { + minChunkSize: 40, + maxChunkSize: 180, + parallelGeneration: true, + maxParallelRequests: 2, + }, + tools: { + getWeather: weatherTool, + getTime: timeTool, + }, + }); + + // Wire agent events to server logs + agent.on("text", (msg: { role: string; text: string }) => { + const prefix = msg.role === "user" ? "πŸ‘€ User" : "πŸ€– Assistant"; + console.log(`[ws-server] ${prefix}: ${msg.text}`); + }); + + agent.on("chunk:text_delta", ({ text }: { text: string }) => { + process.stdout.write(text); + }); + + agent.on("chunk:tool_call", ({ toolName }: { toolName: string }) => { + console.log(`\n[ws-server] πŸ› οΈ Tool call: ${toolName}`); + }); + + agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => { + console.log(`[ws-server] πŸ› οΈ Tool result (${name}):`, JSON.stringify(result)); + }); + + agent.on("speech_start", () => console.log("[ws-server] πŸ”Š Speech started")); + agent.on("speech_complete", () => console.log("[ws-server] πŸ”Š Speech complete")); + agent.on("speech_interrupted", ({ reason }: { reason: string }) => + console.log(`[ws-server] ⏸️ Speech interrupted: ${reason}`), + ); + + agent.on("audio_chunk", ({ chunkId, format, uint8Array }: { chunkId: number; format: string; uint8Array: Uint8Array }) => { + console.log(`[ws-server] πŸ”Š Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`); + }); + + agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message)); + + agent.on("disconnected", () => { + console.log("[ws-server] βœ— client disconnected\n"); + }); + + // Hand the accepted socket to the agent – this is the key line. + // The agent will listen for "transcript", "audio", "interrupt" messages + // and send back "text_delta", "audio_chunk", "response_complete", etc. + agent.handleSocket(socket); +}); + +// Graceful shutdown +process.on("SIGINT", () => { + console.log("\n[ws-server] Shutting down..."); + wss.close(() => { + console.log("[ws-server] Server closed"); + process.exit(0); + }); +}); + +export { wss }; diff --git a/example/ws-server.ts b/example/ws-server.ts index aea5bd6..f648c61 100644 --- a/example/ws-server.ts +++ b/example/ws-server.ts @@ -1,140 +1,131 @@ import "dotenv/config"; -import { WebSocketServer, WebSocket } from "ws"; -import { readFile } from "fs/promises"; -import { existsSync } from "fs"; +import { WebSocketServer } from "ws"; +import { VoiceAgent } from "../src"; +import { tool } from "ai"; +import { z } from "zod"; +import { openai } from "@ai-sdk/openai"; const endpoint = process.env.VOICE_WS_ENDPOINT || "ws://localhost:8080"; const url = new URL(endpoint); const port = Number(url.port || 8080); const host = url.hostname || "localhost"; -// Message types for type safety -interface BaseMessage { - type: string; -} +// ── Tools (same as demo.ts) ──────────────────────────────────────────── +const weatherTool = tool({ + description: "Get the weather in a location", + inputSchema: z.object({ + location: z.string().describe("The location to get the weather for"), + }), + execute: async ({ location }) => ({ + location, + temperature: 72 + Math.floor(Math.random() * 21) - 10, + conditions: ["sunny", "cloudy", "rainy", "partly cloudy"][ + Math.floor(Math.random() * 4) + ], + }), +}); -interface TextDeltaMessage extends BaseMessage { - type: "text_delta"; - text: string; -} - -interface ToolCallMessage extends BaseMessage { - type: "tool_call"; - toolName: string; - toolCallId: string; - input: unknown; -} - -interface ToolResultMessage extends BaseMessage { - type: "tool_result"; - toolName: string; - toolCallId: string; - result: unknown; -} - -interface AudioMessage extends BaseMessage { - type: "audio"; - data: string; // base64 encoded - format: string; -} - -interface ResponseCompleteMessage extends BaseMessage { - type: "response_complete"; - text: string; - toolCalls: Array<{ toolName: string; toolCallId: string; input: unknown }>; - toolResults: Array<{ toolName: string; toolCallId: string; output: unknown }>; -} - -type AgentMessage = - | TextDeltaMessage - | ToolCallMessage - | ToolResultMessage - | AudioMessage - | ResponseCompleteMessage; +const timeTool = tool({ + description: "Get the current time", + inputSchema: z.object({}), + execute: async () => ({ + time: new Date().toLocaleTimeString(), + timezone: Intl.DateTimeFormat().resolvedOptions().timeZone, + }), +}); +// ── WebSocket server ─────────────────────────────────────────────────── const wss = new WebSocketServer({ port, host }); wss.on("listening", () => { - console.log(`[ws-server] πŸš€ listening on ${endpoint}`); + console.log(`[ws-server] listening on ${endpoint}`); console.log("[ws-server] Waiting for connections...\n"); }); -wss.on("connection", (socket: WebSocket) => { +wss.on("connection", (socket) => { console.log("[ws-server] βœ“ client connected"); - let streamingText = ""; - let audioChunks: Buffer[] = []; - - // Send a sample transcript to test text pipeline end-to-end. - setTimeout(() => { - console.log("[ws-server] -> Sending test transcript..."); - socket.send( - JSON.stringify({ - type: "transcript", - text: "What is the weather in Berlin?", - }), - ); - }, 500); - - socket.on("message", async (data) => { - try { - const msg = JSON.parse(data.toString()) as AgentMessage; - - switch (msg.type) { - case "text_delta": - // Real-time streaming text from the agent - streamingText += msg.text; - process.stdout.write(msg.text); - break; - - case "tool_call": - console.log(`\n[ws-server] πŸ› οΈ Tool call: ${msg.toolName}`); - console.log(` Input: ${JSON.stringify(msg.input)}`); - break; - - case "tool_result": - console.log(`[ws-server] πŸ› οΈ Tool result: ${msg.toolName}`); - console.log(` Result: ${JSON.stringify(msg.result)}`); - break; - - case "audio": - // Handle audio response from TTS - const audioBuffer = Buffer.from(msg.data, "base64"); - audioChunks.push(audioBuffer); - console.log( - `[ws-server] πŸ”Š Received audio: ${audioBuffer.length} bytes (${msg.format})`, - ); - - // Optionally save audio to file for testing - // await writeFile(`output_${Date.now()}.${msg.format}`, audioBuffer); - break; - - case "response_complete": - console.log("\n[ws-server] βœ… Response complete"); - console.log(` Text length: ${msg.text.length}`); - console.log(` Tool calls: ${msg.toolCalls.length}`); - console.log(` Tool results: ${msg.toolResults.length}`); - - // Reset for next response - streamingText = ""; - audioChunks = []; - break; - - default: - console.log("[ws-server] <- Unknown message:", msg); - } - } catch { - console.log("[ws-server] <- raw", data.toString().substring(0, 100)); - } + // Create a fresh VoiceAgent per connection + const agent = new VoiceAgent({ + model: openai("gpt-4o"), + transcriptionModel: openai.transcription("whisper-1"), + speechModel: openai.speech("gpt-4o-mini-tts"), + instructions: `You are a helpful voice assistant. +Keep responses concise and conversational since they will be spoken aloud. +Use tools when needed to provide accurate information.`, + voice: "alloy", + speechInstructions: "Speak in a friendly, natural conversational tone.", + outputFormat: "mp3", + streamingSpeech: { + minChunkSize: 40, + maxChunkSize: 180, + parallelGeneration: true, + maxParallelRequests: 2, + }, + tools: { + getWeather: weatherTool, + getTime: timeTool, + }, }); - socket.on("close", () => { + // Wire agent events to server logs + agent.on("text", (msg: { role: string; text: string }) => { + const prefix = msg.role === "user" ? "πŸ‘€ User" : "πŸ€– Assistant"; + console.log(`[ws-server] ${prefix}: ${msg.text}`); + }); + + agent.on("chunk:text_delta", ({ text }: { text: string }) => { + process.stdout.write(text); + }); + + agent.on("chunk:tool_call", ({ toolName }: { toolName: string }) => { + console.log(`\n[ws-server] πŸ› οΈ Tool call: ${toolName}`); + }); + + agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => { + console.log(`[ws-server] πŸ› οΈ Tool result (${name}):`, JSON.stringify(result)); + }); + + agent.on("speech_start", () => console.log("[ws-server] πŸ”Š Speech started")); + agent.on("speech_complete", () => console.log("[ws-server] πŸ”Š Speech complete")); + agent.on("speech_interrupted", ({ reason }: { reason: string }) => + console.log(`[ws-server] ⏸️ Speech interrupted: ${reason}`), + ); + + agent.on("audio_chunk", ({ chunkId, format, uint8Array }: { chunkId: number; format: string; uint8Array: Uint8Array }) => { + console.log(`[ws-server] πŸ”Š Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`); + }); + + agent.on("transcription", ({ text, language }: { text: string; language?: string }) => { + console.log(`[ws-server] πŸ“ Transcription (${language || "unknown"}): ${text}`); + }); + + agent.on("audio_received", ({ size }: { size: number }) => { + console.log(`[ws-server] 🎀 Audio received: ${(size / 1024).toFixed(1)} KB`); + }); + + agent.on("chunk:reasoning_delta", ({ text }: { text: string }) => { + process.stdout.write(text); + }); + + agent.on("warning", (msg: string) => { + console.log(`[ws-server] ⚠️ Warning: ${msg}`); + }); + + agent.on("speech_chunk_queued", ({ id, text }: { id: number; text: string }) => { + console.log(`[ws-server] πŸ”Š Queued speech chunk #${id}: ${text.substring(0, 50)}...`); + }); + + agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message)); + + agent.on("disconnected", () => { console.log("[ws-server] βœ— client disconnected\n"); }); - socket.on("error", (error) => { - console.error("[ws-server] Error:", error.message); - }); + // Hand the accepted socket to the agent – this is the key line. + // The agent will listen for "transcript", "audio", "interrupt" messages + // and send back "text_delta", "audio_chunk", "response_complete", etc. + agent.handleSocket(socket); }); // Graceful shutdown @@ -146,24 +137,4 @@ process.on("SIGINT", () => { }); }); -// Helper function to simulate sending audio to the agent -async function simulateAudioInput(socket: WebSocket, audioPath: string) { - if (!existsSync(audioPath)) { - console.log(`[ws-server] Audio file not found: ${audioPath}`); - return; - } - - const audioBuffer = await readFile(audioPath); - const base64Audio = audioBuffer.toString("base64"); - - console.log(`[ws-server] -> Sending audio: ${audioPath} (${audioBuffer.length} bytes)`); - socket.send( - JSON.stringify({ - type: "audio", - data: base64Audio, - }), - ); -} - -// Export for use as a module -export { wss, simulateAudioInput }; +export { wss }; diff --git a/src/VoiceAgent.ts b/src/VoiceAgent.ts index 2147969..b447e66 100644 --- a/src/VoiceAgent.ts +++ b/src/VoiceAgent.ts @@ -12,6 +12,29 @@ import { type SpeechModel, } from "ai"; +/** + * Represents a chunk of text to be converted to speech + */ +interface SpeechChunk { + id: number; + text: string; + audioPromise?: Promise; +} + +/** + * Configuration for streaming speech behavior + */ +interface StreamingSpeechConfig { + /** Minimum characters before generating speech for a chunk */ + minChunkSize: number; + /** Maximum characters per chunk (will split at sentence boundary before this) */ + maxChunkSize: number; + /** Whether to enable parallel TTS generation */ + parallelGeneration: boolean; + /** Maximum number of parallel TTS requests */ + maxParallelRequests: number; +} + export interface VoiceAgentOptions { model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o')) transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1')) @@ -23,6 +46,8 @@ export interface VoiceAgentOptions { voice?: string; // Voice for TTS (e.g., 'alloy', 'echo', 'shimmer') speechInstructions?: string; // Instructions for TTS voice style outputFormat?: string; // Audio output format (e.g., 'mp3', 'opus', 'wav') + /** Configuration for streaming speech generation */ + streamingSpeech?: Partial; } export class VoiceAgent extends EventEmitter { @@ -41,6 +66,14 @@ export class VoiceAgent extends EventEmitter { private outputFormat: string; private isProcessing = false; + // Streaming speech state + private streamingSpeechConfig: StreamingSpeechConfig; + private currentSpeechAbortController?: AbortController; + private speechChunkQueue: SpeechChunk[] = []; + private nextChunkId = 0; + private isSpeaking = false; + private pendingTextBuffer = ""; + constructor(options: VoiceAgentOptions) { super(); this.model = options.model; @@ -56,6 +89,15 @@ export class VoiceAgent extends EventEmitter { if (options.tools) { this.tools = { ...options.tools }; } + + // Initialize streaming speech config with defaults + this.streamingSpeechConfig = { + minChunkSize: 50, + maxChunkSize: 200, + parallelGeneration: true, + maxParallelRequests: 3, + ...options.streamingSpeech, + }; } private setupListeners() { @@ -67,12 +109,24 @@ export class VoiceAgent extends EventEmitter { // Handle transcribed text from the client/STT if (message.type === "transcript") { + // Interrupt ongoing speech when user starts speaking (barge-in) + if (this.isSpeaking) { + this.interruptSpeech("user_speaking"); + } await this.processUserInput(message.text); } // Handle raw audio data that needs transcription if (message.type === "audio") { + // Interrupt ongoing speech when user starts speaking (barge-in) + if (this.isSpeaking) { + this.interruptSpeech("user_speaking"); + } await this.processAudioInput(message.data); } + // Handle explicit interrupt request from client + if (message.type === "interrupt") { + this.interruptSpeech(message.reason || "client_request"); + } } catch (err) { console.error("Failed to process message:", err); this.emit("error", err); @@ -118,8 +172,12 @@ export class VoiceAgent extends EventEmitter { /** * Generate speech from text using the configured speech model + * @param abortSignal Optional signal to cancel the speech generation */ - public async generateSpeechFromText(text: string): Promise { + public async generateSpeechFromText( + text: string, + abortSignal?: AbortSignal + ): Promise { if (!this.speechModel) { throw new Error("Speech model not configured"); } @@ -130,11 +188,246 @@ export class VoiceAgent extends EventEmitter { voice: this.voice, instructions: this.speechInstructions, outputFormat: this.outputFormat, + abortSignal, }); return result.audio.uint8Array; } + /** + * Interrupt ongoing speech generation and playback (barge-in support) + */ + public interruptSpeech(reason: string = "interrupted"): void { + if (!this.isSpeaking && this.speechChunkQueue.length === 0) { + return; + } + + // Abort any pending speech generation requests + if (this.currentSpeechAbortController) { + this.currentSpeechAbortController.abort(); + this.currentSpeechAbortController = undefined; + } + + // Clear the speech queue + this.speechChunkQueue = []; + this.pendingTextBuffer = ""; + this.isSpeaking = false; + + // Notify clients to stop audio playback + this.sendWebSocketMessage({ + type: "speech_interrupted", + reason, + }); + + this.emit("speech_interrupted", { reason }); + } + + /** + * Extract complete sentences from text buffer + * Returns [extractedSentences, remainingBuffer] + */ + private extractSentences(text: string): [string[], string] { + const sentences: string[] = []; + let remaining = text; + + // Match sentences ending with . ! ? followed by space or end of string + // Also handles common abbreviations and edge cases + const sentenceEndPattern = /[.!?]+(?:\s+|$)/g; + let lastIndex = 0; + let match; + + while ((match = sentenceEndPattern.exec(text)) !== null) { + const sentence = text.slice(lastIndex, match.index + match[0].length).trim(); + if (sentence.length >= this.streamingSpeechConfig.minChunkSize) { + sentences.push(sentence); + lastIndex = match.index + match[0].length; + } else if (sentences.length > 0) { + // Append short sentence to previous one + sentences[sentences.length - 1] += " " + sentence; + lastIndex = match.index + match[0].length; + } + } + + remaining = text.slice(lastIndex); + + // If remaining text is too long, force split at clause boundaries + if (remaining.length > this.streamingSpeechConfig.maxChunkSize) { + const clausePattern = /[,;:]\s+/g; + let clauseMatch; + let splitIndex = 0; + + while ((clauseMatch = clausePattern.exec(remaining)) !== null) { + if (clauseMatch.index >= this.streamingSpeechConfig.minChunkSize) { + splitIndex = clauseMatch.index + clauseMatch[0].length; + break; + } + } + + if (splitIndex > 0) { + sentences.push(remaining.slice(0, splitIndex).trim()); + remaining = remaining.slice(splitIndex); + } + } + + return [sentences, remaining]; + } + + /** + * Queue a text chunk for speech generation + */ + private queueSpeechChunk(text: string): void { + if (!this.speechModel || !text.trim()) return; + + const chunk: SpeechChunk = { + id: this.nextChunkId++, + text: text.trim(), + }; + + // Start generating audio immediately (parallel generation) + if (this.streamingSpeechConfig.parallelGeneration) { + const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length; + + if (activeRequests < this.streamingSpeechConfig.maxParallelRequests) { + chunk.audioPromise = this.generateChunkAudio(chunk); + } + } + + this.speechChunkQueue.push(chunk); + this.emit("speech_chunk_queued", { id: chunk.id, text: chunk.text }); + + // Start processing queue if not already + if (!this.isSpeaking) { + this.processSpeechQueue(); + } + } + + /** + * Generate audio for a single chunk + */ + private async generateChunkAudio(chunk: SpeechChunk): Promise { + if (!this.currentSpeechAbortController) { + this.currentSpeechAbortController = new AbortController(); + } + + try { + const audioData = await this.generateSpeechFromText( + chunk.text, + this.currentSpeechAbortController.signal + ); + return audioData; + } catch (error) { + if ((error as Error).name === "AbortError") { + return null; // Cancelled, don't report as error + } + console.error(`Failed to generate audio for chunk ${chunk.id}:`, error); + this.emit("error", error); + return null; + } + } + + /** + * Process the speech queue and send audio chunks in order + */ + private async processSpeechQueue(): Promise { + if (this.isSpeaking) return; + this.isSpeaking = true; + + this.emit("speech_start", { streaming: true }); + this.sendWebSocketMessage({ type: "speech_stream_start" }); + + try { + while (this.speechChunkQueue.length > 0) { + const chunk = this.speechChunkQueue[0]; + + // Ensure audio generation has started + if (!chunk.audioPromise) { + chunk.audioPromise = this.generateChunkAudio(chunk); + } + + // Wait for this chunk's audio + const audioData = await chunk.audioPromise; + + // Check if we were interrupted while waiting + if (!this.isSpeaking) break; + + // Remove from queue after processing + this.speechChunkQueue.shift(); + + if (audioData) { + const base64Audio = Buffer.from(audioData).toString("base64"); + + // Send audio chunk via WebSocket + this.sendWebSocketMessage({ + type: "audio_chunk", + chunkId: chunk.id, + data: base64Audio, + format: this.outputFormat, + text: chunk.text, + }); + + // Emit for local handling + this.emit("audio_chunk", { + chunkId: chunk.id, + data: base64Audio, + format: this.outputFormat, + text: chunk.text, + uint8Array: audioData, + }); + } + + // Start generating next chunks in parallel + if (this.streamingSpeechConfig.parallelGeneration) { + const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length; + const toStart = Math.min( + this.streamingSpeechConfig.maxParallelRequests - activeRequests, + this.speechChunkQueue.length + ); + + for (let i = 0; i < toStart; i++) { + const nextChunk = this.speechChunkQueue.find(c => !c.audioPromise); + if (nextChunk) { + nextChunk.audioPromise = this.generateChunkAudio(nextChunk); + } + } + } + } + } finally { + this.isSpeaking = false; + this.currentSpeechAbortController = undefined; + + this.sendWebSocketMessage({ type: "speech_stream_end" }); + this.emit("speech_complete", { streaming: true }); + } + } + + /** + * Process text deltra for streaming speech + * Call this as text chunks arrive from LLM + */ + private processTextForStreamingSpeech(textDelta: string): void { + if (!this.speechModel) return; + + this.pendingTextBuffer += textDelta; + + const [sentences, remaining] = this.extractSentences(this.pendingTextBuffer); + this.pendingTextBuffer = remaining; + + for (const sentence of sentences) { + this.queueSpeechChunk(sentence); + } + } + + /** + * Flush any remaining text in the buffer to speech + * Call this when stream ends + */ + private flushStreamingSpeech(): void { + if (!this.speechModel || !this.pendingTextBuffer.trim()) return; + + this.queueSpeechChunk(this.pendingTextBuffer); + this.pendingTextBuffer = ""; + } + /** * Process incoming audio data: transcribe and generate response */ @@ -182,6 +475,18 @@ export class VoiceAgent extends EventEmitter { }); } + /** + * Attach an existing WebSocket (server-side usage). + * Use this when a WS server accepts a connection and you want the + * agent to handle messages on that socket. + */ + public handleSocket(socket: WebSocket): void { + this.socket = socket; + this.isConnected = true; + this.setupListeners(); + this.emit("connected"); + } + /** * Send text input for processing (bypasses transcription) */ @@ -366,6 +671,8 @@ export class VoiceAgent extends EventEmitter { case "text-delta": fullText += part.text; + // Process text for streaming speech as it arrives + this.processTextForStreamingSpeech(part.text); this.sendWebSocketMessage({ type: "text_delta", id: part.id, @@ -374,6 +681,8 @@ export class VoiceAgent extends EventEmitter { break; case "text-end": + // Flush any remaining text to speech when text stream ends + this.flushStreamingSpeech(); this.sendWebSocketMessage({ type: "text_end", id: part.id }); break; @@ -478,9 +787,13 @@ export class VoiceAgent extends EventEmitter { this.conversationHistory.push({ role: "assistant", content: fullText }); } - // Generate speech from the response if speech model is configured - if (this.speechModel && fullText) { - await this.generateAndSendSpeech(fullText); + // Ensure any remaining speech is flushed (in case text-end wasn't triggered) + this.flushStreamingSpeech(); + + // Wait for all speech chunks to complete before signaling response complete + // This ensures audio playback can finish + while (this.speechChunkQueue.length > 0 || this.isSpeaking) { + await new Promise(resolve => setTimeout(resolve, 100)); } // Send the complete response @@ -501,13 +814,14 @@ export class VoiceAgent extends EventEmitter { } /** - * Generate speech and send audio via WebSocket + * Generate speech for full text at once (non-streaming fallback) + * Useful when you want to bypass streaming speech for short responses */ - private async generateAndSendSpeech(text: string): Promise { + public async generateAndSendSpeechFull(text: string): Promise { if (!this.speechModel) return; try { - this.emit("speech_start", { text }); + this.emit("speech_start", { text, streaming: false }); const audioData = await this.generateSpeechFromText(text); const base64Audio = Buffer.from(audioData).toString("base64"); @@ -526,7 +840,7 @@ export class VoiceAgent extends EventEmitter { uint8Array: audioData, }); - this.emit("speech_complete", { text }); + this.emit("speech_complete", { text, streaming: false }); } catch (error) { console.error("Failed to generate speech:", error); this.emit("error", error); @@ -604,4 +918,18 @@ export class VoiceAgent extends EventEmitter { get processing(): boolean { return this.isProcessing; } + + /** + * Check if agent is currently speaking (generating/playing audio) + */ + get speaking(): boolean { + return this.isSpeaking; + } + + /** + * Get the number of pending speech chunks in the queue + */ + get pendingSpeechChunks(): number { + return this.speechChunkQueue.length; + } }