Menu
Zurück zum Blog
2 min read
IoT

Edge Computing für IoT

Edge Computing für IoT-Anwendungen. Lokale Datenverarbeitung, ML Inference, Fog Computing und Edge-Cloud Architektur.

Edge ComputingIoTFog ComputingML InferenceLocal ProcessingEdge Gateway
Edge Computing für IoT

Edge Computing für IoT

Meta-Description: Edge Computing für IoT-Anwendungen. Lokale Datenverarbeitung, ML Inference, Fog Computing und Edge-Cloud Architektur.

Keywords: Edge Computing, IoT, Fog Computing, ML Inference, Local Processing, Edge Gateway, TensorFlow Lite


Einführung

Edge Computing bringt Rechenleistung dorthin, wo Daten entstehen. Für IoT bedeutet das: niedrige Latenz, Offline-Fähigkeit, Datenschutz und reduzierte Bandbreite durch lokale Verarbeitung direkt am Gerät oder Gateway.


Edge Computing Architecture

┌─────────────────────────────────────────────────────────────┐
│              EDGE COMPUTING ARCHITECTURE                     │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Cloud Layer:                                               │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Cloud Platform (AWS IoT / Azure IoT / GCP IoT)     │   │
│  │  - Long-term Storage                                │   │
│  │  - Complex Analytics                                │   │
│  │  - ML Model Training                                │   │
│  │  - Global Dashboards                                │   │
│  └─────────────────────────────────────────────────────┘   │
│                         ↑↓ Aggregated Data                  │
│  Edge Layer (Fog):                                          │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Edge Gateway / Server                              │   │
│  │  ┌───────────┐  ┌───────────┐  ┌───────────┐      │   │
│  │  │ Local DB  │  │ ML Infer- │  │   Rules   │      │   │
│  │  │ (InfluxDB)│  │   ence    │  │  Engine   │      │   │
│  │  └───────────┘  └───────────┘  └───────────┘      │   │
│  │  - Data Aggregation                                │   │
│  │  - Real-time Processing                            │   │
│  │  - Local Decisions                                 │   │
│  └─────────────────────────────────────────────────────┘   │
│                         ↑↓ Filtered Data                    │
│  Device Layer:                                              │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌───────┐ │   │
│  │  │ Sensor  │  │ Camera  │  │ Actuator│  │  PLC  │ │   │
│  │  │         │  │ (CV)    │  │         │  │       │ │   │
│  │  └─────────┘  └─────────┘  └─────────┘  └───────┘ │   │
│  │  - Data Collection                                 │   │
│  │  - Simple Processing                               │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
│  Latency: Cloud ~100ms | Edge ~10ms | Device ~1ms          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Edge Gateway mit Node.js

// edge-gateway/index.ts
import mqtt from 'mqtt';
import { InfluxDBClient } from './lib/influxdb';
import { RulesEngine } from './lib/rules-engine';
import { MLInference } from './lib/ml-inference';

interface EdgeConfig {
  mqttBroker: string;
  cloudEndpoint: string;
  localDb: {
    url: string;
    token: string;
    org: string;
    bucket: string;
  };
  syncInterval: number;  // Millisekunden
  offlineBufferSize: number;
}

class EdgeGateway {
  private localMqtt: mqtt.MqttClient;
  private cloudMqtt: mqtt.MqttClient | null = null;
  private influx: InfluxDBClient;
  private rules: RulesEngine;
  private ml: MLInference;
  private offlineBuffer: any[] = [];
  private isCloudConnected = false;

  constructor(private config: EdgeConfig) {
    this.influx = new InfluxDBClient(config.localDb);
    this.rules = new RulesEngine();
    this.ml = new MLInference();
  }

  async start(): Promise<void> {
    // Local MQTT Broker verbinden
    this.localMqtt = mqtt.connect(this.config.mqttBroker);

    this.localMqtt.on('connect', () => {
      console.log('Connected to local MQTT broker');
      this.localMqtt.subscribe('sensors/#');
      this.localMqtt.subscribe('devices/#');
    });

    this.localMqtt.on('message', (topic, payload) => {
      this.handleLocalMessage(topic, payload.toString());
    });

    // Cloud Connection (mit Retry)
    this.connectToCloud();

    // Periodische Cloud-Synchronisation
    setInterval(() => this.syncToCloud(), this.config.syncInterval);

    // ML Models laden
    await this.ml.loadModels();

    console.log('Edge Gateway started');
  }

