3 Commits

Author SHA1 Message Date
Bijit Mondal
637d57fb41 feat: add serve-client and start-test-environment scripts, enhance voice-client with debugging info 2026-02-14 14:20:07 +05:30
Bijit Mondal
8e8dd9d9f6 WIP 2026-02-14 12:26:47 +05:30
Bijit Mondal
7725f66e39 feat: enhance README with VoiceAgent usage example and configuration options 2026-02-13 17:36:18 +05:30
11 changed files with 917 additions and 118 deletions

81
CHANGELOG.md Normal file
View File

@@ -0,0 +1,81 @@
# Changelog
All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/),
and this project adheres to [Semantic Versioning](https://semver.org/).
## [0.1.0] - 2025-07-15
### Added
- **Conversation history limits** — new `history` option with `maxMessages` (default 100)
and `maxTotalChars` (default unlimited) to prevent unbounded memory growth.
Oldest messages are trimmed in pairs to preserve user/assistant turn structure.
Emits `history_trimmed` event when messages are evicted.
- **Audio input size validation** — new `maxAudioInputSize` option (default 10 MB).
Oversized or empty audio payloads are rejected early with an `error` / `warning` event
instead of being forwarded to the transcription model.
- **Serial input queue** — `sendText()`, WebSocket `transcript` messages, and
transcribed audio are now queued and processed one at a time. This prevents
race conditions where concurrent calls could corrupt `conversationHistory` or
interleave streaming output.
- **LLM stream cancellation** — an `AbortController` is now threaded into
`streamText()` via `abortSignal`. Barge-in, disconnect, and explicit
interrupts abort the LLM stream immediately (saving tokens) instead of only
cancelling TTS.
- **`interruptCurrentResponse(reason)`** — new public method that aborts both
the LLM stream *and* ongoing speech in a single call. WebSocket barge-in
(`transcript` / `audio` / `interrupt` messages) now uses this instead of
`interruptSpeech()` alone.
- **`destroy()`** — permanently tears down the agent, releasing the socket,
clearing history and tools, and removing all event listeners.
A `destroyed` getter is also exposed. Any subsequent method call throws.
- **`history_trimmed` event** — emitted with `{ removedCount, reason }` when
the sliding-window trims old messages.
- **Input validation** — `sendText("")` now throws, and incoming WebSocket
`transcript` / `audio` messages are validated before processing.
### Changed
- **`disconnect()` is now a full cleanup** — aborts in-flight LLM and TTS
streams, clears the speech queue, rejects pending queued inputs, and removes
socket listeners before closing. Previously it only called `socket.close()`.
- **`connect()` and `handleSocket()` are idempotent** — calling either when a
socket is already attached will cleanly tear down the old connection first
instead of leaking it.
- **`sendWebSocketMessage()` is resilient** — checks `socket.readyState` and
wraps `send()` in a try/catch so a socket that closes mid-send does not throw
an unhandled exception.
- **Speech queue completion uses a promise** — `processUserInput` now awaits a
`speechQueueDonePromise` instead of busy-wait polling
(`while (queue.length) { await sleep(100) }`), reducing CPU waste and
eliminating a race window.
- **`interruptSpeech()` resolves the speech-done promise** — so
`processUserInput` can proceed immediately after a barge-in instead of
potentially hanging.
- **WebSocket message handler uses `if/else if`** — prevents a single message
from accidentally matching multiple type branches.
- **Chunk ID wraps at `Number.MAX_SAFE_INTEGER`** — avoids unbounded counter
growth in very long-running sessions.
- **`processUserInput` catch block cleans up speech state** — on stream error
the pending text buffer is cleared and any in-progress speech is interrupted,
so the agent does not get stuck in a broken state.
- **WebSocket close handler calls `cleanupOnDisconnect()`** — aborts LLM + TTS,
clears queues, and rejects pending input promises.
### Fixed
- Typo in JSDoc: `"Process text deltra"``"Process text delta"`.
## [0.0.1] - 2025-07-14
### Added
- Initial release.
- Streaming text generation via AI SDK `streamText`.
- Multi-step tool calling with `stopWhen`.
- Chunked streaming TTS with parallel generation and barge-in support.
- Audio transcription via AI SDK `experimental_transcribe`.
- WebSocket transport with full stream/tool/speech lifecycle events.
- Browser voice client example (`example/voice-client.html`).

183
README.md
View File

@@ -1,14 +1,18 @@
# voice-agent-ai-sdk # voice-agent-ai-sdk
Streaming voice/text agent SDK built on AI SDK with optional WebSocket transport. Streaming voice/text agent SDK built on [AI SDK](https://sdk.vercel.ai/) with optional WebSocket transport.
## Current status ## Features
- Streaming text generation is implemented via `streamText`. - **Streaming text generation** via AI SDK `streamText` with multi-step tool calling.
- Tool calling is supported in-stream. - **Chunked streaming TTS** — text is split at sentence boundaries and converted to speech in parallel as the LLM streams, giving low time-to-first-audio.
- Speech synthesis is implemented with chunked streaming TTS. - **Audio transcription** via AI SDK `experimental_transcribe` (e.g. Whisper).
- Audio transcription is supported (when `transcriptionModel` is configured). - **Barge-in / interruption** — user speech cancels both the in-flight LLM stream and pending TTS, saving tokens and latency.
- WebSocket protocol events are emitted for stream, tool, and speech lifecycle. - **Memory management** — configurable sliding-window on conversation history (`maxMessages`, `maxTotalChars`) and audio input size limits.
- **Serial request queue** — concurrent `sendText` / audio inputs are queued and processed one at a time, preventing race conditions.
- **Graceful lifecycle** — `disconnect()` aborts all in-flight work; `destroy()` permanently releases every resource.
- **WebSocket transport** with a full protocol of stream, tool, and speech lifecycle events.
- **Works without WebSocket** — call `sendText()` directly for text-only or server-side use.
## Prerequisites ## Prerequisites
@@ -29,25 +33,150 @@ Streaming voice/text agent SDK built on AI SDK with optional WebSocket transport
`VOICE_WS_ENDPOINT` is optional for text-only usage. `VOICE_WS_ENDPOINT` is optional for text-only usage.
## VoiceAgent configuration ## VoiceAgent usage (as in the demo)
Minimal end-to-end example using AI SDK tools, streaming text, and streaming TTS:
```ts
import "dotenv/config";
import { VoiceAgent } from "./src";
import { tool } from "ai";
import { z } from "zod";
import { openai } from "@ai-sdk/openai";
const weatherTool = tool({
description: "Get the weather in a location",
inputSchema: z.object({ location: z.string() }),
execute: async ({ location }) => ({ location, temperature: 72, conditions: "sunny" }),
});
const agent = new VoiceAgent({
model: openai("gpt-4o"),
transcriptionModel: openai.transcription("whisper-1"),
speechModel: openai.speech("gpt-4o-mini-tts"),
instructions: "You are a helpful voice assistant.",
voice: "alloy",
speechInstructions: "Speak in a friendly, natural conversational tone.",
outputFormat: "mp3",
streamingSpeech: {
minChunkSize: 40,
maxChunkSize: 180,
parallelGeneration: true,
maxParallelRequests: 2,
},
// Memory management (new in 0.1.0)
history: {
maxMessages: 50, // keep last 50 messages
maxTotalChars: 100_000, // or trim when total chars exceed 100k
},
maxAudioInputSize: 5 * 1024 * 1024, // 5 MB limit
endpoint: process.env.VOICE_WS_ENDPOINT,
tools: { getWeather: weatherTool },
});
agent.on("text", ({ role, text }) => {
const prefix = role === "user" ? "👤" : "🤖";
console.log(prefix, text);
});
agent.on("chunk:text_delta", ({ text }) => process.stdout.write(text));
agent.on("speech_start", ({ streaming }) => console.log("speech_start", streaming));
agent.on("audio_chunk", ({ chunkId, format, uint8Array }) => {
console.log("audio_chunk", chunkId, format, uint8Array.length);
});
await agent.sendText("What's the weather in San Francisco?");
if (process.env.VOICE_WS_ENDPOINT) {
await agent.connect(process.env.VOICE_WS_ENDPOINT);
}
```
### Configuration options
The agent accepts: The agent accepts:
- `model` (required): chat model | Option | Required | Default | Description |
- `transcriptionModel` (optional): STT model |---|---|---|---|
- `speechModel` (optional): TTS model | `model` | **yes** | — | AI SDK chat model (e.g. `openai("gpt-4o")`) |
- `instructions` (optional): system prompt | `transcriptionModel` | no | — | AI SDK transcription model (e.g. `openai.transcription("whisper-1")`) |
- `stopWhen` (optional): stopping condition | `speechModel` | no | — | AI SDK speech model (e.g. `openai.speech("gpt-4o-mini-tts")`) |
- `tools` (optional): AI SDK tools map | `instructions` | no | `"You are a helpful voice assistant."` | System prompt |
- `endpoint` (optional): WebSocket endpoint | `stopWhen` | no | `stepCountIs(5)` | Stopping condition for multi-step tool loops |
- `voice` (optional): TTS voice, default `alloy` | `tools` | no | `{}` | AI SDK tools map |
- `speechInstructions` (optional): style instructions for TTS | `endpoint` | no | — | Default WebSocket URL for `connect()` |
- `outputFormat` (optional): audio format, default `mp3` | `voice` | no | `"alloy"` | TTS voice |
- `streamingSpeech` (optional): | `speechInstructions` | no | — | Style instructions passed to the speech model |
- `minChunkSize` | `outputFormat` | no | `"mp3"` | Audio output format (`mp3`, `opus`, `wav`, …) |
- `maxChunkSize` | `streamingSpeech` | no | see below | Streaming TTS chunk tuning |
- `parallelGeneration` | `history` | no | see below | Conversation memory limits |
- `maxParallelRequests` | `maxAudioInputSize` | no | `10485760` (10 MB) | Maximum accepted audio input in bytes |
#### `streamingSpeech`
| Key | Default | Description |
|---|---|---|
| `minChunkSize` | `50` | Min characters before a sentence is sent to TTS |
| `maxChunkSize` | `200` | Max characters per chunk (force-split at clause boundary) |
| `parallelGeneration` | `true` | Start TTS for upcoming chunks while the current one plays |
| `maxParallelRequests` | `3` | Cap on concurrent TTS requests |
#### `history`
| Key | Default | Description |
|---|---|---|
| `maxMessages` | `100` | Max messages kept in history (0 = unlimited). Oldest are trimmed in pairs. |
| `maxTotalChars` | `0` (unlimited) | Max total characters across all messages. Oldest are trimmed when exceeded. |
### Methods
| Method | Description |
|---|---|
| `sendText(text)` | Process text input. Returns a promise with the full assistant response. Requests are queued serially. |
| `sendAudio(base64Audio)` | Transcribe base64 audio and process the result. |
| `sendAudioBuffer(buffer)` | Same as above, accepts a raw `Buffer` / `Uint8Array`. |
| `transcribeAudio(buffer)` | Transcribe audio to text without generating a response. |
| `generateAndSendSpeechFull(text)` | Non-streaming TTS fallback (entire text at once). |
| `interruptSpeech(reason?)` | Cancel in-flight TTS only (LLM stream keeps running). |
| `interruptCurrentResponse(reason?)` | Cancel **both** the LLM stream and TTS. Used for barge-in. |
| `connect(url?)` / `handleSocket(ws)` | Establish or attach a WebSocket. Safe to call multiple times. |
| `disconnect()` | Close the socket and abort all in-flight work. |
| `destroy()` | Permanently release all resources. The agent cannot be reused. |
| `clearHistory()` | Clear conversation history. |
| `getHistory()` / `setHistory(msgs)` | Read or restore conversation history. |
| `registerTools(tools)` | Merge additional tools into the agent. |
### Read-only properties
| Property | Type | Description |
|---|---|---|
| `connected` | `boolean` | Whether a WebSocket is connected |
| `processing` | `boolean` | Whether a request is currently being processed |
| `speaking` | `boolean` | Whether audio is currently being generated / sent |
| `pendingSpeechChunks` | `number` | Number of queued TTS chunks |
| `destroyed` | `boolean` | Whether `destroy()` has been called |
### Events
| Event | Payload | When |
|---|---|---|
| `text` | `{ role, text }` | User input received or full assistant response ready |
| `chunk:text_delta` | `{ id, text }` | Each streaming text token from the LLM |
| `chunk:reasoning_delta` | `{ id, text }` | Each reasoning token (models that support it) |
| `chunk:tool_call` | `{ toolName, toolCallId, input }` | Tool invocation detected |
| `tool_result` | `{ name, toolCallId, result }` | Tool execution finished |
| `speech_start` | `{ streaming }` | TTS generation begins |
| `speech_complete` | `{ streaming }` | All TTS chunks sent |
| `speech_interrupted` | `{ reason }` | Speech was cancelled (barge-in, disconnect, error) |
| `speech_chunk_queued` | `{ id, text }` | A text chunk entered the TTS queue |
| `audio_chunk` | `{ chunkId, data, format, text, uint8Array }` | One TTS chunk is ready |
| `audio` | `{ data, format, uint8Array }` | Full non-streaming TTS audio |
| `transcription` | `{ text, language }` | Audio transcription result |
| `audio_received` | `{ size }` | Raw audio input received (before transcription) |
| `history_trimmed` | `{ removedCount, reason }` | Oldest messages evicted from history |
| `connected` / `disconnected` | — | WebSocket lifecycle |
| `warning` | `string` | Non-fatal issues (empty input, etc.) |
| `error` | `Error` | Errors from LLM, TTS, transcription, or WebSocket |
## Run (text-only check) ## Run (text-only check)
@@ -59,13 +188,13 @@ Expected logs include `text`, `chunk:text_delta`, tool events, and speech chunk
## Run (WebSocket check) ## Run (WebSocket check)
1. Start local WS server: 1. Start the local WS server:
pnpm ws:server pnpm ws:server
2. In another terminal, run demo: 2. In another terminal, run the demo:
pnpm demo pnpm demo
The demo will: The demo will:
- run `sendText()` first (text-only sanity check), then - run `sendText()` first (text-only sanity check), then

29
example/serve-client.js Normal file
View File

@@ -0,0 +1,29 @@
const http = require('http');
const fs = require('fs');
const path = require('path');
const PORT = 3000;
// Create a simple HTTP server to serve the voice client HTML
const server = http.createServer((req, res) => {
if (req.url === '/' || req.url === '/index.html') {
const htmlPath = path.join(__dirname, 'voice-client.html');
fs.readFile(htmlPath, (err, data) => {
if (err) {
res.writeHead(500);
res.end('Error loading voice-client.html');
return;
}
res.writeHead(200, {'Content-Type': 'text/html'});
res.end(data);
});
} else {
res.writeHead(404);
res.end('Not found');
}
});
server.listen(PORT, () => {
console.log(`Voice client available at: http://localhost:${PORT}`);
console.log(`Make sure to also start the WebSocket server with: npm run ws:server`);
});

View File

@@ -0,0 +1,26 @@
#!/bin/bash
# Kill any previously running servers
echo "Cleaning up any existing processes..."
pkill -f "node.*example/serve-client.js" || true
pkill -f "tsx.*example/ws-server.ts" || true
echo "Starting WebSocket server..."
npm run ws:server &
WS_SERVER_PID=$!
# Sleep to ensure WebSocket server has time to start
sleep 2
echo "Starting web client server..."
npm run client &
CLIENT_SERVER_PID=$!
echo "✅ Test environment started!"
echo "📱 Open http://localhost:3000 in your browser"
echo ""
echo "Press Ctrl+C to shut down both servers"
# Wait for user to Ctrl+C
trap "kill $WS_SERVER_PID $CLIENT_SERVER_PID; echo 'Servers stopped'; exit" INT
wait

View File

@@ -263,6 +263,8 @@
<button id="disconnectBtn" disabled>Disconnect</button> <button id="disconnectBtn" disabled>Disconnect</button>
</div> </div>
<div id="status"><span class="status-dot disconnected"></span>Disconnected</div> <div id="status"><span class="status-dot disconnected"></span>Disconnected</div>
<div style="font-size: 13px; color: #666; margin-top: 4px;">Debug: Check browser console (F12) for detailed logs
</div>
</div> </div>
<!-- Input Controls --> <!-- Input Controls -->
@@ -440,6 +442,7 @@
setStatus("Playing audio...", "speaking"); setStatus("Playing audio...", "speaking");
const { bytes, format } = audioQueue.shift(); const { bytes, format } = audioQueue.shift();
console.log(`Playing audio chunk: ${bytes.length} bytes, format: ${format}`);
try { try {
const ctx = getAudioContext(); const ctx = getAudioContext();
@@ -449,7 +452,10 @@
); );
try { try {
console.log(`Attempting to decode audio with WebAudio API...`);
const audioBuffer = await ctx.decodeAudioData(arrayBuffer.slice(0)); const audioBuffer = await ctx.decodeAudioData(arrayBuffer.slice(0));
console.log(`Decoded audio successfully: ${audioBuffer.duration.toFixed(2)}s, ${audioBuffer.numberOfChannels} channels, ${audioBuffer.sampleRate}Hz`);
await new Promise((resolve) => { await new Promise((resolve) => {
const source = ctx.createBufferSource(); const source = ctx.createBufferSource();
source.buffer = audioBuffer; source.buffer = audioBuffer;
@@ -457,29 +463,49 @@
currentAudioSource = source; currentAudioSource = source;
source.onended = resolve; source.onended = resolve;
source.start(0); source.start(0);
console.log(`Audio playback started`);
}); });
console.log(`Audio playback completed`);
currentAudioSource = null; currentAudioSource = null;
} catch (_decodeErr) { } catch (decodeErr) {
console.warn(`WebAudio decode failed, falling back to Audio element:`, decodeErr);
const mime = getMimeTypeForFormat(format); const mime = getMimeTypeForFormat(format);
console.log(`Using MIME type: ${mime}`);
const blob = new Blob([bytes], { type: mime }); const blob = new Blob([bytes], { type: mime });
const url = URL.createObjectURL(blob); const url = URL.createObjectURL(blob);
const audio = new Audio(url); const audio = new Audio(url);
audio.onerror = (e) => console.error(`Audio element error:`, e);
audio.oncanplaythrough = () => console.log(`Audio ready to play: ${audio.duration.toFixed(2)}s`);
currentAudioElement = audio; currentAudioElement = audio;
await audio.play(); await audio.play();
console.log(`Audio element playback started`);
await new Promise((resolve) => { await new Promise((resolve) => {
audio.onended = resolve; audio.onended = () => {
audio.onerror = resolve; console.log(`Audio element playback completed`);
resolve();
};
audio.onerror = (e) => {
console.error(`Audio element playback failed:`, e);
resolve();
};
}); });
currentAudioElement = null; currentAudioElement = null;
URL.revokeObjectURL(url); URL.revokeObjectURL(url);
} }
} catch (err) { } catch (err) {
console.error(`Audio playback error:`, err);
log(`Audio play error: ${err?.message || err}`); log(`Audio play error: ${err?.message || err}`);
} finally { } finally {
isPlaying = false; isPlaying = false;
if (audioQueue.length > 0) { if (audioQueue.length > 0) {
console.log(`${audioQueue.length} more audio chunks in queue, continuing playback`);
playNextAudioChunk(); playNextAudioChunk();
} else if (connected) { } else if (connected) {
console.log(`Audio queue empty, returning to ${whisperListening || micShouldRun ? 'listening' : 'connected'} state`);
setStatus(whisperListening || micShouldRun ? "Listening..." : "Connected", whisperListening || micShouldRun ? "listening" : "connected"); setStatus(whisperListening || micShouldRun ? "Listening..." : "Connected", whisperListening || micShouldRun ? "listening" : "connected");
} }
} }
@@ -521,6 +547,7 @@
case "webm": case "webm":
return "audio/webm"; return "audio/webm";
default: default:
console.log(`Unknown audio format: ${format}, defaulting to mpeg`);
return `audio/${format || "mpeg"}`; return `audio/${format || "mpeg"}`;
} }
} }
@@ -543,7 +570,10 @@
analyserNode = ctx.createAnalyser(); analyserNode = ctx.createAnalyser();
analyserNode.fftSize = 256; analyserNode.fftSize = 256;
analyserSource.connect(analyserNode); analyserSource.connect(analyserNode);
} catch (_) { } console.log('Audio analyser setup complete');
} catch (err) {
console.error('Audio analyser setup failed:', err);
}
} }
function teardownAnalyser() { function teardownAnalyser() {
@@ -671,15 +701,26 @@
*/ */
async function startWhisperListening() { async function startWhisperListening() {
try { try {
console.log("Starting Whisper VAD listening");
mediaStream = await navigator.mediaDevices.getUserMedia({ mediaStream = await navigator.mediaDevices.getUserMedia({
audio: { audio: {
channelCount: 1, channelCount: 1,
sampleRate: 16000, sampleRate: 16000,
echoCancellation: true, echoCancellation: true,
noiseSuppression: true, noiseSuppression: true,
autoGainControl: true
} }
}); });
// Log the actual constraints we got
const tracks = mediaStream.getAudioTracks();
if (tracks.length > 0) {
const settings = tracks[0].getSettings();
console.log('Audio track settings:', settings);
log(`🎵 Audio: ${settings.sampleRate}Hz, ${settings.channelCount}ch`);
}
} catch (err) { } catch (err) {
console.error("Mic permission error:", err);
log(`Mic permission failed: ${err?.message || err}`); log(`Mic permission failed: ${err?.message || err}`);
setStatus("Mic permission denied", "disconnected"); setStatus("Mic permission denied", "disconnected");
return; return;
@@ -808,13 +849,45 @@
whisperSegmentActive = true; whisperSegmentActive = true;
segmentStartTime = Date.now(); segmentStartTime = Date.now();
const mimeType = MediaRecorder.isTypeSupported("audio/webm;codecs=opus") // Try to choose the best format for Whisper compatibility
? "audio/webm;codecs=opus" let mimeType = '';
: MediaRecorder.isTypeSupported("audio/mp4") const supportedTypes = [
? "audio/mp4" "audio/webm;codecs=opus", // Best compatibility with Whisper
: "audio/webm"; "audio/webm",
"audio/ogg;codecs=opus",
"audio/mp4",
"audio/wav"
];
mediaRecorder = new MediaRecorder(mediaStream, { mimeType }); for (const type of supportedTypes) {
if (MediaRecorder.isTypeSupported(type)) {
mimeType = type;
break;
}
}
if (!mimeType) {
console.warn("No preferred MIME types supported, using default");
mimeType = ''; // Let the browser choose
}
console.log(`Using MediaRecorder with MIME type: ${mimeType || 'browser default'}`);
// Create recorder with bitrate suitable for speech
const recorderOptions = {
mimeType: mimeType,
audioBitsPerSecond: 128000 // 128kbps is good for speech
};
try {
mediaRecorder = new MediaRecorder(mediaStream, recorderOptions);
console.log(`MediaRecorder created with mimeType: ${mediaRecorder.mimeType}`);
} catch (err) {
console.error("MediaRecorder creation failed:", err);
// Fallback to default options
mediaRecorder = new MediaRecorder(mediaStream);
console.log(`Fallback MediaRecorder created with mimeType: ${mediaRecorder.mimeType}`);
}
mediaRecorder.ondataavailable = (event) => { mediaRecorder.ondataavailable = (event) => {
if (event.data.size > 0) audioChunks.push(event.data); if (event.data.size > 0) audioChunks.push(event.data);
@@ -843,27 +916,45 @@
return; return;
} }
const blob = new Blob(audioChunks, { type: mediaRecorder.mimeType }); try {
audioChunks = []; const blob = new Blob(audioChunks, { type: mediaRecorder.mimeType });
audioChunks = [];
const arrayBuffer = await blob.arrayBuffer(); const arrayBuffer = await blob.arrayBuffer();
const uint8 = new Uint8Array(arrayBuffer); const uint8 = new Uint8Array(arrayBuffer);
// Base64 encode in chunks to avoid stack overflow // Base64 encode in chunks to avoid stack overflow
let binary = ""; let binary = "";
const chunkSize = 8192; const chunkSize = 8192;
for (let i = 0; i < uint8.length; i += chunkSize) { for (let i = 0; i < uint8.length; i += chunkSize) {
const slice = uint8.subarray(i, Math.min(i + chunkSize, uint8.length)); const slice = uint8.subarray(i, Math.min(i + chunkSize, uint8.length));
binary += String.fromCharCode.apply(null, slice); binary += String.fromCharCode.apply(null, slice);
} }
const base64 = btoa(binary); const base64 = btoa(binary);
resetOutputPanels(); // Log audio size for debugging
console.log(`Prepared audio segment: ${(uint8.length / 1024).toFixed(1)}KB, duration: ${duration}ms, mime: ${mediaRecorder.mimeType}`);
if (ws && connected) { resetOutputPanels();
ws.send(JSON.stringify({ type: "audio", data: base64 }));
log(`→ Sent audio segment (${(uint8.length / 1024).toFixed(1)} KB, ${duration}ms) for Whisper`); if (ws && connected) {
transcriptEl.textContent = "🎙️ Transcribing audio..."; // Add format information to help server decode the audio
const message = {
type: "audio",
data: base64,
format: mediaRecorder.mimeType,
sampleRate: 16000, // Match the constraint we requested
duration: duration
};
console.log(`Sending audio to server: ${(base64.length / 1000).toFixed(1)}KB, format: ${mediaRecorder.mimeType}`);
ws.send(JSON.stringify(message));
log(`→ Sent audio segment (${(uint8.length / 1024).toFixed(1)} KB, ${duration}ms) for Whisper`);
transcriptEl.textContent = "🎙️ Transcribing audio...";
}
} catch (err) {
console.error("Error processing audio segment:", err);
log(`❌ Error processing audio: ${err.message || err}`);
} }
}; };
@@ -921,6 +1012,18 @@
// ── Server Message Handler ────────────────────────────────────────── // ── Server Message Handler ──────────────────────────────────────────
function handleServerMessage(msg) { function handleServerMessage(msg) {
switch (msg.type) { switch (msg.type) {
// ── Transcription feedback ────────────────
case "transcription_result":
console.log(`Received transcription: "${msg.text}", language: ${msg.language || 'unknown'}`);
transcriptEl.textContent = msg.text;
log(`🎙️ Transcription: ${msg.text}`);
break;
case "transcription_error":
console.error(`Transcription error: ${msg.error}`);
transcriptEl.textContent = `⚠️ ${msg.error}`;
log(`❌ Transcription error: ${msg.error}`);
break;
// ── Stream lifecycle ──────────────────── // ── Stream lifecycle ────────────────────
case "stream_start": case "stream_start":
assistantEl.textContent = ""; assistantEl.textContent = "";
@@ -1022,8 +1125,9 @@
case "audio_chunk": { case "audio_chunk": {
const bytes = decodeBase64ToBytes(msg.data); const bytes = decodeBase64ToBytes(msg.data);
audioQueue.push({ bytes, format: msg.format || "opus" }); audioQueue.push({ bytes, format: msg.format || "mp3" });
log(`🔊 Audio chunk #${msg.chunkId ?? "?"} (${bytes.length} bytes, ${msg.format || "opus"})`); log(`🔊 Audio chunk #${msg.chunkId ?? "?"} (${bytes.length} bytes, ${msg.format || "mp3"})`);
console.log(`Received audio chunk #${msg.chunkId ?? "?"}: ${bytes.length} bytes, format: ${msg.format || "mp3"}`);
playNextAudioChunk(); playNextAudioChunk();
break; break;
} }
@@ -1096,9 +1200,12 @@
ws.onmessage = (event) => { ws.onmessage = (event) => {
try { try {
console.log(`← Received WebSocket message: ${event.data.length} bytes`);
const msg = JSON.parse(event.data); const msg = JSON.parse(event.data);
console.log('Message parsed:', msg.type);
handleServerMessage(msg); handleServerMessage(msg);
} catch { } catch (err) {
console.error('Parse error:', err);
log("Received non-JSON message"); log("Received non-JSON message");
} }
}; };

View File

@@ -96,10 +96,16 @@ Use tools when needed to provide accurate information.`,
console.log(`[ws-server] 🔊 Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`); console.log(`[ws-server] 🔊 Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`);
}); });
agent.on("history_trimmed", ({ removedCount, reason }: { removedCount: number; reason: string }) => {
console.log(`[ws-server] 🧹 History trimmed: removed ${removedCount} messages (${reason})`);
});
agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message)); agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message));
agent.on("disconnected", () => { agent.on("disconnected", () => {
console.log("[ws-server] ✗ client disconnected\n"); // Permanently release all agent resources for this connection
agent.destroy();
console.log("[ws-server] ✗ client disconnected (agent destroyed)\n");
}); });
// Hand the accepted socket to the agent this is the key line. // Hand the accepted socket to the agent this is the key line.

View File

@@ -40,6 +40,7 @@ const wss = new WebSocketServer({ port, host });
wss.on("listening", () => { wss.on("listening", () => {
console.log(`[ws-server] listening on ${endpoint}`); console.log(`[ws-server] listening on ${endpoint}`);
console.log("[ws-server] Waiting for connections...\n"); console.log("[ws-server] Waiting for connections...\n");
console.log(`[ws-server] 🌐 Open voice-client.html in your browser and connect to ${endpoint}`);
}); });
wss.on("connection", (socket) => { wss.on("connection", (socket) => {
@@ -55,12 +56,12 @@ Keep responses concise and conversational since they will be spoken aloud.
Use tools when needed to provide accurate information.`, Use tools when needed to provide accurate information.`,
voice: "alloy", voice: "alloy",
speechInstructions: "Speak in a friendly, natural conversational tone.", speechInstructions: "Speak in a friendly, natural conversational tone.",
outputFormat: "opus", outputFormat: "mp3", // Using mp3 for better browser compatibility
streamingSpeech: { streamingSpeech: {
minChunkSize: 40, minChunkSize: 20, // Smaller chunks for faster streaming
maxChunkSize: 180, maxChunkSize: 150, // Not too large to ensure timely audio delivery
parallelGeneration: true, parallelGeneration: true, // Generate audio chunks in parallel
maxParallelRequests: 2, maxParallelRequests: 3, // Allow up to 3 concurrent TTS requests
}, },
tools: { tools: {
getWeather: weatherTool, getWeather: weatherTool,
@@ -96,6 +97,50 @@ Use tools when needed to provide accurate information.`,
console.log(`[ws-server] 🔊 Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`); console.log(`[ws-server] 🔊 Audio chunk #${chunkId}: ${uint8Array.length} bytes (${format})`);
}); });
// Log raw WebSocket messages for debugging
const originalSend = socket.send.bind(socket);
// Define a wrapper function to log messages before sending
function loggedSend(data: any): void {
let dataSize = 'unknown size';
if (typeof data === 'string') {
dataSize = `${data.length} chars`;
} else if (data instanceof Buffer || data instanceof ArrayBuffer) {
dataSize = `${Buffer.byteLength(data)} bytes`;
} else if (data instanceof Uint8Array) {
dataSize = `${data.byteLength} bytes`;
}
console.log(`[ws-server] → Sending WebSocket data (${dataSize})`);
}
// Create a proxy for the send method that logs but preserves original signatures
socket.send = function (data: any, optionsOrCallback?: any, callback?: (err?: Error) => void): void {
loggedSend(data);
if (typeof optionsOrCallback === 'function') {
// Handle the (data, callback) signature
return originalSend(data, optionsOrCallback);
} else if (optionsOrCallback) {
// Handle the (data, options, callback) signature
return originalSend(data, optionsOrCallback, callback);
} else {
// Handle the (data) signature
return originalSend(data);
}
};
socket.on('message', (data: any) => {
let dataSize = 'unknown size';
if (data instanceof Buffer) {
dataSize = `${data.length} bytes`;
} else if (data instanceof ArrayBuffer) {
dataSize = `${data.byteLength} bytes`;
} else if (typeof data === 'string') {
dataSize = `${data.length} chars`;
}
console.log(`[ws-server] ← Received WebSocket data (${dataSize})`);
});
agent.on("transcription", ({ text, language }: { text: string; language?: string }) => { agent.on("transcription", ({ text, language }: { text: string; language?: string }) => {
console.log(`[ws-server] 📝 Transcription (${language || "unknown"}): ${text}`); console.log(`[ws-server] 📝 Transcription (${language || "unknown"}): ${text}`);
}); });
@@ -116,10 +161,16 @@ Use tools when needed to provide accurate information.`,
console.log(`[ws-server] 🔊 Queued speech chunk #${id}: ${text.substring(0, 50)}...`); console.log(`[ws-server] 🔊 Queued speech chunk #${id}: ${text.substring(0, 50)}...`);
}); });
agent.on("history_trimmed", ({ removedCount, reason }: { removedCount: number; reason: string }) => {
console.log(`[ws-server] 🧹 History trimmed: removed ${removedCount} messages (${reason})`);
});
agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message)); agent.on("error", (err: Error) => console.error("[ws-server] ❌ Error:", err.message));
agent.on("disconnected", () => { agent.on("disconnected", () => {
console.log("[ws-server] ✗ client disconnected\n"); // Permanently release all agent resources for this connection
agent.destroy();
console.log("[ws-server] ✗ client disconnected (agent destroyed)\n");
}); });
// Hand the accepted socket to the agent this is the key line. // Hand the accepted socket to the agent this is the key line.

