Menu
Nazad na Blog
1 min read
Automatisierung

Workflow Orchestration mit Temporal

Workflow Orchestration für komplexe Business Logic. Temporal.io, Durable Workflows, Sagas und Error Recovery.

Workflow OrchestrationTemporalDurable WorkflowsSaga PatternBusiness ProcessError Recovery
Workflow Orchestration mit Temporal

Workflow Orchestration mit Temporal

Meta-Description: Workflow Orchestration für komplexe Business Logic. Temporal.io, Durable Workflows, Sagas und Error Recovery.

Keywords: Workflow Orchestration, Temporal, Durable Workflows, Saga Pattern, Business Process, Error Recovery


Einführung

Komplexe Business-Prozesse brauchen mehr als einfache Funktionen. Workflow Orchestration mit Temporal bietet durable Execution, automatische Retries und State Management für langlebige Prozesse.


Warum Workflow Orchestration?

┌─────────────────────────────────────────────────────────────┐
│              WORKFLOW ORCHESTRATION                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Probleme ohne Orchestration:                               │
│  ├── Verteilte Transaktionen                               │
│  ├── Timeout/Failure Handling                              │
│  ├── State Management über Zeit                            │
│  ├── Retries mit Exponential Backoff                       │
│  └── Compensating Transactions (Rollbacks)                 │
│                                                             │
│  Temporal Lösung:                                           │
│  ├── Durable Execution (überleben Crashes)                 │
│  ├── Automatische Retries                                  │
│  ├── Timeouts auf allen Ebenen                             │
│  ├── Saga Pattern Support                                  │
│  └── Workflow as Code (TypeScript)                         │
│                                                             │
│  Use Cases:                                                 │
│  ├── E-Commerce: Order Processing                          │
│  ├── Finance: Payment Processing                           │
│  ├── HR: Onboarding Workflows                              │
│  ├── DevOps: CI/CD Pipelines                               │
│  └── Data: ETL Pipelines                                   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Temporal Setup

# Temporal Server (Docker)
docker run --rm -p 7233:7233 temporalio/auto-setup

# SDK installieren
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity
// src/activities.ts
import { proxyActivities, defineActivity } from '@temporalio/workflow';

// Activities sind normale async Funktionen
export const activities = {
  async sendEmail(to: string, subject: string, body: string): Promise<boolean> {
    console.log(`Sending email to ${to}`);
    // Resend, SendGrid, etc.
    await resend.emails.send({ to, subject, html: body });
    return true;
  },

  async chargePayment(userId: string, amount: number): Promise<string> {
    console.log(`Charging ${amount} for user ${userId}`);
    // Stripe, PayPal, etc.
    const charge = await stripe.charges.create({
      amount: Math.round(amount * 100),
      currency: 'eur',
      customer: userId
    });
    return charge.id;
  },

  async reserveInventory(productId: string, quantity: number): Promise<string> {
    console.log(`Reserving ${quantity} of ${productId}`);
    // Inventory Service
    const reservation = await inventoryService.reserve(productId, quantity);
    return reservation.id;
  },

  async releaseInventory(reservationId: string): Promise<void> {
    console.log(`Releasing reservation ${reservationId}`);
    await inventoryService.release(reservationId);
  },

  async refundPayment(chargeId: string): Promise<void> {
    console.log(`Refunding charge ${chargeId}`);
    await stripe.refunds.create({ charge: chargeId });
  },

  async createShipment(orderId: string, address: Address): Promise<string> {
    console.log(`Creating shipment for order ${orderId}`);
    const shipment = await shippingService.create({ orderId, address });
    return shipment.trackingNumber;
  }
};

export type Activities = typeof activities;

Workflow Definition

// src/workflows/order-processing.ts
import {
  proxyActivities,
  sleep,
  defineSignal,
  defineQuery,
  setHandler,
  condition,
  ApplicationFailure
} from '@temporalio/workflow';
import type { Activities } from '../activities';

// Activity Proxies mit Retry Config
const {
  sendEmail,
  chargePayment,
  reserveInventory,
  releaseInventory,
  refundPayment,
  createShipment
} = proxyActivities<Activities>({
  startToCloseTimeout: '1 minute',
  retry: {
    initialInterval: '1 second',
    backoffCoefficient: 2,
    maximumAttempts: 5,
    nonRetryableErrorTypes: ['InvalidPaymentError']
  }
});