  private async handleLocalMessage(topic: string, payload: string): Promise<void> {
    try {
      const data = JSON.parse(payload);
      const timestamp = new Date();

      // 1. Lokale Speicherung
      await this.storeLocally(topic, data, timestamp);

      // 2. Echtzeit-Verarbeitung
      const processed = await this.processData(topic, data);

      // 3. Rules Engine
      const actions = this.rules.evaluate(topic, processed);
      await this.executeActions(actions);

      // 4. ML Inference (wenn relevant)
      if (this.shouldRunInference(topic, data)) {
        const prediction = await this.ml.predict(data);
        await this.handlePrediction(topic, prediction);
      }

      // 5. Cloud-Queue (aggregierte Daten)
      this.queueForCloud(topic, processed, timestamp);

    } catch (error) {
      console.error('Error processing message:', error);
    }
  }

  private async storeLocally(
    topic: string,
    data: any,
    timestamp: Date
  ): Promise<void> {
    const measurement = topic.split('/')[0];
    const deviceId = topic.split('/')[1];

    await this.influx.writePoint({
      measurement,
      tags: { device_id: deviceId },
      fields: data,
      timestamp
    });
  }

  private async processData(topic: string, data: any): Promise<any> {
    // Daten normalisieren, filtern, aggregieren
    const processed = { ...data };

    // Outlier Detection
    if (data.temperature !== undefined) {
      if (data.temperature < -50 || data.temperature > 100) {
        processed.temperature_valid = false;
        processed.temperature_original = data.temperature;
        delete processed.temperature;
      }
    }

    // Unit Conversion (wenn nötig)
    // ...

    return processed;
  }

  private queueForCloud(topic: string, data: any, timestamp: Date): void {
    // Nur relevante Daten für Cloud
    const cloudData = {
      topic,
      data: this.aggregateForCloud(data),
      timestamp: timestamp.toISOString(),
      gatewayId: process.env.GATEWAY_ID
    };

    if (this.isCloudConnected) {
      this.sendToCloud(cloudData);
    } else {
      // Offline Buffer
      this.offlineBuffer.push(cloudData);

      // Buffer Limit
      if (this.offlineBuffer.length > this.config.offlineBufferSize) {
        this.offlineBuffer.shift();  // Älteste Daten verwerfen
      }
    }
  }

  private aggregateForCloud(data: any): any {
    // Nur wichtige Felder für Cloud
    const { temperature, humidity, battery, state } = data;
    return { temperature, humidity, battery, state };
  }

  private async syncToCloud(): Promise<void> {
    if (!this.isCloudConnected || this.offlineBuffer.length === 0) {
      return;
    }

    console.log(`Syncing ${this.offlineBuffer.length} messages to cloud`);

    // Batch Upload
    const batch = this.offlineBuffer.splice(0, 100);

    try {
      await this.sendToCloud({
        type: 'batch',
        messages: batch
      });
    } catch (error) {
      // Bei Fehler: zurück in Buffer
      this.offlineBuffer.unshift(...batch);
      console.error('Cloud sync failed:', error);
    }
  }

  private sendToCloud(data: any): void {
    this.cloudMqtt?.publish(
      `gateways/${process.env.GATEWAY_ID}/data`,
      JSON.stringify(data)
    );
  }

  private async executeActions(actions: any[]): Promise<void> {
    for (const action of actions) {
      console.log('Executing action:', action);

      switch (action.type) {
        case 'publish':
          this.localMqtt.publish(action.topic, JSON.stringify(action.payload));
          break;

        case 'alert':
          await this.sendAlert(action.message, action.severity);
          break;

        case 'device_command':
          this.localMqtt.publish(
            `devices/${action.deviceId}/command`,
            JSON.stringify(action.command)
          );
          break;
      }
    }
  }

