Menu
Back to Blog
1 min read
Backend

IoT Sensor Data mit InfluxDB

Time Series Daten mit InfluxDB speichern und analysieren. Flux Queries, Grafana Dashboards und IoT Data Pipelines.

InfluxDBTime SeriesIoTSensor DataGrafanaFlux
IoT Sensor Data mit InfluxDB

IoT Sensor Data mit InfluxDB

Meta-Description: Time Series Daten mit InfluxDB speichern und analysieren. Flux Queries, Grafana Dashboards und IoT Data Pipelines.

Keywords: InfluxDB, Time Series, IoT, Sensor Data, Grafana, Flux, Data Analytics, Monitoring


Einführung

InfluxDB ist die führende Time Series Database für IoT und Monitoring. Optimiert für Sensor-Daten, Metriken und Events – mit der mächtigen Flux Query Language und nahtloser Grafana Integration.


InfluxDB Architecture

┌─────────────────────────────────────────────────────────────┐
│              INFLUXDB IOT ARCHITECTURE                       │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  Data Sources:                                              │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐       │
│  │ Sensors │  │ Devices │  │ Gateways│  │   APIs  │       │
│  └────┬────┘  └────┬────┘  └────┬────┘  └────┬────┘       │
│       │            │            │            │              │
│       └────────────┴────────────┴────────────┘              │
│                         │                                   │
│  Ingestion:             ▼                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  Line Protocol / Telegraf / Client Libraries        │   │
│  │  MQTT / HTTP API / Native UDP                       │   │
│  └─────────────────────────────────────────────────────┘   │
│                         │                                   │
│  Storage:               ▼                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │                   InfluxDB                          │   │
│  │  ┌───────────┐  ┌───────────┐  ┌───────────┐      │   │
│  │  │  Bucket   │  │  Bucket   │  │  Bucket   │      │   │
│  │  │ (sensors) │  │ (metrics) │  │ (events)  │      │   │
│  │  └───────────┘  └───────────┘  └───────────┘      │   │
│  │                                                    │   │
│  │  Time Series Index (TSI) + Time Structured Merge  │   │
│  └─────────────────────────────────────────────────────┘   │
│                         │                                   │
│  Query & Visualization: ▼                                   │
│  ┌─────────────────────────────────────────────────────┐   │
│  │  ┌─────────┐  ┌─────────┐  ┌─────────┐             │   │
│  │  │  Flux   │  │ Grafana │  │   API   │             │   │
│  │  │ Queries │  │Dashboard│  │ Clients │             │   │
│  │  └─────────┘  └─────────┘  └─────────┘             │   │
│  └─────────────────────────────────────────────────────┘   │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Setup & Installation

# docker-compose.yml
version: '3.8'
services:
  influxdb:
    image: influxdb:2.7
    container_name: influxdb
    ports:
      - 8086:8086
    volumes:
      - influxdb-data:/var/lib/influxdb2
      - influxdb-config:/etc/influxdb2
    environment:
      - DOCKER_INFLUXDB_INIT_MODE=setup
      - DOCKER_INFLUXDB_INIT_USERNAME=admin
      - DOCKER_INFLUXDB_INIT_PASSWORD=supersecret
      - DOCKER_INFLUXDB_INIT_ORG=myorg
      - DOCKER_INFLUXDB_INIT_BUCKET=sensors
      - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=my-super-secret-token

  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    ports:
      - 3000:3000
    volumes:
      - grafana-data:/var/lib/grafana
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
    depends_on:
      - influxdb

volumes:
  influxdb-data:
  influxdb-config:
  grafana-data:

Node.js Client

// lib/influxdb-client.ts
import { InfluxDB, Point, WriteApi, QueryApi, flux } from '@influxdata/influxdb-client';

interface InfluxConfig {
  url: string;
  token: string;
  org: string;
  bucket: string;
}

interface SensorReading {
  measurement: string;
  tags: Record<string, string>;
  fields: Record<string, number | string | boolean>;
  timestamp?: Date;
}

class InfluxDBClient {
  private client: InfluxDB;
  private writeApi: WriteApi;
  private queryApi: QueryApi;
  private config: InfluxConfig;

