Menu
Nazad na Blog
2 min read
IoT

MQTT Protocol für IoT-Kommunikation

MQTT Protokoll für IoT. Publish/Subscribe Pattern, QoS Levels, Broker Setup und Node.js Integration mit MQTT.js.

MQTTIoTPublish SubscribeMQTT.jsMosquittoMessage Broker
MQTT Protocol für IoT-Kommunikation

MQTT Protocol für IoT-Kommunikation

Meta-Description: MQTT Protokoll für IoT. Publish/Subscribe Pattern, QoS Levels, Broker Setup und Node.js Integration mit MQTT.js.

Keywords: MQTT, IoT, Publish Subscribe, MQTT.js, Mosquitto, Message Broker, QoS, Real-time


Einführung

MQTT (Message Queuing Telemetry Transport) ist das Standard-Protokoll für IoT-Kommunikation. Leichtgewichtig, effizient und für instabile Netzwerke optimiert – perfekt für Sensor-Netzwerke und Smart Home Anwendungen.


MQTT Architecture

┌─────────────────────────────────────────────────────────────┐
│                    MQTT ARCHITECTURE                         │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Publishers              Broker              Subscribers    │
│  ┌─────────┐         ┌──────────┐         ┌─────────┐     │
│  │ Sensor  │────────→│          │────────→│Dashboard│     │
│  └─────────┘         │          │         └─────────┘     │
│                      │ Mosquitto│                          │
│  ┌─────────┐         │  / EMQX  │         ┌─────────┐     │
│  │ Device  │────────→│          │────────→│ Backend │     │
│  └─────────┘         │          │         └─────────┘     │
│                      │          │                          │
│  ┌─────────┐         │          │         ┌─────────┐     │
│  │ Gateway │←───────→│          │←───────→│ Mobile  │     │
│  └─────────┘         └──────────┘         └─────────┘     │
│                                                             │
│  Topic Structure:                                           │
│  home/living-room/temperature → 21.5                       │
│  home/living-room/humidity → 45                            │
│  home/bedroom/light/state → on                             │
│  home/bedroom/light/brightness → 80                        │
│                                                             │
│  QoS Levels:                                                │
│  0: At most once (Fire & Forget)                           │
│  1: At least once (Acknowledged)                           │
│  2: Exactly once (Assured)                                 │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Mosquitto Broker Setup

# Docker Installation
docker run -d \
  --name mosquitto \
  -p 1883:1883 \
  -p 9001:9001 \
  -v $(pwd)/mosquitto/config:/mosquitto/config \
  -v $(pwd)/mosquitto/data:/mosquitto/data \
  -v $(pwd)/mosquitto/log:/mosquitto/log \
  eclipse-mosquitto

# mosquitto/config/mosquitto.conf
listener 1883
listener 9001
protocol websockets

allow_anonymous false
password_file /mosquitto/config/password.txt

persistence true
persistence_location /mosquitto/data/

log_dest file /mosquitto/log/mosquitto.log
log_type all

# TLS Konfiguration
listener 8883
cafile /mosquitto/certs/ca.crt
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
require_certificate false
# docker-compose.yml
version: '3.8'
services:
  mosquitto:
    image: eclipse-mosquitto:2
    container_name: mosquitto
    ports:
      - "1883:1883"    # MQTT
      - "8883:8883"    # MQTT/TLS
      - "9001:9001"    # WebSocket
    volumes:
      - ./mosquitto/config:/mosquitto/config
      - ./mosquitto/data:/mosquitto/data
      - ./mosquitto/log:/mosquitto/log
    restart: unless-stopped

MQTT.js Client

// lib/mqtt-client.ts
import mqtt, { MqttClient, IClientOptions, QoS } from 'mqtt';

interface MQTTConfig {
  brokerUrl: string;
  options?: IClientOptions;
}

interface PublishOptions {
  qos?: QoS;
  retain?: boolean;
}

type MessageHandler = (topic: string, message: string, packet: any) => void;

class IoTMQTTClient {
  private client: MqttClient | null = null;
  private subscriptions: Map<string, Set<MessageHandler>> = new Map();
  private connected = false;
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 10;

  constructor(private config: MQTTConfig) {}