// Signals für externe Events
const cancelOrderSignal = defineSignal('cancelOrder');
const orderShippedSignal = defineSignal<[string]>('orderShipped');

// Queries für Status-Abfragen
const getStatusQuery = defineQuery<OrderStatus>('getStatus');

interface OrderInput {
  orderId: string;
  userId: string;
  products: Array<{ productId: string; quantity: number }>;
  totalAmount: number;
  shippingAddress: Address;
}

interface OrderStatus {
  state: 'pending' | 'paid' | 'reserved' | 'shipped' | 'cancelled' | 'failed';
  chargeId?: string;
  reservationIds?: string[];
  trackingNumber?: string;
  error?: string;
}

export async function orderProcessingWorkflow(input: OrderInput): Promise<OrderStatus> {
  let status: OrderStatus = { state: 'pending' };
  let cancelled = false;

  // Signal Handler
  setHandler(cancelOrderSignal, () => {
    cancelled = true;
  });

  // Query Handler
  setHandler(getStatusQuery, () => status);

  try {
    // 1. Payment
    status.chargeId = await chargePayment(input.userId, input.totalAmount);
    status.state = 'paid';

    if (cancelled) {
      await compensatePayment(status.chargeId);
      status.state = 'cancelled';
      return status;
    }

    // 2. Inventory Reservation
    status.reservationIds = [];
    for (const product of input.products) {
      const reservationId = await reserveInventory(product.productId, product.quantity);
      status.reservationIds.push(reservationId);
    }
    status.state = 'reserved';

    if (cancelled) {
      await compensate(status);
      status.state = 'cancelled';
      return status;
    }

    // 3. Shipment
    status.trackingNumber = await createShipment(input.orderId, input.shippingAddress);
    status.state = 'shipped';

    // 4. Confirmation Email
    await sendEmail(
      input.userId,
      'Ihre Bestellung wurde versandt',
      `Tracking: ${status.trackingNumber}`
    );

    return status;
  } catch (error) {
    // Compensating Transactions
    await compensate(status);
    status.state = 'failed';
    status.error = (error as Error).message;

    // Notify Customer
    await sendEmail(
      input.userId,
      'Problem mit Ihrer Bestellung',
      'Bitte kontaktieren Sie unseren Support.'
    );

    throw error;
  }
}

// Saga Compensation
async function compensate(status: OrderStatus) {
  // Reverse order: Shipment → Inventory → Payment

  // Release Inventory
  if (status.reservationIds) {
    for (const reservationId of status.reservationIds) {
      await releaseInventory(reservationId);
    }
  }

  // Refund Payment
  if (status.chargeId) {
    await compensatePayment(status.chargeId);
  }
}

async function compensatePayment(chargeId: string) {
  await refundPayment(chargeId);
}

Worker Setup

// src/worker.ts
import { Worker } from '@temporalio/worker';
import { activities } from './activities';

async function run() {
  const worker = await Worker.create({
    workflowsPath: require.resolve('./workflows'),
    activities,
    taskQueue: 'order-processing',
    // Worker Options
    maxConcurrentActivityExecutions: 10,
    maxConcurrentWorkflowTaskExecutions: 5
  });

  console.log('Worker started');
  await worker.run();
}

run().catch((err) => {
  console.error(err);
  process.exit(1);
});

Client Usage

// src/client.ts
import { Client, Connection } from '@temporalio/client';
import { orderProcessingWorkflow } from './workflows/order-processing';

async function main() {
  const connection = await Connection.connect({
    address: 'localhost:7233'
  });

  const client = new Client({ connection });

  // Workflow starten
  const handle = await client.workflow.start(orderProcessingWorkflow, {
    taskQueue: 'order-processing',
    workflowId: `order-${orderId}`,
    args: [{
      orderId: '12345',
      userId: 'user-789',
      products: [
        { productId: 'prod-1', quantity: 2 },
        { productId: 'prod-2', quantity: 1 }
      ],
      totalAmount: 99.99,
      shippingAddress: {
        street: 'Hauptstraße 1',
        city: 'Berlin',
        zip: '10115'
      }
    }]
  });

  console.log(`Started workflow: ${handle.workflowId}`);

  // Status abfragen
  const status = await handle.query('getStatus');
  console.log('Current status:', status);

  // Auf Completion warten
  const result = await handle.result();
  console.log('Workflow completed:', result);
}

