Menu
Zurück zum Blog
1 min read
Web Scraping

Data Extraction Pipelines: Von Roh-Daten zu Insights

Robuste Data Extraction Pipelines. ETL Patterns, Data Validation, Error Handling und Scalable Architecture.

Data ExtractionETL PipelineData ProcessingWeb Scraping PipelineData ValidationStream Processing
Data Extraction Pipelines: Von Roh-Daten zu Insights

Data Extraction Pipelines: Von Roh-Daten zu Insights

Meta-Description: Robuste Data Extraction Pipelines. ETL Patterns, Data Validation, Error Handling und Scalable Architecture.

Keywords: Data Extraction, ETL Pipeline, Data Processing, Web Scraping Pipeline, Data Validation, Stream Processing


Einführung

Web Scraping ist nur der erste Schritt. Data Extraction Pipelines transformieren Rohdaten in nutzbare, validierte Datasets – mit Error Handling, Deduplication und Monitoring.


Pipeline Architecture

┌─────────────────────────────────────────────────────────────┐
│              DATA EXTRACTION PIPELINE                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  1. EXTRACT                                                 │
│  ├── Web Scraping (Playwright/Puppeteer)                   │
│  ├── API Calls                                             │
│  ├── File Imports (CSV, JSON, XML)                         │
│  └── Database Queries                                      │
│           ↓                                                │
│  2. TRANSFORM                                               │
│  ├── Data Cleaning                                         │
│  ├── Normalization                                         │
│  ├── Validation (Zod)                                      │
│  ├── Enrichment                                            │
│  └── Deduplication                                         │
│           ↓                                                │
│  3. LOAD                                                    │
│  ├── Database Insert                                       │
│  ├── File Export                                           │
│  ├── API Push                                              │
│  └── Event Stream                                          │
│           ↓                                                │
│  4. MONITOR                                                 │
│  ├── Success/Failure Rates                                 │
│  ├── Data Quality Metrics                                  │
│  └── Alerting                                              │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Pipeline Class Structure

import { z } from 'zod';
import { EventEmitter } from 'events';

// Base Types
interface PipelineResult<T> {
  success: boolean;
  data?: T;
  errors?: string[];
  metadata: {
    duration: number;
    recordsProcessed: number;
    recordsFailed: number;
  };
}

// Abstract Pipeline
abstract class DataPipeline<TInput, TOutput> extends EventEmitter {
  protected name: string;

  constructor(name: string) {
    super();
    this.name = name;
  }

  async run(input: TInput): Promise<PipelineResult<TOutput>> {
    const startTime = Date.now();
    let recordsProcessed = 0;
    let recordsFailed = 0;
    const errors: string[] = [];

    try {
      this.emit('start', { pipeline: this.name });

      // Extract
      this.emit('phase', { phase: 'extract' });
      const rawData = await this.extract(input);

      // Transform
      this.emit('phase', { phase: 'transform' });
      const transformedData: TOutput[] = [];

      for (const item of rawData) {
        try {
          const transformed = await this.transform(item);
          if (transformed) {
            transformedData.push(transformed);
            recordsProcessed++;
          }
        } catch (error) {
          recordsFailed++;
          errors.push(`Transform error: ${(error as Error).message}`);
          this.emit('error', { phase: 'transform', error });
        }
      }

      // Load
      this.emit('phase', { phase: 'load' });
      await this.load(transformedData);

      this.emit('complete', {
        pipeline: this.name,
        recordsProcessed,
        recordsFailed
      });

      return {
        success: true,
        data: transformedData as any,
        errors: errors.length > 0 ? errors : undefined,
        metadata: {
          duration: Date.now() - startTime,
          recordsProcessed,
          recordsFailed
        }
      };
    } catch (error) {
      this.emit('error', { phase: 'pipeline', error });

      return {
        success: false,
        errors: [(error as Error).message],
        metadata: {
          duration: Date.now() - startTime,
          recordsProcessed,
          recordsFailed
        }
      };
    }
  }

  protected abstract extract(input: TInput): Promise<any[]>;
  protected abstract transform(item: any): Promise<TOutput | null>;
  protected abstract load(data: TOutput[]): Promise<void>;
}

Concrete Pipeline Implementation

import * as cheerio from 'cheerio';
import axios from 'axios';

// Schema Definition
const ProductSchema = z.object({
  id: z.string(),
  name: z.string().min(1),
  price: z.number().positive(),
  currency: z.enum(['EUR', 'USD', 'GBP']),
  description: z.string().optional(),
  category: z.string(),
  imageUrl: z.string().url().optional(),
  sourceUrl: z.string().url(),
  scrapedAt: z.date()
});