  constructor(config: InfluxConfig) {
    this.config = config;
    this.client = new InfluxDB({
      url: config.url,
      token: config.token
    });

    this.writeApi = this.client.getWriteApi(config.org, config.bucket, 'ms');
    this.queryApi = this.client.getQueryApi(config.org);

    // Batch Settings
    this.writeApi.useDefaultTags({ source: 'node-app' });
  }

  // Single Point schreiben
  writePoint(reading: SensorReading): void {
    const point = new Point(reading.measurement);

    // Tags hinzufügen
    Object.entries(reading.tags).forEach(([key, value]) => {
      point.tag(key, value);
    });

    // Fields hinzufügen
    Object.entries(reading.fields).forEach(([key, value]) => {
      if (typeof value === 'number') {
        if (Number.isInteger(value)) {
          point.intField(key, value);
        } else {
          point.floatField(key, value);
        }
      } else if (typeof value === 'boolean') {
        point.booleanField(key, value);
      } else {
        point.stringField(key, value);
      }
    });

    // Timestamp
    if (reading.timestamp) {
      point.timestamp(reading.timestamp);
    }

    this.writeApi.writePoint(point);
  }

  // Mehrere Points schreiben
  writePoints(readings: SensorReading[]): void {
    readings.forEach(reading => this.writePoint(reading));
  }

  // Sofort schreiben (flush)
  async flush(): Promise<void> {
    await this.writeApi.flush();
  }

  // Flux Query ausführen
  async query<T>(fluxQuery: string): Promise<T[]> {
    const results: T[] = [];

    return new Promise((resolve, reject) => {
      this.queryApi.queryRows(fluxQuery, {
        next: (row, tableMeta) => {
          const obj = tableMeta.toObject(row) as T;
          results.push(obj);
        },
        error: (error) => reject(error),
        complete: () => resolve(results)
      });
    });
  }

  // Letzte Messung abrufen
  async getLatest(
    measurement: string,
    tags?: Record<string, string>
  ): Promise<any> {
    let tagFilter = '';
    if (tags) {
      tagFilter = Object.entries(tags)
        .map(([key, value]) => `r["${key}"] == "${value}"`)
        .join(' and ');
    }

    const query = `
      from(bucket: "${this.config.bucket}")
        |> range(start: -1h)
        |> filter(fn: (r) => r["_measurement"] == "${measurement}")
        ${tagFilter ? `|> filter(fn: (r) => ${tagFilter})` : ''}
        |> last()
    `;

    const results = await this.query(query);
    return results[0];
  }

  // Aggregierte Daten abrufen
  async getAggregated(
    measurement: string,
    field: string,
    aggregation: 'mean' | 'max' | 'min' | 'sum' | 'count',
    window: string,
    range: string = '-24h',
    tags?: Record<string, string>
  ): Promise<any[]> {
    let tagFilter = '';
    if (tags) {
      tagFilter = Object.entries(tags)
        .map(([key, value]) => `r["${key}"] == "${value}"`)
        .join(' and ');
    }

    const query = `
      from(bucket: "${this.config.bucket}")
        |> range(start: ${range})
        |> filter(fn: (r) => r["_measurement"] == "${measurement}")
        |> filter(fn: (r) => r["_field"] == "${field}")
        ${tagFilter ? `|> filter(fn: (r) => ${tagFilter})` : ''}
        |> aggregateWindow(every: ${window}, fn: ${aggregation}, createEmpty: false)
        |> yield(name: "${aggregation}")
    `;

    return this.query(query);
  }

  // Cleanup
  async close(): Promise<void> {
    await this.writeApi.close();
  }
}

export { InfluxDBClient };

Sensor Data Collection

// services/sensor-collector.ts
import { InfluxDBClient } from '../lib/influxdb-client';
import mqtt from 'mqtt';

interface MQTTSensorMessage {
  device_id: string;
  temperature?: number;
  humidity?: number;
  pressure?: number;
  battery?: number;
  rssi?: number;
  timestamp?: string;
}

