Menu
Zurück zum Blog
1 min read
KI-Entwicklung

Event-Driven Architecture für AI-Systeme

Design patterns für ereignisgesteuerte AI-Architekturen. Message Queues, Event Sourcing und Reactive Pipelines für skalierbare AI-Anwendungen.

Event-Driven ArchitectureAI SystemsMessage QueueRedisRabbitMQEvent Sourcing
Event-Driven Architecture für AI-Systeme

Event-Driven Architecture für AI-Systeme

Meta-Description: Design patterns für ereignisgesteuerte AI-Architekturen. Message Queues, Event Sourcing und Reactive Pipelines für skalierbare AI-Anwendungen.

Keywords: Event-Driven Architecture, AI Systems, Message Queue, Redis, RabbitMQ, Event Sourcing, Reactive AI, BullMQ


Einführung

AI-Systeme sind inhärent asynchron: LLM-Calls dauern Sekunden, nicht Millisekunden. Event-Driven Architecture (EDA) ist die natürliche Lösung für skalierbare, resiliente AI-Pipelines.


Warum EDA für AI?

┌─────────────────────────────────────────────────────────────┐
│              WHY EVENT-DRIVEN FOR AI?                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Problem: Request/Response für AI                           │
│  ─────────────────────────────────────────                  │
│  Client ──Request──→ Server ──LLM Call (5s)──→ Response    │
│            └── Timeout! Connection lost!                    │
│                                                             │
│  Lösung: Event-Driven                                       │
│  ─────────────────────────────────────────                  │
│  Client ──Event──→ Queue ──Worker──→ LLM ──Event──→ Client │
│            └── Sofort bestätigt, async verarbeitet          │
│                                                             │
│  Vorteile:                                                  │
│  ✓ Keine Timeouts                                          │
│  ✓ Retry bei Fehlern                                       │
│  ✓ Horizontal skalierbar                                   │
│  ✓ Lastverteilung                                          │
│  ✓ Entkopplung                                             │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Architecture Pattern: AI Processing Pipeline

// src/events/types.ts
interface AIEvent {
  id: string;
  type: string;
  timestamp: Date;
  payload: any;
  metadata: {
    userId?: string;
    correlationId: string;
    source: string;
  };
}

// Event Types
type AIEventType =
  | 'analysis.requested'
  | 'analysis.started'
  | 'analysis.completed'
  | 'analysis.failed'
  | 'llm.call.started'
  | 'llm.call.completed'
  | 'notification.send';

BullMQ Implementation

Queue Setup

// src/queues/ai-queue.ts
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';

const redis = new Redis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null
});

// AI Processing Queue
export const aiQueue = new Queue('ai-processing', {
  connection: redis,
  defaultJobOptions: {
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000
    },
    removeOnComplete: {
      count: 1000,
      age: 24 * 3600 // 24h
    },
    removeOnFail: {
      count: 5000
    }
  }
});