type Product = z.infer<typeof ProductSchema>;

// Product Scraping Pipeline
class ProductScrapingPipeline extends DataPipeline<string[], Product> {
  private seenIds = new Set<string>();

  constructor() {
    super('ProductScrapingPipeline');
  }

  protected async extract(urls: string[]): Promise<any[]> {
    const rawProducts: any[] = [];

    for (const url of urls) {
      try {
        const { data: html } = await axios.get(url, {
          headers: { 'User-Agent': 'Mozilla/5.0' },
          timeout: 10000
        });

        const $ = cheerio.load(html);

        $('.product-card').each((_, element) => {
          const $el = $(element);

          rawProducts.push({
            id: $el.data('product-id'),
            name: $el.find('.product-name').text().trim(),
            priceText: $el.find('.price').text().trim(),
            description: $el.find('.description').text().trim(),
            category: $el.find('.category').text().trim(),
            imageUrl: $el.find('img').attr('src'),
            sourceUrl: url
          });
        });

        // Rate Limiting
        await new Promise(r => setTimeout(r, 1000));
      } catch (error) {
        this.emit('error', { phase: 'extract', url, error });
      }
    }

    return rawProducts;
  }

  protected async transform(item: any): Promise<Product | null> {
    // Deduplication
    if (this.seenIds.has(item.id)) {
      return null;
    }
    this.seenIds.add(item.id);

    // Price Parsing
    const priceMatch = item.priceText?.match(/[\d,.]+/);
    if (!priceMatch) return null;

    const price = parseFloat(priceMatch[0].replace(',', '.'));

    // Currency Detection
    let currency: 'EUR' | 'USD' | 'GBP' = 'EUR';
    if (item.priceText.includes('$')) currency = 'USD';
    if (item.priceText.includes('£')) currency = 'GBP';

    // Construct Product
    const product = {
      id: item.id,
      name: item.name,
      price,
      currency,
      description: item.description || undefined,
      category: item.category || 'Uncategorized',
      imageUrl: item.imageUrl?.startsWith('http')
        ? item.imageUrl
        : item.imageUrl
          ? new URL(item.imageUrl, item.sourceUrl).toString()
          : undefined,
      sourceUrl: item.sourceUrl,
      scrapedAt: new Date()
    };

    // Validation
    const result = ProductSchema.safeParse(product);

    if (!result.success) {
      this.emit('validation_error', {
        item,
        errors: result.error.errors
      });
      return null;
    }

    return result.data;
  }

  protected async load(products: Product[]): Promise<void> {
    // Batch Insert
    const batchSize = 100;

    for (let i = 0; i < products.length; i += batchSize) {
      const batch = products.slice(i, i + batchSize);

      await db.product.createMany({
        data: batch,
        skipDuplicates: true
      });

      this.emit('progress', {
        loaded: i + batch.length,
        total: products.length
      });
    }
  }
}

// Usage
const pipeline = new ProductScrapingPipeline();

pipeline.on('start', (data) => console.log('Pipeline started:', data));
pipeline.on('phase', (data) => console.log('Phase:', data.phase));
pipeline.on('progress', (data) => console.log('Progress:', data));
pipeline.on('error', (data) => console.error('Error:', data));
pipeline.on('complete', (data) => console.log('Complete:', data));

const result = await pipeline.run([
  'https://shop.example.com/category/electronics?page=1',
  'https://shop.example.com/category/electronics?page=2'
]);

console.log('Result:', result.metadata);

Stream Processing für große Datasets

import { Transform, Readable, Writable } from 'stream';
import { pipeline } from 'stream/promises';

// Extract Stream
function createExtractStream(urls: string[]): Readable {
  let index = 0;

  return new Readable({
    objectMode: true,
    async read() {
      if (index >= urls.length) {
        this.push(null);
        return;
      }

      const url = urls[index++];

      try {
        const { data: html } = await axios.get(url);
        const $ = cheerio.load(html);

        $('.item').each((_, el) => {
          this.push({
            url,
            html: $(el).html()
          });
        });
      } catch (error) {
        console.error(`Failed to fetch ${url}`);
      }
    }
  });
}