  private connectToCloud(): void {
    this.cloudMqtt = mqtt.connect(this.config.cloudEndpoint, {
      clientId: `gateway-${process.env.GATEWAY_ID}`,
      username: process.env.CLOUD_USER,
      password: process.env.CLOUD_PASS,
      reconnectPeriod: 10000
    });

    this.cloudMqtt.on('connect', () => {
      console.log('Connected to cloud');
      this.isCloudConnected = true;

      // Subscribe to cloud commands
      this.cloudMqtt?.subscribe(`gateways/${process.env.GATEWAY_ID}/command`);
    });

    this.cloudMqtt.on('close', () => {
      console.log('Cloud connection closed');
      this.isCloudConnected = false;
    });

    this.cloudMqtt.on('message', (topic, payload) => {
      this.handleCloudCommand(JSON.parse(payload.toString()));
    });
  }

  private handleCloudCommand(command: any): void {
    console.log('Cloud command received:', command);

    switch (command.type) {
      case 'update_config':
        this.updateConfig(command.config);
        break;

      case 'update_rules':
        this.rules.updateRules(command.rules);
        break;

      case 'update_model':
        this.ml.updateModel(command.modelId, command.modelUrl);
        break;

      case 'device_command':
        this.localMqtt.publish(
          `devices/${command.deviceId}/command`,
          JSON.stringify(command.payload)
        );
        break;
    }
  }
}

ML Inference am Edge

// lib/ml-inference.ts
import * as tf from '@tensorflow/tfjs-node';

interface Model {
  id: string;
  model: tf.LayersModel;
  inputShape: number[];
  labels: string[];
}

class MLInference {
  private models: Map<string, Model> = new Map();

  async loadModels(): Promise<void> {
    // Anomaly Detection Model
    const anomalyModel = await tf.loadLayersModel(
      'file://./models/anomaly/model.json'
    );
    this.models.set('anomaly', {
      id: 'anomaly',
      model: anomalyModel,
      inputShape: [1, 10],  // 10 Features
      labels: ['normal', 'anomaly']
    });

    // Predictive Maintenance Model
    const maintenanceModel = await tf.loadLayersModel(
      'file://./models/maintenance/model.json'
    );
    this.models.set('maintenance', {
      id: 'maintenance',
      model: maintenanceModel,
      inputShape: [1, 24],  // 24 Stunden Historie
      labels: ['ok', 'warning', 'critical']
    });

    console.log(`Loaded ${this.models.size} ML models`);
  }

  async predict(
    modelId: string,
    input: number[]
  ): Promise<{ label: string; confidence: number }> {
    const modelInfo = this.models.get(modelId);
    if (!modelInfo) {
      throw new Error(`Model ${modelId} not found`);
    }

    // Input Tensor erstellen
    const inputTensor = tf.tensor2d([input], modelInfo.inputShape as [number, number]);

    // Inference
    const prediction = modelInfo.model.predict(inputTensor) as tf.Tensor;
    const probabilities = await prediction.data();

    // Cleanup
    inputTensor.dispose();
    prediction.dispose();

    // Beste Prediction finden
    const maxIndex = probabilities.indexOf(Math.max(...Array.from(probabilities)));

    return {
      label: modelInfo.labels[maxIndex],
      confidence: probabilities[maxIndex]
    };
  }

  async detectAnomaly(
    sensorData: {
      temperature: number;
      humidity: number;
      pressure: number;
      vibration: number;
    }
  ): Promise<{ isAnomaly: boolean; confidence: number }> {
    // Feature Engineering
    const features = [
      sensorData.temperature,
      sensorData.humidity,
      sensorData.pressure,
      sensorData.vibration,
      // Normalisierte Werte
      (sensorData.temperature - 20) / 30,
      (sensorData.humidity - 50) / 50,
      // Interaktionen
      sensorData.temperature * sensorData.humidity / 1000,
      // Placeholder für historische Features
      0, 0, 0
    ];

    const result = await this.predict('anomaly', features);

    return {
      isAnomaly: result.label === 'anomaly',
      confidence: result.confidence
    };
  }

  async updateModel(modelId: string, modelUrl: string): Promise<void> {
    console.log(`Updating model ${modelId} from ${modelUrl}`);

    try {
      const newModel = await tf.loadLayersModel(modelUrl);
      const existing = this.models.get(modelId);

      if (existing) {
        existing.model.dispose();
        existing.model = newModel;
      }

      console.log(`Model ${modelId} updated successfully`);
    } catch (error) {
      console.error(`Failed to update model ${modelId}:`, error);
    }
  }
}

export { MLInference };

Rules Engine

// lib/rules-engine.ts