class SensorDataCollector {
  private influx: InfluxDBClient;
  private mqtt: mqtt.MqttClient;
  private flushInterval: NodeJS.Timeout | null = null;

  constructor(
    influxConfig: { url: string; token: string; org: string; bucket: string },
    mqttUrl: string
  ) {
    this.influx = new InfluxDBClient(influxConfig);
    this.mqtt = mqtt.connect(mqttUrl);

    this.mqtt.on('connect', () => {
      console.log('MQTT connected');
      this.mqtt.subscribe('sensors/+/data');
      this.mqtt.subscribe('zigbee2mqtt/+');
    });

    this.mqtt.on('message', (topic, payload) => {
      this.handleMessage(topic, payload.toString());
    });

    // Periodisch flushen
    this.flushInterval = setInterval(() => {
      this.influx.flush().catch(console.error);
    }, 10000);
  }

  private handleMessage(topic: string, payload: string): void {
    try {
      const data = JSON.parse(payload);

      if (topic.startsWith('sensors/')) {
        this.processSensorData(topic, data);
      } else if (topic.startsWith('zigbee2mqtt/')) {
        this.processZigbeeData(topic, data);
      }
    } catch (error) {
      console.error('Failed to parse message:', error);
    }
  }

  private processSensorData(topic: string, data: MQTTSensorMessage): void {
    const deviceId = topic.split('/')[1];

    // Environment Data
    if (data.temperature !== undefined || data.humidity !== undefined) {
      this.influx.writePoint({
        measurement: 'environment',
        tags: {
          device_id: deviceId,
          location: this.getDeviceLocation(deviceId)
        },
        fields: {
          ...(data.temperature !== undefined && { temperature: data.temperature }),
          ...(data.humidity !== undefined && { humidity: data.humidity }),
          ...(data.pressure !== undefined && { pressure: data.pressure })
        },
        timestamp: data.timestamp ? new Date(data.timestamp) : new Date()
      });
    }

    // Device Metrics
    if (data.battery !== undefined || data.rssi !== undefined) {
      this.influx.writePoint({
        measurement: 'device_metrics',
        tags: {
          device_id: deviceId
        },
        fields: {
          ...(data.battery !== undefined && { battery: data.battery }),
          ...(data.rssi !== undefined && { rssi: data.rssi })
        }
      });
    }
  }

  private processZigbeeData(topic: string, data: any): void {
    const deviceName = topic.split('/')[1];

    // Skip bridge topics
    if (deviceName === 'bridge') return;

    const fields: Record<string, number> = {};

    // Bekannte Felder extrahieren
    if (data.temperature !== undefined) fields.temperature = data.temperature;
    if (data.humidity !== undefined) fields.humidity = data.humidity;
    if (data.pressure !== undefined) fields.pressure = data.pressure;
    if (data.battery !== undefined) fields.battery = data.battery;
    if (data.linkquality !== undefined) fields.linkquality = data.linkquality;
    if (data.illuminance !== undefined) fields.illuminance = data.illuminance;
    if (data.occupancy !== undefined) fields.occupancy = data.occupancy ? 1 : 0;
    if (data.contact !== undefined) fields.contact = data.contact ? 1 : 0;

    if (Object.keys(fields).length > 0) {
      this.influx.writePoint({
        measurement: 'zigbee_sensors',
        tags: {
          device: deviceName,
          type: this.inferDeviceType(data)
        },
        fields
      });
    }
  }

  private getDeviceLocation(deviceId: string): string {
    const locationMap: Record<string, string> = {
      'living-room-01': 'living_room',
      'bedroom-01': 'bedroom',
      'kitchen-01': 'kitchen',
      'outdoor-01': 'outdoor'
    };
    return locationMap[deviceId] || 'unknown';
  }

  private inferDeviceType(data: any): string {
    if (data.occupancy !== undefined) return 'motion_sensor';
    if (data.contact !== undefined) return 'contact_sensor';
    if (data.illuminance !== undefined) return 'light_sensor';
    if (data.temperature !== undefined) return 'climate_sensor';
    return 'unknown';
  }

