Menu
Nazad na Blog
2 min read
Voice AI

WebSockets für Voice AI: Real-Time Streaming Architekturen

WebSocket-basierte Architekturen für Voice AI Systeme. Audio-Streaming, Bidirektionale Kommunikation und Low-Latency Patterns für Sprachassistenten.

WebSocketVoice AIReal-Time StreamingAudio WebSocketVoice Agent ArchitectureLow Latency
WebSockets für Voice AI: Real-Time Streaming Architekturen

WebSockets für Voice AI: Real-Time Streaming Architekturen

Meta-Description: WebSocket-basierte Architekturen für Voice AI Systeme. Audio-Streaming, Bidirektionale Kommunikation und Low-Latency Patterns für Sprachassistenten.

Keywords: WebSocket, Voice AI, Real-Time Streaming, Audio WebSocket, Voice Agent Architecture, Low Latency, Bidirectional Communication


Einführung

WebSockets sind das Rückgrat moderner Voice AI. Anders als HTTP ermöglichen sie persistente, bidirektionale Verbindungen – essentiell für Echtzeit-Audio-Streaming mit Sub-500ms Latenz.

"WebSockets bieten JSON-basierte Protokolle statt komplexem WebRTC-Signaling – einfacher zu debuggen, universell unterstützt."

Warum WebSockets für Voice AI?

HTTP vs. WebSocket vs. WebRTC

AspektHTTPWebSocketWebRTC
**Verbindung**Request/ResponsePersistentPersistent
**Latenz**Hoch (neue Verbindung)NiedrigSehr niedrig
**Bidirektional**NeinJaJa
**Komplexität**EinfachMittelHoch
**Audio-Streaming**SchlechtGutExzellent
**Debugging**EinfachEinfachSchwer

Fazit: WebSockets sind der Sweet Spot zwischen Einfachheit und Performance.


Voice AI Pipeline über WebSocket

┌─────────────────────────────────────────────────────────────┐
│                VOICE AI WEBSOCKET ARCHITECTURE              │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Client (Browser/App)                                       │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Microphone → AudioWorklet → WebSocket Send          │   │
│  │                                                      │   │
│  │  WebSocket Receive → AudioContext → Speaker          │   │
│  └─────────────────────────────────────────────────────┘   │
│                           │                                 │
│                           ▼                                 │
│  Server                                                     │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  WebSocket Handler                                   │   │
│  │       │                                              │   │
│  │       ▼                                              │   │
│  │  Audio Buffer → STT → LLM → TTS → Audio Stream      │   │
│  │       │           │      │      │                    │   │
│  │       └───────────┴──────┴──────┘                    │   │
│  │              Event-Driven Pipeline                   │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Server-Side Implementation

// src/server/voice-websocket.ts
import { WebSocketServer, WebSocket } from 'ws';
import { createClient } from '@deepgram/sdk';
import { ElevenLabsClient } from 'elevenlabs';
import Anthropic from '@anthropic-ai/sdk';

interface VoiceSession {
  ws: WebSocket;
  deepgramConnection: any;
  conversationHistory: Message[];
  isProcessing: boolean;
}

const sessions = new Map<string, VoiceSession>();