  async connect(): Promise<void> {
    return new Promise((resolve, reject) => {
      const options: IClientOptions = {
        clientId: `mqtt_${Math.random().toString(16).slice(2, 10)}`,
        clean: true,
        connectTimeout: 30000,
        reconnectPeriod: 5000,
        keepalive: 60,
        ...this.config.options
      };

      this.client = mqtt.connect(this.config.brokerUrl, options);

      this.client.on('connect', () => {
        console.log('Connected to MQTT broker');
        this.connected = true;
        this.reconnectAttempts = 0;

        // Re-subscribe nach Reconnect
        this.resubscribe();
        resolve();
      });

      this.client.on('message', (topic, payload, packet) => {
        const message = payload.toString();
        this.handleMessage(topic, message, packet);
      });

      this.client.on('error', (error) => {
        console.error('MQTT Error:', error);
        if (!this.connected) {
          reject(error);
        }
      });

      this.client.on('close', () => {
        console.log('MQTT connection closed');
        this.connected = false;
      });

      this.client.on('reconnect', () => {
        this.reconnectAttempts++;
        console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);

        if (this.reconnectAttempts >= this.maxReconnectAttempts) {
          this.client?.end();
          console.error('Max reconnection attempts reached');
        }
      });

      this.client.on('offline', () => {
        console.log('MQTT client offline');
      });
    });
  }

  private handleMessage(topic: string, message: string, packet: any) {
    // Exakte Matches
    const handlers = this.subscriptions.get(topic);
    if (handlers) {
      handlers.forEach(handler => handler(topic, message, packet));
    }

    // Wildcard Matches
    for (const [pattern, patternHandlers] of this.subscriptions) {
      if (this.matchTopic(pattern, topic)) {
        patternHandlers.forEach(handler => handler(topic, message, packet));
      }
    }
  }

  private matchTopic(pattern: string, topic: string): boolean {
    if (pattern === topic) return false; // Already handled above

    const patternParts = pattern.split('/');
    const topicParts = topic.split('/');

    for (let i = 0; i < patternParts.length; i++) {
      if (patternParts[i] === '#') {
        return true; // Multi-level wildcard matches rest
      }
      if (patternParts[i] === '+') {
        continue; // Single-level wildcard matches any single level
      }
      if (patternParts[i] !== topicParts[i]) {
        return false;
      }
    }

    return patternParts.length === topicParts.length;
  }

  private resubscribe() {
    for (const [topic] of this.subscriptions) {
      this.client?.subscribe(topic, { qos: 1 });
    }
  }

  // Publish
  async publish(
    topic: string,
    message: string | object,
    options: PublishOptions = {}
  ): Promise<void> {
    return new Promise((resolve, reject) => {
      if (!this.client || !this.connected) {
        reject(new Error('Not connected to broker'));
        return;
      }

      const payload = typeof message === 'object'
        ? JSON.stringify(message)
        : message;

      this.client.publish(
        topic,
        payload,
        {
          qos: options.qos || 1,
          retain: options.retain || false
        },
        (error) => {
          if (error) reject(error);
          else resolve();
        }
      );
    });
  }

  // Subscribe
  subscribe(
    topic: string,
    handler: MessageHandler,
    qos: QoS = 1
  ): () => void {
    if (!this.subscriptions.has(topic)) {
      this.subscriptions.set(topic, new Set());
      this.client?.subscribe(topic, { qos });
    }

    this.subscriptions.get(topic)!.add(handler);

    // Unsubscribe function
    return () => {
      const handlers = this.subscriptions.get(topic);
      if (handlers) {
        handlers.delete(handler);
        if (handlers.size === 0) {
          this.subscriptions.delete(topic);
          this.client?.unsubscribe(topic);
        }
      }
    };
  }

  // Disconnect
  disconnect(): Promise<void> {
    return new Promise((resolve) => {
      this.client?.end(false, {}, () => {
        this.connected = false;
        resolve();
      });
    });
  }

  isConnected(): boolean {
    return this.connected;
  }
}

export { IoTMQTTClient };

Sensor Data Publishing

// devices/temperature-sensor.ts
import { IoTMQTTClient } from '../lib/mqtt-client';

