Menu
Nazad na Blog
1 min read
KI-Entwicklung

Background Jobs & Queues mit BullMQ

Background Job Processing mit BullMQ und Redis. Queues, Workers, Retries und Job Scheduling für Node.js.

BullMQBackground JobsQueue ProcessingRedis QueueJob SchedulingWorker Threads
Background Jobs & Queues mit BullMQ

Background Jobs & Queues mit BullMQ

Meta-Description: Background Job Processing mit BullMQ und Redis. Queues, Workers, Retries und Job Scheduling für Node.js.

Keywords: BullMQ, Background Jobs, Queue Processing, Redis Queue, Job Scheduling, Worker Threads, Async Processing


Einführung

Nicht jede Aufgabe sollte im Request-Response-Zyklus laufen. Background Jobs ermöglichen Email-Versand, Image Processing, Reports – alles asynchron mit Retries, Prioritäten und Scheduling.


Warum Background Jobs?

┌─────────────────────────────────────────────────────────────┐
│              SYNCHRON vs ASYNCHRON                          │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Synchron (schlecht für):                                   │
│  ├── Email-Versand (3-5s)                                  │
│  ├── Image/Video Processing (10s+)                         │
│  ├── PDF Generation (2-5s)                                 │
│  ├── Externe API Calls (variabel)                          │
│  └── Batch Operations (Minuten)                            │
│                                                             │
│  User wartet... ⏳                                          │
│                                                             │
│  Asynchron (Background):                                    │
│  ├── Request → Response (50ms)                             │
│  ├── Job in Queue → Worker verarbeitet                     │
│  ├── Retries bei Fehlern                                   │
│  ├── Rate Limiting                                         │
│  └── Scheduling (Cron)                                     │
│                                                             │
│  User bekommt sofort Antwort! ✓                            │
│                                                             │
└─────────────────────────────────────────────────────────────┘

BullMQ Setup

npm install bullmq ioredis
// lib/queue.ts
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';

const connection = new IORedis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null
});

// Queue definieren
export const emailQueue = new Queue('email', { connection });
export const imageQueue = new Queue('image-processing', { connection });
export const reportQueue = new Queue('reports', { connection });

// Queue Events (für Monitoring)
export const emailQueueEvents = new QueueEvents('email', { connection });

Job Types definieren

// types/jobs.ts
export interface EmailJob {
  type: 'welcome' | 'reset-password' | 'notification';
  to: string;
  subject: string;
  template: string;
  data: Record<string, unknown>;
}

export interface ImageProcessingJob {
  imageId: string;
  operations: Array<{
    type: 'resize' | 'crop' | 'watermark';
    params: Record<string, unknown>;
  }>;
  outputFormat: 'webp' | 'avif' | 'jpeg';
}

export interface ReportJob {
  reportType: 'daily' | 'weekly' | 'monthly';
  userId: string;
  dateRange: {
    start: Date;
    end: Date;
  };
}

Jobs hinzufügen

// services/email.ts
import { emailQueue } from '@/lib/queue';
import { EmailJob } from '@/types/jobs';

export async function sendWelcomeEmail(userId: string, email: string) {
  const job = await emailQueue.add('send-email', {
    type: 'welcome',
    to: email,
    subject: 'Welcome to Our Platform!',
    template: 'welcome',
    data: { userId }
  } satisfies EmailJob, {
    // Job Options
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 1000  // 1s, 2s, 4s
    },
    removeOnComplete: {
      count: 1000  // Behalte letzte 1000 completed Jobs
    },
    removeOnFail: {
      count: 5000  // Behalte letzte 5000 failed Jobs
    }
  });

  return job.id;
}

// Mit Priorität
export async function sendUrgentEmail(data: EmailJob) {
  await emailQueue.add('send-email', data, {
    priority: 1  // Niedrigere Zahl = höhere Priorität
  });
}

// Verzögerter Job
export async function scheduleEmail(data: EmailJob, delay: number) {
  await emailQueue.add('send-email', data, {
    delay  // Millisekunden
  });
}