  async stop(): Promise<void> {
    if (this.flushInterval) {
      clearInterval(this.flushInterval);
    }
    await this.influx.close();
    this.mqtt.end();
  }
}

// Verwendung
const collector = new SensorDataCollector(
  {
    url: process.env.INFLUX_URL || 'http://localhost:8086',
    token: process.env.INFLUX_TOKEN!,
    org: 'myorg',
    bucket: 'sensors'
  },
  process.env.MQTT_URL || 'mqtt://localhost:1883'
);

Flux Queries

// Flux Query Examples

// 1. Durchschnittstemperatur pro Raum (letzte 24h)
const avgTempByRoom = `
from(bucket: "sensors")
  |> range(start: -24h)
  |> filter(fn: (r) => r["_measurement"] == "environment")
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> group(columns: ["location"])
  |> mean()
  |> yield(name: "avg_temperature")
`;

// 2. Min/Max Temperatur pro Tag
const tempMinMax = `
from(bucket: "sensors")
  |> range(start: -7d)
  |> filter(fn: (r) => r["_measurement"] == "environment")
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> aggregateWindow(every: 1d, fn: min, createEmpty: false)
  |> yield(name: "min")

from(bucket: "sensors")
  |> range(start: -7d)
  |> filter(fn: (r) => r["_measurement"] == "environment")
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> aggregateWindow(every: 1d, fn: max, createEmpty: false)
  |> yield(name: "max")
`;

// 3. Bewegungserkennung Events
const motionEvents = `
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_measurement"] == "zigbee_sensors")
  |> filter(fn: (r) => r["type"] == "motion_sensor")
  |> filter(fn: (r) => r["_field"] == "occupancy")
  |> filter(fn: (r) => r["_value"] == 1)
  |> count()
  |> group(columns: ["device"])
`;

// 4. Batterie-Status aller Geräte
const batteryStatus = `
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r["_field"] == "battery")
  |> last()
  |> filter(fn: (r) => r["_value"] < 20)
  |> yield(name: "low_battery")
`;

// 5. Anomalie-Erkennung (außerhalb 2 Standardabweichungen)
const anomalies = `
import "math"

data = from(bucket: "sensors")
  |> range(start: -7d)
  |> filter(fn: (r) => r["_measurement"] == "environment")
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> filter(fn: (r) => r["location"] == "living_room")

stats = data
  |> mean()
  |> map(fn: (r) => ({r with mean: r._value}))
  |> join(
      tables: {std: data |> stddev()},
      on: ["location"],
      method: "inner"
    )

data
  |> map(fn: (r) => ({
      r with
      upper: stats.mean + 2.0 * stats._value,
      lower: stats.mean - 2.0 * stats._value
    }))
  |> filter(fn: (r) => r._value > r.upper or r._value < r.lower)
`;

// 6. Downsampling für Dashboard
const downsampled = `
from(bucket: "sensors")
  |> range(start: -30d)
  |> filter(fn: (r) => r["_measurement"] == "environment")
  |> filter(fn: (r) => r["_field"] == "temperature")
  |> aggregateWindow(
      every: 1h,
      fn: mean,
      createEmpty: false
    )
  |> yield(name: "hourly_avg")
`;

API Endpoints

// api/metrics-api.ts
import express from 'express';
import { InfluxDBClient } from '../lib/influxdb-client';

const router = express.Router();
const influx = new InfluxDBClient({
  url: process.env.INFLUX_URL!,
  token: process.env.INFLUX_TOKEN!,
  org: 'myorg',
  bucket: 'sensors'
});

// GET /api/metrics/temperature?location=living_room&range=24h
router.get('/metrics/temperature', async (req, res) => {
  const { location, range = '24h' } = req.query;

  try {
    const data = await influx.getAggregated(
      'environment',
      'temperature',
      'mean',
      '15m',
      `-${range}`,
      location ? { location: location as string } : undefined
    );

    res.json(data.map(d => ({
      time: d._time,
      value: d._value,
      location: d.location
    })));
  } catch (error) {
    res.status(500).json({ error: 'Query failed' });
  }
});