interface SensorReading {
  device_id: string;
  timestamp: string;
  temperature: number;
  humidity: number;
  battery: number;
}

class TemperatureSensor {
  private client: IoTMQTTClient;
  private deviceId: string;
  private baseTopic: string;
  private intervalId?: NodeJS.Timeout;

  constructor(
    brokerUrl: string,
    deviceId: string,
    options?: { username?: string; password?: string }
  ) {
    this.deviceId = deviceId;
    this.baseTopic = `sensors/${deviceId}`;

    this.client = new IoTMQTTClient({
      brokerUrl,
      options: {
        username: options?.username,
        password: options?.password,
        will: {
          topic: `${this.baseTopic}/status`,
          payload: Buffer.from('offline'),
          qos: 1,
          retain: true
        }
      }
    });
  }

  async start(intervalMs: number = 60000): Promise<void> {
    await this.client.connect();

    // Online Status
    await this.client.publish(
      `${this.baseTopic}/status`,
      'online',
      { retain: true }
    );

    // Device Info
    await this.client.publish(
      `${this.baseTopic}/info`,
      {
        device_id: this.deviceId,
        type: 'temperature_humidity',
        firmware: '1.0.0',
        started_at: new Date().toISOString()
      },
      { retain: true }
    );

    // Periodisch Daten senden
    this.intervalId = setInterval(() => this.publishReading(), intervalMs);

    // Initial Reading
    await this.publishReading();

    console.log(`Sensor ${this.deviceId} started`);
  }

  private async publishReading(): Promise<void> {
    const reading: SensorReading = {
      device_id: this.deviceId,
      timestamp: new Date().toISOString(),
      temperature: this.readTemperature(),
      humidity: this.readHumidity(),
      battery: this.readBattery()
    };

    // JSON Format
    await this.client.publish(
      `${this.baseTopic}/data`,
      reading,
      { qos: 1 }
    );

    // Home Assistant Discovery Format
    await this.client.publish(
      `homeassistant/sensor/${this.deviceId}/temperature/state`,
      reading.temperature.toString()
    );

    await this.client.publish(
      `homeassistant/sensor/${this.deviceId}/humidity/state`,
      reading.humidity.toString()
    );

    console.log(`Published: ${JSON.stringify(reading)}`);
  }

  // Simulierte Sensor-Werte (in Produktion: echte Hardware-Reads)
  private readTemperature(): number {
    return Math.round((20 + Math.random() * 5) * 10) / 10;
  }

  private readHumidity(): number {
    return Math.round((40 + Math.random() * 20) * 10) / 10;
  }

  private readBattery(): number {
    return Math.round((80 + Math.random() * 20) * 10) / 10;
  }

  async stop(): Promise<void> {
    if (this.intervalId) {
      clearInterval(this.intervalId);
    }

    await this.client.publish(
      `${this.baseTopic}/status`,
      'offline',
      { retain: true }
    );

    await this.client.disconnect();
    console.log(`Sensor ${this.deviceId} stopped`);
  }
}

// Verwendung
const sensor = new TemperatureSensor(
  'mqtt://localhost:1883',
  'living-room-01',
  {
    username: process.env.MQTT_USER,
    password: process.env.MQTT_PASS
  }
);

await sensor.start(30000); // Alle 30 Sekunden

// Graceful Shutdown
process.on('SIGTERM', async () => {
  await sensor.stop();
  process.exit(0);
});

Data Subscriber Service

// services/mqtt-data-collector.ts
import { IoTMQTTClient } from '../lib/mqtt-client';
import { InfluxDBClient } from '../lib/influxdb';

interface SensorData {
  device_id: string;
  timestamp: string;
  [key: string]: any;
}

class MQTTDataCollector {
  private mqtt: IoTMQTTClient;
  private influx: InfluxDBClient;
  private subscriptions: Array<() => void> = [];

  constructor(
    mqttUrl: string,
    influxConfig: { url: string; token: string; org: string; bucket: string }
  ) {
    this.mqtt = new IoTMQTTClient({
      brokerUrl: mqttUrl,
      options: {
        clientId: 'data-collector',
        username: process.env.MQTT_USER,
        password: process.env.MQTT_PASS
      }
    });

    this.influx = new InfluxDBClient(influxConfig);
  }

