mirror of
https://github.com/Bijit-Mondal/VoiceAgent.git
synced 2026-03-02 10:36:37 +00:00
- Added ConversationManager for managing conversation history with configurable limits. - Implemented InputQueue for serial processing of input items. - Created SpeechManager for handling text-to-speech generation and streaming. - Developed StreamProcessor for processing LLM streams and forwarding events. - Added TranscriptionManager for audio transcription using AI SDK. - Introduced WebSocketManager for managing WebSocket connections and messaging. - Updated VoiceAgent to support new architecture and improved socket handling. - Refactored index files to export new core components.
379 lines
14 KiB
JavaScript
379 lines
14 KiB
JavaScript
"use strict";
|
|
Object.defineProperty(exports, "__esModule", { value: true });
|
|
exports.VoiceAgent = void 0;
|
|
const events_1 = require("events");
|
|
const ai_1 = require("ai");
|
|
const core_1 = require("./core");
|
|
/**
|
|
* A single-session voice agent that manages one WebSocket connection at a time.
|
|
*
|
|
* **Important:** Each `VoiceAgent` instance holds its own conversation history,
|
|
* input queue, speech state, and WebSocket. It is designed for **one user per
|
|
* instance**. To support multiple concurrent users, create a separate
|
|
* `VoiceAgent` for each connection:
|
|
*
|
|
* ```ts
|
|
* wss.on("connection", (socket) => {
|
|
* const agent = new VoiceAgent({ model, ... });
|
|
* agent.handleSocket(socket);
|
|
* agent.on("disconnected", () => agent.destroy());
|
|
* });
|
|
* ```
|
|
*
|
|
* Sharing a single instance across multiple users will cause conversation
|
|
* history cross-contamination, interleaved audio, and unpredictable behavior.
|
|
*/
|
|
class VoiceAgent extends events_1.EventEmitter {
|
|
model;
|
|
instructions;
|
|
stopWhen;
|
|
endpoint;
|
|
tools = {};
|
|
isDestroyed = false;
|
|
_isProcessing = false;
|
|
// Abort controller for the current LLM stream
|
|
currentStreamAbortController;
|
|
// ── Managers ──────────────────────────────────────────
|
|
ws;
|
|
speech;
|
|
conversation;
|
|
transcription;
|
|
inputQueue;
|
|
constructor(options) {
|
|
super();
|
|
this.model = options.model;
|
|
this.instructions =
|
|
options.instructions || "You are a helpful voice assistant.";
|
|
this.stopWhen = options.stopWhen || (0, ai_1.stepCountIs)(5);
|
|
this.endpoint = options.endpoint;
|
|
if (options.tools) {
|
|
this.tools = { ...options.tools };
|
|
}
|
|
// ── Initialize managers ──────────────────────────────
|
|
this.ws = new core_1.WebSocketManager();
|
|
this.speech = new core_1.SpeechManager({
|
|
speechModel: options.speechModel,
|
|
voice: options.voice,
|
|
speechInstructions: options.speechInstructions,
|
|
outputFormat: options.outputFormat,
|
|
streamingSpeech: options.streamingSpeech,
|
|
});
|
|
this.conversation = new core_1.ConversationManager({
|
|
history: options.history,
|
|
});
|
|
this.transcription = new core_1.TranscriptionManager({
|
|
transcriptionModel: options.transcriptionModel,
|
|
maxAudioInputSize: options.maxAudioInputSize,
|
|
});
|
|
this.inputQueue = new core_1.InputQueue();
|
|
// ── Wire managers to the WebSocket send function ─────
|
|
const sendMsg = (msg) => this.ws.send(msg);
|
|
this.speech.sendMessage = sendMsg;
|
|
this.transcription.sendMessage = sendMsg;
|
|
// ── Wire the input queue processor ───────────────────
|
|
this.inputQueue.processor = (item) => this.processUserInput(item.text);
|
|
// ── Bubble events from managers ──────────────────────
|
|
this.bubbleEvents(this.ws, [
|
|
"connected",
|
|
"error",
|
|
]);
|
|
this.bubbleEvents(this.speech, [
|
|
"speech_start",
|
|
"speech_complete",
|
|
"speech_interrupted",
|
|
"speech_chunk_queued",
|
|
"audio_chunk",
|
|
"audio",
|
|
"error",
|
|
]);
|
|
this.bubbleEvents(this.conversation, [
|
|
"history_cleared",
|
|
"history_trimmed",
|
|
]);
|
|
this.bubbleEvents(this.transcription, [
|
|
"transcription",
|
|
"audio_received",
|
|
"error",
|
|
"warning",
|
|
]);
|
|
// ── Handle WebSocket lifecycle events ────────────────
|
|
this.ws.on("disconnected", () => {
|
|
this.cleanupOnDisconnect();
|
|
this.emit("disconnected");
|
|
});
|
|
this.ws.on("message", (message) => this.handleMessage(message));
|
|
}
|
|
// ── Public API ────────────────────────────────────────
|
|
registerTools(tools) {
|
|
this.tools = { ...this.tools, ...tools };
|
|
}
|
|
/**
|
|
* Transcribe audio data to text using the configured transcription model.
|
|
*/
|
|
async transcribeAudio(audioData) {
|
|
return this.transcription.transcribeAudio(audioData);
|
|
}
|
|
/**
|
|
* Generate speech from text using the configured speech model.
|
|
*/
|
|
async generateSpeechFromText(text, abortSignal) {
|
|
return this.speech.generateSpeechFromText(text, abortSignal);
|
|
}
|
|
/**
|
|
* Interrupt ongoing speech generation and playback (barge-in support).
|
|
*/
|
|
interruptSpeech(reason = "interrupted") {
|
|
this.speech.interruptSpeech(reason);
|
|
}
|
|
/**
|
|
* Interrupt both the current LLM stream and ongoing speech.
|
|
*/
|
|
interruptCurrentResponse(reason = "interrupted") {
|
|
if (this.currentStreamAbortController) {
|
|
this.currentStreamAbortController.abort();
|
|
this.currentStreamAbortController = undefined;
|
|
}
|
|
this.speech.interruptSpeech(reason);
|
|
}
|
|
/**
|
|
* Connect to a WebSocket server by URL.
|
|
*/
|
|
async connect(url) {
|
|
this.ensureNotDestroyed();
|
|
const wsUrl = url || this.endpoint || "ws://localhost:8080";
|
|
await this.ws.connect(wsUrl);
|
|
}
|
|
/**
|
|
* Attach an existing WebSocket (server-side usage).
|
|
*/
|
|
handleSocket(socket) {
|
|
this.ensureNotDestroyed();
|
|
this.ws.handleSocket(socket);
|
|
}
|
|
/**
|
|
* Send text input for processing (bypasses transcription).
|
|
*/
|
|
async sendText(text) {
|
|
this.ensureNotDestroyed();
|
|
if (!text || !text.trim()) {
|
|
throw new Error("Text input cannot be empty");
|
|
}
|
|
return this.enqueueInput(text);
|
|
}
|
|
/**
|
|
* Send base64 audio data to be transcribed and processed.
|
|
*/
|
|
async sendAudio(audioData) {
|
|
this.ensureNotDestroyed();
|
|
await this.handleAudioInput(audioData);
|
|
}
|
|
/**
|
|
* Send raw audio buffer to be transcribed and processed.
|
|
*/
|
|
async sendAudioBuffer(audioBuffer) {
|
|
this.ensureNotDestroyed();
|
|
const base64Audio = Buffer.from(audioBuffer).toString("base64");
|
|
await this.handleAudioInput(base64Audio);
|
|
}
|
|
/**
|
|
* Generate speech for full text at once (non-streaming fallback).
|
|
*/
|
|
async generateAndSendSpeechFull(text) {
|
|
return this.speech.generateAndSendSpeechFull(text);
|
|
}
|
|
/** Start listening for voice input */
|
|
startListening() {
|
|
console.log("Starting voice agent...");
|
|
this.emit("listening");
|
|
}
|
|
/** Stop listening for voice input */
|
|
stopListening() {
|
|
console.log("Stopping voice agent...");
|
|
this.emit("stopped");
|
|
}
|
|
/** Clear conversation history */
|
|
clearHistory() {
|
|
this.conversation.clearHistory();
|
|
}
|
|
/** Get current conversation history */
|
|
getHistory() {
|
|
return this.conversation.getHistory();
|
|
}
|
|
/** Set conversation history (useful for restoring sessions) */
|
|
setHistory(history) {
|
|
this.conversation.setHistory(history);
|
|
}
|
|
/** Disconnect from WebSocket and stop all in-flight work */
|
|
disconnect() {
|
|
this.ws.disconnect();
|
|
}
|
|
/**
|
|
* Permanently destroy the agent, releasing all resources.
|
|
*/
|
|
destroy() {
|
|
this.isDestroyed = true;
|
|
this.cleanupOnDisconnect();
|
|
this.ws.disconnect();
|
|
this.conversation.clearHistory();
|
|
this.tools = {};
|
|
this.removeAllListeners();
|
|
}
|
|
// ── Getters ───────────────────────────────────────────
|
|
get connected() {
|
|
return this.ws.isConnected;
|
|
}
|
|
get processing() {
|
|
return this._isProcessing;
|
|
}
|
|
get speaking() {
|
|
return this.speech.isSpeaking;
|
|
}
|
|
get pendingSpeechChunks() {
|
|
return this.speech.pendingChunkCount;
|
|
}
|
|
get destroyed() {
|
|
return this.isDestroyed;
|
|
}
|
|
// ── Private: message handling ─────────────────────────
|
|
async handleMessage(message) {
|
|
try {
|
|
console.log(`Received WebSocket message of type: ${message.type}`);
|
|
if (message.type === "transcript") {
|
|
if (typeof message.text !== "string" || !message.text.trim()) {
|
|
this.emit("warning", "Received empty or invalid transcript message");
|
|
return;
|
|
}
|
|
this.interruptCurrentResponse("user_speaking");
|
|
console.log(`Processing transcript: "${message.text}"`);
|
|
await this.enqueueInput(message.text);
|
|
}
|
|
else if (message.type === "audio") {
|
|
if (typeof message.data !== "string" || !message.data) {
|
|
this.emit("warning", "Received empty or invalid audio message");
|
|
return;
|
|
}
|
|
this.interruptCurrentResponse("user_speaking");
|
|
console.log(`Received audio data (${message.data.length / 1000}KB) for processing, format: ${message.format || "unknown"}`);
|
|
await this.handleAudioInput(message.data, message.format);
|
|
}
|
|
else if (message.type === "interrupt") {
|
|
console.log(`Received interrupt request: ${message.reason || "client_request"}`);
|
|
this.interruptCurrentResponse(message.reason || "client_request");
|
|
}
|
|
}
|
|
catch (err) {
|
|
console.error("Failed to process message:", err);
|
|
this.emit("error", err);
|
|
}
|
|
}
|
|
// ── Private: audio ────────────────────────────────────
|
|
async handleAudioInput(base64Audio, format) {
|
|
const text = await this.transcription.processAudioInput(base64Audio, format);
|
|
if (text) {
|
|
await this.enqueueInput(text);
|
|
}
|
|
}
|
|
// ── Private: input queue ──────────────────────────────
|
|
enqueueInput(text) {
|
|
return new Promise((resolve, reject) => {
|
|
this.inputQueue.enqueue({ text, resolve, reject });
|
|
});
|
|
}
|
|
// ── Private: LLM processing ───────────────────────────
|
|
/**
|
|
* Process user input with streaming text generation.
|
|
* Called serially by the input queue.
|
|
*/
|
|
async processUserInput(text) {
|
|
this._isProcessing = true;
|
|
this.currentStreamAbortController = new AbortController();
|
|
const streamAbortSignal = this.currentStreamAbortController.signal;
|
|
try {
|
|
this.emit("text", { role: "user", text });
|
|
this.conversation.addMessage({ role: "user", content: text });
|
|
const result = (0, ai_1.streamText)({
|
|
model: this.model,
|
|
system: this.instructions,
|
|
messages: this.conversation.getHistoryRef(),
|
|
tools: this.tools,
|
|
stopWhen: this.stopWhen,
|
|
abortSignal: streamAbortSignal,
|
|
onChunk: ({ chunk }) => {
|
|
(0, core_1.handleStreamChunk)(chunk, (event, data) => this.emit(event, data));
|
|
},
|
|
onFinish: async (event) => {
|
|
for (const step of event.steps) {
|
|
for (const toolResult of step.toolResults) {
|
|
this.emit("tool_result", {
|
|
name: toolResult.toolName,
|
|
toolCallId: toolResult.toolCallId,
|
|
result: toolResult.output,
|
|
});
|
|
}
|
|
}
|
|
},
|
|
onError: ({ error }) => {
|
|
console.error("Stream error:", error);
|
|
this.emit("error", error);
|
|
},
|
|
});
|
|
const streamResult = await (0, core_1.processFullStream)(result, {
|
|
onTextDelta: (delta) => this.speech.processTextDelta(delta),
|
|
onTextEnd: () => this.speech.flushPendingText(),
|
|
sendMessage: (msg) => this.ws.send(msg),
|
|
emitEvent: (event, data) => this.emit(event, data),
|
|
});
|
|
// Add assistant response to history
|
|
if (streamResult.fullText) {
|
|
this.conversation.addMessage({
|
|
role: "assistant",
|
|
content: streamResult.fullText,
|
|
});
|
|
}
|
|
// Flush any remaining speech
|
|
this.speech.flushPendingText();
|
|
// Wait for all speech chunks to complete
|
|
if (this.speech.queueDonePromise) {
|
|
await this.speech.queueDonePromise;
|
|
}
|
|
return streamResult.fullText;
|
|
}
|
|
catch (error) {
|
|
// Clean up speech state on error
|
|
this.speech.reset();
|
|
throw error;
|
|
}
|
|
finally {
|
|
this._isProcessing = false;
|
|
this.currentStreamAbortController = undefined;
|
|
}
|
|
}
|
|
// ── Private: helpers ──────────────────────────────────
|
|
ensureNotDestroyed() {
|
|
if (this.isDestroyed) {
|
|
throw new Error("VoiceAgent has been destroyed and cannot be used");
|
|
}
|
|
}
|
|
/**
|
|
* Clean up all in-flight state when the connection drops.
|
|
*/
|
|
cleanupOnDisconnect() {
|
|
if (this.currentStreamAbortController) {
|
|
this.currentStreamAbortController.abort();
|
|
this.currentStreamAbortController = undefined;
|
|
}
|
|
this.speech.reset();
|
|
this._isProcessing = false;
|
|
this.inputQueue.rejectAll(new Error("Connection closed"));
|
|
}
|
|
/**
|
|
* Forward select events from a child emitter to this agent.
|
|
*/
|
|
bubbleEvents(source, events) {
|
|
for (const event of events) {
|
|
source.on(event, (...args) => this.emit(event, ...args));
|
|
}
|
|
}
|
|
}
|
|
exports.VoiceAgent = VoiceAgent;
|
|
//# sourceMappingURL=VoiceAgent.new.js.map
|