// Job hinzufügen
export async function queueAITask(
  type: string,
  data: any,
  priority: number = 0
): Promise<string> {
  const job = await aiQueue.add(type, data, {
    priority,
    jobId: `${type}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
  });

  return job.id!;
}

Worker Implementation

// src/workers/ai-worker.ts
import { Worker, Job } from 'bullmq';
import Anthropic from '@anthropic-ai/sdk';
import { eventBus } from './event-bus';

const anthropic = new Anthropic();

export const aiWorker = new Worker(
  'ai-processing',
  async (job: Job) => {
    const { type, data } = job;

    // Event: Processing started
    await eventBus.emit('analysis.started', {
      jobId: job.id,
      type
    });

    try {
      switch (type) {
        case 'analyze-product':
          return await analyzeProduct(job, data);

        case 'generate-content':
          return await generateContent(job, data);

        case 'summarize-document':
          return await summarizeDocument(job, data);

        default:
          throw new Error(`Unknown job type: ${type}`);
      }
    } catch (error) {
      // Event: Processing failed
      await eventBus.emit('analysis.failed', {
        jobId: job.id,
        error: error.message
      });
      throw error;
    }
  },
  {
    connection: redis,
    concurrency: 5, // 5 parallele Jobs
    limiter: {
      max: 10,       // Max 10 Jobs
      duration: 1000 // pro Sekunde (Rate Limiting)
    }
  }
);

async function analyzeProduct(job: Job, data: ProductData) {
  // Progress tracking
  await job.updateProgress(10);

  const response = await anthropic.messages.create({
    model: 'claude-3-haiku-20240307',
    max_tokens: 500,
    messages: [{
      role: 'user',
      content: `Analysiere dieses Produkt: ${JSON.stringify(data)}`
    }]
  });

  await job.updateProgress(90);

  const result = {
    analysis: response.content[0].text,
    confidence: 0.85,
    timestamp: new Date()
  };

  // Event: Processing completed
  await eventBus.emit('analysis.completed', {
    jobId: job.id,
    result
  });

  await job.updateProgress(100);

  return result;
}

Event Bus Implementation

// src/events/event-bus.ts
import { EventEmitter } from 'events';
import { Redis } from 'ioredis';

class EventBus extends EventEmitter {
  private publisher: Redis;
  private subscriber: Redis;

  constructor() {
    super();
    this.publisher = new Redis(process.env.REDIS_URL!);
    this.subscriber = new Redis(process.env.REDIS_URL!);

    this.setupSubscriber();
  }

  private setupSubscriber() {
    this.subscriber.on('message', (channel, message) => {
      const event = JSON.parse(message);
      this.emit(event.type, event);
    });
  }

  async subscribe(pattern: string) {
    await this.subscriber.psubscribe(pattern);
  }

  async emit(type: string, payload: any): Promise<void> {
    const event: AIEvent = {
      id: crypto.randomUUID(),
      type,
      timestamp: new Date(),
      payload,
      metadata: {
        correlationId: payload.correlationId || crypto.randomUUID(),
        source: 'ai-service'
      }
    };

    // Lokal emittieren
    super.emit(type, event);

    // An andere Services publishen
    await this.publisher.publish('ai-events', JSON.stringify(event));

    // Event Log speichern
    await this.logEvent(event);
  }

  private async logEvent(event: AIEvent) {
    await this.publisher.xadd(
      'events-log',
      '*',
      'event', JSON.stringify(event)
    );
  }
}

export const eventBus = new EventBus();

Real-Time Updates via WebSocket

// src/websocket/event-stream.ts
import { WebSocketServer } from 'ws';
import { eventBus } from '../events/event-bus';

export function setupEventStream(server: any) {
  const wss = new WebSocketServer({ server, path: '/events' });

  wss.on('connection', (ws, req) => {
    const userId = extractUserId(req);
    console.log(`Event stream connected: ${userId}`);

    // Event Listener für diesen User
    const handlers = {
      'analysis.started': (event: AIEvent) => {
        if (event.metadata.userId === userId) {
          ws.send(JSON.stringify(event));
        }
      },
      'analysis.completed': (event: AIEvent) => {
        if (event.metadata.userId === userId) {
          ws.send(JSON.stringify(event));
        }
      },
      'analysis.failed': (event: AIEvent) => {
        if (event.metadata.userId === userId) {
          ws.send(JSON.stringify(event));
        }
      }
    };

    // Event Listener registrieren
    Object.entries(handlers).forEach(([type, handler]) => {
      eventBus.on(type, handler);
    });

    ws.on('close', () => {
      // Cleanup
      Object.entries(handlers).forEach(([type, handler]) => {
        eventBus.off(type, handler);
      });
    });
  });
}

Client-Side Integration

// src/client/event-client.ts
class AIEventClient {
  private ws: WebSocket | null = null;
  private pendingJobs = new Map<string, {
    resolve: (result: any) => void;
    reject: (error: any) => void;
  }>();

  connect(url: string) {
    this.ws = new WebSocket(url);

    this.ws.onmessage = (event) => {
      const aiEvent: AIEvent = JSON.parse(event.data);
      this.handleEvent(aiEvent);
    };
  }

  private handleEvent(event: AIEvent) {
    const jobId = event.payload.jobId;
    const pending = this.pendingJobs.get(jobId);

    switch (event.type) {
      case 'analysis.started':
        this.onProgress?.(jobId, 'started');
        break;

      case 'analysis.completed':
        if (pending) {
          pending.resolve(event.payload.result);
          this.pendingJobs.delete(jobId);
        }
        break;

      case 'analysis.failed':
        if (pending) {
          pending.reject(new Error(event.payload.error));
          this.pendingJobs.delete(jobId);
        }
        break;
    }
  }

  // Promise-basierte API über Events
  async analyzeProduct(product: ProductData): Promise<AnalysisResult> {
    // Job einreichen
    const response = await fetch('/api/analyze', {
      method: 'POST',
      body: JSON.stringify(product)
    });

    const { jobId } = await response.json();

    // Auf Event warten
    return new Promise((resolve, reject) => {
      this.pendingJobs.set(jobId, { resolve, reject });

      // Timeout
      setTimeout(() => {
        if (this.pendingJobs.has(jobId)) {
          this.pendingJobs.delete(jobId);
          reject(new Error('Analysis timeout'));
        }
      }, 60000); // 60s Timeout
    });
  }

  onProgress?: (jobId: string, status: string) => void;
}

Event Sourcing für AI Decisions

// src/events/event-store.ts
interface AIDecisionEvent {
  eventId: string;
  aggregateId: string; // z.B. analysisId
  type: string;
  data: any;
  timestamp: Date;
  version: number;
}

class AIEventStore {
  private redis: Redis;

  constructor() {
    this.redis = new Redis(process.env.REDIS_URL!);
  }

  async append(aggregateId: string, events: AIDecisionEvent[]) {
    const key = `aggregate:${aggregateId}`;

    for (const event of events) {
      await this.redis.xadd(
        key,
        '*',
        'data', JSON.stringify(event)
      );
    }
  }

  async getEvents(aggregateId: string): Promise<AIDecisionEvent[]> {
    const key = `aggregate:${aggregateId}`;
    const entries = await this.redis.xrange(key, '-', '+');

    return entries.map(([id, fields]) => {
      const data = JSON.parse(fields[1]);
      return { ...data, eventId: id };
    });
  }

  // Reconstruct State from Events
  async reconstruct<T>(
    aggregateId: string,
    reducer: (state: T, event: AIDecisionEvent) => T,
    initialState: T
  ): Promise<T> {
    const events = await this.getEvents(aggregateId);
    return events.reduce(reducer, initialState);
  }
}

// Beispiel: AI-Analyse mit vollständiger History
const eventStore = new AIEventStore();

// Events speichern
await eventStore.append('analysis-123', [
  { type: 'analysis.created', data: { input: '...' }, ... },
  { type: 'llm.called', data: { model: 'claude-3-haiku', tokens: 500 }, ... },
  { type: 'analysis.completed', data: { result: '...' }, ... }
]);

// State rekonstruieren
const analysisState = await eventStore.reconstruct(
  'analysis-123',
  (state, event) => {
    switch (event.type) {
      case 'analysis.created':
        return { ...state, input: event.data.input, status: 'pending' };
      case 'analysis.completed':
        return { ...state, result: event.data.result, status: 'completed' };
      default:
        return state;
    }
  },
  { input: null, result: null, status: 'unknown' }
);

Scaling Pattern

# docker-compose.yml für skalierbare AI Worker
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  api:
    build: .
    environment:
      - REDIS_URL=redis://redis:6379
    ports:
      - "3000:3000"

  ai-worker:
    build: .
    command: npm run worker
    environment:
      - REDIS_URL=redis://redis:6379
      - CONCURRENCY=5
    deploy:
      replicas: 3  # 3 Worker-Instanzen

  scheduler:
    build: .
    command: npm run scheduler
    environment:
      - REDIS_URL=redis://redis:6379

Monitoring

// src/monitoring/queue-metrics.ts
import { aiQueue } from '../queues/ai-queue';

async function getQueueMetrics() {
  const [waiting, active, completed, failed] = await Promise.all([
    aiQueue.getWaitingCount(),
    aiQueue.getActiveCount(),
    aiQueue.getCompletedCount(),
    aiQueue.getFailedCount()
  ]);

  return {
    waiting,
    active,
    completed,
    failed,
    throughput: completed / (Date.now() / 1000 / 60) // per minute
  };
}

// Prometheus Metrics
import { Counter, Gauge, Histogram } from 'prom-client';

const jobsProcessed = new Counter({
  name: 'ai_jobs_processed_total',
  help: 'Total AI jobs processed',
  labelNames: ['type', 'status']
});

const jobDuration = new Histogram({
  name: 'ai_job_duration_seconds',
  help: 'AI job processing duration',
  labelNames: ['type'],
  buckets: [0.5, 1, 2, 5, 10, 30, 60]
});

const queueDepth = new Gauge({
  name: 'ai_queue_depth',
  help: 'Current queue depth',
  labelNames: ['queue']
});

Fazit

Event-Driven Architecture für AI-Systeme bietet:

  1. Resilienz: Automatische Retries, keine Timeouts
  2. Skalierbarkeit: Horizontale Worker-Skalierung
  3. Transparenz: Event Sourcing für Audit-Trail
  4. Real-Time Updates: WebSocket-Events an Clients

Für produktive AI-Anwendungen ist EDA nicht optional – es ist die Grundlage für zuverlässige Systeme.


Bildprompts

  1. "Event flow diagram with AI processing nodes, message queue visualization, technical architecture"
  2. "Multiple workers processing AI tasks from central queue, assembly line concept"
  3. "Event stream timeline with AI decision points highlighted, data visualization style"

Quellen