2 min read
IoTEdge Computing für IoT
Edge Computing für IoT-Anwendungen. Lokale Datenverarbeitung, ML Inference, Fog Computing und Edge-Cloud Architektur.
Edge ComputingIoTFog ComputingML InferenceLocal ProcessingEdge Gateway

Edge Computing für IoT
Meta-Description: Edge Computing für IoT-Anwendungen. Lokale Datenverarbeitung, ML Inference, Fog Computing und Edge-Cloud Architektur.
Keywords: Edge Computing, IoT, Fog Computing, ML Inference, Local Processing, Edge Gateway, TensorFlow Lite
Einführung
Edge Computing bringt Rechenleistung dorthin, wo Daten entstehen. Für IoT bedeutet das: niedrige Latenz, Offline-Fähigkeit, Datenschutz und reduzierte Bandbreite durch lokale Verarbeitung direkt am Gerät oder Gateway.
Edge Computing Architecture
┌─────────────────────────────────────────────────────────────┐
│ EDGE COMPUTING ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Cloud Layer: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Cloud Platform (AWS IoT / Azure IoT / GCP IoT) │ │
│ │ - Long-term Storage │ │
│ │ - Complex Analytics │ │
│ │ - ML Model Training │ │
│ │ - Global Dashboards │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↑↓ Aggregated Data │
│ Edge Layer (Fog): │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Edge Gateway / Server │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Local DB │ │ ML Infer- │ │ Rules │ │ │
│ │ │ (InfluxDB)│ │ ence │ │ Engine │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ │ - Data Aggregation │ │
│ │ - Real-time Processing │ │
│ │ - Local Decisions │ │
│ └─────────────────────────────────────────────────────┘ │
│ ↑↓ Filtered Data │
│ Device Layer: │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌───────┐ │ │
│ │ │ Sensor │ │ Camera │ │ Actuator│ │ PLC │ │ │
│ │ │ │ │ (CV) │ │ │ │ │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └───────┘ │ │
│ │ - Data Collection │ │
│ │ - Simple Processing │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
│ Latency: Cloud ~100ms | Edge ~10ms | Device ~1ms │
│ │
└─────────────────────────────────────────────────────────────┘Edge Gateway mit Node.js
// edge-gateway/index.ts
import mqtt from 'mqtt';
import { InfluxDBClient } from './lib/influxdb';
import { RulesEngine } from './lib/rules-engine';
import { MLInference } from './lib/ml-inference';
interface EdgeConfig {
mqttBroker: string;
cloudEndpoint: string;
localDb: {
url: string;
token: string;
org: string;
bucket: string;
};
syncInterval: number; // Millisekunden
offlineBufferSize: number;
}
class EdgeGateway {
private localMqtt: mqtt.MqttClient;
private cloudMqtt: mqtt.MqttClient | null = null;
private influx: InfluxDBClient;
private rules: RulesEngine;
private ml: MLInference;
private offlineBuffer: any[] = [];
private isCloudConnected = false;
constructor(private config: EdgeConfig) {
this.influx = new InfluxDBClient(config.localDb);
this.rules = new RulesEngine();
this.ml = new MLInference();
}
async start(): Promise<void> {
// Local MQTT Broker verbinden
this.localMqtt = mqtt.connect(this.config.mqttBroker);
this.localMqtt.on('connect', () => {
console.log('Connected to local MQTT broker');
this.localMqtt.subscribe('sensors/#');
this.localMqtt.subscribe('devices/#');
});
this.localMqtt.on('message', (topic, payload) => {
this.handleLocalMessage(topic, payload.toString());
});
// Cloud Connection (mit Retry)
this.connectToCloud();
// Periodische Cloud-Synchronisation
setInterval(() => this.syncToCloud(), this.config.syncInterval);
// ML Models laden
await this.ml.loadModels();
console.log('Edge Gateway started');
}
private async handleLocalMessage(topic: string, payload: string): Promise<void> {
try {
const data = JSON.parse(payload);
const timestamp = new Date();
// 1. Lokale Speicherung
await this.storeLocally(topic, data, timestamp);
// 2. Echtzeit-Verarbeitung
const processed = await this.processData(topic, data);
// 3. Rules Engine
const actions = this.rules.evaluate(topic, processed);
await this.executeActions(actions);
// 4. ML Inference (wenn relevant)
if (this.shouldRunInference(topic, data)) {
const prediction = await this.ml.predict(data);
await this.handlePrediction(topic, prediction);
}
// 5. Cloud-Queue (aggregierte Daten)
this.queueForCloud(topic, processed, timestamp);
} catch (error) {
console.error('Error processing message:', error);
}
}
private async storeLocally(
topic: string,
data: any,
timestamp: Date
): Promise<void> {
const measurement = topic.split('/')[0];
const deviceId = topic.split('/')[1];
await this.influx.writePoint({
measurement,
tags: { device_id: deviceId },
fields: data,
timestamp
});
}
private async processData(topic: string, data: any): Promise<any> {
// Daten normalisieren, filtern, aggregieren
const processed = { ...data };
// Outlier Detection
if (data.temperature !== undefined) {
if (data.temperature < -50 || data.temperature > 100) {
processed.temperature_valid = false;
processed.temperature_original = data.temperature;
delete processed.temperature;
}
}
// Unit Conversion (wenn nötig)
// ...
return processed;
}
private queueForCloud(topic: string, data: any, timestamp: Date): void {
// Nur relevante Daten für Cloud
const cloudData = {
topic,
data: this.aggregateForCloud(data),
timestamp: timestamp.toISOString(),
gatewayId: process.env.GATEWAY_ID
};
if (this.isCloudConnected) {
this.sendToCloud(cloudData);
} else {
// Offline Buffer
this.offlineBuffer.push(cloudData);
// Buffer Limit
if (this.offlineBuffer.length > this.config.offlineBufferSize) {
this.offlineBuffer.shift(); // Älteste Daten verwerfen
}
}
}
private aggregateForCloud(data: any): any {
// Nur wichtige Felder für Cloud
const { temperature, humidity, battery, state } = data;
return { temperature, humidity, battery, state };
}
private async syncToCloud(): Promise<void> {
if (!this.isCloudConnected || this.offlineBuffer.length === 0) {
return;
}
console.log(`Syncing ${this.offlineBuffer.length} messages to cloud`);
// Batch Upload
const batch = this.offlineBuffer.splice(0, 100);
try {
await this.sendToCloud({
type: 'batch',
messages: batch
});
} catch (error) {
// Bei Fehler: zurück in Buffer
this.offlineBuffer.unshift(...batch);
console.error('Cloud sync failed:', error);
}
}
private sendToCloud(data: any): void {
this.cloudMqtt?.publish(
`gateways/${process.env.GATEWAY_ID}/data`,
JSON.stringify(data)
);
}
private async executeActions(actions: any[]): Promise<void> {
for (const action of actions) {
console.log('Executing action:', action);
switch (action.type) {
case 'publish':
this.localMqtt.publish(action.topic, JSON.stringify(action.payload));
break;
case 'alert':
await this.sendAlert(action.message, action.severity);
break;
case 'device_command':
this.localMqtt.publish(
`devices/${action.deviceId}/command`,
JSON.stringify(action.command)
);
break;
}
}
}
private connectToCloud(): void {
this.cloudMqtt = mqtt.connect(this.config.cloudEndpoint, {
clientId: `gateway-${process.env.GATEWAY_ID}`,
username: process.env.CLOUD_USER,
password: process.env.CLOUD_PASS,
reconnectPeriod: 10000
});
this.cloudMqtt.on('connect', () => {
console.log('Connected to cloud');
this.isCloudConnected = true;
// Subscribe to cloud commands
this.cloudMqtt?.subscribe(`gateways/${process.env.GATEWAY_ID}/command`);
});
this.cloudMqtt.on('close', () => {
console.log('Cloud connection closed');
this.isCloudConnected = false;
});
this.cloudMqtt.on('message', (topic, payload) => {
this.handleCloudCommand(JSON.parse(payload.toString()));
});
}
private handleCloudCommand(command: any): void {
console.log('Cloud command received:', command);
switch (command.type) {
case 'update_config':
this.updateConfig(command.config);
break;
case 'update_rules':
this.rules.updateRules(command.rules);
break;
case 'update_model':
this.ml.updateModel(command.modelId, command.modelUrl);
break;
case 'device_command':
this.localMqtt.publish(
`devices/${command.deviceId}/command`,
JSON.stringify(command.payload)
);
break;
}
}
}ML Inference am Edge
// lib/ml-inference.ts
import * as tf from '@tensorflow/tfjs-node';
interface Model {
id: string;
model: tf.LayersModel;
inputShape: number[];
labels: string[];
}
class MLInference {
private models: Map<string, Model> = new Map();
async loadModels(): Promise<void> {
// Anomaly Detection Model
const anomalyModel = await tf.loadLayersModel(
'file://./models/anomaly/model.json'
);
this.models.set('anomaly', {
id: 'anomaly',
model: anomalyModel,
inputShape: [1, 10], // 10 Features
labels: ['normal', 'anomaly']
});
// Predictive Maintenance Model
const maintenanceModel = await tf.loadLayersModel(
'file://./models/maintenance/model.json'
);
this.models.set('maintenance', {
id: 'maintenance',
model: maintenanceModel,
inputShape: [1, 24], // 24 Stunden Historie
labels: ['ok', 'warning', 'critical']
});
console.log(`Loaded ${this.models.size} ML models`);
}
async predict(
modelId: string,
input: number[]
): Promise<{ label: string; confidence: number }> {
const modelInfo = this.models.get(modelId);
if (!modelInfo) {
throw new Error(`Model ${modelId} not found`);
}
// Input Tensor erstellen
const inputTensor = tf.tensor2d([input], modelInfo.inputShape as [number, number]);
// Inference
const prediction = modelInfo.model.predict(inputTensor) as tf.Tensor;
const probabilities = await prediction.data();
// Cleanup
inputTensor.dispose();
prediction.dispose();
// Beste Prediction finden
const maxIndex = probabilities.indexOf(Math.max(...Array.from(probabilities)));
return {
label: modelInfo.labels[maxIndex],
confidence: probabilities[maxIndex]
};
}
async detectAnomaly(
sensorData: {
temperature: number;
humidity: number;
pressure: number;
vibration: number;
}
): Promise<{ isAnomaly: boolean; confidence: number }> {
// Feature Engineering
const features = [
sensorData.temperature,
sensorData.humidity,
sensorData.pressure,
sensorData.vibration,
// Normalisierte Werte
(sensorData.temperature - 20) / 30,
(sensorData.humidity - 50) / 50,
// Interaktionen
sensorData.temperature * sensorData.humidity / 1000,
// Placeholder für historische Features
0, 0, 0
];
const result = await this.predict('anomaly', features);
return {
isAnomaly: result.label === 'anomaly',
confidence: result.confidence
};
}
async updateModel(modelId: string, modelUrl: string): Promise<void> {
console.log(`Updating model ${modelId} from ${modelUrl}`);
try {
const newModel = await tf.loadLayersModel(modelUrl);
const existing = this.models.get(modelId);
if (existing) {
existing.model.dispose();
existing.model = newModel;
}
console.log(`Model ${modelId} updated successfully`);
} catch (error) {
console.error(`Failed to update model ${modelId}:`, error);
}
}
}
export { MLInference };Rules Engine
// lib/rules-engine.ts
interface Rule {
id: string;
name: string;
condition: RuleCondition;
actions: RuleAction[];
enabled: boolean;
priority: number;
}
interface RuleCondition {
type: 'simple' | 'compound';
field?: string;
operator?: '>' | '<' | '==' | '!=' | '>=' | '<=';
value?: any;
logic?: 'AND' | 'OR';
conditions?: RuleCondition[];
}
interface RuleAction {
type: 'publish' | 'alert' | 'device_command' | 'log';
[key: string]: any;
}
class RulesEngine {
private rules: Rule[] = [];
constructor() {
this.loadDefaultRules();
}
private loadDefaultRules(): void {
this.rules = [
// High Temperature Alert
{
id: 'high-temp-alert',
name: 'High Temperature Alert',
condition: {
type: 'simple',
field: 'temperature',
operator: '>',
value: 35
},
actions: [
{
type: 'alert',
message: 'High temperature detected',
severity: 'warning'
},
{
type: 'publish',
topic: 'alerts/temperature',
payload: { alert: 'high_temperature' }
}
],
enabled: true,
priority: 1
},
// Low Battery Warning
{
id: 'low-battery',
name: 'Low Battery Warning',
condition: {
type: 'simple',
field: 'battery',
operator: '<',
value: 20
},
actions: [
{
type: 'alert',
message: 'Device battery low',
severity: 'info'
}
],
enabled: true,
priority: 2
},
// Motion → Light On
{
id: 'motion-light',
name: 'Motion Activated Light',
condition: {
type: 'compound',
logic: 'AND',
conditions: [
{ type: 'simple', field: 'occupancy', operator: '==', value: true },
{ type: 'simple', field: 'ambient_light', operator: '<', value: 100 }
]
},
actions: [
{
type: 'device_command',
deviceId: 'hallway_light',
command: { state: 'ON', brightness: 80 }
}
],
enabled: true,
priority: 1
}
];
}
evaluate(topic: string, data: any): RuleAction[] {
const actions: RuleAction[] = [];
// Regeln nach Priorität sortieren
const sortedRules = [...this.rules]
.filter(r => r.enabled)
.sort((a, b) => a.priority - b.priority);
for (const rule of sortedRules) {
if (this.evaluateCondition(rule.condition, data)) {
console.log(`Rule triggered: ${rule.name}`);
actions.push(...rule.actions);
}
}
return actions;
}
private evaluateCondition(condition: RuleCondition, data: any): boolean {
if (condition.type === 'simple') {
return this.evaluateSimple(condition, data);
}
if (condition.type === 'compound' && condition.conditions) {
const results = condition.conditions.map(c =>
this.evaluateCondition(c, data)
);
return condition.logic === 'AND'
? results.every(r => r)
: results.some(r => r);
}
return false;
}
private evaluateSimple(condition: RuleCondition, data: any): boolean {
const value = data[condition.field!];
if (value === undefined) return false;
switch (condition.operator) {
case '>': return value > condition.value;
case '<': return value < condition.value;
case '>=': return value >= condition.value;
case '<=': return value <= condition.value;
case '==': return value === condition.value;
case '!=': return value !== condition.value;
default: return false;
}
}
updateRules(newRules: Rule[]): void {
this.rules = newRules;
console.log(`Updated ${newRules.length} rules`);
}
addRule(rule: Rule): void {
this.rules.push(rule);
}
removeRule(ruleId: string): void {
this.rules = this.rules.filter(r => r.id !== ruleId);
}
getRules(): Rule[] {
return this.rules;
}
}
export { RulesEngine };Edge-Cloud Synchronisation
// lib/edge-cloud-sync.ts
interface SyncConfig {
syncInterval: number;
batchSize: number;
retentionLocal: number; // Tage
compressionEnabled: boolean;
}
class EdgeCloudSync {
private config: SyncConfig;
private pendingSync: any[] = [];
private lastSyncTime: Date | null = null;
constructor(config: SyncConfig) {
this.config = config;
}
// Daten für Cloud-Sync vorbereiten
prepareForSync(data: any[]): any {
// Aggregation
const aggregated = this.aggregate(data);
// Kompression (optional)
if (this.config.compressionEnabled) {
return this.compress(aggregated);
}
return aggregated;
}
private aggregate(data: any[]): any {
// Nach Device gruppieren
const byDevice = new Map<string, any[]>();
data.forEach(item => {
const deviceId = item.device_id;
if (!byDevice.has(deviceId)) {
byDevice.set(deviceId, []);
}
byDevice.get(deviceId)!.push(item);
});
// Aggregierte Statistiken pro Device
const result: any[] = [];
byDevice.forEach((items, deviceId) => {
const temps = items
.map(i => i.temperature)
.filter(t => t !== undefined);
const humidities = items
.map(i => i.humidity)
.filter(h => h !== undefined);
result.push({
device_id: deviceId,
period_start: items[0].timestamp,
period_end: items[items.length - 1].timestamp,
sample_count: items.length,
temperature: temps.length > 0 ? {
min: Math.min(...temps),
max: Math.max(...temps),
avg: temps.reduce((a, b) => a + b, 0) / temps.length
} : null,
humidity: humidities.length > 0 ? {
min: Math.min(...humidities),
max: Math.max(...humidities),
avg: humidities.reduce((a, b) => a + b, 0) / humidities.length
} : null
});
});
return result;
}
private compress(data: any): Buffer {
const zlib = require('zlib');
const json = JSON.stringify(data);
return zlib.gzipSync(json);
}
// Conflict Resolution bei Cloud-Sync
resolveConflict(localData: any, cloudData: any): any {
// Last-Write-Wins Strategie
const localTime = new Date(localData.updated_at);
const cloudTime = new Date(cloudData.updated_at);
if (localTime > cloudTime) {
return { ...cloudData, ...localData, conflict_resolved: true };
}
return { ...localData, ...cloudData, conflict_resolved: true };
}
}Edge Deployment
# docker-compose.edge.yml
version: '3.8'
services:
edge-gateway:
build: ./edge-gateway
restart: always
environment:
- GATEWAY_ID=${GATEWAY_ID}
- MQTT_BROKER=mqtt://mosquitto:1883
- CLOUD_ENDPOINT=${CLOUD_MQTT_URL}
- INFLUX_URL=http://influxdb:8086
volumes:
- ./models:/app/models
- ./config:/app/config
depends_on:
- mosquitto
- influxdb
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- mosquitto-data:/mosquitto/data
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
volumes:
- influxdb-data:/var/lib/influxdb2
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=edgepassword
- DOCKER_INFLUXDB_INIT_ORG=edge
- DOCKER_INFLUXDB_INIT_BUCKET=sensors
volumes:
mosquitto-data:
influxdb-data:Edge vs Cloud Comparison
| Aspect | Edge Computing | Cloud Computing |
|---|---|---|
| **Latency** | 1-10 ms | 50-200 ms |
| **Bandwidth** | Minimal (local) | High (all data) |
| **Offline** | Fully operational | Limited |
| **Privacy** | Data stays local | Data in cloud |
| **Compute** | Limited | Unlimited |
| **Storage** | Limited | Unlimited |
| **Cost** | Hardware upfront | Pay-per-use |
| **ML Training** | Not practical | Ideal |
| **ML Inference** | Real-time | Batch |
Fazit
Edge Computing für IoT bietet:
- Low Latency: Lokale Entscheidungen in Millisekunden
- Offline-Fähigkeit: Funktioniert ohne Cloud
- Datenschutz: Sensible Daten bleiben lokal
- Bandbreite: Nur aggregierte Daten zur Cloud
Die optimale Architektur kombiniert Edge und Cloud.
Bildprompts
- "Edge computing diagram with local processing near sensors"
- "IoT gateway device processing data streams locally"
- "Fog computing layers between devices and cloud"