This commit is contained in:
Bijit Mondal
2026-02-14 12:26:47 +05:30
parent 7725f66e39
commit 8e8dd9d9f6
8 changed files with 531 additions and 81 deletions

View File

@@ -35,6 +35,19 @@ interface StreamingSpeechConfig {
maxParallelRequests: number;
}
/**
* Configuration for conversation history memory management
*/
interface HistoryConfig {
/** Maximum number of messages to keep in history. When exceeded, oldest messages are trimmed. Set to 0 for unlimited. */
maxMessages: number;
/** Maximum total character count across all messages. When exceeded, oldest messages are trimmed. Set to 0 for unlimited. */
maxTotalChars: number;
}
/** Default maximum audio input size (10 MB) */
const DEFAULT_MAX_AUDIO_SIZE = 10 * 1024 * 1024;
export interface VoiceAgentOptions {
model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o'))
transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1'))
@@ -48,6 +61,10 @@ export interface VoiceAgentOptions {
outputFormat?: string; // Audio output format (e.g., 'mp3', 'opus', 'wav')
/** Configuration for streaming speech generation */
streamingSpeech?: Partial<StreamingSpeechConfig>;
/** Configuration for conversation history memory limits */
history?: Partial<HistoryConfig>;
/** Maximum audio input size in bytes (default: 10 MB) */
maxAudioInputSize?: number;
}
export class VoiceAgent extends EventEmitter {
@@ -65,6 +82,18 @@ export class VoiceAgent extends EventEmitter {
private speechInstructions?: string;
private outputFormat: string;
private isProcessing = false;
private isDestroyed = false;
// Concurrency: queue incoming requests so they run serially
private inputQueue: Array<{ text: string; resolve: (v: string) => void; reject: (e: unknown) => void }> = [];
private processingQueue = false;
// Abort controller for the current LLM stream so we can cancel it on interrupt/disconnect
private currentStreamAbortController?: AbortController;
// Memory management
private historyConfig: HistoryConfig;
private maxAudioInputSize: number;
// Streaming speech state
private streamingSpeechConfig: StreamingSpeechConfig;
@@ -74,6 +103,10 @@ export class VoiceAgent extends EventEmitter {
private isSpeaking = false;
private pendingTextBuffer = "";
// Promise-based signal for speech queue completion (replaces busy-wait polling)
private speechQueueDonePromise?: Promise<void>;
private speechQueueDoneResolve?: () => void;
constructor(options: VoiceAgentOptions) {
super();
this.model = options.model;
@@ -86,6 +119,7 @@ export class VoiceAgent extends EventEmitter {
this.voice = options.voice || "alloy";
this.speechInstructions = options.speechInstructions;
this.outputFormat = options.outputFormat || "mp3";
this.maxAudioInputSize = options.maxAudioInputSize ?? DEFAULT_MAX_AUDIO_SIZE;
if (options.tools) {
this.tools = { ...options.tools };
}
@@ -98,6 +132,22 @@ export class VoiceAgent extends EventEmitter {
maxParallelRequests: 3,
...options.streamingSpeech,
};
// Initialize history config with defaults
this.historyConfig = {
maxMessages: 100,
maxTotalChars: 0, // unlimited by default
...options.history,
};
}
/**
* Ensure the agent has not been destroyed. Throws if it has.
*/
private ensureNotDestroyed(): void {
if (this.isDestroyed) {
throw new Error("VoiceAgent has been destroyed and cannot be used");
}
}
private setupListeners() {
@@ -109,23 +159,27 @@ export class VoiceAgent extends EventEmitter {
// Handle transcribed text from the client/STT
if (message.type === "transcript") {
// Interrupt ongoing speech when user starts speaking (barge-in)
if (this.isSpeaking) {
this.interruptSpeech("user_speaking");
if (typeof message.text !== "string" || !message.text.trim()) {
this.emit("warning", "Received empty or invalid transcript message");
return;
}
await this.processUserInput(message.text);
// Interrupt ongoing speech when user starts speaking (barge-in)
this.interruptCurrentResponse("user_speaking");
await this.enqueueInput(message.text);
}
// Handle raw audio data that needs transcription
if (message.type === "audio") {
// Interrupt ongoing speech when user starts speaking (barge-in)
if (this.isSpeaking) {
this.interruptSpeech("user_speaking");
else if (message.type === "audio") {
if (typeof message.data !== "string" || !message.data) {
this.emit("warning", "Received empty or invalid audio message");
return;
}
// Interrupt ongoing speech when user starts speaking (barge-in)
this.interruptCurrentResponse("user_speaking");
await this.processAudioInput(message.data);
}
// Handle explicit interrupt request from client
if (message.type === "interrupt") {
this.interruptSpeech(message.reason || "client_request");
else if (message.type === "interrupt") {
this.interruptCurrentResponse(message.reason || "client_request");
}
} catch (err) {
console.error("Failed to process message:", err);
@@ -136,6 +190,8 @@ export class VoiceAgent extends EventEmitter {
this.socket.on("close", () => {
console.log("Disconnected");
this.isConnected = false;
// Clean up all in-flight work when the socket closes
this.cleanupOnDisconnect();
this.emit("disconnected");
});
@@ -145,6 +201,39 @@ export class VoiceAgent extends EventEmitter {
});
}
/**
* Clean up all in-flight state when the connection drops.
*/
private cleanupOnDisconnect(): void {
// Abort ongoing LLM stream
if (this.currentStreamAbortController) {
this.currentStreamAbortController.abort();
this.currentStreamAbortController = undefined;
}
// Abort ongoing speech generation
if (this.currentSpeechAbortController) {
this.currentSpeechAbortController.abort();
this.currentSpeechAbortController = undefined;
}
// Clear speech state
this.speechChunkQueue = [];
this.pendingTextBuffer = "";
this.isSpeaking = false;
this.isProcessing = false;
// Resolve any pending speech-done waiters
if (this.speechQueueDoneResolve) {
this.speechQueueDoneResolve();
this.speechQueueDoneResolve = undefined;
this.speechQueueDonePromise = undefined;
}
// Reject any queued inputs
for (const item of this.inputQueue) {
item.reject(new Error("Connection closed"));
}
this.inputQueue = [];
this.processingQueue = false;
}
public registerTools(tools: Record<string, Tool>) {
this.tools = { ...this.tools, ...tools };
}
@@ -195,7 +284,8 @@ export class VoiceAgent extends EventEmitter {
}
/**
* Interrupt ongoing speech generation and playback (barge-in support)
* Interrupt ongoing speech generation and playback (barge-in support).
* This only interrupts TTS — the LLM stream is left running.
*/
public interruptSpeech(reason: string = "interrupted"): void {
if (!this.isSpeaking && this.speechChunkQueue.length === 0) {
@@ -213,6 +303,13 @@ export class VoiceAgent extends EventEmitter {
this.pendingTextBuffer = "";
this.isSpeaking = false;
// Resolve any pending speech-done waiters so processUserInput can finish
if (this.speechQueueDoneResolve) {
this.speechQueueDoneResolve();
this.speechQueueDoneResolve = undefined;
this.speechQueueDonePromise = undefined;
}
// Notify clients to stop audio playback
this.sendWebSocketMessage({
type: "speech_interrupted",
@@ -222,6 +319,20 @@ export class VoiceAgent extends EventEmitter {
this.emit("speech_interrupted", { reason });
}
/**
* Interrupt both the current LLM stream and ongoing speech.
* Use this for barge-in scenarios where the entire response should be cancelled.
*/
public interruptCurrentResponse(reason: string = "interrupted"): void {
// Abort the LLM stream first
if (this.currentStreamAbortController) {
this.currentStreamAbortController.abort();
this.currentStreamAbortController = undefined;
}
// Then interrupt speech
this.interruptSpeech(reason);
}
/**
* Extract complete sentences from text buffer
* Returns [extractedSentences, remainingBuffer]
@@ -272,17 +383,68 @@ export class VoiceAgent extends EventEmitter {
return [sentences, remaining];
}
/**
* Trim conversation history to stay within configured limits.
* Removes oldest messages (always in pairs to preserve user/assistant turns).
*/
private trimHistory(): void {
const { maxMessages, maxTotalChars } = this.historyConfig;
// Trim by message count
if (maxMessages > 0 && this.conversationHistory.length > maxMessages) {
const excess = this.conversationHistory.length - maxMessages;
// Remove from the front, ensuring we remove at least `excess` messages
// Round up to even number to preserve turn pairs
const toRemove = excess % 2 === 0 ? excess : excess + 1;
this.conversationHistory.splice(0, toRemove);
this.emit("history_trimmed", { removedCount: toRemove, reason: "max_messages" });
}
// Trim by total character count
if (maxTotalChars > 0) {
let totalChars = this.conversationHistory.reduce((sum, msg) => {
const content = typeof msg.content === "string" ? msg.content : JSON.stringify(msg.content);
return sum + content.length;
}, 0);
let removedCount = 0;
while (totalChars > maxTotalChars && this.conversationHistory.length > 2) {
const removed = this.conversationHistory.shift();
if (removed) {
const content = typeof removed.content === "string" ? removed.content : JSON.stringify(removed.content);
totalChars -= content.length;
removedCount++;
}
}
if (removedCount > 0) {
this.emit("history_trimmed", { removedCount, reason: "max_total_chars" });
}
}
}
/**
* Queue a text chunk for speech generation
*/
private queueSpeechChunk(text: string): void {
if (!this.speechModel || !text.trim()) return;
// Wrap chunk ID to prevent unbounded growth in very long sessions
if (this.nextChunkId >= Number.MAX_SAFE_INTEGER) {
this.nextChunkId = 0;
}
const chunk: SpeechChunk = {
id: this.nextChunkId++,
text: text.trim(),
};
// Create the speech-done promise if not already present
if (!this.speechQueueDonePromise) {
this.speechQueueDonePromise = new Promise<void>((resolve) => {
this.speechQueueDoneResolve = resolve;
});
}
// Start generating audio immediately (parallel generation)
if (this.streamingSpeechConfig.parallelGeneration) {
const activeRequests = this.speechChunkQueue.filter(c => c.audioPromise).length;
@@ -395,14 +557,21 @@ export class VoiceAgent extends EventEmitter {
this.isSpeaking = false;
this.currentSpeechAbortController = undefined;
// Signal that the speech queue is fully drained
if (this.speechQueueDoneResolve) {
this.speechQueueDoneResolve();
this.speechQueueDoneResolve = undefined;
this.speechQueueDonePromise = undefined;
}
this.sendWebSocketMessage({ type: "speech_stream_end" });
this.emit("speech_complete", { streaming: true });
}
}
/**
* Process text deltra for streaming speech
* Call this as text chunks arrive from LLM
* Process text delta for streaming speech.
* Call this as text chunks arrive from LLM.
*/
private processTextForStreamingSpeech(textDelta: string): void {
if (!this.speechModel) return;
@@ -439,12 +608,28 @@ export class VoiceAgent extends EventEmitter {
try {
const audioBuffer = Buffer.from(base64Audio, "base64");
// Validate audio size to prevent memory issues
if (audioBuffer.length > this.maxAudioInputSize) {
const sizeMB = (audioBuffer.length / (1024 * 1024)).toFixed(1);
const maxMB = (this.maxAudioInputSize / (1024 * 1024)).toFixed(1);
this.emit("error", new Error(
`Audio input too large (${sizeMB} MB). Maximum allowed: ${maxMB} MB`
));
return;
}
if (audioBuffer.length === 0) {
this.emit("warning", "Received empty audio data");
return;
}
this.emit("audio_received", { size: audioBuffer.length });
const transcribedText = await this.transcribeAudio(audioBuffer);
if (transcribedText.trim()) {
await this.processUserInput(transcribedText);
await this.enqueueInput(transcribedText);
}
} catch (error) {
console.error("Failed to process audio input:", error);
@@ -453,6 +638,13 @@ export class VoiceAgent extends EventEmitter {
}
public async connect(url?: string): Promise<void> {
this.ensureNotDestroyed();
// Clean up any existing connection first
if (this.socket) {
this.disconnectSocket();
}
return new Promise((resolve, reject) => {
try {
// Use provided URL, configured endpoint, or default URL
@@ -481,6 +673,13 @@ export class VoiceAgent extends EventEmitter {
* agent to handle messages on that socket.
*/
public handleSocket(socket: WebSocket): void {
this.ensureNotDestroyed();
// Clean up any existing connection first
if (this.socket) {
this.disconnectSocket();
}
this.socket = socket;
this.isConnected = true;
this.setupListeners();
@@ -488,10 +687,15 @@ export class VoiceAgent extends EventEmitter {
}
/**
* Send text input for processing (bypasses transcription)
* Send text input for processing (bypasses transcription).
* Requests are queued and processed serially to prevent race conditions.
*/
public async sendText(text: string): Promise<string> {
return this.processUserInput(text);
this.ensureNotDestroyed();
if (!text || !text.trim()) {
throw new Error("Text input cannot be empty");
}
return this.enqueueInput(text);
}
/**
@@ -499,6 +703,7 @@ export class VoiceAgent extends EventEmitter {
* @param audioData Base64 encoded audio data
*/
public async sendAudio(audioData: string): Promise<void> {
this.ensureNotDestroyed();
await this.processAudioInput(audioData);
}
@@ -506,26 +711,65 @@ export class VoiceAgent extends EventEmitter {
* Send raw audio buffer to be transcribed and processed
*/
public async sendAudioBuffer(audioBuffer: Buffer | Uint8Array): Promise<void> {
this.ensureNotDestroyed();
const base64Audio = Buffer.from(audioBuffer).toString("base64");
await this.processAudioInput(base64Audio);
}
/**
* Process user input with streaming text generation
* Handles the full pipeline: text -> LLM (streaming) -> TTS -> WebSocket
* Enqueue a text input for serial processing.
* This ensures only one processUserInput runs at a time, preventing
* race conditions on conversationHistory, fullText accumulation, etc.
*/
private enqueueInput(text: string): Promise<string> {
return new Promise<string>((resolve, reject) => {
this.inputQueue.push({ text, resolve, reject });
this.drainInputQueue();
});
}
/**
* Drain the input queue, processing one request at a time.
*/
private async drainInputQueue(): Promise<void> {
if (this.processingQueue) return;
this.processingQueue = true;
try {
while (this.inputQueue.length > 0) {
const item = this.inputQueue.shift()!;
try {
const result = await this.processUserInput(item.text);
item.resolve(result);
} catch (error) {
item.reject(error);
}
}
} finally {
this.processingQueue = false;
}
}
/**
* Process user input with streaming text generation.
* Handles the full pipeline: text -> LLM (streaming) -> TTS -> WebSocket.
*
* This method is designed to be called serially via drainInputQueue().
*/
private async processUserInput(text: string): Promise<string> {
if (this.isProcessing) {
this.emit("warning", "Already processing a request, queuing...");
}
this.isProcessing = true;
// Create an abort controller for this LLM stream so it can be cancelled
this.currentStreamAbortController = new AbortController();
const streamAbortSignal = this.currentStreamAbortController.signal;
try {
// Emit text event for incoming user input
this.emit("text", { role: "user", text });
// Add user message to conversation history
// Add user message to conversation history and trim if needed
this.conversationHistory.push({ role: "user", content: text });
this.trimHistory();
// Use streamText for streaming responses with tool support
const result = streamText({
@@ -534,6 +778,7 @@ export class VoiceAgent extends EventEmitter {
messages: this.conversationHistory,
tools: this.tools,
stopWhen: this.stopWhen,
abortSignal: streamAbortSignal,
onChunk: ({ chunk }) => {
// Emit streaming chunks for real-time updates
// Note: onChunk only receives a subset of stream events
@@ -782,18 +1027,19 @@ export class VoiceAgent extends EventEmitter {
}
}
// Add assistant response to conversation history
// Add assistant response to conversation history and trim
if (fullText) {
this.conversationHistory.push({ role: "assistant", content: fullText });
this.trimHistory();
}
// Ensure any remaining speech is flushed (in case text-end wasn't triggered)
this.flushStreamingSpeech();
// Wait for all speech chunks to complete before signaling response complete
// This ensures audio playback can finish
while (this.speechChunkQueue.length > 0 || this.isSpeaking) {
await new Promise(resolve => setTimeout(resolve, 100));
// Wait for all speech chunks to complete using promise-based signaling
// (replaces the previous busy-wait polling loop)
if (this.speechQueueDonePromise) {
await this.speechQueueDonePromise;
}
// Send the complete response
@@ -808,8 +1054,16 @@ export class VoiceAgent extends EventEmitter {
});
return fullText;
} catch (error) {
// Clean up speech state on error so the agent isn't stuck in a broken state
this.pendingTextBuffer = "";
if (this.speechChunkQueue.length > 0 || this.isSpeaking) {
this.interruptSpeech("stream_error");
}
throw error;
} finally {
this.isProcessing = false;
this.currentStreamAbortController = undefined;
}
}
@@ -848,11 +1102,20 @@ export class VoiceAgent extends EventEmitter {
}
/**
* Send a message via WebSocket if connected
* Send a message via WebSocket if connected.
* Gracefully handles send failures (e.g., socket closing mid-send).
*/
private sendWebSocketMessage(message: Record<string, unknown>): void {
if (this.socket && this.isConnected) {
this.socket.send(JSON.stringify(message));
if (!this.socket || !this.isConnected) return;
try {
if (this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify(message));
}
} catch (error) {
// Socket may have closed between the readyState check and send()
console.error("Failed to send WebSocket message:", error);
this.emit("error", error);
}
}
@@ -895,14 +1158,44 @@ export class VoiceAgent extends EventEmitter {
}
/**
* Disconnect from WebSocket
* Internal helper to close and clean up the current socket.
*/
private disconnectSocket(): void {
if (!this.socket) return;
// Stop all in-flight work tied to this connection
this.cleanupOnDisconnect();
try {
this.socket.removeAllListeners();
if (this.socket.readyState === WebSocket.OPEN ||
this.socket.readyState === WebSocket.CONNECTING) {
this.socket.close();
}
} catch {
// Ignore close errors — socket may already be dead
}
this.socket = undefined;
this.isConnected = false;
}
/**
* Disconnect from WebSocket and stop all in-flight work.
*/
disconnect() {
if (this.socket) {
this.socket.close();
this.socket = undefined;
this.isConnected = false;
}
this.disconnectSocket();
}
/**
* Permanently destroy the agent, releasing all resources.
* After calling this, the agent cannot be reused.
*/
destroy() {
this.isDestroyed = true;
this.disconnectSocket();
this.conversationHistory = [];
this.tools = {};
this.removeAllListeners();
}
/**
@@ -932,4 +1225,11 @@ export class VoiceAgent extends EventEmitter {
get pendingSpeechChunks(): number {
return this.speechChunkQueue.length;
}
/**
* Check if agent has been permanently destroyed
*/
get destroyed(): boolean {
return this.isDestroyed;
}
}