2 min read
IoTMQTT Protocol für IoT-Kommunikation
MQTT Protokoll für IoT. Publish/Subscribe Pattern, QoS Levels, Broker Setup und Node.js Integration mit MQTT.js.
MQTTIoTPublish SubscribeMQTT.jsMosquittoMessage Broker

MQTT Protocol für IoT-Kommunikation
Meta-Description: MQTT Protokoll für IoT. Publish/Subscribe Pattern, QoS Levels, Broker Setup und Node.js Integration mit MQTT.js.
Keywords: MQTT, IoT, Publish Subscribe, MQTT.js, Mosquitto, Message Broker, QoS, Real-time
Einführung
MQTT (Message Queuing Telemetry Transport) ist das Standard-Protokoll für IoT-Kommunikation. Leichtgewichtig, effizient und für instabile Netzwerke optimiert – perfekt für Sensor-Netzwerke und Smart Home Anwendungen.
MQTT Architecture
┌─────────────────────────────────────────────────────────────┐
│ MQTT ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Publishers Broker Subscribers │
│ ┌─────────┐ ┌──────────┐ ┌─────────┐ │
│ │ Sensor │────────→│ │────────→│Dashboard│ │
│ └─────────┘ │ │ └─────────┘ │
│ │ Mosquitto│ │
│ ┌─────────┐ │ / EMQX │ ┌─────────┐ │
│ │ Device │────────→│ │────────→│ Backend │ │
│ └─────────┘ │ │ └─────────┘ │
│ │ │ │
│ ┌─────────┐ │ │ ┌─────────┐ │
│ │ Gateway │←───────→│ │←───────→│ Mobile │ │
│ └─────────┘ └──────────┘ └─────────┘ │
│ │
│ Topic Structure: │
│ home/living-room/temperature → 21.5 │
│ home/living-room/humidity → 45 │
│ home/bedroom/light/state → on │
│ home/bedroom/light/brightness → 80 │
│ │
│ QoS Levels: │
│ 0: At most once (Fire & Forget) │
│ 1: At least once (Acknowledged) │
│ 2: Exactly once (Assured) │
│ │
└─────────────────────────────────────────────────────────────┘Mosquitto Broker Setup
# Docker Installation
docker run -d \
--name mosquitto \
-p 1883:1883 \
-p 9001:9001 \
-v $(pwd)/mosquitto/config:/mosquitto/config \
-v $(pwd)/mosquitto/data:/mosquitto/data \
-v $(pwd)/mosquitto/log:/mosquitto/log \
eclipse-mosquitto
# mosquitto/config/mosquitto.conf
listener 1883
listener 9001
protocol websockets
allow_anonymous false
password_file /mosquitto/config/password.txt
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
log_type all
# TLS Konfiguration
listener 8883
cafile /mosquitto/certs/ca.crt
certfile /mosquitto/certs/server.crt
keyfile /mosquitto/certs/server.key
require_certificate false# docker-compose.yml
version: '3.8'
services:
mosquitto:
image: eclipse-mosquitto:2
container_name: mosquitto
ports:
- "1883:1883" # MQTT
- "8883:8883" # MQTT/TLS
- "9001:9001" # WebSocket
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
restart: unless-stoppedMQTT.js Client
// lib/mqtt-client.ts
import mqtt, { MqttClient, IClientOptions, QoS } from 'mqtt';
interface MQTTConfig {
brokerUrl: string;
options?: IClientOptions;
}
interface PublishOptions {
qos?: QoS;
retain?: boolean;
}
type MessageHandler = (topic: string, message: string, packet: any) => void;
class IoTMQTTClient {
private client: MqttClient | null = null;
private subscriptions: Map<string, Set<MessageHandler>> = new Map();
private connected = false;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;
constructor(private config: MQTTConfig) {}
async connect(): Promise<void> {
return new Promise((resolve, reject) => {
const options: IClientOptions = {
clientId: `mqtt_${Math.random().toString(16).slice(2, 10)}`,
clean: true,
connectTimeout: 30000,
reconnectPeriod: 5000,
keepalive: 60,
...this.config.options
};
this.client = mqtt.connect(this.config.brokerUrl, options);
this.client.on('connect', () => {
console.log('Connected to MQTT broker');
this.connected = true;
this.reconnectAttempts = 0;
// Re-subscribe nach Reconnect
this.resubscribe();
resolve();
});
this.client.on('message', (topic, payload, packet) => {
const message = payload.toString();
this.handleMessage(topic, message, packet);
});
this.client.on('error', (error) => {
console.error('MQTT Error:', error);
if (!this.connected) {
reject(error);
}
});
this.client.on('close', () => {
console.log('MQTT connection closed');
this.connected = false;
});
this.client.on('reconnect', () => {
this.reconnectAttempts++;
console.log(`Reconnecting... Attempt ${this.reconnectAttempts}`);
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.client?.end();
console.error('Max reconnection attempts reached');
}
});
this.client.on('offline', () => {
console.log('MQTT client offline');
});
});
}
private handleMessage(topic: string, message: string, packet: any) {
// Exakte Matches
const handlers = this.subscriptions.get(topic);
if (handlers) {
handlers.forEach(handler => handler(topic, message, packet));
}
// Wildcard Matches
for (const [pattern, patternHandlers] of this.subscriptions) {
if (this.matchTopic(pattern, topic)) {
patternHandlers.forEach(handler => handler(topic, message, packet));
}
}
}
private matchTopic(pattern: string, topic: string): boolean {
if (pattern === topic) return false; // Already handled above
const patternParts = pattern.split('/');
const topicParts = topic.split('/');
for (let i = 0; i < patternParts.length; i++) {
if (patternParts[i] === '#') {
return true; // Multi-level wildcard matches rest
}
if (patternParts[i] === '+') {
continue; // Single-level wildcard matches any single level
}
if (patternParts[i] !== topicParts[i]) {
return false;
}
}
return patternParts.length === topicParts.length;
}
private resubscribe() {
for (const [topic] of this.subscriptions) {
this.client?.subscribe(topic, { qos: 1 });
}
}
// Publish
async publish(
topic: string,
message: string | object,
options: PublishOptions = {}
): Promise<void> {
return new Promise((resolve, reject) => {
if (!this.client || !this.connected) {
reject(new Error('Not connected to broker'));
return;
}
const payload = typeof message === 'object'
? JSON.stringify(message)
: message;
this.client.publish(
topic,
payload,
{
qos: options.qos || 1,
retain: options.retain || false
},
(error) => {
if (error) reject(error);
else resolve();
}
);
});
}
// Subscribe
subscribe(
topic: string,
handler: MessageHandler,
qos: QoS = 1
): () => void {
if (!this.subscriptions.has(topic)) {
this.subscriptions.set(topic, new Set());
this.client?.subscribe(topic, { qos });
}
this.subscriptions.get(topic)!.add(handler);
// Unsubscribe function
return () => {
const handlers = this.subscriptions.get(topic);
if (handlers) {
handlers.delete(handler);
if (handlers.size === 0) {
this.subscriptions.delete(topic);
this.client?.unsubscribe(topic);
}
}
};
}
// Disconnect
disconnect(): Promise<void> {
return new Promise((resolve) => {
this.client?.end(false, {}, () => {
this.connected = false;
resolve();
});
});
}
isConnected(): boolean {
return this.connected;
}
}
export { IoTMQTTClient };Sensor Data Publishing
// devices/temperature-sensor.ts
import { IoTMQTTClient } from '../lib/mqtt-client';
interface SensorReading {
device_id: string;
timestamp: string;
temperature: number;
humidity: number;
battery: number;
}
class TemperatureSensor {
private client: IoTMQTTClient;
private deviceId: string;
private baseTopic: string;
private intervalId?: NodeJS.Timeout;
constructor(
brokerUrl: string,
deviceId: string,
options?: { username?: string; password?: string }
) {
this.deviceId = deviceId;
this.baseTopic = `sensors/${deviceId}`;
this.client = new IoTMQTTClient({
brokerUrl,
options: {
username: options?.username,
password: options?.password,
will: {
topic: `${this.baseTopic}/status`,
payload: Buffer.from('offline'),
qos: 1,
retain: true
}
}
});
}
async start(intervalMs: number = 60000): Promise<void> {
await this.client.connect();
// Online Status
await this.client.publish(
`${this.baseTopic}/status`,
'online',
{ retain: true }
);
// Device Info
await this.client.publish(
`${this.baseTopic}/info`,
{
device_id: this.deviceId,
type: 'temperature_humidity',
firmware: '1.0.0',
started_at: new Date().toISOString()
},
{ retain: true }
);
// Periodisch Daten senden
this.intervalId = setInterval(() => this.publishReading(), intervalMs);
// Initial Reading
await this.publishReading();
console.log(`Sensor ${this.deviceId} started`);
}
private async publishReading(): Promise<void> {
const reading: SensorReading = {
device_id: this.deviceId,
timestamp: new Date().toISOString(),
temperature: this.readTemperature(),
humidity: this.readHumidity(),
battery: this.readBattery()
};
// JSON Format
await this.client.publish(
`${this.baseTopic}/data`,
reading,
{ qos: 1 }
);
// Home Assistant Discovery Format
await this.client.publish(
`homeassistant/sensor/${this.deviceId}/temperature/state`,
reading.temperature.toString()
);
await this.client.publish(
`homeassistant/sensor/${this.deviceId}/humidity/state`,
reading.humidity.toString()
);
console.log(`Published: ${JSON.stringify(reading)}`);
}
// Simulierte Sensor-Werte (in Produktion: echte Hardware-Reads)
private readTemperature(): number {
return Math.round((20 + Math.random() * 5) * 10) / 10;
}
private readHumidity(): number {
return Math.round((40 + Math.random() * 20) * 10) / 10;
}
private readBattery(): number {
return Math.round((80 + Math.random() * 20) * 10) / 10;
}
async stop(): Promise<void> {
if (this.intervalId) {
clearInterval(this.intervalId);
}
await this.client.publish(
`${this.baseTopic}/status`,
'offline',
{ retain: true }
);
await this.client.disconnect();
console.log(`Sensor ${this.deviceId} stopped`);
}
}
// Verwendung
const sensor = new TemperatureSensor(
'mqtt://localhost:1883',
'living-room-01',
{
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS
}
);
await sensor.start(30000); // Alle 30 Sekunden
// Graceful Shutdown
process.on('SIGTERM', async () => {
await sensor.stop();
process.exit(0);
});Data Subscriber Service
// services/mqtt-data-collector.ts
import { IoTMQTTClient } from '../lib/mqtt-client';
import { InfluxDBClient } from '../lib/influxdb';
interface SensorData {
device_id: string;
timestamp: string;
[key: string]: any;
}
class MQTTDataCollector {
private mqtt: IoTMQTTClient;
private influx: InfluxDBClient;
private subscriptions: Array<() => void> = [];
constructor(
mqttUrl: string,
influxConfig: { url: string; token: string; org: string; bucket: string }
) {
this.mqtt = new IoTMQTTClient({
brokerUrl: mqttUrl,
options: {
clientId: 'data-collector',
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS
}
});
this.influx = new InfluxDBClient(influxConfig);
}
async start(): Promise<void> {
await this.mqtt.connect();
// Subscribe to all sensor data
const unsubSensors = this.mqtt.subscribe(
'sensors/+/data',
async (topic, message) => {
try {
const data = JSON.parse(message) as SensorData;
await this.processSensorData(topic, data);
} catch (error) {
console.error('Failed to process sensor data:', error);
}
}
);
this.subscriptions.push(unsubSensors);
// Subscribe to device status
const unsubStatus = this.mqtt.subscribe(
'sensors/+/status',
(topic, message) => {
const deviceId = topic.split('/')[1];
console.log(`Device ${deviceId} is ${message}`);
this.updateDeviceStatus(deviceId, message);
}
);
this.subscriptions.push(unsubStatus);
// Subscribe to commands
const unsubCommands = this.mqtt.subscribe(
'commands/#',
this.handleCommand.bind(this)
);
this.subscriptions.push(unsubCommands);
console.log('Data collector started');
}
private async processSensorData(topic: string, data: SensorData): Promise<void> {
const deviceId = topic.split('/')[1];
// In InfluxDB speichern
await this.influx.writePoint({
measurement: 'sensor_readings',
tags: {
device_id: deviceId,
location: this.getDeviceLocation(deviceId)
},
fields: {
temperature: data.temperature,
humidity: data.humidity,
battery: data.battery
},
timestamp: new Date(data.timestamp)
});
// Alerts prüfen
await this.checkAlerts(deviceId, data);
}
private async checkAlerts(deviceId: string, data: SensorData): Promise<void> {
// Temperatur-Alert
if (data.temperature > 30) {
await this.mqtt.publish('alerts/temperature/high', {
device_id: deviceId,
temperature: data.temperature,
timestamp: data.timestamp,
message: `High temperature alert: ${data.temperature}°C`
});
}
// Batterie-Alert
if (data.battery < 20) {
await this.mqtt.publish('alerts/battery/low', {
device_id: deviceId,
battery: data.battery,
timestamp: data.timestamp,
message: `Low battery alert: ${data.battery}%`
});
}
}
private handleCommand(topic: string, message: string): void {
const parts = topic.split('/');
const target = parts[1];
const action = parts[2];
console.log(`Command: ${action} for ${target}`);
// Command weiterleiten an Gerät
this.mqtt.publish(`devices/${target}/command`, {
action,
payload: JSON.parse(message),
timestamp: new Date().toISOString()
});
}
private getDeviceLocation(deviceId: string): string {
const locationMap: Record<string, string> = {
'living-room-01': 'living_room',
'bedroom-01': 'bedroom',
'kitchen-01': 'kitchen'
};
return locationMap[deviceId] || 'unknown';
}
private updateDeviceStatus(deviceId: string, status: string): void {
// Status in DB speichern
console.log(`Device ${deviceId} status: ${status}`);
}
async stop(): Promise<void> {
this.subscriptions.forEach(unsub => unsub());
await this.mqtt.disconnect();
}
}WebSocket Bridge (Browser)
// lib/mqtt-websocket.ts
import mqtt, { MqttClient } from 'mqtt';
class MQTTWebSocketClient {
private client: MqttClient | null = null;
async connect(wsUrl: string, options?: {
username?: string;
password?: string;
}): Promise<void> {
return new Promise((resolve, reject) => {
this.client = mqtt.connect(wsUrl, {
protocol: 'wss',
clientId: `web_${Math.random().toString(16).slice(2, 8)}`,
...options
});
this.client.on('connect', () => {
console.log('WebSocket MQTT connected');
resolve();
});
this.client.on('error', reject);
});
}
subscribe(topic: string, callback: (topic: string, message: string) => void): () => void {
this.client?.subscribe(topic);
const handler = (t: string, payload: Buffer) => {
if (this.topicMatches(topic, t)) {
callback(t, payload.toString());
}
};
this.client?.on('message', handler);
return () => {
this.client?.unsubscribe(topic);
this.client?.removeListener('message', handler);
};
}
publish(topic: string, message: any): void {
const payload = typeof message === 'object'
? JSON.stringify(message)
: String(message);
this.client?.publish(topic, payload);
}
private topicMatches(pattern: string, topic: string): boolean {
const patternParts = pattern.split('/');
const topicParts = topic.split('/');
for (let i = 0; i < patternParts.length; i++) {
if (patternParts[i] === '#') return true;
if (patternParts[i] === '+') continue;
if (patternParts[i] !== topicParts[i]) return false;
}
return patternParts.length === topicParts.length;
}
}
// React Hook
import { useState, useEffect, useCallback } from 'react';
export function useMQTT(wsUrl: string) {
const [client] = useState(() => new MQTTWebSocketClient());
const [connected, setConnected] = useState(false);
useEffect(() => {
client.connect(wsUrl).then(() => setConnected(true));
}, [wsUrl]);
const subscribe = useCallback((
topic: string,
callback: (topic: string, message: string) => void
) => {
return client.subscribe(topic, callback);
}, [client]);
const publish = useCallback((topic: string, message: any) => {
client.publish(topic, message);
}, [client]);
return { connected, subscribe, publish };
}
// Verwendung in React Component
function SensorDashboard() {
const { connected, subscribe } = useMQTT('wss://mqtt.example.com:9001');
const [temperature, setTemperature] = useState<number | null>(null);
useEffect(() => {
if (!connected) return;
const unsub = subscribe('sensors/+/data', (topic, message) => {
const data = JSON.parse(message);
setTemperature(data.temperature);
});
return unsub;
}, [connected, subscribe]);
return (
<div>
{connected ? (
<p>Temperature: {temperature ?? 'Loading...'}°C</p>
) : (
<p>Connecting...</p>
)}
</div>
);
}Secure MQTT with TLS
// lib/secure-mqtt.ts
import mqtt from 'mqtt';
import fs from 'fs';
import path from 'path';
const secureClient = mqtt.connect('mqtts://mqtt.example.com:8883', {
clientId: `secure_${Date.now()}`,
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS,
// TLS Optionen
ca: fs.readFileSync(path.join(__dirname, 'certs/ca.crt')),
cert: fs.readFileSync(path.join(__dirname, 'certs/client.crt')),
key: fs.readFileSync(path.join(__dirname, 'certs/client.key')),
// Bei selbstsignierten Zertifikaten
rejectUnauthorized: process.env.NODE_ENV === 'production',
// Protokoll-Version
protocolVersion: 5, // MQTT 5.0
// Clean Start (MQTT 5.0)
clean: true,
// Session Expiry (MQTT 5.0)
properties: {
sessionExpiryInterval: 3600
}
});
secureClient.on('connect', (connack) => {
console.log('Secure connection established');
console.log('Session present:', connack.sessionPresent);
});QoS Level Comparison
| QoS | Name | Garantie | Use Case |
|---|---|---|---|
| 0 | At most once | Keine | Telemetrie, unwichtige Daten |
| 1 | At least once | Zustellung garantiert | Sensor-Daten, Commands |
| 2 | Exactly once | Keine Duplikate | Zahlungen, kritische Daten |
Fazit
MQTT für IoT bietet:
- Leichtgewichtig: Minimaler Overhead
- Publish/Subscribe: Entkoppelte Kommunikation
- QoS Levels: Flexible Zustellgarantien
- Wildcards: Flexible Topic-Struktur
Standard für IoT-Kommunikation.
Bildprompts
- "IoT devices connected through message broker, publish subscribe pattern"
- "Sensor data flowing to central hub, MQTT network visualization"
- "Quality of Service levels diagram, message delivery guarantee"