1 min read
KI-EntwicklungBackground Jobs & Queues mit BullMQ
Background Job Processing mit BullMQ und Redis. Queues, Workers, Retries und Job Scheduling für Node.js.
BullMQBackground JobsQueue ProcessingRedis QueueJob SchedulingWorker Threads

Background Jobs & Queues mit BullMQ
Meta-Description: Background Job Processing mit BullMQ und Redis. Queues, Workers, Retries und Job Scheduling für Node.js.
Keywords: BullMQ, Background Jobs, Queue Processing, Redis Queue, Job Scheduling, Worker Threads, Async Processing
Einführung
Nicht jede Aufgabe sollte im Request-Response-Zyklus laufen. Background Jobs ermöglichen Email-Versand, Image Processing, Reports – alles asynchron mit Retries, Prioritäten und Scheduling.
Warum Background Jobs?
┌─────────────────────────────────────────────────────────────┐
│ SYNCHRON vs ASYNCHRON │
├─────────────────────────────────────────────────────────────┤
│ │
│ Synchron (schlecht für): │
│ ├── Email-Versand (3-5s) │
│ ├── Image/Video Processing (10s+) │
│ ├── PDF Generation (2-5s) │
│ ├── Externe API Calls (variabel) │
│ └── Batch Operations (Minuten) │
│ │
│ User wartet... ⏳ │
│ │
│ Asynchron (Background): │
│ ├── Request → Response (50ms) │
│ ├── Job in Queue → Worker verarbeitet │
│ ├── Retries bei Fehlern │
│ ├── Rate Limiting │
│ └── Scheduling (Cron) │
│ │
│ User bekommt sofort Antwort! ✓ │
│ │
└─────────────────────────────────────────────────────────────┘BullMQ Setup
npm install bullmq ioredis// lib/queue.ts
import { Queue, Worker, QueueEvents } from 'bullmq';
import IORedis from 'ioredis';
const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null
});
// Queue definieren
export const emailQueue = new Queue('email', { connection });
export const imageQueue = new Queue('image-processing', { connection });
export const reportQueue = new Queue('reports', { connection });
// Queue Events (für Monitoring)
export const emailQueueEvents = new QueueEvents('email', { connection });Job Types definieren
// types/jobs.ts
export interface EmailJob {
type: 'welcome' | 'reset-password' | 'notification';
to: string;
subject: string;
template: string;
data: Record<string, unknown>;
}
export interface ImageProcessingJob {
imageId: string;
operations: Array<{
type: 'resize' | 'crop' | 'watermark';
params: Record<string, unknown>;
}>;
outputFormat: 'webp' | 'avif' | 'jpeg';
}
export interface ReportJob {
reportType: 'daily' | 'weekly' | 'monthly';
userId: string;
dateRange: {
start: Date;
end: Date;
};
}Jobs hinzufügen
// services/email.ts
import { emailQueue } from '@/lib/queue';
import { EmailJob } from '@/types/jobs';
export async function sendWelcomeEmail(userId: string, email: string) {
const job = await emailQueue.add('send-email', {
type: 'welcome',
to: email,
subject: 'Welcome to Our Platform!',
template: 'welcome',
data: { userId }
} satisfies EmailJob, {
// Job Options
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000 // 1s, 2s, 4s
},
removeOnComplete: {
count: 1000 // Behalte letzte 1000 completed Jobs
},
removeOnFail: {
count: 5000 // Behalte letzte 5000 failed Jobs
}
});
return job.id;
}
// Mit Priorität
export async function sendUrgentEmail(data: EmailJob) {
await emailQueue.add('send-email', data, {
priority: 1 // Niedrigere Zahl = höhere Priorität
});
}
// Verzögerter Job
export async function scheduleEmail(data: EmailJob, delay: number) {
await emailQueue.add('send-email', data, {
delay // Millisekunden
});
}
// Geplanter Job (Cron)
export async function scheduleReports() {
await reportQueue.add('generate-report', {
reportType: 'daily',
// ...
}, {
repeat: {
pattern: '0 8 * * *' // Täglich um 8:00
}
});
}Worker implementieren
// workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import IORedis from 'ioredis';
import { Resend } from 'resend';
import { EmailJob } from '@/types/jobs';
const connection = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null
});
const resend = new Resend(process.env.RESEND_API_KEY);
const emailWorker = new Worker<EmailJob>(
'email',
async (job: Job<EmailJob>) => {
console.log(`Processing job ${job.id}: ${job.data.type}`);
const { to, subject, template, data } = job.data;
// Template rendern
const html = await renderTemplate(template, data);
// Email senden
const result = await resend.emails.send({
from: 'noreply@example.com',
to,
subject,
html
});
console.log(`Email sent: ${result.id}`);
return { emailId: result.id };
},
{
connection,
concurrency: 5, // 5 Jobs parallel
limiter: {
max: 100, // Max 100 Jobs
duration: 60000 // Pro Minute (Rate Limiting)
}
}
);
// Event Handlers
emailWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result:`, result);
});
emailWorker.on('failed', (job, error) => {
console.error(`Job ${job?.id} failed:`, error.message);
});
emailWorker.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
export { emailWorker };// workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import sharp from 'sharp';
import { ImageProcessingJob } from '@/types/jobs';
const imageWorker = new Worker<ImageProcessingJob>(
'image-processing',
async (job: Job<ImageProcessingJob>) => {
const { imageId, operations, outputFormat } = job.data;
// Bild laden
const image = await loadImage(imageId);
let pipeline = sharp(image);
// Operationen anwenden
for (let i = 0; i < operations.length; i++) {
const op = operations[i];
// Progress updaten
await job.updateProgress(Math.round((i / operations.length) * 100));
switch (op.type) {
case 'resize':
pipeline = pipeline.resize(op.params.width, op.params.height);
break;
case 'crop':
pipeline = pipeline.extract(op.params);
break;
case 'watermark':
pipeline = pipeline.composite([{
input: await loadWatermark(),
gravity: 'southeast'
}]);
break;
}
}
// Output
const output = await pipeline[outputFormat]().toBuffer();
// Speichern
const url = await uploadToStorage(output, `${imageId}.${outputFormat}`);
return { url, size: output.length };
},
{
connection,
concurrency: 2 // CPU-intensive, weniger parallel
}
);Job Progress & Events
// API Route: Job Status
// app/api/jobs/[id]/route.ts
import { emailQueue } from '@/lib/queue';
export async function GET(
request: Request,
{ params }: { params: { id: string } }
) {
const job = await emailQueue.getJob(params.id);
if (!job) {
return Response.json({ error: 'Job not found' }, { status: 404 });
}
const state = await job.getState();
const progress = job.progress;
return Response.json({
id: job.id,
state,
progress,
data: job.data,
returnValue: job.returnvalue,
failedReason: job.failedReason,
timestamp: job.timestamp,
processedOn: job.processedOn,
finishedOn: job.finishedOn
});
}// Real-time Updates mit SSE
// app/api/jobs/[id]/stream/route.ts
import { emailQueueEvents } from '@/lib/queue';
export async function GET(
request: Request,
{ params }: { params: { id: string } }
) {
const encoder = new TextEncoder();
const stream = new ReadableStream({
start(controller) {
const handleCompleted = ({ jobId, returnvalue }) => {
if (jobId === params.id) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({
event: 'completed',
result: returnvalue
})}\n\n`)
);
controller.close();
}
};
const handleFailed = ({ jobId, failedReason }) => {
if (jobId === params.id) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({
event: 'failed',
error: failedReason
})}\n\n`)
);
controller.close();
}
};
const handleProgress = ({ jobId, data }) => {
if (jobId === params.id) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({
event: 'progress',
progress: data
})}\n\n`)
);
}
};
emailQueueEvents.on('completed', handleCompleted);
emailQueueEvents.on('failed', handleFailed);
emailQueueEvents.on('progress', handleProgress);
return () => {
emailQueueEvents.off('completed', handleCompleted);
emailQueueEvents.off('failed', handleFailed);
emailQueueEvents.off('progress', handleProgress);
};
}
});
return new Response(stream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
});
}Job Scheduling (Cron)
// Wiederkehrende Jobs einrichten
async function setupScheduledJobs() {
// Täglich um 8:00 UTC
await reportQueue.add('daily-report', {
reportType: 'daily'
}, {
repeat: {
pattern: '0 8 * * *'
},
jobId: 'daily-report' // Verhindert Duplikate
});
// Wöchentlich Montag 9:00
await reportQueue.add('weekly-report', {
reportType: 'weekly'
}, {
repeat: {
pattern: '0 9 * * 1'
},
jobId: 'weekly-report'
});
// Alle 5 Minuten
await emailQueue.add('process-email-queue', {}, {
repeat: {
every: 5 * 60 * 1000
}
});
}
// Scheduled Jobs verwalten
async function listScheduledJobs() {
const repeatableJobs = await emailQueue.getRepeatableJobs();
return repeatableJobs;
}
async function removeScheduledJob(key: string) {
await emailQueue.removeRepeatableByKey(key);
}Dashboard & Monitoring
// Bull Board Integration
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(imageQueue),
new BullMQAdapter(reportQueue)
],
serverAdapter
});
// In Express/Hono einbinden
app.use('/admin/queues', serverAdapter.getRouter());Best Practices
// 1. Idempotente Jobs (können mehrfach ausgeführt werden)
async function processOrder(job: Job<{ orderId: string }>) {
const order = await db.order.findUnique({
where: { id: job.data.orderId }
});
// Prüfen ob schon verarbeitet
if (order?.status === 'processed') {
return { skipped: true, reason: 'Already processed' };
}
// Verarbeiten
await db.order.update({
where: { id: order.id },
data: { status: 'processed' }
});
}
// 2. Graceful Shutdown
process.on('SIGTERM', async () => {
console.log('Shutting down workers...');
await emailWorker.close();
await imageWorker.close();
process.exit(0);
});
// 3. Dead Letter Queue
const dlQueue = new Queue('dead-letter', { connection });
emailWorker.on('failed', async (job, error) => {
if (job.attemptsMade >= job.opts.attempts!) {
await dlQueue.add('failed-email', {
originalJob: job.data,
error: error.message,
failedAt: new Date()
});
}
});Fazit
BullMQ bietet:
- Zuverlässigkeit: Retries, Persistence, Acknowledgement
- Skalierbarkeit: Mehrere Worker, Rate Limiting
- Flexibilität: Scheduling, Prioritäten, Progress
- Monitoring: Bull Board, Events, Metrics
Unverzichtbar für Production-Grade Async Processing.
Bildprompts
- "Queue of tasks being processed by workers, assembly line concept"
- "Background process running while user continues, async workflow"
- "Redis connecting multiple workers, distributed job processing"