// GET /api/metrics/latest
router.get('/metrics/latest', async (req, res) => {
  const query = `
    from(bucket: "sensors")
      |> range(start: -1h)
      |> filter(fn: (r) => r["_measurement"] == "environment")
      |> last()
      |> pivot(
          rowKey: ["_time", "location"],
          columnKey: ["_field"],
          valueColumn: "_value"
        )
  `;

  try {
    const data = await influx.query(query);
    res.json(data);
  } catch (error) {
    res.status(500).json({ error: 'Query failed' });
  }
});

// GET /api/metrics/summary
router.get('/metrics/summary', async (req, res) => {
  const query = `
    from(bucket: "sensors")
      |> range(start: -24h)
      |> filter(fn: (r) => r["_measurement"] == "environment")
      |> filter(fn: (r) => r["_field"] == "temperature")
      |> group(columns: ["location"])
      |> reduce(
          fn: (r, accumulator) => ({
            count: accumulator.count + 1,
            sum: accumulator.sum + r._value,
            min: if r._value < accumulator.min then r._value else accumulator.min,
            max: if r._value > accumulator.max then r._value else accumulator.max
          }),
          identity: {count: 0, sum: 0.0, min: 100.0, max: -100.0}
        )
      |> map(fn: (r) => ({
          location: r.location,
          count: r.count,
          avg: r.sum / float(v: r.count),
          min: r.min,
          max: r.max
        }))
  `;

  try {
    const data = await influx.query(query);
    res.json(data);
  } catch (error) {
    res.status(500).json({ error: 'Query failed' });
  }
});

export default router;

Retention Policies

// Data Retention Management

// InfluxDB 2.x verwendet Bucket Retention

// Bucket erstellen mit Retention (via API)
async function createBucketWithRetention(
  influxUrl: string,
  token: string,
  org: string,
  bucketName: string,
  retentionSeconds: number
) {
  const response = await fetch(`${influxUrl}/api/v2/buckets`, {
    method: 'POST',
    headers: {
      'Authorization': `Token ${token}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      name: bucketName,
      orgID: org,
      retentionRules: [{
        type: 'expire',
        everySeconds: retentionSeconds
      }]
    })
  });

  return response.json();
}

// Beispiel: Verschiedene Buckets für verschiedene Retention
const buckets = {
  'sensors_raw': 7 * 24 * 60 * 60,      // 7 Tage für Rohdaten
  'sensors_hourly': 30 * 24 * 60 * 60,   // 30 Tage für Stundendaten
  'sensors_daily': 365 * 24 * 60 * 60,   // 1 Jahr für Tagesdaten
  'sensors_archive': 0                    // Unbegrenzt für Archiv
};

Grafana Dashboard

{
  "dashboard": {
    "title": "IoT Sensor Dashboard",
    "panels": [
      {
        "title": "Temperature",
        "type": "timeseries",
        "datasource": "InfluxDB",
        "targets": [
          {
            "query": "from(bucket: \"sensors\")\n  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n  |> filter(fn: (r) => r[\"_measurement\"] == \"environment\")\n  |> filter(fn: (r) => r[\"_field\"] == \"temperature\")\n  |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)"
          }
        ]
      },
      {
        "title": "Humidity",
        "type": "gauge",
        "datasource": "InfluxDB",
        "targets": [
          {
            "query": "from(bucket: \"sensors\")\n  |> range(start: -5m)\n  |> filter(fn: (r) => r[\"_measurement\"] == \"environment\")\n  |> filter(fn: (r) => r[\"_field\"] == \"humidity\")\n  |> last()"
          }
        ]
      }
    ]
  }
}

Fazit

InfluxDB für IoT bietet:

  1. Optimiert für Time Series: Hohe Write-Performance
  2. Flux Language: Mächtige Queries & Transformationen
  3. Retention Policies: Automatische Datenbereinigung
  4. Grafana Integration: Professionelle Visualisierung

Die Standard-Lösung für IoT Analytics.


Bildprompts

  1. "Time series graph showing sensor data over time, temperature humidity"
  2. "Grafana dashboard with IoT metrics, colorful panels and gauges"
  3. "Data pipeline from sensors through InfluxDB to visualization"

Quellen