export function createVoiceWebSocketServer(server: any) {
  const wss = new WebSocketServer({ server, path: '/voice' });

  wss.on('connection', async (ws, req) => {
    const sessionId = crypto.randomUUID();
    console.log(`New voice session: ${sessionId}`);

    // Session initialisieren
    const session: VoiceSession = {
      ws,
      deepgramConnection: null,
      conversationHistory: [],
      isProcessing: false
    };

    sessions.set(sessionId, session);

    // Deepgram Streaming Connection
    const deepgram = createClient(process.env.DEEPGRAM_API_KEY!);
    session.deepgramConnection = deepgram.listen.live({
      model: 'nova-2',
      language: 'de',
      smart_format: true,
      interim_results: true,
      endpointing: 500,
      vad_events: true
    });

    // STT Events
    session.deepgramConnection.on('Results', async (data: any) => {
      const transcript = data.channel.alternatives[0].transcript;

      if (data.is_final && transcript.trim()) {
        await processUserInput(session, transcript);
      } else if (transcript) {
        // Interim results für UI
        ws.send(JSON.stringify({
          type: 'transcript_interim',
          text: transcript
        }));
      }
    });

    session.deepgramConnection.on('UtteranceEnd', async () => {
      // User hat aufgehört zu sprechen
      ws.send(JSON.stringify({ type: 'user_speech_end' }));
    });

    // Client Events
    ws.on('message', async (data: Buffer) => {
      const message = parseMessage(data);

      switch (message.type) {
        case 'audio':
          // Audio an Deepgram weiterleiten
          if (session.deepgramConnection) {
            session.deepgramConnection.send(message.audio);
          }
          break;

        case 'interrupt':
          // User unterbricht
          session.isProcessing = false;
          ws.send(JSON.stringify({ type: 'interrupted' }));
          break;

        case 'config':
          // Session-Konfiguration
          break;
      }
    });

    ws.on('close', () => {
      console.log(`Session closed: ${sessionId}`);
      if (session.deepgramConnection) {
        session.deepgramConnection.finish();
      }
      sessions.delete(sessionId);
    });

    // Ready-Signal senden
    ws.send(JSON.stringify({ type: 'ready', sessionId }));
  });

  return wss;
}

async function processUserInput(session: VoiceSession, text: string) {
  if (session.isProcessing) return;
  session.isProcessing = true;

  const { ws } = session;

  // User-Nachricht an Client
  ws.send(JSON.stringify({
    type: 'transcript_final',
    text
  }));

  // Conversation History updaten
  session.conversationHistory.push({
    role: 'user',
    content: text
  });

  // LLM Response generieren
  const anthropic = new Anthropic();
  const stream = await anthropic.messages.stream({
    model: 'claude-3-haiku-20240307',
    max_tokens: 500,
    system: 'Du bist ein hilfreicher Sprachassistent. Antworte kurz und prägnant.',
    messages: session.conversationHistory
  });

  let fullResponse = '';
  let sentenceBuffer = '';

  // Sentence-by-sentence TTS
  const elevenlabs = new ElevenLabsClient();

  for await (const event of stream) {
    if (!session.isProcessing) break; // Interruption Check

    if (event.type === 'content_block_delta') {
      const text = event.delta.text;
      fullResponse += text;
      sentenceBuffer += text;

      // Satz-Ende erkennen
      const sentenceEnd = sentenceBuffer.match(/[.!?]\s/);
      if (sentenceEnd) {
        const sentence = sentenceBuffer.substring(0, sentenceEnd.index! + 1);
        sentenceBuffer = sentenceBuffer.substring(sentenceEnd.index! + 2);

        // TTS für Satz
        await streamTTSToClient(ws, sentence, elevenlabs);
      }
    }
  }

  // Restlichen Buffer sprechen
  if (sentenceBuffer.trim() && session.isProcessing) {
    await streamTTSToClient(ws, sentenceBuffer, elevenlabs);
  }

  // History updaten
  session.conversationHistory.push({
    role: 'assistant',
    content: fullResponse
  });

  session.isProcessing = false;
  ws.send(JSON.stringify({ type: 'response_complete' }));
}

async function streamTTSToClient(
  ws: WebSocket,
  text: string,
  elevenlabs: ElevenLabsClient
) {
  const audioStream = await elevenlabs.textToSpeech.convertAsStream(
    'onwK4e9ZLuTAKqWW03F9',
    {
      text,
      model_id: 'eleven_turbo_v2_5',
      voice_settings: {
        stability: 0.5,
        similarity_boost: 0.75
      },
      optimize_streaming_latency: 2
    }
  );

  for await (const chunk of audioStream) {
    if (ws.readyState === WebSocket.OPEN) {
      ws.send(JSON.stringify({
        type: 'audio',
        data: Buffer.from(chunk).toString('base64')
      }));
    }
  }
}