interface Rule {
  id: string;
  name: string;
  condition: RuleCondition;
  actions: RuleAction[];
  enabled: boolean;
  priority: number;
}

interface RuleCondition {
  type: 'simple' | 'compound';
  field?: string;
  operator?: '>' | '<' | '==' | '!=' | '>=' | '<=';
  value?: any;
  logic?: 'AND' | 'OR';
  conditions?: RuleCondition[];
}

interface RuleAction {
  type: 'publish' | 'alert' | 'device_command' | 'log';
  [key: string]: any;
}

class RulesEngine {
  private rules: Rule[] = [];

  constructor() {
    this.loadDefaultRules();
  }

  private loadDefaultRules(): void {
    this.rules = [
      // High Temperature Alert
      {
        id: 'high-temp-alert',
        name: 'High Temperature Alert',
        condition: {
          type: 'simple',
          field: 'temperature',
          operator: '>',
          value: 35
        },
        actions: [
          {
            type: 'alert',
            message: 'High temperature detected',
            severity: 'warning'
          },
          {
            type: 'publish',
            topic: 'alerts/temperature',
            payload: { alert: 'high_temperature' }
          }
        ],
        enabled: true,
        priority: 1
      },

      // Low Battery Warning
      {
        id: 'low-battery',
        name: 'Low Battery Warning',
        condition: {
          type: 'simple',
          field: 'battery',
          operator: '<',
          value: 20
        },
        actions: [
          {
            type: 'alert',
            message: 'Device battery low',
            severity: 'info'
          }
        ],
        enabled: true,
        priority: 2
      },

      // Motion → Light On
      {
        id: 'motion-light',
        name: 'Motion Activated Light',
        condition: {
          type: 'compound',
          logic: 'AND',
          conditions: [
            { type: 'simple', field: 'occupancy', operator: '==', value: true },
            { type: 'simple', field: 'ambient_light', operator: '<', value: 100 }
          ]
        },
        actions: [
          {
            type: 'device_command',
            deviceId: 'hallway_light',
            command: { state: 'ON', brightness: 80 }
          }
        ],
        enabled: true,
        priority: 1
      }
    ];
  }

  evaluate(topic: string, data: any): RuleAction[] {
    const actions: RuleAction[] = [];

    // Regeln nach Priorität sortieren
    const sortedRules = [...this.rules]
      .filter(r => r.enabled)
      .sort((a, b) => a.priority - b.priority);

    for (const rule of sortedRules) {
      if (this.evaluateCondition(rule.condition, data)) {
        console.log(`Rule triggered: ${rule.name}`);
        actions.push(...rule.actions);
      }
    }

    return actions;
  }

  private evaluateCondition(condition: RuleCondition, data: any): boolean {
    if (condition.type === 'simple') {
      return this.evaluateSimple(condition, data);
    }

    if (condition.type === 'compound' && condition.conditions) {
      const results = condition.conditions.map(c =>
        this.evaluateCondition(c, data)
      );

      return condition.logic === 'AND'
        ? results.every(r => r)
        : results.some(r => r);
    }

    return false;
  }

  private evaluateSimple(condition: RuleCondition, data: any): boolean {
    const value = data[condition.field!];

    if (value === undefined) return false;

    switch (condition.operator) {
      case '>': return value > condition.value;
      case '<': return value < condition.value;
      case '>=': return value >= condition.value;
      case '<=': return value <= condition.value;
      case '==': return value === condition.value;
      case '!=': return value !== condition.value;
      default: return false;
    }
  }

  updateRules(newRules: Rule[]): void {
    this.rules = newRules;
    console.log(`Updated ${newRules.length} rules`);
  }

  addRule(rule: Rule): void {
    this.rules.push(rule);
  }

  removeRule(ruleId: string): void {
    this.rules = this.rules.filter(r => r.id !== ruleId);
  }

  getRules(): Rule[] {
    return this.rules;
  }
}

export { RulesEngine };

Edge-Cloud Synchronisation

// lib/edge-cloud-sync.ts

interface SyncConfig {
  syncInterval: number;
  batchSize: number;
  retentionLocal: number;  // Tage
  compressionEnabled: boolean;
}

class EdgeCloudSync {
  private config: SyncConfig;
  private pendingSync: any[] = [];
  private lastSyncTime: Date | null = null;