  async start(): Promise<void> {
    await this.mqtt.connect();

    // Subscribe to all sensor data
    const unsubSensors = this.mqtt.subscribe(
      'sensors/+/data',
      async (topic, message) => {
        try {
          const data = JSON.parse(message) as SensorData;
          await this.processSensorData(topic, data);
        } catch (error) {
          console.error('Failed to process sensor data:', error);
        }
      }
    );
    this.subscriptions.push(unsubSensors);

    // Subscribe to device status
    const unsubStatus = this.mqtt.subscribe(
      'sensors/+/status',
      (topic, message) => {
        const deviceId = topic.split('/')[1];
        console.log(`Device ${deviceId} is ${message}`);
        this.updateDeviceStatus(deviceId, message);
      }
    );
    this.subscriptions.push(unsubStatus);

    // Subscribe to commands
    const unsubCommands = this.mqtt.subscribe(
      'commands/#',
      this.handleCommand.bind(this)
    );
    this.subscriptions.push(unsubCommands);

    console.log('Data collector started');
  }

  private async processSensorData(topic: string, data: SensorData): Promise<void> {
    const deviceId = topic.split('/')[1];

    // In InfluxDB speichern
    await this.influx.writePoint({
      measurement: 'sensor_readings',
      tags: {
        device_id: deviceId,
        location: this.getDeviceLocation(deviceId)
      },
      fields: {
        temperature: data.temperature,
        humidity: data.humidity,
        battery: data.battery
      },
      timestamp: new Date(data.timestamp)
    });

    // Alerts prüfen
    await this.checkAlerts(deviceId, data);
  }

  private async checkAlerts(deviceId: string, data: SensorData): Promise<void> {
    // Temperatur-Alert
    if (data.temperature > 30) {
      await this.mqtt.publish('alerts/temperature/high', {
        device_id: deviceId,
        temperature: data.temperature,
        timestamp: data.timestamp,
        message: `High temperature alert: ${data.temperature}°C`
      });
    }

    // Batterie-Alert
    if (data.battery < 20) {
      await this.mqtt.publish('alerts/battery/low', {
        device_id: deviceId,
        battery: data.battery,
        timestamp: data.timestamp,
        message: `Low battery alert: ${data.battery}%`
      });
    }
  }

  private handleCommand(topic: string, message: string): void {
    const parts = topic.split('/');
    const target = parts[1];
    const action = parts[2];

    console.log(`Command: ${action} for ${target}`);

    // Command weiterleiten an Gerät
    this.mqtt.publish(`devices/${target}/command`, {
      action,
      payload: JSON.parse(message),
      timestamp: new Date().toISOString()
    });
  }

  private getDeviceLocation(deviceId: string): string {
    const locationMap: Record<string, string> = {
      'living-room-01': 'living_room',
      'bedroom-01': 'bedroom',
      'kitchen-01': 'kitchen'
    };
    return locationMap[deviceId] || 'unknown';
  }

  private updateDeviceStatus(deviceId: string, status: string): void {
    // Status in DB speichern
    console.log(`Device ${deviceId} status: ${status}`);
  }

  async stop(): Promise<void> {
    this.subscriptions.forEach(unsub => unsub());
    await this.mqtt.disconnect();
  }
}

WebSocket Bridge (Browser)

// lib/mqtt-websocket.ts
import mqtt, { MqttClient } from 'mqtt';

class MQTTWebSocketClient {
  private client: MqttClient | null = null;

  async connect(wsUrl: string, options?: {
    username?: string;
    password?: string;
  }): Promise<void> {
    return new Promise((resolve, reject) => {
      this.client = mqtt.connect(wsUrl, {
        protocol: 'wss',
        clientId: `web_${Math.random().toString(16).slice(2, 8)}`,
        ...options
      });

      this.client.on('connect', () => {
        console.log('WebSocket MQTT connected');
        resolve();
      });

      this.client.on('error', reject);
    });
  }

