diff --git a/README.md b/README.md
index 526fc8f..736762d 100644
--- a/README.md
+++ b/README.md
@@ -1,12 +1,14 @@
# voice-agent-ai-sdk
-Minimal voice/text agent SDK built on AI SDK with optional WebSocket transport.
+Streaming voice/text agent SDK built on AI SDK with optional WebSocket transport.
## Current status
-- Text flow works via `sendText()` (no WebSocket required).
-- WebSocket flow works when `connect()` is used with a running WS endpoint.
-- Voice streaming is not implemented yet.
+- Streaming text generation is implemented via `streamText`.
+- Tool calling is supported in-stream.
+- Speech synthesis is implemented with chunked streaming TTS.
+- Audio transcription is supported (when `transcriptionModel` is configured).
+- WebSocket protocol events are emitted for stream, tool, and speech lifecycle.
## Prerequisites
@@ -25,13 +27,35 @@ Minimal voice/text agent SDK built on AI SDK with optional WebSocket transport.
OPENAI_API_KEY=your_openai_api_key
VOICE_WS_ENDPOINT=ws://localhost:8080
+`VOICE_WS_ENDPOINT` is optional for text-only usage.
+
+## VoiceAgent configuration
+
+The agent accepts:
+
+- `model` (required): chat model
+- `transcriptionModel` (optional): STT model
+- `speechModel` (optional): TTS model
+- `instructions` (optional): system prompt
+- `stopWhen` (optional): stopping condition
+- `tools` (optional): AI SDK tools map
+- `endpoint` (optional): WebSocket endpoint
+- `voice` (optional): TTS voice, default `alloy`
+- `speechInstructions` (optional): style instructions for TTS
+- `outputFormat` (optional): audio format, default `mp3`
+- `streamingSpeech` (optional):
+ - `minChunkSize`
+ - `maxChunkSize`
+ - `parallelGeneration`
+ - `maxParallelRequests`
+
## Run (text-only check)
-This validates model + tool calls without requiring WebSocket:
+This validates LLM + tool + streaming speech without requiring WebSocket:
pnpm demo
-Expected logs include `text` events and optional `tool_start`.
+Expected logs include `text`, `chunk:text_delta`, tool events, and speech chunk events.
## Run (WebSocket check)
@@ -45,7 +69,22 @@ Expected logs include `text` events and optional `tool_start`.
The demo will:
- run `sendText()` first (text-only sanity check), then
-- connect to `VOICE_WS_ENDPOINT` if provided.
+- connect to `VOICE_WS_ENDPOINT` if provided,
+- emit streaming protocol messages (`text_delta`, `tool_call`, `audio_chunk`, `response_complete`, etc.).
+
+## Browser voice client (HTML)
+
+A simple browser client is available at [example/voice-client.html](example/voice-client.html).
+
+What it does:
+- captures microphone speech using Web Speech API (speech-to-text)
+- sends transcript to the agent via WebSocket (`type: "transcript"`)
+- receives streaming `audio_chunk` messages and plays them in order
+
+How to use:
+1. Start your agent server/WebSocket endpoint.
+2. Open [example/voice-client.html](example/voice-client.html) in a browser (Chrome/Edge recommended).
+3. Connect to `ws://localhost:8080` (or your endpoint), then click **Start Mic**.
## Scripts
@@ -58,3 +97,4 @@ The demo will:
- If `VOICE_WS_ENDPOINT` is empty, WebSocket connect is skipped.
- The sample WS server sends a mock `transcript` message for end-to-end testing.
+- Streaming TTS uses chunk queueing and supports interruption (`interrupt`).
diff --git a/example/demo.ts b/example/demo.ts
index 0b36731..87ada97 100644
--- a/example/demo.ts
+++ b/example/demo.ts
@@ -43,6 +43,13 @@ Use tools when needed to provide accurate information.`,
voice: "alloy", // Options: alloy, echo, fable, onyx, nova, shimmer
speechInstructions: "Speak in a friendly, natural conversational tone.",
outputFormat: "mp3",
+ // Streaming speech tuning
+ streamingSpeech: {
+ minChunkSize: 40,
+ maxChunkSize: 180,
+ parallelGeneration: true,
+ maxParallelRequests: 2,
+ },
// WebSocket endpoint
endpoint: process.env.VOICE_WS_ENDPOINT,
// Tools
@@ -70,13 +77,13 @@ agent.on("text", (msg: { role: string; text: string }) => {
});
// Streaming text delta events (real-time text chunks)
-agent.on("text_delta", ({ text }: { text: string }) => {
+agent.on("chunk:text_delta", ({ text }: { text: string }) => {
process.stdout.write(text);
});
-// Tool events
-agent.on("tool_start", ({ name, input }: { name: string; input?: unknown }) => {
- console.log(`\n[Tool] Calling ${name}...`, input ? JSON.stringify(input) : "");
+// Tool events (stream-level)
+agent.on("chunk:tool_call", ({ toolName, input }: { toolName: string; input: unknown }) => {
+ console.log(`\n[Tool] Calling ${toolName}...`, input ? JSON.stringify(input) : "");
});
agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => {
@@ -84,19 +91,51 @@ agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) =>
});
// Speech events
-agent.on("speech_start", ({ text }: { text: string }) => {
- console.log(`[TTS] Generating speech for: "${text.substring(0, 50)}..."`);
+agent.on("speech_start", ({ streaming }: { streaming: boolean }) => {
+ console.log(`[TTS] Speech started (streaming=${streaming})`);
});
agent.on("speech_complete", () => {
console.log("[TTS] Speech generation complete");
});
-// Audio events (when TTS audio is generated)
+agent.on("speech_chunk_queued", ({ id, text }: { id: number; text: string }) => {
+ console.log(`[TTS] Queued chunk #${id}: ${text.substring(0, 40)}...`);
+});
+
+// Streaming audio chunk events
+agent.on(
+ "audio_chunk",
+ async ({ chunkId, format, uint8Array }: { chunkId: number; format: string; uint8Array: Uint8Array }) => {
+ console.log(`[Audio] Chunk #${chunkId} (${uint8Array.length} bytes, ${format})`);
+ await writeFile(`output_chunk_${chunkId}.${format}`, Buffer.from(uint8Array));
+ },
+);
+
+// Full audio event (non-streaming fallback via generateAndSendSpeechFull)
agent.on("audio", async (audio: { data: string; format: string; uint8Array: Uint8Array }) => {
- console.log(`[Audio] Received ${audio.format} audio (${audio.uint8Array.length} bytes)`);
- // Optionally save to file for testing
- await writeFile(`output.${audio.format}`, Buffer.from(audio.uint8Array));
+ console.log(`[Audio] Full response audio (${audio.uint8Array.length} bytes, ${audio.format})`);
+ await writeFile(`output_full.${audio.format}`, Buffer.from(audio.uint8Array));
+});
+
+// Speech interruption (barge-in)
+agent.on("speech_interrupted", ({ reason }: { reason: string }) => {
+ console.log(`[TTS] Speech interrupted: ${reason}`);
+});
+
+// Transcription event (when server-side Whisper is used)
+agent.on("transcription", ({ text, language }: { text: string; language?: string }) => {
+ console.log(`[STT] Transcription (${language || "unknown"}): ${text}`);
+});
+
+// Audio received event
+agent.on("audio_received", ({ size }: { size: number }) => {
+ console.log(`[Audio] Received ${(size / 1024).toFixed(1)} KB of audio input`);
+});
+
+// Warning events
+agent.on("warning", (msg: string) => {
+ console.warn(`[Warning] ${msg}`);
});
// Error handling
@@ -111,20 +150,9 @@ agent.on("error", (error: Error) => {
try {
// Test 1: Simple text query with streaming
- console.log("--- Test 1: Weather Query ---");
- const response1 = await agent.sendText("What is the weather in Berlin?");
+ console.log("--- Test 1: Text Query ---");
+ await agent.sendText("What's the weather in San Francisco?");
console.log("\n");
-
- // Test 2: Multi-turn conversation
- console.log("--- Test 2: Follow-up Question ---");
- const response2 = await agent.sendText("What about Tokyo?");
- console.log("\n");
-
- // Test 3: Time query
- console.log("--- Test 3: Time Query ---");
- const response3 = await agent.sendText("What time is it?");
- console.log("\n");
-
// Show conversation history
console.log("--- Conversation History ---");
const history = agent.getHistory();
diff --git a/example/voice-client.html b/example/voice-client.html
new file mode 100644
index 0000000..27b67d4
--- /dev/null
+++ b/example/voice-client.html
@@ -0,0 +1,1088 @@
+
+
+
+
+
+
+ Voice Agent Web Client
+
+
+
+
+ ποΈ Voice Agent Web Client
+ Real-time voice I/O with streaming speech generation. Supports browser STT or server-side
+ Whisper transcription.
+
+
+
+
+
+
+
+
+
Disconnected
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ π€ You said
+ —
+
+ π€ Assistant idle
+
+
+
+
+
+
+ π Logs
+
+
+
+
+
+
\ No newline at end of file
diff --git a/example/ws-server-2.ts b/example/ws-server-2.ts
new file mode 100644
index 0000000..9948a85
--- /dev/null
+++ b/example/ws-server-2.ts
@@ -0,0 +1,120 @@
+import "dotenv/config";
+import { WebSocketServer } from "ws";
+import { VoiceAgent } from "../src";
+import { tool } from "ai";
+import { z } from "zod";
+import { openai } from "@ai-sdk/openai";
+
+const endpoint = process.env.VOICE_WS_ENDPOINT || "ws://localhost:8080";
+const url = new URL(endpoint);
+const port = Number(url.port || 8080);
+const host = url.hostname || "localhost";
+
+// ββ Tools (same as demo.ts) ββββββββββββββββββββββββββββββββββββββββββββ
+const weatherTool = tool({
+ description: "Get the weather in a location",
+ inputSchema: z.object({
+ location: z.string().describe("The location to get the weather for"),
+ }),
+ execute: async ({ location }) => ({
+ location,
+ temperature: 72 + Math.floor(Math.random() * 21) - 10,
+ conditions: ["sunny", "cloudy", "rainy", "partly cloudy"][
+ Math.floor(Math.random() * 4)
+ ],
+ }),
+});
+
+const timeTool = tool({
+ description: "Get the current time",
+ inputSchema: z.object({}),
+ execute: async () => ({
+ time: new Date().toLocaleTimeString(),
+ timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
+ }),
+});
+
+// ββ WebSocket server βββββββββββββββββββββββββββββββββββββββββββββββββββ
+const wss = new WebSocketServer({ port, host });
+
+wss.on("listening", () => {
+ console.log(`[ws-server] listening on ${endpoint}`);
+ console.log("[ws-server] Waiting for connections...\n");
+});
+
+wss.on("connection", (socket) => {
+ console.log("[ws-server] β client connected");
+
+ // Create a fresh VoiceAgent per connection
+ const agent = new VoiceAgent({
+ model: openai("gpt-4o"),
+ transcriptionModel: openai.transcription("whisper-1"),
+ speechModel: openai.speech("gpt-4o-mini-tts"),
+ instructions: `You are a helpful voice assistant.
+Keep responses concise and conversational since they will be spoken aloud.
+Use tools when needed to provide accurate information.`,
+ voice: "alloy",
+ speechInstructions: "Speak in a friendly, natural conversational tone.",
+ outputFormat: "mp3",
+ streamingSpeech: {
+ minChunkSize: 40,
+ maxChunkSize: 180,
+ parallelGeneration: true,
+ maxParallelRequests: 2,
+ },
+ tools: {
+ getWeather: weatherTool,
+ getTime: timeTool,
+ },
+ });
+
+ // Wire agent events to server logs
+ agent.on("text", (msg: { role: string; text: string }) => {
+ const prefix = msg.role === "user" ? "π€ User" : "π€ Assistant";
+ console.log(`[ws-server] ${prefix}: ${msg.text}`);
+ });
+
+ agent.on("chunk:text_delta", ({ text }: { text: string }) => {
+ process.stdout.write(text);
+ });
+
+ agent.on("chunk:tool_call", ({ toolName }: { toolName: string }) => {
+ console.log(`\n[ws-server] π οΈ Tool call: ${toolName}`);
+ });
+
+ agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => {
+ console.log(`[ws-server] π οΈ Tool result (${name}):`, JSON.stringify(result));
+ });
+
+ agent.on("speech_start", () => console.log("[ws-server] π Speech started"));
+ agent.on("speech_complete", () => console.log("[ws-server] π Speech complete"));
+ agent.on("speech_interrupted", ({ reason }: { reason: string }) =>
+ console.log(`[ws-server] βΈοΈ Speech interrupted: ${reason}`),
+ );
+
+ agent.on("audio_chunk", ({ chunkId, format, uint8Array }: { chunkId: number; format: string; uint8Array: Uint8Array }) => {
+ console.log(`[ws-server] π Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`);
+ });
+
+ agent.on("error", (err: Error) => console.error("[ws-server] β Error:", err.message));
+
+ agent.on("disconnected", () => {
+ console.log("[ws-server] β client disconnected\n");
+ });
+
+ // Hand the accepted socket to the agent β this is the key line.
+ // The agent will listen for "transcript", "audio", "interrupt" messages
+ // and send back "text_delta", "audio_chunk", "response_complete", etc.
+ agent.handleSocket(socket);
+});
+
+// Graceful shutdown
+process.on("SIGINT", () => {
+ console.log("\n[ws-server] Shutting down...");
+ wss.close(() => {
+ console.log("[ws-server] Server closed");
+ process.exit(0);
+ });
+});
+
+export { wss };
diff --git a/example/ws-server.ts b/example/ws-server.ts
index aea5bd6..f648c61 100644
--- a/example/ws-server.ts
+++ b/example/ws-server.ts
@@ -1,140 +1,131 @@
import "dotenv/config";
-import { WebSocketServer, WebSocket } from "ws";
-import { readFile } from "fs/promises";
-import { existsSync } from "fs";
+import { WebSocketServer } from "ws";
+import { VoiceAgent } from "../src";
+import { tool } from "ai";
+import { z } from "zod";
+import { openai } from "@ai-sdk/openai";
const endpoint = process.env.VOICE_WS_ENDPOINT || "ws://localhost:8080";
const url = new URL(endpoint);
const port = Number(url.port || 8080);
const host = url.hostname || "localhost";
-// Message types for type safety
-interface BaseMessage {
- type: string;
-}
+// ββ Tools (same as demo.ts) ββββββββββββββββββββββββββββββββββββββββββββ
+const weatherTool = tool({
+ description: "Get the weather in a location",
+ inputSchema: z.object({
+ location: z.string().describe("The location to get the weather for"),
+ }),
+ execute: async ({ location }) => ({
+ location,
+ temperature: 72 + Math.floor(Math.random() * 21) - 10,
+ conditions: ["sunny", "cloudy", "rainy", "partly cloudy"][
+ Math.floor(Math.random() * 4)
+ ],
+ }),
+});
-interface TextDeltaMessage extends BaseMessage {
- type: "text_delta";
- text: string;
-}
-
-interface ToolCallMessage extends BaseMessage {
- type: "tool_call";
- toolName: string;
- toolCallId: string;
- input: unknown;
-}
-
-interface ToolResultMessage extends BaseMessage {
- type: "tool_result";
- toolName: string;
- toolCallId: string;
- result: unknown;
-}
-
-interface AudioMessage extends BaseMessage {
- type: "audio";
- data: string; // base64 encoded
- format: string;
-}
-
-interface ResponseCompleteMessage extends BaseMessage {
- type: "response_complete";
- text: string;
- toolCalls: Array<{ toolName: string; toolCallId: string; input: unknown }>;
- toolResults: Array<{ toolName: string; toolCallId: string; output: unknown }>;
-}
-
-type AgentMessage =
- | TextDeltaMessage
- | ToolCallMessage
- | ToolResultMessage
- | AudioMessage
- | ResponseCompleteMessage;
+const timeTool = tool({
+ description: "Get the current time",
+ inputSchema: z.object({}),
+ execute: async () => ({
+ time: new Date().toLocaleTimeString(),
+ timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
+ }),
+});
+// ββ WebSocket server βββββββββββββββββββββββββββββββββββββββββββββββββββ
const wss = new WebSocketServer({ port, host });
wss.on("listening", () => {
- console.log(`[ws-server] π listening on ${endpoint}`);
+ console.log(`[ws-server] listening on ${endpoint}`);
console.log("[ws-server] Waiting for connections...\n");
});
-wss.on("connection", (socket: WebSocket) => {
+wss.on("connection", (socket) => {
console.log("[ws-server] β client connected");
- let streamingText = "";
- let audioChunks: Buffer[] = [];
-
- // Send a sample transcript to test text pipeline end-to-end.
- setTimeout(() => {
- console.log("[ws-server] -> Sending test transcript...");
- socket.send(
- JSON.stringify({
- type: "transcript",
- text: "What is the weather in Berlin?",
- }),
- );
- }, 500);
-
- socket.on("message", async (data) => {
- try {
- const msg = JSON.parse(data.toString()) as AgentMessage;
-
- switch (msg.type) {
- case "text_delta":
- // Real-time streaming text from the agent
- streamingText += msg.text;
- process.stdout.write(msg.text);
- break;
-
- case "tool_call":
- console.log(`\n[ws-server] π οΈ Tool call: ${msg.toolName}`);
- console.log(` Input: ${JSON.stringify(msg.input)}`);
- break;
-
- case "tool_result":
- console.log(`[ws-server] π οΈ Tool result: ${msg.toolName}`);
- console.log(` Result: ${JSON.stringify(msg.result)}`);
- break;
-
- case "audio":
- // Handle audio response from TTS
- const audioBuffer = Buffer.from(msg.data, "base64");
- audioChunks.push(audioBuffer);
- console.log(
- `[ws-server] π Received audio: ${audioBuffer.length} bytes (${msg.format})`,
- );
-
- // Optionally save audio to file for testing
- // await writeFile(`output_${Date.now()}.${msg.format}`, audioBuffer);
- break;
-
- case "response_complete":
- console.log("\n[ws-server] β
Response complete");
- console.log(` Text length: ${msg.text.length}`);
- console.log(` Tool calls: ${msg.toolCalls.length}`);
- console.log(` Tool results: ${msg.toolResults.length}`);
-
- // Reset for next response
- streamingText = "";
- audioChunks = [];
- break;
-
- default:
- console.log("[ws-server] <- Unknown message:", msg);
- }
- } catch {
- console.log("[ws-server] <- raw", data.toString().substring(0, 100));
- }
+ // Create a fresh VoiceAgent per connection
+ const agent = new VoiceAgent({
+ model: openai("gpt-4o"),
+ transcriptionModel: openai.transcription("whisper-1"),
+ speechModel: openai.speech("gpt-4o-mini-tts"),
+ instructions: `You are a helpful voice assistant.
+Keep responses concise and conversational since they will be spoken aloud.
+Use tools when needed to provide accurate information.`,
+ voice: "alloy",
+ speechInstructions: "Speak in a friendly, natural conversational tone.",
+ outputFormat: "mp3",
+ streamingSpeech: {
+ minChunkSize: 40,
+ maxChunkSize: 180,
+ parallelGeneration: true,
+ maxParallelRequests: 2,
+ },
+ tools: {
+ getWeather: weatherTool,
+ getTime: timeTool,
+ },
});
- socket.on("close", () => {
+ // Wire agent events to server logs
+ agent.on("text", (msg: { role: string; text: string }) => {
+ const prefix = msg.role === "user" ? "π€ User" : "π€ Assistant";
+ console.log(`[ws-server] ${prefix}: ${msg.text}`);
+ });
+
+ agent.on("chunk:text_delta", ({ text }: { text: string }) => {
+ process.stdout.write(text);
+ });
+
+ agent.on("chunk:tool_call", ({ toolName }: { toolName: string }) => {
+ console.log(`\n[ws-server] π οΈ Tool call: ${toolName}`);
+ });
+
+ agent.on("tool_result", ({ name, result }: { name: string; result: unknown }) => {
+ console.log(`[ws-server] π οΈ Tool result (${name}):`, JSON.stringify(result));
+ });
+
+ agent.on("speech_start", () => console.log("[ws-server] π Speech started"));
+ agent.on("speech_complete", () => console.log("[ws-server] π Speech complete"));
+ agent.on("speech_interrupted", ({ reason }: { reason: string }) =>
+ console.log(`[ws-server] βΈοΈ Speech interrupted: ${reason}`),
+ );
+
+ agent.on("audio_chunk", ({ chunkId, format, uint8Array }: { chunkId: number; format: string; uint8Array: Uint8Array }) => {
+ console.log(`[ws-server] π Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`);
+ });
+
+ agent.on("transcription", ({ text, language }: { text: string; language?: string }) => {
+ console.log(`[ws-server] π Transcription (${language || "unknown"}): ${text}`);
+ });
+
+ agent.on("audio_received", ({ size }: { size: number }) => {
+ console.log(`[ws-server] π€ Audio received: ${(size / 1024).toFixed(1)} KB`);
+ });
+
+ agent.on("chunk:reasoning_delta", ({ text }: { text: string }) => {
+ process.stdout.write(text);
+ });
+
+ agent.on("warning", (msg: string) => {
+ console.log(`[ws-server] β οΈ Warning: ${msg}`);
+ });
+
+ agent.on("speech_chunk_queued", ({ id, text }: { id: number; text: string }) => {
+ console.log(`[ws-server] π Queued speech chunk #${id}: ${text.substring(0, 50)}...`);
+ });
+
+ agent.on("error", (err: Error) => console.error("[ws-server] β Error:", err.message));
+
+ agent.on("disconnected", () => {
console.log("[ws-server] β client disconnected\n");
});
- socket.on("error", (error) => {
- console.error("[ws-server] Error:", error.message);
- });
+ // Hand the accepted socket to the agent β this is the key line.
+ // The agent will listen for "transcript", "audio", "interrupt" messages
+ // and send back "text_delta", "audio_chunk", "response_complete", etc.
+ agent.handleSocket(socket);
});
// Graceful shutdown
@@ -146,24 +137,4 @@ process.on("SIGINT", () => {
});
});
-// Helper function to simulate sending audio to the agent
-async function simulateAudioInput(socket: WebSocket, audioPath: string) {
- if (!existsSync(audioPath)) {
- console.log(`[ws-server] Audio file not found: ${audioPath}`);
- return;
- }
-
- const audioBuffer = await readFile(audioPath);
- const base64Audio = audioBuffer.toString("base64");
-
- console.log(`[ws-server] -> Sending audio: ${audioPath} (${audioBuffer.length} bytes)`);
- socket.send(
- JSON.stringify({
- type: "audio",
- data: base64Audio,
- }),
- );
-}
-
-// Export for use as a module
-export { wss, simulateAudioInput };
+export { wss };
diff --git a/src/VoiceAgent.ts b/src/VoiceAgent.ts
index 2147969..b447e66 100644
--- a/src/VoiceAgent.ts
+++ b/src/VoiceAgent.ts
@@ -12,6 +12,29 @@ import {
type SpeechModel,
} from "ai";
+/**
+ * Represents a chunk of text to be converted to speech
+ */
+interface SpeechChunk {
+ id: number;
+ text: string;
+ audioPromise?: Promise;
+}
+
+/**
+ * Configuration for streaming speech behavior
+ */
+interface StreamingSpeechConfig {
+ /** Minimum characters before generating speech for a chunk */
+ minChunkSize: number;
+ /** Maximum characters per chunk (will split at sentence boundary before this) */
+ maxChunkSize: number;
+ /** Whether to enable parallel TTS generation */
+ parallelGeneration: boolean;
+ /** Maximum number of parallel TTS requests */
+ maxParallelRequests: number;
+}
+
export interface VoiceAgentOptions {
model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o'))
transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1'))
@@ -23,6 +46,8 @@ export interface VoiceAgentOptions {
voice?: string; // Voice for TTS (e.g., 'alloy', 'echo', 'shimmer')
speechInstructions?: string; // Instructions for TTS voice style
outputFormat?: string; // Audio output format (e.g., 'mp3', 'opus', 'wav')
+ /** Configuration for streaming speech generation */
+ streamingSpeech?: Partial;
}
export class VoiceAgent extends EventEmitter {
@@ -41,6 +66,14 @@ export class VoiceAgent extends EventEmitter {
private outputFormat: string;
private isProcessing = false;
+ // Streaming speech state
+ private streamingSpeechConfig: StreamingSpeechConfig;
+ private currentSpeechAbortController?: AbortController;
+ private speechChunkQueue: SpeechChunk[] = [];
+ private nextChunkId = 0;
+ private isSpeaking = false;
+ private pendingTextBuffer = "";
+
constructor(options: VoiceAgentOptions) {
super();
this.model = options.model;
@@ -56,6 +89,15 @@ export class VoiceAgent extends EventEmitter {
if (options.tools) {
this.tools = { ...options.tools };
}
+
+ // Initialize streaming speech config with defaults
+ this.streamingSpeechConfig = {
+ minChunkSize: 50,
+ maxChunkSize: 200,
+ parallelGeneration: true,
+ maxParallelRequests: 3,
+ ...options.streamingSpeech,
+ };
}
private setupListeners() {
@@ -67,12 +109,24 @@ export class VoiceAgent extends EventEmitter {
// Handle transcribed text from the client/STT
if (message.type === "transcript") {
+ // Interrupt ongoing speech when user starts speaking (barge-in)
+ if (this.isSpeaking) {
+ this.interruptSpeech("user_speaking");
+ }
await this.processUserInput(message.text);
}
// Handle raw audio data that needs transcription
if (message.type === "audio") {
+ // Interrupt ongoing speech when user starts speaking (barge-in)
+ if (this.isSpeaking) {
+ this.interruptSpeech("user_speaking");
+ }
await this.processAudioInput(message.data);
}
+ // Handle explicit interrupt request from client
+ if (message.type === "interrupt") {
+ this.interruptSpeech(message.reason || "client_request");
+ }
} catch (err) {
console.error("Failed to process message:", err);
this.emit("error", err);
@@ -118,8 +172,12 @@ export class VoiceAgent extends EventEmitter {
/**
* Generate speech from text using the configured speech model
+ * @param abortSignal Optional signal to cancel the speech generation
*/
- public async generateSpeechFromText(text: string): Promise {
+ public async generateSpeechFromText(
+ text: string,
+ abortSignal?: AbortSignal
+ ): Promise {
if (!this.speechModel) {
throw new Error("Speech model not configured");
}
@@ -130,11 +188,246 @@ export class VoiceAgent extends EventEmitter {
voice: this.voice,
instructions: this.speechInstructions,
outputFormat: this.outputFormat,
+ abortSignal,
});
return result.audio.uint8Array;
}
+ /**
+ * Interrupt ongoing speech generation and playback (barge-in support)
+ */
+ public interruptSpeech(reason: string = "interrupted"): void {
+ if (!this.isSpeaking && this.speechChunkQueue.length === 0) {
+ return;
+ }
+
+ // Abort any pending speech generation requests
+ if (this.currentSpeechAbortController) {
+ this.currentSpeechAbortController.abort();
+ this.currentSpeechAbortController = undefined;
+ }
+
+ // Clear the speech queue
+ this.speechChunkQueue = [];
+ this.pendingTextBuffer = "";
+ this.isSpeaking = false;
+
+ // Notify clients to stop audio playback
+ this.sendWebSocketMessage({
+ type: "speech_interrupted",
+ reason,
+ });
+
+ this.emit("speech_interrupted", { reason });
+ }
+
+ /**
+ * Extract complete sentences from text buffer
+ * Returns [extractedSentences, remainingBuffer]
+ */
+ private extractSentences(text: string): [string[], string] {
+ const sentences: string[] = [];
+ let remaining = text;
+
+ // Match sentences ending with . ! ? followed by space or end of string
+ // Also handles common abbreviations and edge cases
+ const sentenceEndPattern = /[.!?]+(?:\s+|$)/g;
+ let lastIndex = 0;
+ let match;
+
+ while ((match = sentenceEndPattern.exec(text)) !== null) {
+ const sentence = text.slice(lastIndex, match.index + match[0].length).trim();
+ if (sentence.length >= this.streamingSpeechConfig.minChunkSize) {
+ sentences.push(sentence);
+ lastIndex = match.index + match[0].length;
+ } else if (sentences.length > 0) {
+ // Append short sentence to previous one
+ sentences[sentences.length - 1] += " " + sentence;
+ lastIndex = match.index + match[0].length;
+ }
+ }
+
+ remaining = text.slice(lastIndex);
+
+ // If remaining text is too long, force split at clause boundaries
+ if (remaining.length > this.streamingSpeechConfig.maxChunkSize) {
+ const clausePattern = /[,;:]\s+/g;
+ let clauseMatch;
+ let splitIndex = 0;
+
+ while ((clauseMatch = clausePattern.exec(remaining)) !== null) {
+ if (clauseMatch.index >= this.streamingSpeechConfig.minChunkSize) {
+ splitIndex = clauseMatch.index + clauseMatch[0].length;
+ break;
+ }
+ }
+
+ if (splitIndex > 0) {
+ sentences.push(remaining.slice(0, splitIndex).trim());
+ remaining = remaining.slice(splitIndex);
+ }
+ }
+
+ return [sentences, remaining];
+ }
+
+ /**
+ * Queue a text chunk for speech generation
+ */
+ private queueSpeechChunk(text: string): void {
+ if (!this.speechModel || !text.trim()) return;
+
+ const chunk: SpeechChunk = {
+ id: this.nextChunkId++,
+ text: text.trim(),
+ };
+
+ // Start generating audio immediately (parallel generation)
+ if (this.streamingSpeechConfig.parallelGeneration) {
+ const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length;
+
+ if (activeRequests < this.streamingSpeechConfig.maxParallelRequests) {
+ chunk.audioPromise = this.generateChunkAudio(chunk);
+ }
+ }
+
+ this.speechChunkQueue.push(chunk);
+ this.emit("speech_chunk_queued", { id: chunk.id, text: chunk.text });
+
+ // Start processing queue if not already
+ if (!this.isSpeaking) {
+ this.processSpeechQueue();
+ }
+ }
+
+ /**
+ * Generate audio for a single chunk
+ */
+ private async generateChunkAudio(chunk: SpeechChunk): Promise {
+ if (!this.currentSpeechAbortController) {
+ this.currentSpeechAbortController = new AbortController();
+ }
+
+ try {
+ const audioData = await this.generateSpeechFromText(
+ chunk.text,
+ this.currentSpeechAbortController.signal
+ );
+ return audioData;
+ } catch (error) {
+ if ((error as Error).name === "AbortError") {
+ return null; // Cancelled, don't report as error
+ }
+ console.error(`Failed to generate audio for chunk ${chunk.id}:`, error);
+ this.emit("error", error);
+ return null;
+ }
+ }
+
+ /**
+ * Process the speech queue and send audio chunks in order
+ */
+ private async processSpeechQueue(): Promise {
+ if (this.isSpeaking) return;
+ this.isSpeaking = true;
+
+ this.emit("speech_start", { streaming: true });
+ this.sendWebSocketMessage({ type: "speech_stream_start" });
+
+ try {
+ while (this.speechChunkQueue.length > 0) {
+ const chunk = this.speechChunkQueue[0];
+
+ // Ensure audio generation has started
+ if (!chunk.audioPromise) {
+ chunk.audioPromise = this.generateChunkAudio(chunk);
+ }
+
+ // Wait for this chunk's audio
+ const audioData = await chunk.audioPromise;
+
+ // Check if we were interrupted while waiting
+ if (!this.isSpeaking) break;
+
+ // Remove from queue after processing
+ this.speechChunkQueue.shift();
+
+ if (audioData) {
+ const base64Audio = Buffer.from(audioData).toString("base64");
+
+ // Send audio chunk via WebSocket
+ this.sendWebSocketMessage({
+ type: "audio_chunk",
+ chunkId: chunk.id,
+ data: base64Audio,
+ format: this.outputFormat,
+ text: chunk.text,
+ });
+
+ // Emit for local handling
+ this.emit("audio_chunk", {
+ chunkId: chunk.id,
+ data: base64Audio,
+ format: this.outputFormat,
+ text: chunk.text,
+ uint8Array: audioData,
+ });
+ }
+
+ // Start generating next chunks in parallel
+ if (this.streamingSpeechConfig.parallelGeneration) {
+ const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length;
+ const toStart = Math.min(
+ this.streamingSpeechConfig.maxParallelRequests - activeRequests,
+ this.speechChunkQueue.length
+ );
+
+ for (let i = 0; i < toStart; i++) {
+ const nextChunk = this.speechChunkQueue.find(c => !c.audioPromise);
+ if (nextChunk) {
+ nextChunk.audioPromise = this.generateChunkAudio(nextChunk);
+ }
+ }
+ }
+ }
+ } finally {
+ this.isSpeaking = false;
+ this.currentSpeechAbortController = undefined;
+
+ this.sendWebSocketMessage({ type: "speech_stream_end" });
+ this.emit("speech_complete", { streaming: true });
+ }
+ }
+
+ /**
+ * Process text deltra for streaming speech
+ * Call this as text chunks arrive from LLM
+ */
+ private processTextForStreamingSpeech(textDelta: string): void {
+ if (!this.speechModel) return;
+
+ this.pendingTextBuffer += textDelta;
+
+ const [sentences, remaining] = this.extractSentences(this.pendingTextBuffer);
+ this.pendingTextBuffer = remaining;
+
+ for (const sentence of sentences) {
+ this.queueSpeechChunk(sentence);
+ }
+ }
+
+ /**
+ * Flush any remaining text in the buffer to speech
+ * Call this when stream ends
+ */
+ private flushStreamingSpeech(): void {
+ if (!this.speechModel || !this.pendingTextBuffer.trim()) return;
+
+ this.queueSpeechChunk(this.pendingTextBuffer);
+ this.pendingTextBuffer = "";
+ }
+
/**
* Process incoming audio data: transcribe and generate response
*/
@@ -182,6 +475,18 @@ export class VoiceAgent extends EventEmitter {
});
}
+ /**
+ * Attach an existing WebSocket (server-side usage).
+ * Use this when a WS server accepts a connection and you want the
+ * agent to handle messages on that socket.
+ */
+ public handleSocket(socket: WebSocket): void {
+ this.socket = socket;
+ this.isConnected = true;
+ this.setupListeners();
+ this.emit("connected");
+ }
+
/**
* Send text input for processing (bypasses transcription)
*/
@@ -366,6 +671,8 @@ export class VoiceAgent extends EventEmitter {
case "text-delta":
fullText += part.text;
+ // Process text for streaming speech as it arrives
+ this.processTextForStreamingSpeech(part.text);
this.sendWebSocketMessage({
type: "text_delta",
id: part.id,
@@ -374,6 +681,8 @@ export class VoiceAgent extends EventEmitter {
break;
case "text-end":
+ // Flush any remaining text to speech when text stream ends
+ this.flushStreamingSpeech();
this.sendWebSocketMessage({ type: "text_end", id: part.id });
break;
@@ -478,9 +787,13 @@ export class VoiceAgent extends EventEmitter {
this.conversationHistory.push({ role: "assistant", content: fullText });
}
- // Generate speech from the response if speech model is configured
- if (this.speechModel && fullText) {
- await this.generateAndSendSpeech(fullText);
+ // Ensure any remaining speech is flushed (in case text-end wasn't triggered)
+ this.flushStreamingSpeech();
+
+ // Wait for all speech chunks to complete before signaling response complete
+ // This ensures audio playback can finish
+ while (this.speechChunkQueue.length > 0 || this.isSpeaking) {
+ await new Promise(resolve => setTimeout(resolve, 100));
}
// Send the complete response
@@ -501,13 +814,14 @@ export class VoiceAgent extends EventEmitter {
}
/**
- * Generate speech and send audio via WebSocket
+ * Generate speech for full text at once (non-streaming fallback)
+ * Useful when you want to bypass streaming speech for short responses
*/
- private async generateAndSendSpeech(text: string): Promise {
+ public async generateAndSendSpeechFull(text: string): Promise {
if (!this.speechModel) return;
try {
- this.emit("speech_start", { text });
+ this.emit("speech_start", { text, streaming: false });
const audioData = await this.generateSpeechFromText(text);
const base64Audio = Buffer.from(audioData).toString("base64");
@@ -526,7 +840,7 @@ export class VoiceAgent extends EventEmitter {
uint8Array: audioData,
});
- this.emit("speech_complete", { text });
+ this.emit("speech_complete", { text, streaming: false });
} catch (error) {
console.error("Failed to generate speech:", error);
this.emit("error", error);
@@ -604,4 +918,18 @@ export class VoiceAgent extends EventEmitter {
get processing(): boolean {
return this.isProcessing;
}
+
+ /**
+ * Check if agent is currently speaking (generating/playing audio)
+ */
+ get speaking(): boolean {
+ return this.isSpeaking;
+ }
+
+ /**
+ * Get the number of pending speech chunks in the queue
+ */
+ get pendingSpeechChunks(): number {
+ return this.speechChunkQueue.length;
+ }
}