Event-Driven Architecture für AI-Systeme
Design patterns für ereignisgesteuerte AI-Architekturen. Message Queues, Event Sourcing und Reactive Pipelines für skalierbare AI-Anwendungen.

Event-Driven Architecture für AI-Systeme
Meta-Description: Design patterns für ereignisgesteuerte AI-Architekturen. Message Queues, Event Sourcing und Reactive Pipelines für skalierbare AI-Anwendungen.
Keywords: Event-Driven Architecture, AI Systems, Message Queue, Redis, RabbitMQ, Event Sourcing, Reactive AI, BullMQ
Einführung
AI-Systeme sind inhärent asynchron: LLM-Calls dauern Sekunden, nicht Millisekunden. Event-Driven Architecture (EDA) ist die natürliche Lösung für skalierbare, resiliente AI-Pipelines.
Warum EDA für AI?
┌─────────────────────────────────────────────────────────────┐
│ WHY EVENT-DRIVEN FOR AI? │
├─────────────────────────────────────────────────────────────┤
│ │
│ Problem: Request/Response für AI │
│ ───────────────────────────────────────── │
│ Client ──Request──→ Server ──LLM Call (5s)──→ Response │
│ └── Timeout! Connection lost! │
│ │
│ Lösung: Event-Driven │
│ ───────────────────────────────────────── │
│ Client ──Event──→ Queue ──Worker──→ LLM ──Event──→ Client │
│ └── Sofort bestätigt, async verarbeitet │
│ │
│ Vorteile: │
│ ✓ Keine Timeouts │
│ ✓ Retry bei Fehlern │
│ ✓ Horizontal skalierbar │
│ ✓ Lastverteilung │
│ ✓ Entkopplung │
│ │
└─────────────────────────────────────────────────────────────┘Architecture Pattern: AI Processing Pipeline
// src/events/types.ts
interface AIEvent {
id: string;
type: string;
timestamp: Date;
payload: any;
metadata: {
userId?: string;
correlationId: string;
source: string;
};
}
// Event Types
type AIEventType =
| 'analysis.requested'
| 'analysis.started'
| 'analysis.completed'
| 'analysis.failed'
| 'llm.call.started'
| 'llm.call.completed'
| 'notification.send';BullMQ Implementation
Queue Setup
// src/queues/ai-queue.ts
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
const redis = new Redis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null
});
// AI Processing Queue
export const aiQueue = new Queue('ai-processing', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000
},
removeOnComplete: {
count: 1000,
age: 24 * 3600 // 24h
},
removeOnFail: {
count: 5000
}
}
});
// Job hinzufügen
export async function queueAITask(
type: string,
data: any,
priority: number = 0
): Promise<string> {
const job = await aiQueue.add(type, data, {
priority,
jobId: `${type}-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`
});
return job.id!;
}Worker Implementation
// src/workers/ai-worker.ts
import { Worker, Job } from 'bullmq';
import Anthropic from '@anthropic-ai/sdk';
import { eventBus } from './event-bus';
const anthropic = new Anthropic();
export const aiWorker = new Worker(
'ai-processing',
async (job: Job) => {
const { type, data } = job;
// Event: Processing started
await eventBus.emit('analysis.started', {
jobId: job.id,
type
});
try {
switch (type) {
case 'analyze-product':
return await analyzeProduct(job, data);
case 'generate-content':
return await generateContent(job, data);
case 'summarize-document':
return await summarizeDocument(job, data);
default:
throw new Error(`Unknown job type: ${type}`);
}
} catch (error) {
// Event: Processing failed
await eventBus.emit('analysis.failed', {
jobId: job.id,
error: error.message
});
throw error;
}
},
{
connection: redis,
concurrency: 5, // 5 parallele Jobs
limiter: {
max: 10, // Max 10 Jobs
duration: 1000 // pro Sekunde (Rate Limiting)
}
}
);
async function analyzeProduct(job: Job, data: ProductData) {
// Progress tracking
await job.updateProgress(10);
const response = await anthropic.messages.create({
model: 'claude-3-haiku-20240307',
max_tokens: 500,
messages: [{
role: 'user',
content: `Analysiere dieses Produkt: ${JSON.stringify(data)}`
}]
});
await job.updateProgress(90);
const result = {
analysis: response.content[0].text,
confidence: 0.85,
timestamp: new Date()
};
// Event: Processing completed
await eventBus.emit('analysis.completed', {
jobId: job.id,
result
});
await job.updateProgress(100);
return result;
}Event Bus Implementation
// src/events/event-bus.ts
import { EventEmitter } from 'events';
import { Redis } from 'ioredis';
class EventBus extends EventEmitter {
private publisher: Redis;
private subscriber: Redis;
constructor() {
super();
this.publisher = new Redis(process.env.REDIS_URL!);
this.subscriber = new Redis(process.env.REDIS_URL!);
this.setupSubscriber();
}
private setupSubscriber() {
this.subscriber.on('message', (channel, message) => {
const event = JSON.parse(message);
this.emit(event.type, event);
});
}
async subscribe(pattern: string) {
await this.subscriber.psubscribe(pattern);
}
async emit(type: string, payload: any): Promise<void> {
const event: AIEvent = {
id: crypto.randomUUID(),
type,
timestamp: new Date(),
payload,
metadata: {
correlationId: payload.correlationId || crypto.randomUUID(),
source: 'ai-service'
}
};
// Lokal emittieren
super.emit(type, event);
// An andere Services publishen
await this.publisher.publish('ai-events', JSON.stringify(event));
// Event Log speichern
await this.logEvent(event);
}
private async logEvent(event: AIEvent) {
await this.publisher.xadd(
'events-log',
'*',
'event', JSON.stringify(event)
);
}
}
export const eventBus = new EventBus();Real-Time Updates via WebSocket
// src/websocket/event-stream.ts
import { WebSocketServer } from 'ws';
import { eventBus } from '../events/event-bus';
export function setupEventStream(server: any) {
const wss = new WebSocketServer({ server, path: '/events' });
wss.on('connection', (ws, req) => {
const userId = extractUserId(req);
console.log(`Event stream connected: ${userId}`);
// Event Listener für diesen User
const handlers = {
'analysis.started': (event: AIEvent) => {
if (event.metadata.userId === userId) {
ws.send(JSON.stringify(event));
}
},
'analysis.completed': (event: AIEvent) => {
if (event.metadata.userId === userId) {
ws.send(JSON.stringify(event));
}
},
'analysis.failed': (event: AIEvent) => {
if (event.metadata.userId === userId) {
ws.send(JSON.stringify(event));
}
}
};
// Event Listener registrieren
Object.entries(handlers).forEach(([type, handler]) => {
eventBus.on(type, handler);
});
ws.on('close', () => {
// Cleanup
Object.entries(handlers).forEach(([type, handler]) => {
eventBus.off(type, handler);
});
});
});
}Client-Side Integration
// src/client/event-client.ts
class AIEventClient {
private ws: WebSocket | null = null;
private pendingJobs = new Map<string, {
resolve: (result: any) => void;
reject: (error: any) => void;
}>();
connect(url: string) {
this.ws = new WebSocket(url);
this.ws.onmessage = (event) => {
const aiEvent: AIEvent = JSON.parse(event.data);
this.handleEvent(aiEvent);
};
}
private handleEvent(event: AIEvent) {
const jobId = event.payload.jobId;
const pending = this.pendingJobs.get(jobId);
switch (event.type) {
case 'analysis.started':
this.onProgress?.(jobId, 'started');
break;
case 'analysis.completed':
if (pending) {
pending.resolve(event.payload.result);
this.pendingJobs.delete(jobId);
}
break;
case 'analysis.failed':
if (pending) {
pending.reject(new Error(event.payload.error));
this.pendingJobs.delete(jobId);
}
break;
}
}
// Promise-basierte API über Events
async analyzeProduct(product: ProductData): Promise<AnalysisResult> {
// Job einreichen
const response = await fetch('/api/analyze', {
method: 'POST',
body: JSON.stringify(product)
});
const { jobId } = await response.json();
// Auf Event warten
return new Promise((resolve, reject) => {
this.pendingJobs.set(jobId, { resolve, reject });
// Timeout
setTimeout(() => {
if (this.pendingJobs.has(jobId)) {
this.pendingJobs.delete(jobId);
reject(new Error('Analysis timeout'));
}
}, 60000); // 60s Timeout
});
}
onProgress?: (jobId: string, status: string) => void;
}Event Sourcing für AI Decisions
// src/events/event-store.ts
interface AIDecisionEvent {
eventId: string;
aggregateId: string; // z.B. analysisId
type: string;
data: any;
timestamp: Date;
version: number;
}
class AIEventStore {
private redis: Redis;
constructor() {
this.redis = new Redis(process.env.REDIS_URL!);
}
async append(aggregateId: string, events: AIDecisionEvent[]) {
const key = `aggregate:${aggregateId}`;
for (const event of events) {
await this.redis.xadd(
key,
'*',
'data', JSON.stringify(event)
);
}
}
async getEvents(aggregateId: string): Promise<AIDecisionEvent[]> {
const key = `aggregate:${aggregateId}`;
const entries = await this.redis.xrange(key, '-', '+');
return entries.map(([id, fields]) => {
const data = JSON.parse(fields[1]);
return { ...data, eventId: id };
});
}
// Reconstruct State from Events
async reconstruct<T>(
aggregateId: string,
reducer: (state: T, event: AIDecisionEvent) => T,
initialState: T
): Promise<T> {
const events = await this.getEvents(aggregateId);
return events.reduce(reducer, initialState);
}
}
// Beispiel: AI-Analyse mit vollständiger History
const eventStore = new AIEventStore();
// Events speichern
await eventStore.append('analysis-123', [
{ type: 'analysis.created', data: { input: '...' }, ... },
{ type: 'llm.called', data: { model: 'claude-3-haiku', tokens: 500 }, ... },
{ type: 'analysis.completed', data: { result: '...' }, ... }
]);
// State rekonstruieren
const analysisState = await eventStore.reconstruct(
'analysis-123',
(state, event) => {
switch (event.type) {
case 'analysis.created':
return { ...state, input: event.data.input, status: 'pending' };
case 'analysis.completed':
return { ...state, result: event.data.result, status: 'completed' };
default:
return state;
}
},
{ input: null, result: null, status: 'unknown' }
);Scaling Pattern
# docker-compose.yml für skalierbare AI Worker
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
api:
build: .
environment:
- REDIS_URL=redis://redis:6379
ports:
- "3000:3000"
ai-worker:
build: .
command: npm run worker
environment:
- REDIS_URL=redis://redis:6379
- CONCURRENCY=5
deploy:
replicas: 3 # 3 Worker-Instanzen
scheduler:
build: .
command: npm run scheduler
environment:
- REDIS_URL=redis://redis:6379Monitoring
// src/monitoring/queue-metrics.ts
import { aiQueue } from '../queues/ai-queue';
async function getQueueMetrics() {
const [waiting, active, completed, failed] = await Promise.all([
aiQueue.getWaitingCount(),
aiQueue.getActiveCount(),
aiQueue.getCompletedCount(),
aiQueue.getFailedCount()
]);
return {
waiting,
active,
completed,
failed,
throughput: completed / (Date.now() / 1000 / 60) // per minute
};
}
// Prometheus Metrics
import { Counter, Gauge, Histogram } from 'prom-client';
const jobsProcessed = new Counter({
name: 'ai_jobs_processed_total',
help: 'Total AI jobs processed',
labelNames: ['type', 'status']
});
const jobDuration = new Histogram({
name: 'ai_job_duration_seconds',
help: 'AI job processing duration',
labelNames: ['type'],
buckets: [0.5, 1, 2, 5, 10, 30, 60]
});
const queueDepth = new Gauge({
name: 'ai_queue_depth',
help: 'Current queue depth',
labelNames: ['queue']
});Fazit
Event-Driven Architecture für AI-Systeme bietet:
- Resilienz: Automatische Retries, keine Timeouts
- Skalierbarkeit: Horizontale Worker-Skalierung
- Transparenz: Event Sourcing für Audit-Trail
- Real-Time Updates: WebSocket-Events an Clients
Für produktive AI-Anwendungen ist EDA nicht optional – es ist die Grundlage für zuverlässige Systeme.
Bildprompts
- "Event flow diagram with AI processing nodes, message queue visualization, technical architecture"
- "Multiple workers processing AI tasks from central queue, assembly line concept"
- "Event stream timeline with AI decision points highlighted, data visualization style"