Binary file not shown.

Binary file not shown.

View File

@@ -1,6 +1,6 @@
{ {
"name": "voice-agent-ai-sdk", "name": "voice-agent-ai-sdk",
"version": "0.0.1", "version": "0.1.0",
"description": "Voice AI Agent with ai-sdk", "description": "Voice AI Agent with ai-sdk",
"main": "src/index.ts", "main": "src/index.ts",
"scripts": { "scripts": {
@@ -8,6 +8,7 @@
"dev": "tsc -w", "dev": "tsc -w",
"demo": "tsx example/demo.ts", "demo": "tsx example/demo.ts",
"ws:server": "tsx example/ws-server.ts", "ws:server": "tsx example/ws-server.ts",
"client": "node example/serve-client.js",
"prepublishOnly": "pnpm build" "prepublishOnly": "pnpm build"
}, },
"keywords": [ "keywords": [

View File

@@ -35,6 +35,19 @@ interface StreamingSpeechConfig {
maxParallelRequests: number; maxParallelRequests: number;
} }
/**
* Configuration for conversation history memory management
*/
interface HistoryConfig {
/** Maximum number of messages to keep in history. When exceeded, oldest messages are trimmed. Set to 0 for unlimited. */
maxMessages: number;
/** Maximum total character count across all messages. When exceeded, oldest messages are trimmed. Set to 0 for unlimited. */
maxTotalChars: number;
}
/** Default maximum audio input size (10 MB) */
const DEFAULT_MAX_AUDIO_SIZE = 10 * 1024 * 1024;
export interface VoiceAgentOptions { export interface VoiceAgentOptions {
model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o')) model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o'))
transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1')) transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1'))
@@ -48,6 +61,10 @@ export interface VoiceAgentOptions {
outputFormat?: string; // Audio output format (e.g., 'mp3', 'opus', 'wav') outputFormat?: string; // Audio output format (e.g., 'mp3', 'opus', 'wav')
/** Configuration for streaming speech generation */ /** Configuration for streaming speech generation */
streamingSpeech?: Partial<StreamingSpeechConfig>; streamingSpeech?: Partial<StreamingSpeechConfig>;
/** Configuration for conversation history memory limits */
history?: Partial<HistoryConfig>;
/** Maximum audio input size in bytes (default: 10 MB) */
maxAudioInputSize?: number;
} }
export class VoiceAgent extends EventEmitter { export class VoiceAgent extends EventEmitter {
@@ -65,6 +82,18 @@ export class VoiceAgent extends EventEmitter {
private speechInstructions?: string; private speechInstructions?: string;
private outputFormat: string; private outputFormat: string;
private isProcessing = false; private isProcessing = false;
private isDestroyed = false;
// Concurrency: queue incoming requests so they run serially
private inputQueue: Array<{ text: string; resolve: (v: string) => void; reject: (e: unknown) => void }> = [];
private processingQueue = false;
// Abort controller for the current LLM stream so we can cancel it on interrupt/disconnect
private currentStreamAbortController?: AbortController;
// Memory management
private historyConfig: HistoryConfig;
private maxAudioInputSize: number;
// Streaming speech state // Streaming speech state
private streamingSpeechConfig: StreamingSpeechConfig; private streamingSpeechConfig: StreamingSpeechConfig;
@@ -74,6 +103,10 @@ export class VoiceAgent extends EventEmitter {
private isSpeaking = false; private isSpeaking = false;
private pendingTextBuffer = ""; private pendingTextBuffer = "";
// Promise-based signal for speech queue completion (replaces busy-wait polling)
private speechQueueDonePromise?: Promise<void>;
private speechQueueDoneResolve?: () => void;
constructor(options: VoiceAgentOptions) { constructor(options: VoiceAgentOptions) {
super(); super();
this.model = options.model; this.model = options.model;
@@ -86,6 +119,7 @@ export class VoiceAgent extends EventEmitter {
this.voice = options.voice || "alloy"; this.voice = options.voice || "alloy";
this.speechInstructions = options.speechInstructions; this.speechInstructions = options.speechInstructions;
this.outputFormat = options.outputFormat || "mp3"; this.outputFormat = options.outputFormat || "mp3";
this.maxAudioInputSize = options.maxAudioInputSize ?? DEFAULT_MAX_AUDIO_SIZE;
if (options.tools) { if (options.tools) {
this.tools = { ...options.tools }; this.tools = { ...options.tools };
} }
@@ -98,6 +132,22 @@ export class VoiceAgent extends EventEmitter {
maxParallelRequests: 3, maxParallelRequests: 3,
...options.streamingSpeech, ...options.streamingSpeech,
}; };
// Initialize history config with defaults
this.historyConfig = {
maxMessages: 100,
maxTotalChars: 0, // unlimited by default
...options.history,
};
}
/**
* Ensure the agent has not been destroyed. Throws if it has.
*/
private ensureNotDestroyed(): void {
if (this.isDestroyed) {
throw new Error("VoiceAgent has been destroyed and cannot be used");
}
} }
private setupListeners() { private setupListeners() {
@@ -106,26 +156,34 @@ export class VoiceAgent extends EventEmitter {
this.socket.on("message", async (data) => { this.socket.on("message", async (data) => {
try { try {
const message = JSON.parse(data.toString()); const message = JSON.parse(data.toString());
console.log(`Received WebSocket message of type: ${message.type}`);
// Handle transcribed text from the client/STT // Handle transcribed text from the client/STT
if (message.type === "transcript") { if (message.type === "transcript") {
// Interrupt ongoing speech when user starts speaking (barge-in) if (typeof message.text !== "string" || !message.text.trim()) {
if (this.isSpeaking) { this.emit("warning", "Received empty or invalid transcript message");
this.interruptSpeech("user_speaking"); return;
} }
await this.processUserInput(message.text); // Interrupt ongoing speech when user starts speaking (barge-in)
this.interruptCurrentResponse("user_speaking");
console.log(`Processing transcript: "${message.text}"`);
await this.enqueueInput(message.text);
} }
// Handle raw audio data that needs transcription // Handle raw audio data that needs transcription
if (message.type === "audio") { else if (message.type === "audio") {
// Interrupt ongoing speech when user starts speaking (barge-in) if (typeof message.data !== "string" || !message.data) {
if (this.isSpeaking) { this.emit("warning", "Received empty or invalid audio message");
this.interruptSpeech("user_speaking"); return;
} }
await this.processAudioInput(message.data); // Interrupt ongoing speech when user starts speaking (barge-in)
this.interruptCurrentResponse("user_speaking");
console.log(`Received audio data (${message.data.length / 1000}KB) for processing, format: ${message.format || 'unknown'}`);
await this.processAudioInput(message.data, message.format);
} }
// Handle explicit interrupt request from client // Handle explicit interrupt request from client
if (message.type === "interrupt") { else if (message.type === "interrupt") {
this.interruptSpeech(message.reason || "client_request"); console.log(`Received interrupt request: ${message.reason || "client_request"}`);
this.interruptCurrentResponse(message.reason || "client_request");
} }
} catch (err) { } catch (err) {
console.error("Failed to process message:", err); console.error("Failed to process message:", err);
@@ -136,6 +194,8 @@ export class VoiceAgent extends EventEmitter {
this.socket.on("close", () => { this.socket.on("close", () => {
console.log("Disconnected"); console.log("Disconnected");
this.isConnected = false; this.isConnected = false;
// Clean up all in-flight work when the socket closes
this.cleanupOnDisconnect();
this.emit("disconnected"); this.emit("disconnected");
}); });
@@ -145,6 +205,39 @@ export class VoiceAgent extends EventEmitter {
}); });
} }
/**
* Clean up all in-flight state when the connection drops.
*/
private cleanupOnDisconnect(): void {
// Abort ongoing LLM stream
if (this.currentStreamAbortController) {
this.currentStreamAbortController.abort();
this.currentStreamAbortController = undefined;
}
// Abort ongoing speech generation
if (this.currentSpeechAbortController) {
this.currentSpeechAbortController.abort();
this.currentSpeechAbortController = undefined;
}
// Clear speech state
this.speechChunkQueue = [];
this.pendingTextBuffer = "";
this.isSpeaking = false;
this.isProcessing = false;
// Resolve any pending speech-done waiters
if (this.speechQueueDoneResolve) {
this.speechQueueDoneResolve();
this.speechQueueDoneResolve = undefined;
this.speechQueueDonePromise = undefined;
}
// Reject any queued inputs
for (const item of this.inputQueue) {
item.reject(new Error("Connection closed"));
}
this.inputQueue = [];
this.processingQueue = false;
}
public registerTools(tools: Record<string, Tool>) { public registerTools(tools: Record<string, Tool>) {
this.tools = { ...this.tools, ...tools }; this.tools = { ...this.tools, ...tools };
} }
@@ -157,17 +250,38 @@ export class VoiceAgent extends EventEmitter {
throw new Error("Transcription model not configured"); throw new Error("Transcription model not configured");
} }
const result = await transcribe({ console.log(`Sending ${audioData.byteLength} bytes to Whisper for transcription`);
model: this.transcriptionModel,
audio: audioData,
});
this.emit("transcription", { try {
text: result.text, // Note: The AI SDK transcribe function only accepts these parameters
language: result.language, // We can't directly pass language or temperature to it
}); const result = await transcribe({
model: this.transcriptionModel,
audio: audioData,
// If we need to pass additional options to OpenAI Whisper,
// we would need to do it via providerOptions if supported
});
return result.text; console.log(`Whisper transcription result: "${result.text}", language: ${result.language || 'unknown'}`);
this.emit("transcription", {
text: result.text,
language: result.language,
});
// Also send transcription to client for immediate feedback
this.sendWebSocketMessage({
type: "transcription_result",
text: result.text,
language: result.language,
});
return result.text;
} catch (error) {
console.error("Whisper transcription failed:", error);
// Re-throw to be handled by the caller
throw error;
}
} }
/** /**
@@ -195,7 +309,8 @@ export class VoiceAgent extends EventEmitter {
} }
/** /**
* Interrupt ongoing speech generation and playback (barge-in support) * Interrupt ongoing speech generation and playback (barge-in support).
* This only interrupts TTS — the LLM stream is left running.
*/ */
public interruptSpeech(reason: string = "interrupted"): void { public interruptSpeech(reason: string = "interrupted"): void {
if (!this.isSpeaking && this.speechChunkQueue.length === 0) { if (!this.isSpeaking && this.speechChunkQueue.length === 0) {
@@ -213,6 +328,13 @@ export class VoiceAgent extends EventEmitter {
this.pendingTextBuffer = ""; this.pendingTextBuffer = "";
this.isSpeaking = false; this.isSpeaking = false;
// Resolve any pending speech-done waiters so processUserInput can finish
if (this.speechQueueDoneResolve) {
this.speechQueueDoneResolve();
this.speechQueueDoneResolve = undefined;
this.speechQueueDonePromise = undefined;
}
// Notify clients to stop audio playback // Notify clients to stop audio playback
this.sendWebSocketMessage({ this.sendWebSocketMessage({
type: "speech_interrupted", type: "speech_interrupted",
@@ -222,6 +344,20 @@ export class VoiceAgent extends EventEmitter {
this.emit("speech_interrupted", { reason }); this.emit("speech_interrupted", { reason });
} }
/**
* Interrupt both the current LLM stream and ongoing speech.
* Use this for barge-in scenarios where the entire response should be cancelled.
*/
public interruptCurrentResponse(reason: string = "interrupted"): void {
// Abort the LLM stream first
if (this.currentStreamAbortController) {
this.currentStreamAbortController.abort();
this.currentStreamAbortController = undefined;
}
// Then interrupt speech
this.interruptSpeech(reason);
}
/** /**
* Extract complete sentences from text buffer * Extract complete sentences from text buffer
* Returns [extractedSentences, remainingBuffer] * Returns [extractedSentences, remainingBuffer]
@@ -272,17 +408,68 @@ export class VoiceAgent extends EventEmitter {
return [sentences, remaining]; return [sentences, remaining];
} }
/**
* Trim conversation history to stay within configured limits.
* Removes oldest messages (always in pairs to preserve user/assistant turns).
*/
private trimHistory(): void {
const { maxMessages, maxTotalChars } = this.historyConfig;
// Trim by message count
if (maxMessages > 0 && this.conversationHistory.length > maxMessages) {
const excess = this.conversationHistory.length - maxMessages;
// Remove from the front, ensuring we remove at least `excess` messages
// Round up to even number to preserve turn pairs
const toRemove = excess % 2 === 0 ? excess : excess + 1;
this.conversationHistory.splice(0, toRemove);
this.emit("history_trimmed", { removedCount: toRemove, reason: "max_messages" });
}
// Trim by total character count
if (maxTotalChars > 0) {
let totalChars = this.conversationHistory.reduce((sum, msg) => {
const content = typeof msg.content === "string" ? msg.content : JSON.stringify(msg.content);
return sum + content.length;
}, 0);
let removedCount = 0;
while (totalChars > maxTotalChars && this.conversationHistory.length > 2) {
const removed = this.conversationHistory.shift();
if (removed) {
const content = typeof removed.content === "string" ? removed.content : JSON.stringify(removed.content);
totalChars -= content.length;
removedCount++;
}
}
if (removedCount > 0) {
this.emit("history_trimmed", { removedCount, reason: "max_total_chars" });
}
}
}
/** /**
* Queue a text chunk for speech generation * Queue a text chunk for speech generation
*/ */
private queueSpeechChunk(text: string): void { private queueSpeechChunk(text: string): void {
if (!this.speechModel || !text.trim()) return; if (!this.speechModel || !text.trim()) return;
// Wrap chunk ID to prevent unbounded growth in very long sessions
if (this.nextChunkId >= Number.MAX_SAFE_INTEGER) {
this.nextChunkId = 0;
}
const chunk: SpeechChunk = { const chunk: SpeechChunk = {
id: this.nextChunkId++, id: this.nextChunkId++,
text: text.trim(), text: text.trim(),
}; };
// Create the speech-done promise if not already present
if (!this.speechQueueDonePromise) {
this.speechQueueDonePromise = new Promise<void>((resolve) => {
this.speechQueueDoneResolve = resolve;
});
}
// Start generating audio immediately (parallel generation) // Start generating audio immediately (parallel generation)
if (this.streamingSpeechConfig.parallelGeneration) { if (this.streamingSpeechConfig.parallelGeneration) {
const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length; const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length;
@@ -310,13 +497,16 @@ export class VoiceAgent extends EventEmitter {
} }
try { try {
console.log(`Generating audio for chunk ${chunk.id}: "${chunk.text.substring(0, 50)}${chunk.text.length > 50 ? '...' : ''}"`);
const audioData = await this.generateSpeechFromText( const audioData = await this.generateSpeechFromText(
chunk.text, chunk.text,
this.currentSpeechAbortController.signal this.currentSpeechAbortController.signal
); );
console.log(`Generated audio for chunk ${chunk.id}: ${audioData.length} bytes`);
return audioData; return audioData;
} catch (error) { } catch (error) {
if ((error as Error).name === "AbortError") { if ((error as Error).name === "AbortError") {
console.log(`Audio generation aborted for chunk ${chunk.id}`);
return null; // Cancelled, don't report as error return null; // Cancelled, don't report as error
} }
console.error(`Failed to generate audio for chunk ${chunk.id}:`, error); console.error(`Failed to generate audio for chunk ${chunk.id}:`, error);
@@ -332,6 +522,7 @@ export class VoiceAgent extends EventEmitter {
if (this.isSpeaking) return; if (this.isSpeaking) return;
this.isSpeaking = true; this.isSpeaking = true;
console.log(`Starting speech queue processing with ${this.speechChunkQueue.length} chunks`);
this.emit("speech_start", { streaming: true }); this.emit("speech_start", { streaming: true });
this.sendWebSocketMessage({ type: "speech_stream_start" }); this.sendWebSocketMessage({ type: "speech_stream_start" });
@@ -339,6 +530,8 @@ export class VoiceAgent extends EventEmitter {
while (this.speechChunkQueue.length > 0) { while (this.speechChunkQueue.length > 0) {
const chunk = this.speechChunkQueue[0]; const chunk = this.speechChunkQueue[0];
console.log(`Processing speech chunk #${chunk.id} (${this.speechChunkQueue.length - 1} remaining)`);
// Ensure audio generation has started // Ensure audio generation has started
if (!chunk.audioPromise) { if (!chunk.audioPromise) {
chunk.audioPromise = this.generateChunkAudio(chunk); chunk.audioPromise = this.generateChunkAudio(chunk);
@@ -348,13 +541,17 @@ export class VoiceAgent extends EventEmitter {
const audioData = await chunk.audioPromise; const audioData = await chunk.audioPromise;
// Check if we were interrupted while waiting // Check if we were interrupted while waiting
if (!this.isSpeaking) break; if (!this.isSpeaking) {
console.log(`Speech interrupted during chunk #${chunk.id}`);
break;
}
// Remove from queue after processing // Remove from queue after processing
this.speechChunkQueue.shift(); this.speechChunkQueue.shift();
if (audioData) { if (audioData) {
const base64Audio = Buffer.from(audioData).toString("base64"); const base64Audio = Buffer.from(audioData).toString("base64");
console.log(`Sending audio chunk #${chunk.id} (${audioData.length} bytes, ${this.outputFormat})`);
// Send audio chunk via WebSocket // Send audio chunk via WebSocket
this.sendWebSocketMessage({ this.sendWebSocketMessage({
@@ -373,6 +570,8 @@ export class VoiceAgent extends EventEmitter {
text: chunk.text, text: chunk.text,
uint8Array: audioData, uint8Array: audioData,
}); });
} else {
console.log(`No audio data generated for chunk #${chunk.id}`);
} }
// Start generating next chunks in parallel // Start generating next chunks in parallel
@@ -383,26 +582,40 @@ export class VoiceAgent extends EventEmitter {
this.speechChunkQueue.length this.speechChunkQueue.length
); );
for (let i = 0; i < toStart; i++) { if (toStart > 0) {
const nextChunk = this.speechChunkQueue.find(c => !c.audioPromise); console.log(`Starting parallel generation for ${toStart} more chunks`);
if (nextChunk) { for (let i = 0; i < toStart; i++) {
nextChunk.audioPromise = this.generateChunkAudio(nextChunk); const nextChunk = this.speechChunkQueue.find(c => !c.audioPromise);
if (nextChunk) {
nextChunk.audioPromise = this.generateChunkAudio(nextChunk);
}
} }
} }
} }
} }
} catch (error) {
console.error("Error in speech queue processing:", error);
this.emit("error", error);
} finally { } finally {
this.isSpeaking = false; this.isSpeaking = false;
this.currentSpeechAbortController = undefined; this.currentSpeechAbortController = undefined;
// Signal that the speech queue is fully drained
if (this.speechQueueDoneResolve) {
this.speechQueueDoneResolve();
this.speechQueueDoneResolve = undefined;
this.speechQueueDonePromise = undefined;
}
console.log(`Speech queue processing complete`);
this.sendWebSocketMessage({ type: "speech_stream_end" }); this.sendWebSocketMessage({ type: "speech_stream_end" });
this.emit("speech_complete", { streaming: true }); this.emit("speech_complete", { streaming: true });
} }
} }
/** /**
* Process text deltra for streaming speech * Process text delta for streaming speech.
* Call this as text chunks arrive from LLM * Call this as text chunks arrive from LLM.
*/ */
private processTextForStreamingSpeech(textDelta: string): void { private processTextForStreamingSpeech(textDelta: string): void {
if (!this.speechModel) return; if (!this.speechModel) return;
@@ -431,7 +644,7 @@ export class VoiceAgent extends EventEmitter {
/** /**
* Process incoming audio data: transcribe and generate response * Process incoming audio data: transcribe and generate response
*/ */
private async processAudioInput(base64Audio: string): Promise<void> { private async processAudioInput(base64Audio: string, format?: string): Promise<void> {
if (!this.transcriptionModel) { if (!this.transcriptionModel) {
this.emit("error", new Error("Transcription model not configured for audio input")); this.emit("error", new Error("Transcription model not configured for audio input"));
return; return;
@@ -439,20 +652,55 @@ export class VoiceAgent extends EventEmitter {
try { try {
const audioBuffer = Buffer.from(base64Audio, "base64"); const audioBuffer = Buffer.from(base64Audio, "base64");
this.emit("audio_received", { size: audioBuffer.length });
// Validate audio size to prevent memory issues
if (audioBuffer.length > this.maxAudioInputSize) {
const sizeMB = (audioBuffer.length / (1024 * 1024)).toFixed(1);
const maxMB = (this.maxAudioInputSize / (1024 * 1024)).toFixed(1);
this.emit("error", new Error(
`Audio input too large (${sizeMB} MB). Maximum allowed: ${maxMB} MB`
));
return;
}
if (audioBuffer.length === 0) {
this.emit("warning", "Received empty audio data");
return;
}
this.emit("audio_received", { size: audioBuffer.length, format });
console.log(`Processing audio input: ${audioBuffer.length} bytes, format: ${format || 'unknown'}`);
const transcribedText = await this.transcribeAudio(audioBuffer); const transcribedText = await this.transcribeAudio(audioBuffer);
console.log(`Transcribed text: "${transcribedText}"`);
if (transcribedText.trim()) { if (transcribedText.trim()) {
await this.processUserInput(transcribedText); await this.enqueueInput(transcribedText);
} else {
this.emit("warning", "Transcription returned empty text");
this.sendWebSocketMessage({
type: "transcription_error",
error: "Whisper returned empty text"
});
} }
} catch (error) { } catch (error) {
console.error("Failed to process audio input:", error); console.error("Failed to process audio input:", error);
this.emit("error", error); this.emit("error", error);
this.sendWebSocketMessage({
type: "transcription_error",
error: `Transcription failed: ${(error as Error).message || String(error)}`
});
} }
} }
public async connect(url?: string): Promise<void> { public async connect(url?: string): Promise<void> {
this.ensureNotDestroyed();
// Clean up any existing connection first
if (this.socket) {
this.disconnectSocket();
}
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
try { try {
// Use provided URL, configured endpoint, or default URL // Use provided URL, configured endpoint, or default URL
@@ -481,6 +729,13 @@ export class VoiceAgent extends EventEmitter {
* agent to handle messages on that socket. * agent to handle messages on that socket.
*/ */
public handleSocket(socket: WebSocket): void { public handleSocket(socket: WebSocket): void {
this.ensureNotDestroyed();
// Clean up any existing connection first
if (this.socket) {
this.disconnectSocket();
}
this.socket = socket; this.socket = socket;
this.isConnected = true; this.isConnected = true;
this.setupListeners(); this.setupListeners();
@@ -488,10 +743,15 @@ export class VoiceAgent extends EventEmitter {
} }
/** /**
* Send text input for processing (bypasses transcription) * Send text input for processing (bypasses transcription).
* Requests are queued and processed serially to prevent race conditions.
*/ */
public async sendText(text: string): Promise<string> { public async sendText(text: string): Promise<string> {
return this.processUserInput(text); this.ensureNotDestroyed();
if (!text || !text.trim()) {
throw new Error("Text input cannot be empty");
}
return this.enqueueInput(text);
} }
/** /**
@@ -499,6 +759,7 @@ export class VoiceAgent extends EventEmitter {
* @param audioData Base64 encoded audio data * @param audioData Base64 encoded audio data
*/ */
public async sendAudio(audioData: string): Promise<void> { public async sendAudio(audioData: string): Promise<void> {
this.ensureNotDestroyed();
await this.processAudioInput(audioData); await this.processAudioInput(audioData);
} }
@@ -506,26 +767,65 @@ export class VoiceAgent extends EventEmitter {
* Send raw audio buffer to be transcribed and processed * Send raw audio buffer to be transcribed and processed
*/ */
public async sendAudioBuffer(audioBuffer: Buffer | Uint8Array): Promise<void> { public async sendAudioBuffer(audioBuffer: Buffer | Uint8Array): Promise<void> {
this.ensureNotDestroyed();
const base64Audio = Buffer.from(audioBuffer).toString("base64"); const base64Audio = Buffer.from(audioBuffer).toString("base64");
await this.processAudioInput(base64Audio); await this.processAudioInput(base64Audio);
} }
/** /**
* Process user input with streaming text generation * Enqueue a text input for serial processing.
* Handles the full pipeline: text -> LLM (streaming) -> TTS -> WebSocket * This ensures only one processUserInput runs at a time, preventing
* race conditions on conversationHistory, fullText accumulation, etc.
*/
private enqueueInput(text: string): Promise<string> {
return new Promise<string>((resolve, reject) => {
this.inputQueue.push({ text, resolve, reject });
this.drainInputQueue();
});
}
/**
* Drain the input queue, processing one request at a time.
*/
private async drainInputQueue(): Promise<void> {
if (this.processingQueue) return;
this.processingQueue = true;
try {
while (this.inputQueue.length > 0) {
const item = this.inputQueue.shift()!;
try {
const result = await this.processUserInput(item.text);
item.resolve(result);
} catch (error) {
item.reject(error);
}
}
} finally {
this.processingQueue = false;
}
}
/**
* Process user input with streaming text generation.
* Handles the full pipeline: text -> LLM (streaming) -> TTS -> WebSocket.
*
* This method is designed to be called serially via drainInputQueue().
*/ */
private async processUserInput(text: string): Promise<string> { private async processUserInput(text: string): Promise<string> {
if (this.isProcessing) {
this.emit("warning", "Already processing a request, queuing...");
}
this.isProcessing = true; this.isProcessing = true;
// Create an abort controller for this LLM stream so it can be cancelled
this.currentStreamAbortController = new AbortController();
const streamAbortSignal = this.currentStreamAbortController.signal;
try { try {
// Emit text event for incoming user input // Emit text event for incoming user input
this.emit("text", { role: "user", text }); this.emit("text", { role: "user", text });
// Add user message to conversation history // Add user message to conversation history and trim if needed
this.conversationHistory.push({ role: "user", content: text }); this.conversationHistory.push({ role: "user", content: text });
this.trimHistory();
// Use streamText for streaming responses with tool support // Use streamText for streaming responses with tool support
const result = streamText({ const result = streamText({
@@ -534,6 +834,7 @@ export class VoiceAgent extends EventEmitter {
messages: this.conversationHistory, messages: this.conversationHistory,
tools: this.tools, tools: this.tools,
stopWhen: this.stopWhen, stopWhen: this.stopWhen,
abortSignal: streamAbortSignal,
onChunk: ({ chunk }) => { onChunk: ({ chunk }) => {
// Emit streaming chunks for real-time updates // Emit streaming chunks for real-time updates
// Note: onChunk only receives a subset of stream events // Note: onChunk only receives a subset of stream events
@@ -782,18 +1083,19 @@ export class VoiceAgent extends EventEmitter {
} }
} }
// Add assistant response to conversation history // Add assistant response to conversation history and trim
if (fullText) { if (fullText) {
this.conversationHistory.push({ role: "assistant", content: fullText }); this.conversationHistory.push({ role: "assistant", content: fullText });
this.trimHistory();
} }
// Ensure any remaining speech is flushed (in case text-end wasn't triggered) // Ensure any remaining speech is flushed (in case text-end wasn't triggered)
this.flushStreamingSpeech(); this.flushStreamingSpeech();
// Wait for all speech chunks to complete before signaling response complete // Wait for all speech chunks to complete using promise-based signaling
// This ensures audio playback can finish // (replaces the previous busy-wait polling loop)
while (this.speechChunkQueue.length > 0 || this.isSpeaking) { if (this.speechQueueDonePromise) {
await new Promise(resolve => setTimeout(resolve, 100)); await this.speechQueueDonePromise;
} }
// Send the complete response // Send the complete response
@@ -808,8 +1110,16 @@ export class VoiceAgent extends EventEmitter {
}); });
return fullText; return fullText;
} catch (error) {
// Clean up speech state on error so the agent isn't stuck in a broken state
this.pendingTextBuffer = "";
if (this.speechChunkQueue.length > 0 || this.isSpeaking) {
this.interruptSpeech("stream_error");
}
throw error;
} finally { } finally {
this.isProcessing = false; this.isProcessing = false;
this.currentStreamAbortController = undefined;
} }
} }
@@ -848,11 +1158,33 @@ export class VoiceAgent extends EventEmitter {
} }
/** /**
* Send a message via WebSocket if connected * Send a message via WebSocket if connected.
* Gracefully handles send failures (e.g., socket closing mid-send).
*/ */
private sendWebSocketMessage(message: Record<string, unknown>): void { private sendWebSocketMessage(message: Record<string, unknown>): void {
if (this.socket && this.isConnected) { if (!this.socket || !this.isConnected) return;
this.socket.send(JSON.stringify(message));
try {
if (this.socket.readyState === WebSocket.OPEN) {
// Skip logging huge audio data for better readability
if (message.type === "audio_chunk" || message.type === "audio") {
const { data, ...rest } = message as any;
console.log(`Sending WebSocket message: ${message.type}`,
data ? `(${(data.length / 1000).toFixed(1)}KB audio data)` : "",
rest
);
} else {
console.log(`Sending WebSocket message: ${message.type}`);
}
this.socket.send(JSON.stringify(message));
} else {
console.warn(`Cannot send message, socket state: ${this.socket.readyState}`);
}
} catch (error) {
// Socket may have closed between the readyState check and send()
console.error("Failed to send WebSocket message:", error);
this.emit("error", error);
} }
} }
@@ -895,14 +1227,44 @@ export class VoiceAgent extends EventEmitter {
} }
/** /**
* Disconnect from WebSocket * Internal helper to close and clean up the current socket.
*/
private disconnectSocket(): void {
if (!this.socket) return;
// Stop all in-flight work tied to this connection
this.cleanupOnDisconnect();
try {
this.socket.removeAllListeners();
if (this.socket.readyState === WebSocket.OPEN ||
this.socket.readyState === WebSocket.CONNECTING) {
this.socket.close();
}
} catch {
// Ignore close errors — socket may already be dead
}
this.socket = undefined;
this.isConnected = false;
}
/**
* Disconnect from WebSocket and stop all in-flight work.
*/ */
disconnect() { disconnect() {
if (this.socket) { this.disconnectSocket();
this.socket.close(); }
this.socket = undefined;
this.isConnected = false; /**
} * Permanently destroy the agent, releasing all resources.
* After calling this, the agent cannot be reused.
*/
destroy() {
this.isDestroyed = true;
this.disconnectSocket();
this.conversationHistory = [];
this.tools = {};
this.removeAllListeners();
} }
/** /**
@@ -932,4 +1294,11 @@ export class VoiceAgent extends EventEmitter {
get pendingSpeechChunks(): number { get pendingSpeechChunks(): number {
return this.speechChunkQueue.length; return this.speechChunkQueue.length;
} }
/**
* Check if agent has been permanently destroyed
*/
get destroyed(): boolean {
return this.isDestroyed;
}
} }