1 min read
BackendIoT Sensor Data mit InfluxDB
Time Series Daten mit InfluxDB speichern und analysieren. Flux Queries, Grafana Dashboards und IoT Data Pipelines.
InfluxDBTime SeriesIoTSensor DataGrafanaFlux

IoT Sensor Data mit InfluxDB
Meta-Description: Time Series Daten mit InfluxDB speichern und analysieren. Flux Queries, Grafana Dashboards und IoT Data Pipelines.
Keywords: InfluxDB, Time Series, IoT, Sensor Data, Grafana, Flux, Data Analytics, Monitoring
Einführung
InfluxDB ist die führende Time Series Database für IoT und Monitoring. Optimiert für Sensor-Daten, Metriken und Events – mit der mächtigen Flux Query Language und nahtloser Grafana Integration.
InfluxDB Architecture
┌─────────────────────────────────────────────────────────────┐
│ INFLUXDB IOT ARCHITECTURE │
├─────────────────────────────────────────────────────────────┤
│ │
│ Data Sources: │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │
│ │ Sensors │ │ Devices │ │ Gateways│ │ APIs │ │
│ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │
│ │ │ │ │ │
│ └────────────┴────────────┴────────────┘ │
│ │ │
│ Ingestion: ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ Line Protocol / Telegraf / Client Libraries │ │
│ │ MQTT / HTTP API / Native UDP │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ Storage: ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ InfluxDB │ │
│ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │
│ │ │ Bucket │ │ Bucket │ │ Bucket │ │ │
│ │ │ (sensors) │ │ (metrics) │ │ (events) │ │ │
│ │ └───────────┘ └───────────┘ └───────────┘ │ │
│ │ │ │
│ │ Time Series Index (TSI) + Time Structured Merge │ │
│ └─────────────────────────────────────────────────────┘ │
│ │ │
│ Query & Visualization: ▼ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Flux │ │ Grafana │ │ API │ │ │
│ │ │ Queries │ │Dashboard│ │ Clients │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘Setup & Installation
# docker-compose.yml
version: '3.8'
services:
influxdb:
image: influxdb:2.7
container_name: influxdb
ports:
- 8086:8086
volumes:
- influxdb-data:/var/lib/influxdb2
- influxdb-config:/etc/influxdb2
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=supersecret
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=sensors
- DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-token
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- 3000:3000
volumes:
- grafana-data:/var/lib/grafana
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
depends_on:
- influxdb
volumes:
influxdb-data:
influxdb-config:
grafana-data:Node.js Client
// lib/influxdb-client.ts
import { InfluxDB, Point, WriteApi, QueryApi, flux } from '@influxdata/influxdb-client';
interface InfluxConfig {
url: string;
token: string;
org: string;
bucket: string;
}
interface SensorReading {
measurement: string;
tags: Record<string, string>;
fields: Record<string, number | string | boolean>;
timestamp?: Date;
}
class InfluxDBClient {
private client: InfluxDB;
private writeApi: WriteApi;
private queryApi: QueryApi;
private config: InfluxConfig;
constructor(config: InfluxConfig) {
this.config = config;
this.client = new InfluxDB({
url: config.url,
token: config.token
});
this.writeApi = this.client.getWriteApi(config.org, config.bucket, 'ms');
this.queryApi = this.client.getQueryApi(config.org);
// Batch Settings
this.writeApi.useDefaultTags({ source: 'node-app' });
}
// Single Point schreiben
writePoint(reading: SensorReading): void {
const point = new Point(reading.measurement);
// Tags hinzufügen
Object.entries(reading.tags).forEach(([key, value]) => {
point.tag(key, value);
});
// Fields hinzufügen
Object.entries(reading.fields).forEach(([key, value]) => {
if (typeof value === 'number') {
if (Number.isInteger(value)) {
point.intField(key, value);
} else {
point.floatField(key, value);
}
} else if (typeof value === 'boolean') {
point.booleanField(key, value);
} else {
point.stringField(key, value);
}
});
// Timestamp
if (reading.timestamp) {
point.timestamp(reading.timestamp);
}
this.writeApi.writePoint(point);
}
// Mehrere Points schreiben
writePoints(readings: SensorReading[]): void {
readings.forEach(reading => this.writePoint(reading));
}
// Sofort schreiben (flush)
async flush(): Promise<void> {
await this.writeApi.flush();
}
// Flux Query ausführen
async query<T>(fluxQuery: string): Promise<T[]> {
const results: T[] = [];
return new Promise((resolve, reject) => {
this.queryApi.queryRows(fluxQuery, {
next: (row, tableMeta) => {
const obj = tableMeta.toObject(row) as T;
results.push(obj);
},
error: (error) => reject(error),
complete: () => resolve(results)
});
});
}
// Letzte Messung abrufen
async getLatest(
measurement: string,
tags?: Record<string, string>
): Promise<any> {
let tagFilter = '';
if (tags) {
tagFilter = Object.entries(tags)
.map(([key, value]) => `r["${key}"] == "${value}"`)
.join(' and ');
}
const query = `
from(bucket: "${this.config.bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "${measurement}")
${tagFilter ? `|> filter(fn: (r) => ${tagFilter})` : ''}
|> last()
`;
const results = await this.query(query);
return results[0];
}
// Aggregierte Daten abrufen
async getAggregated(
measurement: string,
field: string,
aggregation: 'mean' | 'max' | 'min' | 'sum' | 'count',
window: string,
range: string = '-24h',
tags?: Record<string, string>
): Promise<any[]> {
let tagFilter = '';
if (tags) {
tagFilter = Object.entries(tags)
.map(([key, value]) => `r["${key}"] == "${value}"`)
.join(' and ');
}
const query = `
from(bucket: "${this.config.bucket}")
|> range(start: ${range})
|> filter(fn: (r) => r["_measurement"] == "${measurement}")
|> filter(fn: (r) => r["_field"] == "${field}")
${tagFilter ? `|> filter(fn: (r) => ${tagFilter})` : ''}
|> aggregateWindow(every: ${window}, fn: ${aggregation}, createEmpty: false)
|> yield(name: "${aggregation}")
`;
return this.query(query);
}
// Cleanup
async close(): Promise<void> {
await this.writeApi.close();
}
}
export { InfluxDBClient };Sensor Data Collection
// services/sensor-collector.ts
import { InfluxDBClient } from '../lib/influxdb-client';
import mqtt from 'mqtt';
interface MQTTSensorMessage {
device_id: string;
temperature?: number;
humidity?: number;
pressure?: number;
battery?: number;
rssi?: number;
timestamp?: string;
}
class SensorDataCollector {
private influx: InfluxDBClient;
private mqtt: mqtt.MqttClient;
private flushInterval: NodeJS.Timeout | null = null;
constructor(
influxConfig: { url: string; token: string; org: string; bucket: string },
mqttUrl: string
) {
this.influx = new InfluxDBClient(influxConfig);
this.mqtt = mqtt.connect(mqttUrl);
this.mqtt.on('connect', () => {
console.log('MQTT connected');
this.mqtt.subscribe('sensors/+/data');
this.mqtt.subscribe('zigbee2mqtt/+');
});
this.mqtt.on('message', (topic, payload) => {
this.handleMessage(topic, payload.toString());
});
// Periodisch flushen
this.flushInterval = setInterval(() => {
this.influx.flush().catch(console.error);
}, 10000);
}
private handleMessage(topic: string, payload: string): void {
try {
const data = JSON.parse(payload);
if (topic.startsWith('sensors/')) {
this.processSensorData(topic, data);
} else if (topic.startsWith('zigbee2mqtt/')) {
this.processZigbeeData(topic, data);
}
} catch (error) {
console.error('Failed to parse message:', error);
}
}
private processSensorData(topic: string, data: MQTTSensorMessage): void {
const deviceId = topic.split('/')[1];
// Environment Data
if (data.temperature !== undefined || data.humidity !== undefined) {
this.influx.writePoint({
measurement: 'environment',
tags: {
device_id: deviceId,
location: this.getDeviceLocation(deviceId)
},
fields: {
...(data.temperature !== undefined && { temperature: data.temperature }),
...(data.humidity !== undefined && { humidity: data.humidity }),
...(data.pressure !== undefined && { pressure: data.pressure })
},
timestamp: data.timestamp ? new Date(data.timestamp) : new Date()
});
}
// Device Metrics
if (data.battery !== undefined || data.rssi !== undefined) {
this.influx.writePoint({
measurement: 'device_metrics',
tags: {
device_id: deviceId
},
fields: {
...(data.battery !== undefined && { battery: data.battery }),
...(data.rssi !== undefined && { rssi: data.rssi })
}
});
}
}
private processZigbeeData(topic: string, data: any): void {
const deviceName = topic.split('/')[1];
// Skip bridge topics
if (deviceName === 'bridge') return;
const fields: Record<string, number> = {};
// Bekannte Felder extrahieren
if (data.temperature !== undefined) fields.temperature = data.temperature;
if (data.humidity !== undefined) fields.humidity = data.humidity;
if (data.pressure !== undefined) fields.pressure = data.pressure;
if (data.battery !== undefined) fields.battery = data.battery;
if (data.linkquality !== undefined) fields.linkquality = data.linkquality;
if (data.illuminance !== undefined) fields.illuminance = data.illuminance;
if (data.occupancy !== undefined) fields.occupancy = data.occupancy ? 1 : 0;
if (data.contact !== undefined) fields.contact = data.contact ? 1 : 0;
if (Object.keys(fields).length > 0) {
this.influx.writePoint({
measurement: 'zigbee_sensors',
tags: {
device: deviceName,
type: this.inferDeviceType(data)
},
fields
});
}
}
private getDeviceLocation(deviceId: string): string {
const locationMap: Record<string, string> = {
'living-room-01': 'living_room',
'bedroom-01': 'bedroom',
'kitchen-01': 'kitchen',
'outdoor-01': 'outdoor'
};
return locationMap[deviceId] || 'unknown';
}
private inferDeviceType(data: any): string {
if (data.occupancy !== undefined) return 'motion_sensor';
if (data.contact !== undefined) return 'contact_sensor';
if (data.illuminance !== undefined) return 'light_sensor';
if (data.temperature !== undefined) return 'climate_sensor';
return 'unknown';
}
async stop(): Promise<void> {
if (this.flushInterval) {
clearInterval(this.flushInterval);
}
await this.influx.close();
this.mqtt.end();
}
}
// Verwendung
const collector = new SensorDataCollector(
{
url: process.env.INFLUX_URL || 'http://localhost:8086',
token: process.env.INFLUX_TOKEN!,
org: 'myorg',
bucket: 'sensors'
},
process.env.MQTT_URL || 'mqtt://localhost:1883'
);Flux Queries
// Flux Query Examples
// 1. Durchschnittstemperatur pro Raum (letzte 24h)
const avgTempByRoom = `
from(bucket: "sensors")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> filter(fn: (r) => r["_field"] == "temperature")
|> group(columns: ["location"])
|> mean()
|> yield(name: "avg_temperature")
`;
// 2. Min/Max Temperatur pro Tag
const tempMinMax = `
from(bucket: "sensors")
|> range(start: -7d)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> filter(fn: (r) => r["_field"] == "temperature")
|> aggregateWindow(every: 1d, fn: min, createEmpty: false)
|> yield(name: "min")
from(bucket: "sensors")
|> range(start: -7d)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> filter(fn: (r) => r["_field"] == "temperature")
|> aggregateWindow(every: 1d, fn: max, createEmpty: false)
|> yield(name: "max")
`;
// 3. Bewegungserkennung Events
const motionEvents = `
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "zigbee_sensors")
|> filter(fn: (r) => r["type"] == "motion_sensor")
|> filter(fn: (r) => r["_field"] == "occupancy")
|> filter(fn: (r) => r["_value"] == 1)
|> count()
|> group(columns: ["device"])
`;
// 4. Batterie-Status aller Geräte
const batteryStatus = `
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r["_field"] == "battery")
|> last()
|> filter(fn: (r) => r["_value"] < 20)
|> yield(name: "low_battery")
`;
// 5. Anomalie-Erkennung (außerhalb 2 Standardabweichungen)
const anomalies = `
import "math"
data = from(bucket: "sensors")
|> range(start: -7d)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> filter(fn: (r) => r["_field"] == "temperature")
|> filter(fn: (r) => r["location"] == "living_room")
stats = data
|> mean()
|> map(fn: (r) => ({r with mean: r._value}))
|> join(
tables: {std: data |> stddev()},
on: ["location"],
method: "inner"
)
data
|> map(fn: (r) => ({
r with
upper: stats.mean + 2.0 * stats._value,
lower: stats.mean - 2.0 * stats._value
}))
|> filter(fn: (r) => r._value > r.upper or r._value < r.lower)
`;
// 6. Downsampling für Dashboard
const downsampled = `
from(bucket: "sensors")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> filter(fn: (r) => r["_field"] == "temperature")
|> aggregateWindow(
every: 1h,
fn: mean,
createEmpty: false
)
|> yield(name: "hourly_avg")
`;API Endpoints
// api/metrics-api.ts
import express from 'express';
import { InfluxDBClient } from '../lib/influxdb-client';
const router = express.Router();
const influx = new InfluxDBClient({
url: process.env.INFLUX_URL!,
token: process.env.INFLUX_TOKEN!,
org: 'myorg',
bucket: 'sensors'
});
// GET /api/metrics/temperature?location=living_room&range=24h
router.get('/metrics/temperature', async (req, res) => {
const { location, range = '24h' } = req.query;
try {
const data = await influx.getAggregated(
'environment',
'temperature',
'mean',
'15m',
`-${range}`,
location ? { location: location as string } : undefined
);
res.json(data.map(d => ({
time: d._time,
value: d._value,
location: d.location
})));
} catch (error) {
res.status(500).json({ error: 'Query failed' });
}
});
// GET /api/metrics/latest
router.get('/metrics/latest', async (req, res) => {
const query = `
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> last()
|> pivot(
rowKey: ["_time", "location"],
columnKey: ["_field"],
valueColumn: "_value"
)
`;
try {
const data = await influx.query(query);
res.json(data);
} catch (error) {
res.status(500).json({ error: 'Query failed' });
}
});
// GET /api/metrics/summary
router.get('/metrics/summary', async (req, res) => {
const query = `
from(bucket: "sensors")
|> range(start: -24h)
|> filter(fn: (r) => r["_measurement"] == "environment")
|> filter(fn: (r) => r["_field"] == "temperature")
|> group(columns: ["location"])
|> reduce(
fn: (r, accumulator) => ({
count: accumulator.count + 1,
sum: accumulator.sum + r._value,
min: if r._value < accumulator.min then r._value else accumulator.min,
max: if r._value > accumulator.max then r._value else accumulator.max
}),
identity: {count: 0, sum: 0.0, min: 100.0, max: -100.0}
)
|> map(fn: (r) => ({
location: r.location,
count: r.count,
avg: r.sum / float(v: r.count),
min: r.min,
max: r.max
}))
`;
try {
const data = await influx.query(query);
res.json(data);
} catch (error) {
res.status(500).json({ error: 'Query failed' });
}
});
export default router;Retention Policies
// Data Retention Management
// InfluxDB 2.x verwendet Bucket Retention
// Bucket erstellen mit Retention (via API)
async function createBucketWithRetention(
influxUrl: string,
token: string,
org: string,
bucketName: string,
retentionSeconds: number
) {
const response = await fetch(`${influxUrl}/api/v2/buckets`, {
method: 'POST',
headers: {
'Authorization': `Token ${token}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({
name: bucketName,
orgID: org,
retentionRules: [{
type: 'expire',
everySeconds: retentionSeconds
}]
})
});
return response.json();
}
// Beispiel: Verschiedene Buckets für verschiedene Retention
const buckets = {
'sensors_raw': 7 * 24 * 60 * 60, // 7 Tage für Rohdaten
'sensors_hourly': 30 * 24 * 60 * 60, // 30 Tage für Stundendaten
'sensors_daily': 365 * 24 * 60 * 60, // 1 Jahr für Tagesdaten
'sensors_archive': 0 // Unbegrenzt für Archiv
};Grafana Dashboard
{
"dashboard": {
"title": "IoT Sensor Dashboard",
"panels": [
{
"title": "Temperature",
"type": "timeseries",
"datasource": "InfluxDB",
"targets": [
{
"query": "from(bucket: \"sensors\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r[\"_measurement\"] == \"environment\")\n |> filter(fn: (r) => r[\"_field\"] == \"temperature\")\n |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)"
}
]
},
{
"title": "Humidity",
"type": "gauge",
"datasource": "InfluxDB",
"targets": [
{
"query": "from(bucket: \"sensors\")\n |> range(start: -5m)\n |> filter(fn: (r) => r[\"_measurement\"] == \"environment\")\n |> filter(fn: (r) => r[\"_field\"] == \"humidity\")\n |> last()"
}
]
}
]
}
}Fazit
InfluxDB für IoT bietet:
- Optimiert für Time Series: Hohe Write-Performance
- Flux Language: Mächtige Queries & Transformationen
- Retention Policies: Automatische Datenbereinigung
- Grafana Integration: Professionelle Visualisierung
Die Standard-Lösung für IoT Analytics.
Bildprompts
- "Time series graph showing sensor data over time, temperature humidity"
- "Grafana dashboard with IoT metrics, colorful panels and gauges"
- "Data pipeline from sensors through InfluxDB to visualization"