// Transform Stream
function createTransformStream(schema: z.ZodSchema): Transform {
  return new Transform({
    objectMode: true,
    transform(chunk, encoding, callback) {
      try {
        const $ = cheerio.load(chunk.html);

        const data = {
          name: $('h2').text().trim(),
          price: parseFloat($('.price').text().replace(/[^0-9.]/g, '')),
          sourceUrl: chunk.url
        };

        const result = schema.safeParse(data);

        if (result.success) {
          callback(null, result.data);
        } else {
          callback(null);  // Skip invalid items
        }
      } catch (error) {
        callback(error as Error);
      }
    }
  });
}

// Load Stream
function createLoadStream(batchSize: number = 100): Writable {
  let batch: any[] = [];

  return new Writable({
    objectMode: true,
    async write(chunk, encoding, callback) {
      batch.push(chunk);

      if (batch.length >= batchSize) {
        await saveBatch(batch);
        batch = [];
      }

      callback();
    },
    async final(callback) {
      if (batch.length > 0) {
        await saveBatch(batch);
      }
      callback();
    }
  });
}

// Stream Pipeline
async function runStreamPipeline(urls: string[]) {
  await pipeline(
    createExtractStream(urls),
    createTransformStream(ProductSchema),
    createLoadStream(100)
  );
}

Data Quality & Monitoring

interface DataQualityMetrics {
  totalRecords: number;
  validRecords: number;
  invalidRecords: number;
  duplicates: number;
  nullFields: Record<string, number>;
  validationErrors: Record<string, number>;
}

class DataQualityMonitor {
  private metrics: DataQualityMetrics = {
    totalRecords: 0,
    validRecords: 0,
    invalidRecords: 0,
    duplicates: 0,
    nullFields: {},
    validationErrors: {}
  };

  recordValid() {
    this.metrics.totalRecords++;
    this.metrics.validRecords++;
  }

  recordInvalid(errors: z.ZodError) {
    this.metrics.totalRecords++;
    this.metrics.invalidRecords++;

    errors.errors.forEach(err => {
      const path = err.path.join('.');
      this.metrics.validationErrors[path] =
        (this.metrics.validationErrors[path] || 0) + 1;
    });
  }

  recordDuplicate() {
    this.metrics.duplicates++;
  }

  recordNullField(fieldName: string) {
    this.metrics.nullFields[fieldName] =
      (this.metrics.nullFields[fieldName] || 0) + 1;
  }

  getReport(): DataQualityMetrics & { qualityScore: number } {
    const qualityScore = this.metrics.validRecords / this.metrics.totalRecords * 100;

    return {
      ...this.metrics,
      qualityScore: Math.round(qualityScore * 100) / 100
    };
  }

  async sendAlert(threshold: number = 80) {
    const report = this.getReport();

    if (report.qualityScore < threshold) {
      // Alert senden (Email, Slack, etc.)
      console.warn(`Data quality below threshold: ${report.qualityScore}%`);
    }
  }
}

Scheduling & Orchestration

import { CronJob } from 'cron';

// Scheduled Pipeline Execution
const pipelineJob = new CronJob(
  '0 2 * * *',  // Täglich um 2:00 Uhr
  async () => {
    console.log('Starting scheduled pipeline run');

    const pipeline = new ProductScrapingPipeline();
    const urls = await getUrlsToScrape();

    const result = await pipeline.run(urls);

    // Metrics speichern
    await db.pipelineRun.create({
      data: {
        pipelineName: 'ProductScrapingPipeline',
        status: result.success ? 'success' : 'failed',
        recordsProcessed: result.metadata.recordsProcessed,
        recordsFailed: result.metadata.recordsFailed,
        duration: result.metadata.duration,
        errors: result.errors
      }
    });

    // Alerts bei Fehlern
    if (!result.success || result.metadata.recordsFailed > 100) {
      await sendPipelineAlert(result);
    }
  },
  null,
  true,
  'Europe/Berlin'
);

// Manual Trigger
async function triggerPipeline(pipelineId: string) {
  // Queue Job für async Execution
  await jobQueue.add('run-pipeline', { pipelineId });
}

Fazit

Robuste Data Pipelines brauchen:

  1. Validation: Zod für Schema-Enforcement
  2. Error Handling: Graceful Degradation
  3. Monitoring: Quality Metrics & Alerts
  4. Scalability: Stream Processing für große Datasets

Von Rohdaten zu strukturierten, vertrauenswürdigen Daten.


Bildprompts

  1. "Data flowing through pipeline stages, ETL visualization"
  2. "Quality filter cleaning dirty data, data validation concept"
  3. "Multiple data sources merging into single database, integration"

Quellen