// Geplanter Job (Cron)
export async function scheduleReports() {
  await reportQueue.add('generate-report', {
    reportType: 'daily',
    // ...
  }, {
    repeat: {
      pattern: '0 8 * * *'  // Täglich um 8:00
    }
  });
}

Worker implementieren

// workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';
import { Resend } from 'resend';
import { EmailJob } from '@/types/jobs';

const connection = new IORedis(process.env.REDIS_URL!, {
  maxRetriesPerRequest: null
});

const resend = new Resend(process.env.RESEND_API_KEY);

const emailWorker = new Worker<EmailJob>(
  'email',
  async (job: Job<EmailJob>) => {
    console.log(`Processing job ${job.id}: ${job.data.type}`);

    const { to, subject, template, data } = job.data;

    // Template rendern
    const html = await renderTemplate(template, data);

    // Email senden
    const result = await resend.emails.send({
      from: 'noreply@example.com',
      to,
      subject,
      html
    });

    console.log(`Email sent: ${result.id}`);

    return { emailId: result.id };
  },
  {
    connection,
    concurrency: 5,  // 5 Jobs parallel
    limiter: {
      max: 100,       // Max 100 Jobs
      duration: 60000 // Pro Minute (Rate Limiting)
    }
  }
);

// Event Handlers
emailWorker.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result:`, result);
});

emailWorker.on('failed', (job, error) => {
  console.error(`Job ${job?.id} failed:`, error.message);
});

emailWorker.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

export { emailWorker };
// workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import sharp from 'sharp';
import { ImageProcessingJob } from '@/types/jobs';

const imageWorker = new Worker<ImageProcessingJob>(
  'image-processing',
  async (job: Job<ImageProcessingJob>) => {
    const { imageId, operations, outputFormat } = job.data;

    // Bild laden
    const image = await loadImage(imageId);
    let pipeline = sharp(image);

    // Operationen anwenden
    for (let i = 0; i < operations.length; i++) {
      const op = operations[i];

      // Progress updaten
      await job.updateProgress(Math.round((i / operations.length) * 100));

      switch (op.type) {
        case 'resize':
          pipeline = pipeline.resize(op.params.width, op.params.height);
          break;
        case 'crop':
          pipeline = pipeline.extract(op.params);
          break;
        case 'watermark':
          pipeline = pipeline.composite([{
            input: await loadWatermark(),
            gravity: 'southeast'
          }]);
          break;
      }
    }

    // Output
    const output = await pipeline[outputFormat]().toBuffer();

    // Speichern
    const url = await uploadToStorage(output, `${imageId}.${outputFormat}`);

    return { url, size: output.length };
  },
  {
    connection,
    concurrency: 2  // CPU-intensive, weniger parallel
  }
);

Job Progress & Events

// API Route: Job Status
// app/api/jobs/[id]/route.ts
import { emailQueue } from '@/lib/queue';

export async function GET(
  request: Request,
  { params }: { params: { id: string } }
) {
  const job = await emailQueue.getJob(params.id);

  if (!job) {
    return Response.json({ error: 'Job not found' }, { status: 404 });
  }

  const state = await job.getState();
  const progress = job.progress;

  return Response.json({
    id: job.id,
    state,
    progress,
    data: job.data,
    returnValue: job.returnvalue,
    failedReason: job.failedReason,
    timestamp: job.timestamp,
    processedOn: job.processedOn,
    finishedOn: job.finishedOn
  });
}
// Real-time Updates mit SSE
// app/api/jobs/[id]/stream/route.ts
import { emailQueueEvents } from '@/lib/queue';

export async function GET(
  request: Request,
  { params }: { params: { id: string } }
) {
  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    start(controller) {
      const handleCompleted = ({ jobId, returnvalue }) => {
        if (jobId === params.id) {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify({
              event: 'completed',
              result: returnvalue
            })}\n\n`)
          );
          controller.close();
        }
      };

      const handleFailed = ({ jobId, failedReason }) => {
        if (jobId === params.id) {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify({
              event: 'failed',
              error: failedReason
            })}\n\n`)
          );
          controller.close();
        }
      };

      const handleProgress = ({ jobId, data }) => {
        if (jobId === params.id) {
          controller.enqueue(
            encoder.encode(`data: ${JSON.stringify({
              event: 'progress',
              progress: data
            })}\n\n`)
          );
        }
      };

      emailQueueEvents.on('completed', handleCompleted);
      emailQueueEvents.on('failed', handleFailed);
      emailQueueEvents.on('progress', handleProgress);

      return () => {
        emailQueueEvents.off('completed', handleCompleted);
        emailQueueEvents.off('failed', handleFailed);
        emailQueueEvents.off('progress', handleProgress);
      };
    }
  });

  return new Response(stream, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive'
    }
  });
}

Job Scheduling (Cron)

// Wiederkehrende Jobs einrichten
async function setupScheduledJobs() {
  // Täglich um 8:00 UTC
  await reportQueue.add('daily-report', {
    reportType: 'daily'
  }, {
    repeat: {
      pattern: '0 8 * * *'
    },
    jobId: 'daily-report'  // Verhindert Duplikate
  });

  // Wöchentlich Montag 9:00
  await reportQueue.add('weekly-report', {
    reportType: 'weekly'
  }, {
    repeat: {
      pattern: '0 9 * * 1'
    },
    jobId: 'weekly-report'
  });

  // Alle 5 Minuten
  await emailQueue.add('process-email-queue', {}, {
    repeat: {
      every: 5 * 60 * 1000
    }
  });
}

// Scheduled Jobs verwalten
async function listScheduledJobs() {
  const repeatableJobs = await emailQueue.getRepeatableJobs();
  return repeatableJobs;
}

async function removeScheduledJob(key: string) {
  await emailQueue.removeRepeatableByKey(key);
}

Dashboard & Monitoring

// Bull Board Integration
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';

const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

createBullBoard({
  queues: [
    new BullMQAdapter(emailQueue),
    new BullMQAdapter(imageQueue),
    new BullMQAdapter(reportQueue)
  ],
  serverAdapter
});

// In Express/Hono einbinden
app.use('/admin/queues', serverAdapter.getRouter());

Best Practices

// 1. Idempotente Jobs (können mehrfach ausgeführt werden)
async function processOrder(job: Job<{ orderId: string }>) {
  const order = await db.order.findUnique({
    where: { id: job.data.orderId }
  });

  // Prüfen ob schon verarbeitet
  if (order?.status === 'processed') {
    return { skipped: true, reason: 'Already processed' };
  }

  // Verarbeiten
  await db.order.update({
    where: { id: order.id },
    data: { status: 'processed' }
  });
}

// 2. Graceful Shutdown
process.on('SIGTERM', async () => {
  console.log('Shutting down workers...');
  await emailWorker.close();
  await imageWorker.close();
  process.exit(0);
});

// 3. Dead Letter Queue
const dlQueue = new Queue('dead-letter', { connection });

emailWorker.on('failed', async (job, error) => {
  if (job.attemptsMade >= job.opts.attempts!) {
    await dlQueue.add('failed-email', {
      originalJob: job.data,
      error: error.message,
      failedAt: new Date()
    });
  }
});

Fazit

BullMQ bietet:

  1. Zuverlässigkeit: Retries, Persistence, Acknowledgement
  2. Skalierbarkeit: Mehrere Worker, Rate Limiting
  3. Flexibilität: Scheduling, Prioritäten, Progress
  4. Monitoring: Bull Board, Events, Metrics

Unverzichtbar für Production-Grade Async Processing.


Bildprompts

  1. "Queue of tasks being processed by workers, assembly line concept"
  2. "Background process running while user continues, async workflow"
  3. "Redis connecting multiple workers, distributed job processing"

Quellen