function parseMessage(data: Buffer): any {
  try {
    return JSON.parse(data.toString());
  } catch {
    // Binary audio data
    return { type: 'audio', audio: data };
  }
}

Client-Side Implementation

// src/client/voice-client.ts
class VoiceWebSocketClient {
  private ws: WebSocket | null = null;
  private audioContext: AudioContext | null = null;
  private mediaStream: MediaStream | null = null;
  private audioWorklet: AudioWorkletNode | null = null;
  private audioQueue: AudioBuffer[] = [];
  private isPlaying = false;

  async connect(url: string) {
    this.ws = new WebSocket(url);

    this.ws.onopen = () => {
      console.log('Connected to voice server');
      this.startAudioCapture();
    };

    this.ws.onmessage = async (event) => {
      const message = JSON.parse(event.data);
      await this.handleMessage(message);
    };

    this.ws.onclose = () => {
      console.log('Disconnected');
      this.stopAudioCapture();
    };
  }

  private async handleMessage(message: any) {
    switch (message.type) {
      case 'ready':
        console.log(`Session: ${message.sessionId}`);
        break;

      case 'transcript_interim':
        this.onInterimTranscript?.(message.text);
        break;

      case 'transcript_final':
        this.onFinalTranscript?.(message.text);
        break;

      case 'audio':
        await this.playAudio(message.data);
        break;

      case 'response_complete':
        this.onResponseComplete?.();
        break;
    }
  }

  private async startAudioCapture() {
    this.audioContext = new AudioContext({ sampleRate: 16000 });
    this.mediaStream = await navigator.mediaDevices.getUserMedia({
      audio: {
        channelCount: 1,
        sampleRate: 16000,
        echoCancellation: true,
        noiseSuppression: true
      }
    });

    // AudioWorklet für effizientes Audio-Processing
    await this.audioContext.audioWorklet.addModule('/audio-processor.js');

    const source = this.audioContext.createMediaStreamSource(this.mediaStream);
    this.audioWorklet = new AudioWorkletNode(
      this.audioContext,
      'audio-processor'
    );

    this.audioWorklet.port.onmessage = (event) => {
      if (this.ws?.readyState === WebSocket.OPEN) {
        // Audio als Base64 senden
        const audioData = event.data;
        this.ws.send(JSON.stringify({
          type: 'audio',
          audio: this.float32ToInt16Base64(audioData)
        }));
      }
    };

    source.connect(this.audioWorklet);
  }

  private async playAudio(base64Data: string) {
    if (!this.audioContext) return;

    const audioData = Uint8Array.from(atob(base64Data), c => c.charCodeAt(0));
    const audioBuffer = await this.audioContext.decodeAudioData(
      audioData.buffer
    );

    this.audioQueue.push(audioBuffer);

    if (!this.isPlaying) {
      this.playNextInQueue();
    }
  }

  private playNextInQueue() {
    if (this.audioQueue.length === 0) {
      this.isPlaying = false;
      return;
    }

    this.isPlaying = true;
    const buffer = this.audioQueue.shift()!;

    const source = this.audioContext!.createBufferSource();
    source.buffer = buffer;
    source.connect(this.audioContext!.destination);

    source.onended = () => {
      this.playNextInQueue();
    };

    source.start();
  }

  interrupt() {
    // Audio-Queue leeren und Server informieren
    this.audioQueue = [];
    this.isPlaying = false;
    this.ws?.send(JSON.stringify({ type: 'interrupt' }));
  }

  private stopAudioCapture() {
    this.mediaStream?.getTracks().forEach(track => track.stop());
    this.audioWorklet?.disconnect();
    this.audioContext?.close();
  }

  private float32ToInt16Base64(float32Array: Float32Array): string {
    const int16Array = new Int16Array(float32Array.length);
    for (let i = 0; i < float32Array.length; i++) {
      int16Array[i] = Math.max(-32768, Math.min(32767,
        Math.round(float32Array[i] * 32767)
      ));
    }
    return btoa(String.fromCharCode(...new Uint8Array(int16Array.buffer)));
  }

