mirror of
https://github.com/Bijit-Mondal/VoiceAgent.git
synced 2026-03-02 18:36:39 +00:00
feat(example): video streaming
This commit is contained in:
@@ -84,6 +84,10 @@ const DEFAULT_VIDEO_AGENT_CONFIG: VideoAgentConfig = {
|
||||
};
|
||||
|
||||
export interface VideoAgentOptions {
|
||||
/**
|
||||
* AI SDK Model for chat. Must be a vision-enabled model (e.g., openai('gpt-4o'),
|
||||
* anthropic('claude-3.5-sonnet'), google('gemini-1.5-pro')) to process video frames.
|
||||
*/
|
||||
model: LanguageModel; // AI SDK Model for chat (e.g., openai('gpt-4o'))
|
||||
transcriptionModel?: TranscriptionModel; // AI SDK Transcription Model (e.g., openai.transcription('whisper-1'))
|
||||
speechModel?: SpeechModel; // AI SDK Speech Model (e.g., openai.speech('gpt-4o-mini-tts'))
|
||||
@@ -285,6 +289,7 @@ Use tools when needed to provide accurate information.`;
|
||||
// Handle raw audio data that needs transcription
|
||||
case "audio":
|
||||
if (typeof message.data !== "string" || !message.data) {
|
||||
console.warn("Received empty or invalid audio message");
|
||||
this.emit("warning", "Received empty or invalid audio message");
|
||||
return;
|
||||
}
|
||||
@@ -293,9 +298,15 @@ Use tools when needed to provide accurate information.`;
|
||||
// Force capture current frame when user speaks
|
||||
this.requestFrameCapture("user_request");
|
||||
console.log(
|
||||
`Received audio data (${message.data.length / 1000}KB) for processing, format: ${message.format || "unknown"}`
|
||||
`[audio handler] Received audio data (${(message.data.length / 1000).toFixed(1)}KB) for processing, format: ${message.format || "unknown"}`
|
||||
);
|
||||
await this.processAudioInput(message);
|
||||
try {
|
||||
await this.processAudioInput(message);
|
||||
console.log(`[audio handler] processAudioInput completed`);
|
||||
} catch (audioError) {
|
||||
console.error(`[audio handler] Error in processAudioInput:`, audioError);
|
||||
this.emit("error", audioError);
|
||||
}
|
||||
break;
|
||||
|
||||
// Handle video frame from client
|
||||
@@ -850,13 +861,20 @@ Use tools when needed to provide accurate information.`;
|
||||
/**
|
||||
* Process incoming audio data: transcribe and generate response
|
||||
*/
|
||||
private async processAudioInput(audioMessage: AudioData): Promise<void> {
|
||||
private async processAudioInput(audioMessage: AudioData | { type: string; data: string; format?: string; sessionId?: string }): Promise<void> {
|
||||
if (!this.transcriptionModel) {
|
||||
this.emit("error", new Error("Transcription model not configured for audio input"));
|
||||
const error = new Error("Transcription model not configured for audio input");
|
||||
console.error(error.message);
|
||||
this.emit("error", error);
|
||||
this.sendWebSocketMessage({
|
||||
type: "error",
|
||||
error: error.message,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
console.log(`[processAudioInput] Starting audio processing, data length: ${audioMessage.data?.length || 0}`);
|
||||
const audioBuffer = Buffer.from(audioMessage.data, "base64");
|
||||
|
||||
if (audioBuffer.length > this.maxAudioInputSize) {
|
||||
@@ -877,19 +895,23 @@ Use tools when needed to provide accurate information.`;
|
||||
this.emit("audio_received", {
|
||||
size: audioBuffer.length,
|
||||
format: audioMessage.format,
|
||||
sessionId: audioMessage.sessionId,
|
||||
sessionId: audioMessage.sessionId || this.sessionId,
|
||||
});
|
||||
|
||||
console.log(
|
||||
`Processing audio input: ${audioBuffer.length} bytes, format: ${audioMessage.format || "unknown"}`
|
||||
`[processAudioInput] Processing audio: ${audioBuffer.length} bytes, format: ${audioMessage.format || "unknown"}`
|
||||
);
|
||||
|
||||
console.log(`[processAudioInput] Calling transcribeAudio...`);
|
||||
const transcribedText = await this.transcribeAudio(audioBuffer);
|
||||
console.log(`Transcribed text: "${transcribedText}"`);
|
||||
console.log(`[processAudioInput] Transcribed text: "${transcribedText}"`);
|
||||
|
||||
if (transcribedText.trim()) {
|
||||
console.log(`[processAudioInput] Enqueueing text input: "${transcribedText}"`);
|
||||
await this.enqueueTextInput(transcribedText);
|
||||
console.log(`[processAudioInput] Text input processing complete`);
|
||||
} else {
|
||||
console.warn(`[processAudioInput] Transcription returned empty text`);
|
||||
this.emit("warning", "Transcription returned empty text");
|
||||
this.sendWebSocketMessage({
|
||||
type: "transcription_error",
|
||||
@@ -897,7 +919,7 @@ Use tools when needed to provide accurate information.`;
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error("Failed to process audio input:", error);
|
||||
console.error("[processAudioInput] Failed to process audio input:", error);
|
||||
this.emit("error", error);
|
||||
this.sendWebSocketMessage({
|
||||
type: "transcription_error",
|
||||
@@ -1049,28 +1071,38 @@ Use tools when needed to provide accurate information.`;
|
||||
* Drain the input queue, processing one request at a time
|
||||
*/
|
||||
private async drainInputQueue(): Promise<void> {
|
||||
if (this.processingQueue) return;
|
||||
if (this.processingQueue) {
|
||||
console.log(`[drainInputQueue] Already processing, skipping`);
|
||||
return;
|
||||
}
|
||||
this.processingQueue = true;
|
||||
console.log(`[drainInputQueue] Starting to drain queue, ${this.inputQueue.length} items`);
|
||||
|
||||
try {
|
||||
while (this.inputQueue.length > 0) {
|
||||
const item = this.inputQueue.shift()!;
|
||||
console.log(`[drainInputQueue] Processing item: text="${item.text?.substring(0, 50)}...", hasFrame=${!!item.frame}`);
|
||||
try {
|
||||
let result: string;
|
||||
if (item.frame && item.text) {
|
||||
console.log(`[drainInputQueue] Calling processMultimodalInput`);
|
||||
result = await this.processMultimodalInput(item.text, item.frame);
|
||||
} else if (item.text) {
|
||||
console.log(`[drainInputQueue] Calling processUserInput`);
|
||||
result = await this.processUserInput(item.text);
|
||||
} else {
|
||||
result = "";
|
||||
}
|
||||
console.log(`[drainInputQueue] Got result: "${result?.substring(0, 100)}..."`);
|
||||
item.resolve(result);
|
||||
} catch (error) {
|
||||
console.error(`[drainInputQueue] Error processing item:`, error);
|
||||
item.reject(error);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.processingQueue = false;
|
||||
console.log(`[drainInputQueue] Done draining queue`);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1173,6 +1205,7 @@ Use tools when needed to provide accurate information.`;
|
||||
* Process user input with streaming text generation
|
||||
*/
|
||||
private async processUserInput(text: string): Promise<string> {
|
||||
console.log(`[processUserInput] Starting with text: "${text}"`);
|
||||
this.isProcessing = true;
|
||||
this.currentStreamAbortController = new AbortController();
|
||||
const streamAbortSignal = this.currentStreamAbortController.signal;
|
||||
@@ -1182,6 +1215,7 @@ Use tools when needed to provide accurate information.`;
|
||||
|
||||
// Check if we have current frame data - if so, include it
|
||||
const hasVisualContext = !!this.currentFrameData;
|
||||
console.log(`[processUserInput] hasVisualContext: ${hasVisualContext}`);
|
||||
|
||||
let messages: ModelMessage[];
|
||||
|
||||
@@ -1207,6 +1241,10 @@ Use tools when needed to provide accurate information.`;
|
||||
|
||||
this.trimHistory();
|
||||
|
||||
console.log(`[processUserInput] Calling streamText with ${messages.length} messages`);
|
||||
console.log(`[processUserInput] Model:`, this.model);
|
||||
console.log(`[processUserInput] Tools:`, Object.keys(this.tools));
|
||||
|
||||
const result = streamText({
|
||||
model: this.model,
|
||||
system: this.instructions,
|
||||
@@ -1218,6 +1256,7 @@ Use tools when needed to provide accurate information.`;
|
||||
this.handleStreamChunk(chunk);
|
||||
},
|
||||
onFinish: async (event) => {
|
||||
console.log(`[processUserInput] onFinish called`);
|
||||
for (const step of event.steps) {
|
||||
for (const toolResult of step.toolResults) {
|
||||
this.emit("tool_result", {
|
||||
@@ -1229,11 +1268,12 @@ Use tools when needed to provide accurate information.`;
|
||||
}
|
||||
},
|
||||
onError: ({ error }) => {
|
||||
console.error("Stream error:", error);
|
||||
console.error("[processUserInput] Stream error:", error);
|
||||
this.emit("error", error);
|
||||
},
|
||||
});
|
||||
|
||||
console.log(`[processUserInput] Calling processStreamResult`);
|
||||
return await this.processStreamResult(result);
|
||||
} catch (error) {
|
||||
this.pendingTextBuffer = "";
|
||||
|
||||
Reference in New Issue
Block a user