1 min read
Web ScrapingData Extraction Pipelines: Von Roh-Daten zu Insights
Robuste Data Extraction Pipelines. ETL Patterns, Data Validation, Error Handling und Scalable Architecture.
Data ExtractionETL PipelineData ProcessingWeb Scraping PipelineData ValidationStream Processing

Data Extraction Pipelines: Von Roh-Daten zu Insights
Meta-Description: Robuste Data Extraction Pipelines. ETL Patterns, Data Validation, Error Handling und Scalable Architecture.
Keywords: Data Extraction, ETL Pipeline, Data Processing, Web Scraping Pipeline, Data Validation, Stream Processing
Einführung
Web Scraping ist nur der erste Schritt. Data Extraction Pipelines transformieren Rohdaten in nutzbare, validierte Datasets – mit Error Handling, Deduplication und Monitoring.
Pipeline Architecture
┌─────────────────────────────────────────────────────────────┐
│ DATA EXTRACTION PIPELINE │
├─────────────────────────────────────────────────────────────┤
│ │
│ 1. EXTRACT │
│ ├── Web Scraping (Playwright/Puppeteer) │
│ ├── API Calls │
│ ├── File Imports (CSV, JSON, XML) │
│ └── Database Queries │
│ ↓ │
│ 2. TRANSFORM │
│ ├── Data Cleaning │
│ ├── Normalization │
│ ├── Validation (Zod) │
│ ├── Enrichment │
│ └── Deduplication │
│ ↓ │
│ 3. LOAD │
│ ├── Database Insert │
│ ├── File Export │
│ ├── API Push │
│ └── Event Stream │
│ ↓ │
│ 4. MONITOR │
│ ├── Success/Failure Rates │
│ ├── Data Quality Metrics │
│ └── Alerting │
│ │
└─────────────────────────────────────────────────────────────┘Pipeline Class Structure
import { z } from 'zod';
import { EventEmitter } from 'events';
// Base Types
interface PipelineResult<T> {
success: boolean;
data?: T;
errors?: string[];
metadata: {
duration: number;
recordsProcessed: number;
recordsFailed: number;
};
}
// Abstract Pipeline
abstract class DataPipeline<TInput, TOutput> extends EventEmitter {
protected name: string;
constructor(name: string) {
super();
this.name = name;
}
async run(input: TInput): Promise<PipelineResult<TOutput>> {
const startTime = Date.now();
let recordsProcessed = 0;
let recordsFailed = 0;
const errors: string[] = [];
try {
this.emit('start', { pipeline: this.name });
// Extract
this.emit('phase', { phase: 'extract' });
const rawData = await this.extract(input);
// Transform
this.emit('phase', { phase: 'transform' });
const transformedData: TOutput[] = [];
for (const item of rawData) {
try {
const transformed = await this.transform(item);
if (transformed) {
transformedData.push(transformed);
recordsProcessed++;
}
} catch (error) {
recordsFailed++;
errors.push(`Transform error: ${(error as Error).message}`);
this.emit('error', { phase: 'transform', error });
}
}
// Load
this.emit('phase', { phase: 'load' });
await this.load(transformedData);
this.emit('complete', {
pipeline: this.name,
recordsProcessed,
recordsFailed
});
return {
success: true,
data: transformedData as any,
errors: errors.length > 0 ? errors : undefined,
metadata: {
duration: Date.now() - startTime,
recordsProcessed,
recordsFailed
}
};
} catch (error) {
this.emit('error', { phase: 'pipeline', error });
return {
success: false,
errors: [(error as Error).message],
metadata: {
duration: Date.now() - startTime,
recordsProcessed,
recordsFailed
}
};
}
}
protected abstract extract(input: TInput): Promise<any[]>;
protected abstract transform(item: any): Promise<TOutput | null>;
protected abstract load(data: TOutput[]): Promise<void>;
}Concrete Pipeline Implementation
import * as cheerio from 'cheerio';
import axios from 'axios';
// Schema Definition
const ProductSchema = z.object({
id: z.string(),
name: z.string().min(1),
price: z.number().positive(),
currency: z.enum(['EUR', 'USD', 'GBP']),
description: z.string().optional(),
category: z.string(),
imageUrl: z.string().url().optional(),
sourceUrl: z.string().url(),
scrapedAt: z.date()
});
type Product = z.infer<typeof ProductSchema>;
// Product Scraping Pipeline
class ProductScrapingPipeline extends DataPipeline<string[], Product> {
private seenIds = new Set<string>();
constructor() {
super('ProductScrapingPipeline');
}
protected async extract(urls: string[]): Promise<any[]> {
const rawProducts: any[] = [];
for (const url of urls) {
try {
const { data: html } = await axios.get(url, {
headers: { 'User-Agent': 'Mozilla/5.0' },
timeout: 10000
});
const $ = cheerio.load(html);
$('.product-card').each((_, element) => {
const $el = $(element);
rawProducts.push({
id: $el.data('product-id'),
name: $el.find('.product-name').text().trim(),
priceText: $el.find('.price').text().trim(),
description: $el.find('.description').text().trim(),
category: $el.find('.category').text().trim(),
imageUrl: $el.find('img').attr('src'),
sourceUrl: url
});
});
// Rate Limiting
await new Promise(r => setTimeout(r, 1000));
} catch (error) {
this.emit('error', { phase: 'extract', url, error });
}
}
return rawProducts;
}
protected async transform(item: any): Promise<Product | null> {
// Deduplication
if (this.seenIds.has(item.id)) {
return null;
}
this.seenIds.add(item.id);
// Price Parsing
const priceMatch = item.priceText?.match(/[\d,.]+/);
if (!priceMatch) return null;
const price = parseFloat(priceMatch[0].replace(',', '.'));
// Currency Detection
let currency: 'EUR' | 'USD' | 'GBP' = 'EUR';
if (item.priceText.includes('$')) currency = 'USD';
if (item.priceText.includes('£')) currency = 'GBP';
// Construct Product
const product = {
id: item.id,
name: item.name,
price,
currency,
description: item.description || undefined,
category: item.category || 'Uncategorized',
imageUrl: item.imageUrl?.startsWith('http')
? item.imageUrl
: item.imageUrl
? new URL(item.imageUrl, item.sourceUrl).toString()
: undefined,
sourceUrl: item.sourceUrl,
scrapedAt: new Date()
};
// Validation
const result = ProductSchema.safeParse(product);
if (!result.success) {
this.emit('validation_error', {
item,
errors: result.error.errors
});
return null;
}
return result.data;
}
protected async load(products: Product[]): Promise<void> {
// Batch Insert
const batchSize = 100;
for (let i = 0; i < products.length; i += batchSize) {
const batch = products.slice(i, i + batchSize);
await db.product.createMany({
data: batch,
skipDuplicates: true
});
this.emit('progress', {
loaded: i + batch.length,
total: products.length
});
}
}
}
// Usage
const pipeline = new ProductScrapingPipeline();
pipeline.on('start', (data) => console.log('Pipeline started:', data));
pipeline.on('phase', (data) => console.log('Phase:', data.phase));
pipeline.on('progress', (data) => console.log('Progress:', data));
pipeline.on('error', (data) => console.error('Error:', data));
pipeline.on('complete', (data) => console.log('Complete:', data));
const result = await pipeline.run([
'https://shop.example.com/category/electronics?page=1',
'https://shop.example.com/category/electronics?page=2'
]);
console.log('Result:', result.metadata);Stream Processing für große Datasets
import { Transform, Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';
// Extract Stream
function createExtractStream(urls: string[]): Readable {
let index = 0;
return new Readable({
objectMode: true,
async read() {
if (index >= urls.length) {
this.push(null);
return;
}
const url = urls[index++];
try {
const { data: html } = await axios.get(url);
const $ = cheerio.load(html);
$('.item').each((_, el) => {
this.push({
url,
html: $(el).html()
});
});
} catch (error) {
console.error(`Failed to fetch ${url}`);
}
}
});
}
// Transform Stream
function createTransformStream(schema: z.ZodSchema): Transform {
return new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
try {
const $ = cheerio.load(chunk.html);
const data = {
name: $('h2').text().trim(),
price: parseFloat($('.price').text().replace(/[^0-9.]/g, '')),
sourceUrl: chunk.url
};
const result = schema.safeParse(data);
if (result.success) {
callback(null, result.data);
} else {
callback(null); // Skip invalid items
}
} catch (error) {
callback(error as Error);
}
}
});
}
// Load Stream
function createLoadStream(batchSize: number = 100): Writable {
let batch: any[] = [];
return new Writable({
objectMode: true,
async write(chunk, encoding, callback) {
batch.push(chunk);
if (batch.length >= batchSize) {
await saveBatch(batch);
batch = [];
}
callback();
},
async final(callback) {
if (batch.length > 0) {
await saveBatch(batch);
}
callback();
}
});
}
// Stream Pipeline
async function runStreamPipeline(urls: string[]) {
await pipeline(
createExtractStream(urls),
createTransformStream(ProductSchema),
createLoadStream(100)
);
}Data Quality & Monitoring
interface DataQualityMetrics {
totalRecords: number;
validRecords: number;
invalidRecords: number;
duplicates: number;
nullFields: Record<string, number>;
validationErrors: Record<string, number>;
}
class DataQualityMonitor {
private metrics: DataQualityMetrics = {
totalRecords: 0,
validRecords: 0,
invalidRecords: 0,
duplicates: 0,
nullFields: {},
validationErrors: {}
};
recordValid() {
this.metrics.totalRecords++;
this.metrics.validRecords++;
}
recordInvalid(errors: z.ZodError) {
this.metrics.totalRecords++;
this.metrics.invalidRecords++;
errors.errors.forEach(err => {
const path = err.path.join('.');
this.metrics.validationErrors[path] =
(this.metrics.validationErrors[path] || 0) + 1;
});
}
recordDuplicate() {
this.metrics.duplicates++;
}
recordNullField(fieldName: string) {
this.metrics.nullFields[fieldName] =
(this.metrics.nullFields[fieldName] || 0) + 1;
}
getReport(): DataQualityMetrics & { qualityScore: number } {
const qualityScore = this.metrics.validRecords / this.metrics.totalRecords * 100;
return {
...this.metrics,
qualityScore: Math.round(qualityScore * 100) / 100
};
}
async sendAlert(threshold: number = 80) {
const report = this.getReport();
if (report.qualityScore < threshold) {
// Alert senden (Email, Slack, etc.)
console.warn(`Data quality below threshold: ${report.qualityScore}%`);
}
}
}Scheduling & Orchestration
import { CronJob } from 'cron';
// Scheduled Pipeline Execution
const pipelineJob = new CronJob(
'0 2 * * *', // Täglich um 2:00 Uhr
async () => {
console.log('Starting scheduled pipeline run');
const pipeline = new ProductScrapingPipeline();
const urls = await getUrlsToScrape();
const result = await pipeline.run(urls);
// Metrics speichern
await db.pipelineRun.create({
data: {
pipelineName: 'ProductScrapingPipeline',
status: result.success ? 'success' : 'failed',
recordsProcessed: result.metadata.recordsProcessed,
recordsFailed: result.metadata.recordsFailed,
duration: result.metadata.duration,
errors: result.errors
}
});
// Alerts bei Fehlern
if (!result.success || result.metadata.recordsFailed > 100) {
await sendPipelineAlert(result);
}
},
null,
true,
'Europe/Berlin'
);
// Manual Trigger
async function triggerPipeline(pipelineId: string) {
// Queue Job für async Execution
await jobQueue.add('run-pipeline', { pipelineId });
}Fazit
Robuste Data Pipelines brauchen:
- Validation: Zod für Schema-Enforcement
- Error Handling: Graceful Degradation
- Monitoring: Quality Metrics & Alerts
- Scalability: Stream Processing für große Datasets
Von Rohdaten zu strukturierten, vertrauenswürdigen Daten.
Bildprompts
- "Data flowing through pipeline stages, ETL visualization"
- "Quality filter cleaning dirty data, data validation concept"
- "Multiple data sources merging into single database, integration"