  // Event Callbacks
  onInterimTranscript?: (text: string) => void;
  onFinalTranscript?: (text: string) => void;
  onResponseComplete?: () => void;
}

Audio Worklet für Browser

// public/audio-processor.js
class AudioProcessor extends AudioWorkletProcessor {
  constructor() {
    super();
    this.bufferSize = 4096;
    this.buffer = new Float32Array(this.bufferSize);
    this.bufferIndex = 0;
  }

  process(inputs, outputs, parameters) {
    const input = inputs[0];
    if (input.length > 0) {
      const channelData = input[0];

      for (let i = 0; i < channelData.length; i++) {
        this.buffer[this.bufferIndex++] = channelData[i];

        if (this.bufferIndex >= this.bufferSize) {
          // Buffer voll - an Main Thread senden
          this.port.postMessage(this.buffer.slice());
          this.bufferIndex = 0;
        }
      }
    }
    return true;
  }
}

registerProcessor('audio-processor', AudioProcessor);

Message Protocol

// src/types/voice-protocol.ts
type ClientMessage =
  | { type: 'audio'; audio: string } // Base64 Audio
  | { type: 'interrupt' }
  | { type: 'config'; language?: string; voice?: string };

type ServerMessage =
  | { type: 'ready'; sessionId: string }
  | { type: 'transcript_interim'; text: string }
  | { type: 'transcript_final'; text: string }
  | { type: 'audio'; data: string }  // Base64 Audio
  | { type: 'user_speech_end' }
  | { type: 'response_complete' }
  | { type: 'interrupted' }
  | { type: 'error'; message: string };

Latenz-Optimierungen

1. Audio Chunk Size

// Kleinere Chunks = niedrigere Latenz, mehr Overhead
const CHUNK_SIZES = {
  lowLatency: 1024,    // ~64ms bei 16kHz
  balanced: 4096,      // ~256ms bei 16kHz
  efficiency: 8192     // ~512ms bei 16kHz
};

2. Connection Keep-Alive

// Ping/Pong für Connection Health
setInterval(() => {
  if (ws.readyState === WebSocket.OPEN) {
    ws.ping();
  }
}, 30000);

ws.on('pong', () => {
  // Connection alive
});

3. Audio Codec Optimization

// Opus für niedrige Bandbreite
const mediaConstraints = {
  audio: {
    channelCount: 1,
    sampleRate: 16000,
    // Für Opus Encoding (wenn verfügbar)
    // echoCancellation: true,
    // noiseSuppression: true,
    // autoGainControl: true
  }
};

Error Handling & Reconnection

class ResilientVoiceClient extends VoiceWebSocketClient {
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;

  async connect(url: string) {
    try {
      await super.connect(url);
      this.reconnectAttempts = 0;
    } catch (error) {
      await this.handleConnectionError(error);
    }
  }

  private async handleConnectionError(error: Error) {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = Math.min(1000 * 2 ** this.reconnectAttempts, 30000);

      console.log(`Reconnecting in ${delay}ms (attempt ${this.reconnectAttempts})`);

      await new Promise(resolve => setTimeout(resolve, delay));
      await this.connect(this.url);
    } else {
      console.error('Max reconnection attempts reached');
      this.onConnectionFailed?.();
    }
  }

  onConnectionFailed?: () => void;
}

Fazit

WebSockets für Voice AI erfordern:

  1. Bidirektionales Streaming: Audio rein, Audio raus
  2. Event-Driven Architecture: Asynchrone Pipeline-Verarbeitung
  3. Chunk-basiertes Audio: Balance zwischen Latenz und Effizienz
  4. Robuste Reconnection: Graceful Degradation bei Verbindungsproblemen

Die 500ms-Grenze ist mit WebSockets erreichbar – aber nur mit sorgfältiger Architektur.


Bildprompts

  1. "Two-way data stream between client and server, glowing WebSocket connection, technical visualization"
  2. "Audio waveform traveling through network tunnel, real-time streaming concept"
  3. "Voice AI architecture diagram with WebSocket at center, clean technical illustration"

Quellen