1 min read
AutomatisierungWorkflow Orchestration mit Temporal
Workflow Orchestration für komplexe Business Logic. Temporal.io, Durable Workflows, Sagas und Error Recovery.
Workflow OrchestrationTemporalDurable WorkflowsSaga PatternBusiness ProcessError Recovery

Workflow Orchestration mit Temporal
Meta-Description: Workflow Orchestration für komplexe Business Logic. Temporal.io, Durable Workflows, Sagas und Error Recovery.
Keywords: Workflow Orchestration, Temporal, Durable Workflows, Saga Pattern, Business Process, Error Recovery
Einführung
Komplexe Business-Prozesse brauchen mehr als einfache Funktionen. Workflow Orchestration mit Temporal bietet durable Execution, automatische Retries und State Management für langlebige Prozesse.
Warum Workflow Orchestration?
┌─────────────────────────────────────────────────────────────┐
│ WORKFLOW ORCHESTRATION │
├─────────────────────────────────────────────────────────────┤
│ │
│ Probleme ohne Orchestration: │
│ ├── Verteilte Transaktionen │
│ ├── Timeout/Failure Handling │
│ ├── State Management über Zeit │
│ ├── Retries mit Exponential Backoff │
│ └── Compensating Transactions (Rollbacks) │
│ │
│ Temporal Lösung: │
│ ├── Durable Execution (überleben Crashes) │
│ ├── Automatische Retries │
│ ├── Timeouts auf allen Ebenen │
│ ├── Saga Pattern Support │
│ └── Workflow as Code (TypeScript) │
│ │
│ Use Cases: │
│ ├── E-Commerce: Order Processing │
│ ├── Finance: Payment Processing │
│ ├── HR: Onboarding Workflows │
│ ├── DevOps: CI/CD Pipelines │
│ └── Data: ETL Pipelines │
│ │
└─────────────────────────────────────────────────────────────┘Temporal Setup
# Temporal Server (Docker)
docker run --rm -p 7233:7233 temporalio/auto-setup
# SDK installieren
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity// src/activities.ts
import { proxyActivities, defineActivity } from '@temporalio/workflow';
// Activities sind normale async Funktionen
export const activities = {
async sendEmail(to: string, subject: string, body: string): Promise<boolean> {
console.log(`Sending email to ${to}`);
// Resend, SendGrid, etc.
await resend.emails.send({ to, subject, html: body });
return true;
},
async chargePayment(userId: string, amount: number): Promise<string> {
console.log(`Charging ${amount} for user ${userId}`);
// Stripe, PayPal, etc.
const charge = await stripe.charges.create({
amount: Math.round(amount * 100),
currency: 'eur',
customer: userId
});
return charge.id;
},
async reserveInventory(productId: string, quantity: number): Promise<string> {
console.log(`Reserving ${quantity} of ${productId}`);
// Inventory Service
const reservation = await inventoryService.reserve(productId, quantity);
return reservation.id;
},
async releaseInventory(reservationId: string): Promise<void> {
console.log(`Releasing reservation ${reservationId}`);
await inventoryService.release(reservationId);
},
async refundPayment(chargeId: string): Promise<void> {
console.log(`Refunding charge ${chargeId}`);
await stripe.refunds.create({ charge: chargeId });
},
async createShipment(orderId: string, address: Address): Promise<string> {
console.log(`Creating shipment for order ${orderId}`);
const shipment = await shippingService.create({ orderId, address });
return shipment.trackingNumber;
}
};
export type Activities = typeof activities;Workflow Definition
// src/workflows/order-processing.ts
import {
proxyActivities,
sleep,
defineSignal,
defineQuery,
setHandler,
condition,
ApplicationFailure
} from '@temporalio/workflow';
import type { Activities } from '../activities';
// Activity Proxies mit Retry Config
const {
sendEmail,
chargePayment,
reserveInventory,
releaseInventory,
refundPayment,
createShipment
} = proxyActivities<Activities>({
startToCloseTimeout: '1 minute',
retry: {
initialInterval: '1 second',
backoffCoefficient: 2,
maximumAttempts: 5,
nonRetryableErrorTypes: ['InvalidPaymentError']
}
});
// Signals für externe Events
const cancelOrderSignal = defineSignal('cancelOrder');
const orderShippedSignal = defineSignal<[string]>('orderShipped');
// Queries für Status-Abfragen
const getStatusQuery = defineQuery<OrderStatus>('getStatus');
interface OrderInput {
orderId: string;
userId: string;
products: Array<{ productId: string; quantity: number }>;
totalAmount: number;
shippingAddress: Address;
}
interface OrderStatus {
state: 'pending' | 'paid' | 'reserved' | 'shipped' | 'cancelled' | 'failed';
chargeId?: string;
reservationIds?: string[];
trackingNumber?: string;
error?: string;
}
export async function orderProcessingWorkflow(input: OrderInput): Promise<OrderStatus> {
let status: OrderStatus = { state: 'pending' };
let cancelled = false;
// Signal Handler
setHandler(cancelOrderSignal, () => {
cancelled = true;
});
// Query Handler
setHandler(getStatusQuery, () => status);
try {
// 1. Payment
status.chargeId = await chargePayment(input.userId, input.totalAmount);
status.state = 'paid';
if (cancelled) {
await compensatePayment(status.chargeId);
status.state = 'cancelled';
return status;
}
// 2. Inventory Reservation
status.reservationIds = [];
for (const product of input.products) {
const reservationId = await reserveInventory(product.productId, product.quantity);
status.reservationIds.push(reservationId);
}
status.state = 'reserved';
if (cancelled) {
await compensate(status);
status.state = 'cancelled';
return status;
}
// 3. Shipment
status.trackingNumber = await createShipment(input.orderId, input.shippingAddress);
status.state = 'shipped';
// 4. Confirmation Email
await sendEmail(
input.userId,
'Ihre Bestellung wurde versandt',
`Tracking: ${status.trackingNumber}`
);
return status;
} catch (error) {
// Compensating Transactions
await compensate(status);
status.state = 'failed';
status.error = (error as Error).message;
// Notify Customer
await sendEmail(
input.userId,
'Problem mit Ihrer Bestellung',
'Bitte kontaktieren Sie unseren Support.'
);
throw error;
}
}
// Saga Compensation
async function compensate(status: OrderStatus) {
// Reverse order: Shipment → Inventory → Payment
// Release Inventory
if (status.reservationIds) {
for (const reservationId of status.reservationIds) {
await releaseInventory(reservationId);
}
}
// Refund Payment
if (status.chargeId) {
await compensatePayment(status.chargeId);
}
}
async function compensatePayment(chargeId: string) {
await refundPayment(chargeId);
}Worker Setup
// src/worker.ts
import { Worker } from '@temporalio/worker';
import { activities } from './activities';
async function run() {
const worker = await Worker.create({
workflowsPath: require.resolve('./workflows'),
activities,
taskQueue: 'order-processing',
// Worker Options
maxConcurrentActivityExecutions: 10,
maxConcurrentWorkflowTaskExecutions: 5
});
console.log('Worker started');
await worker.run();
}
run().catch((err) => {
console.error(err);
process.exit(1);
});Client Usage
// src/client.ts
import { Client, Connection } from '@temporalio/client';
import { orderProcessingWorkflow } from './workflows/order-processing';
async function main() {
const connection = await Connection.connect({
address: 'localhost:7233'
});
const client = new Client({ connection });
// Workflow starten
const handle = await client.workflow.start(orderProcessingWorkflow, {
taskQueue: 'order-processing',
workflowId: `order-${orderId}`,
args: [{
orderId: '12345',
userId: 'user-789',
products: [
{ productId: 'prod-1', quantity: 2 },
{ productId: 'prod-2', quantity: 1 }
],
totalAmount: 99.99,
shippingAddress: {
street: 'Hauptstraße 1',
city: 'Berlin',
zip: '10115'
}
}]
});
console.log(`Started workflow: ${handle.workflowId}`);
// Status abfragen
const status = await handle.query('getStatus');
console.log('Current status:', status);
// Auf Completion warten
const result = await handle.result();
console.log('Workflow completed:', result);
}
// Order canceln
async function cancelOrder(workflowId: string) {
const handle = client.workflow.getHandle(workflowId);
await handle.signal('cancelOrder');
}API Route Integration
// app/api/orders/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { Client, Connection } from '@temporalio/client';
import { orderProcessingWorkflow } from '@/workflows/order-processing';
let client: Client;
async function getClient() {
if (!client) {
const connection = await Connection.connect({
address: process.env.TEMPORAL_ADDRESS || 'localhost:7233'
});
client = new Client({ connection });
}
return client;
}
export async function POST(request: NextRequest) {
const body = await request.json();
const client = await getClient();
const handle = await client.workflow.start(orderProcessingWorkflow, {
taskQueue: 'order-processing',
workflowId: `order-${body.orderId}`,
args: [body]
});
return NextResponse.json({
workflowId: handle.workflowId,
status: 'started'
});
}
export async function GET(request: NextRequest) {
const workflowId = request.nextUrl.searchParams.get('workflowId');
if (!workflowId) {
return NextResponse.json({ error: 'workflowId required' }, { status: 400 });
}
const client = await getClient();
const handle = client.workflow.getHandle(workflowId);
try {
const status = await handle.query('getStatus');
return NextResponse.json({ workflowId, status });
} catch (error) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 });
}
}Timer & Scheduling
// src/workflows/subscription-renewal.ts
import { proxyActivities, sleep, continueAsNew } from '@temporalio/workflow';
export async function subscriptionRenewalWorkflow(
userId: string,
planId: string
): Promise<void> {
const { chargeSubscription, sendRenewalReminder, sendRenewalConfirmation } =
proxyActivities<Activities>({
startToCloseTimeout: '1 minute'
});
// Reminder 7 Tage vor Ablauf
await sleep('23 days');
await sendRenewalReminder(userId);
// Warte bis Ablauf
await sleep('7 days');
// Charge
await chargeSubscription(userId, planId);
await sendRenewalConfirmation(userId);
// Workflow für nächsten Monat fortsetzen
await continueAsNew<typeof subscriptionRenewalWorkflow>(userId, planId);
}Error Handling & Compensation
// Saga Pattern mit expliziter Compensation
import { proxyActivities, ApplicationFailure } from '@temporalio/workflow';
interface SagaStep<T> {
execute: () => Promise<T>;
compensate: (result: T) => Promise<void>;
}
async function runSaga<T>(steps: SagaStep<T>[]): Promise<T[]> {
const results: T[] = [];
const completedSteps: SagaStep<T>[] = [];
try {
for (const step of steps) {
const result = await step.execute();
results.push(result);
completedSteps.push(step);
}
return results;
} catch (error) {
// Compensate in reverse order
for (let i = completedSteps.length - 1; i >= 0; i--) {
const step = completedSteps[i];
const result = results[i];
try {
await step.compensate(result);
} catch (compensationError) {
console.error('Compensation failed:', compensationError);
// Log aber nicht re-throw
}
}
throw error;
}
}
// Verwendung
export async function bookingWorkflow(input: BookingInput) {
const results = await runSaga([
{
execute: () => reserveHotel(input.hotelId, input.dates),
compensate: (reservationId) => cancelHotelReservation(reservationId)
},
{
execute: () => reserveFlight(input.flightId),
compensate: (reservationId) => cancelFlightReservation(reservationId)
},
{
execute: () => chargeCustomer(input.customerId, input.total),
compensate: (chargeId) => refundCustomer(chargeId)
}
]);
return {
hotelReservation: results[0],
flightReservation: results[1],
chargeId: results[2]
};
}Fazit
Temporal Workflow Orchestration bietet:
- Durability: Workflows überleben Server-Crashes
- Visibility: Vollständige History & State
- Reliability: Automatische Retries & Timeouts
- Scalability: Horizontale Worker-Skalierung
Für komplexe Business-Prozesse unverzichtbar.
Bildprompts
- "Workflow steps connected with arrows, showing state persistence"
- "Saga pattern with forward and backward arrows, compensation flow"
- "Multiple workers processing workflow tasks, distributed orchestration"