  constructor(config: SyncConfig) {
    this.config = config;
  }

  // Daten für Cloud-Sync vorbereiten
  prepareForSync(data: any[]): any {
    // Aggregation
    const aggregated = this.aggregate(data);

    // Kompression (optional)
    if (this.config.compressionEnabled) {
      return this.compress(aggregated);
    }

    return aggregated;
  }

  private aggregate(data: any[]): any {
    // Nach Device gruppieren
    const byDevice = new Map<string, any[]>();

    data.forEach(item => {
      const deviceId = item.device_id;
      if (!byDevice.has(deviceId)) {
        byDevice.set(deviceId, []);
      }
      byDevice.get(deviceId)!.push(item);
    });

    // Aggregierte Statistiken pro Device
    const result: any[] = [];

    byDevice.forEach((items, deviceId) => {
      const temps = items
        .map(i => i.temperature)
        .filter(t => t !== undefined);

      const humidities = items
        .map(i => i.humidity)
        .filter(h => h !== undefined);

      result.push({
        device_id: deviceId,
        period_start: items[0].timestamp,
        period_end: items[items.length - 1].timestamp,
        sample_count: items.length,
        temperature: temps.length > 0 ? {
          min: Math.min(...temps),
          max: Math.max(...temps),
          avg: temps.reduce((a, b) => a + b, 0) / temps.length
        } : null,
        humidity: humidities.length > 0 ? {
          min: Math.min(...humidities),
          max: Math.max(...humidities),
          avg: humidities.reduce((a, b) => a + b, 0) / humidities.length
        } : null
      });
    });

    return result;
  }

  private compress(data: any): Buffer {
    const zlib = require('zlib');
    const json = JSON.stringify(data);
    return zlib.gzipSync(json);
  }

  // Conflict Resolution bei Cloud-Sync
  resolveConflict(localData: any, cloudData: any): any {
    // Last-Write-Wins Strategie
    const localTime = new Date(localData.updated_at);
    const cloudTime = new Date(cloudData.updated_at);

    if (localTime > cloudTime) {
      return { ...cloudData, ...localData, conflict_resolved: true };
    }
    return { ...localData, ...cloudData, conflict_resolved: true };
  }
}

Edge Deployment

# docker-compose.edge.yml
version: '3.8'
services:
  edge-gateway:
    build: ./edge-gateway
    restart: always
    environment:
      - GATEWAY_ID=${GATEWAY_ID}
      - MQTT_BROKER=mqtt://mosquitto:1883
      - CLOUD_ENDPOINT=${CLOUD_MQTT_URL}
      - INFLUX_URL=http://influxdb:8086
    volumes:
      - ./models:/app/models
      - ./config:/app/config
    depends_on:
      - mosquitto
      - influxdb

  mosquitto:
    image: eclipse-mosquitto:2
    ports:
      - "1883:1883"
    volumes:
      - mosquitto-data:/mosquitto/data

  influxdb:
    image: influxdb:2.7
    ports:
      - "8086:8086"
    volumes:
      - influxdb-data:/var/lib/influxdb2
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=edgepassword
      - DOCKER_INFLUXDB_INIT_ORG=edge
      - DOCKER_INFLUXDB_INIT_BUCKET=sensors

volumes:
  mosquitto-data:
  influxdb-data:

Edge vs Cloud Comparison

AspectEdge ComputingCloud Computing
**Latency**1-10 ms50-200 ms
**Bandwidth**Minimal (local)High (all data)
**Offline**Fully operationalLimited
**Privacy**Data stays localData in cloud
**Compute**LimitedUnlimited
**Storage**LimitedUnlimited
**Cost**Hardware upfrontPay-per-use
**ML Training**Not practicalIdeal
**ML Inference**Real-timeBatch

Fazit

Edge Computing für IoT bietet:

  1. Low Latency: Lokale Entscheidungen in Millisekunden
  2. Offline-Fähigkeit: Funktioniert ohne Cloud
  3. Datenschutz: Sensible Daten bleiben lokal
  4. Bandbreite: Nur aggregierte Daten zur Cloud

Die optimale Architektur kombiniert Edge und Cloud.


Bildprompts

  1. "Edge computing diagram with local processing near sensors"
  2. "IoT gateway device processing data streams locally"
  3. "Fog computing layers between devices and cloud"

Quellen