// Order canceln
async function cancelOrder(workflowId: string) {
  const handle = client.workflow.getHandle(workflowId);
  await handle.signal('cancelOrder');
}

API Route Integration

// app/api/orders/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { Client, Connection } from '@temporalio/client';
import { orderProcessingWorkflow } from '@/workflows/order-processing';

let client: Client;

async function getClient() {
  if (!client) {
    const connection = await Connection.connect({
      address: process.env.TEMPORAL_ADDRESS || 'localhost:7233'
    });
    client = new Client({ connection });
  }
  return client;
}

export async function POST(request: NextRequest) {
  const body = await request.json();

  const client = await getClient();

  const handle = await client.workflow.start(orderProcessingWorkflow, {
    taskQueue: 'order-processing',
    workflowId: `order-${body.orderId}`,
    args: [body]
  });

  return NextResponse.json({
    workflowId: handle.workflowId,
    status: 'started'
  });
}

export async function GET(request: NextRequest) {
  const workflowId = request.nextUrl.searchParams.get('workflowId');

  if (!workflowId) {
    return NextResponse.json({ error: 'workflowId required' }, { status: 400 });
  }

  const client = await getClient();
  const handle = client.workflow.getHandle(workflowId);

  try {
    const status = await handle.query('getStatus');
    return NextResponse.json({ workflowId, status });
  } catch (error) {
    return NextResponse.json({ error: 'Workflow not found' }, { status: 404 });
  }
}

Timer & Scheduling

// src/workflows/subscription-renewal.ts
import { proxyActivities, sleep, continueAsNew } from '@temporalio/workflow';

export async function subscriptionRenewalWorkflow(
  userId: string,
  planId: string
): Promise<void> {
  const { chargeSubscription, sendRenewalReminder, sendRenewalConfirmation } =
    proxyActivities<Activities>({
      startToCloseTimeout: '1 minute'
    });

  // Reminder 7 Tage vor Ablauf
  await sleep('23 days');
  await sendRenewalReminder(userId);

  // Warte bis Ablauf
  await sleep('7 days');

  // Charge
  await chargeSubscription(userId, planId);
  await sendRenewalConfirmation(userId);

  // Workflow für nächsten Monat fortsetzen
  await continueAsNew<typeof subscriptionRenewalWorkflow>(userId, planId);
}

Error Handling & Compensation

// Saga Pattern mit expliziter Compensation
import { proxyActivities, ApplicationFailure } from '@temporalio/workflow';

interface SagaStep<T> {
  execute: () => Promise<T>;
  compensate: (result: T) => Promise<void>;
}

async function runSaga<T>(steps: SagaStep<T>[]): Promise<T[]> {
  const results: T[] = [];
  const completedSteps: SagaStep<T>[] = [];

  try {
    for (const step of steps) {
      const result = await step.execute();
      results.push(result);
      completedSteps.push(step);
    }
    return results;
  } catch (error) {
    // Compensate in reverse order
    for (let i = completedSteps.length - 1; i >= 0; i--) {
      const step = completedSteps[i];
      const result = results[i];
      try {
        await step.compensate(result);
      } catch (compensationError) {
        console.error('Compensation failed:', compensationError);
        // Log aber nicht re-throw
      }
    }
    throw error;
  }
}

// Verwendung
export async function bookingWorkflow(input: BookingInput) {
  const results = await runSaga([
    {
      execute: () => reserveHotel(input.hotelId, input.dates),
      compensate: (reservationId) => cancelHotelReservation(reservationId)
    },
    {
      execute: () => reserveFlight(input.flightId),
      compensate: (reservationId) => cancelFlightReservation(reservationId)
    },
    {
      execute: () => chargeCustomer(input.customerId, input.total),
      compensate: (chargeId) => refundCustomer(chargeId)
    }
  ]);

  return {
    hotelReservation: results[0],
    flightReservation: results[1],
    chargeId: results[2]
  };
}

Fazit

Temporal Workflow Orchestration bietet:

  1. Durability: Workflows überleben Server-Crashes
  2. Visibility: Vollständige History & State
  3. Reliability: Automatische Retries & Timeouts
  4. Scalability: Horizontale Worker-Skalierung

Für komplexe Business-Prozesse unverzichtbar.


Bildprompts

  1. "Workflow steps connected with arrows, showing state persistence"
  2. "Saga pattern with forward and backward arrows, compensation flow"
  3. "Multiple workers processing workflow tasks, distributed orchestration"

Quellen