  subscribe(topic: string, callback: (topic: string, message: string) => void): () => void {
    this.client?.subscribe(topic);

    const handler = (t: string, payload: Buffer) => {
      if (this.topicMatches(topic, t)) {
        callback(t, payload.toString());
      }
    };

    this.client?.on('message', handler);

    return () => {
      this.client?.unsubscribe(topic);
      this.client?.removeListener('message', handler);
    };
  }

  publish(topic: string, message: any): void {
    const payload = typeof message === 'object'
      ? JSON.stringify(message)
      : String(message);
    this.client?.publish(topic, payload);
  }

  private topicMatches(pattern: string, topic: string): boolean {
    const patternParts = pattern.split('/');
    const topicParts = topic.split('/');

    for (let i = 0; i < patternParts.length; i++) {
      if (patternParts[i] === '#') return true;
      if (patternParts[i] === '+') continue;
      if (patternParts[i] !== topicParts[i]) return false;
    }

    return patternParts.length === topicParts.length;
  }
}

// React Hook
import { useState, useEffect, useCallback } from 'react';

export function useMQTT(wsUrl: string) {
  const [client] = useState(() => new MQTTWebSocketClient());
  const [connected, setConnected] = useState(false);

  useEffect(() => {
    client.connect(wsUrl).then(() => setConnected(true));
  }, [wsUrl]);

  const subscribe = useCallback((
    topic: string,
    callback: (topic: string, message: string) => void
  ) => {
    return client.subscribe(topic, callback);
  }, [client]);

  const publish = useCallback((topic: string, message: any) => {
    client.publish(topic, message);
  }, [client]);

  return { connected, subscribe, publish };
}

// Verwendung in React Component
function SensorDashboard() {
  const { connected, subscribe } = useMQTT('wss://mqtt.example.com:9001');
  const [temperature, setTemperature] = useState<number | null>(null);

  useEffect(() => {
    if (!connected) return;

    const unsub = subscribe('sensors/+/data', (topic, message) => {
      const data = JSON.parse(message);
      setTemperature(data.temperature);
    });

    return unsub;
  }, [connected, subscribe]);

  return (
    <div>
      {connected ? (
        <p>Temperature: {temperature ?? 'Loading...'}°C</p>
      ) : (
        <p>Connecting...</p>
      )}
    </div>
  );
}

Secure MQTT with TLS

// lib/secure-mqtt.ts
import mqtt from 'mqtt';
import fs from 'fs';
import path from 'path';

const secureClient = mqtt.connect('mqtts://mqtt.example.com:8883', {
  clientId: `secure_${Date.now()}`,
  username: process.env.MQTT_USER,
  password: process.env.MQTT_PASS,

  // TLS Optionen
  ca: fs.readFileSync(path.join(__dirname, 'certs/ca.crt')),
  cert: fs.readFileSync(path.join(__dirname, 'certs/client.crt')),
  key: fs.readFileSync(path.join(__dirname, 'certs/client.key')),

  // Bei selbstsignierten Zertifikaten
  rejectUnauthorized: process.env.NODE_ENV === 'production',

  // Protokoll-Version
  protocolVersion: 5,  // MQTT 5.0

  // Clean Start (MQTT 5.0)
  clean: true,

  // Session Expiry (MQTT 5.0)
  properties: {
    sessionExpiryInterval: 3600
  }
});

secureClient.on('connect', (connack) => {
  console.log('Secure connection established');
  console.log('Session present:', connack.sessionPresent);
});

QoS Level Comparison

QoSNameGarantieUse Case
0At most onceKeineTelemetrie, unwichtige Daten
1At least onceZustellung garantiertSensor-Daten, Commands
2Exactly onceKeine DuplikateZahlungen, kritische Daten

Fazit

MQTT für IoT bietet:

  1. Leichtgewichtig: Minimaler Overhead
  2. Publish/Subscribe: Entkoppelte Kommunikation
  3. QoS Levels: Flexible Zustellgarantien
  4. Wildcards: Flexible Topic-Struktur

Standard für IoT-Kommunikation.


Bildprompts

  1. "IoT devices connected through message broker, publish subscribe pattern"
  2. "Sensor data flowing to central hub, MQTT network visualization"
  3. "Quality of Service levels diagram, message delivery guarantee"

Quellen