InfluxDb 重要概念和操作记录

11 minute

前言

InfluxDB 是一个用于存储和分析时间序列数据的开源数据库,对与时间相关联的数据进行分析、处理和存储时有较大的优势,比如针对某一个智能设备连续不断收集的数据,可以将其进行合理地存储和整理,从时间角度对数据进行分析以得到相关的统计数据,通过内置的 CQ 与 RP 操作,可以对大量的数据进行采样和保留。

本文介绍的 InfluxDb 版本为 v2。

安装与启动

这里暂时只记录了 MacOS 的操作步骤。

1# install influxdb and its cli tool
2brew install influxdb influxdb-cli
3# start influxdb | listen 8086
4influxd

启动成功后,访问 localhost:8086 根据提示完成用户初始化,并得到一个认证 token 用于 API 调用。influxdb 默认将数据保存在 ~/.influxdbv2 目录下面。

重要概念

数据组织形式

  1. bucket: 存储时序数据的地方,一个 bucket 可以包含多个 measurement,相当于 database
  2. measurement: 用于描述时序数据,包含 timestamp, tags 和 fields,相当于 table
  3. timestamp: 记录时序数据的时间
  4. tag: 键值对,用于标识一个时序数据的特点
  5. field: 键值对,用于记录时序数据的具体值

第一次接触 bucket 的概念,可能会有所疑惑,下面是官方的解释:

A bucket is a named location where data is stored that has a retention policy. It’s similar to an InfluxDB v1. x “database,” but is a combination of both a database and a retention policy.

也就是说它类似于 v1 版本中的 “database”,但是多了一个数据保存策略。

时序数据通常是海量的,对其进行合适的数据清理和保存是十分重要的,每个数据库对应着一个数据保存策略以适应于实际场景的使用。

在一个 bucket 中,插入每条数据需要遵循以下格式(行协议):

重要定义

Point 和 Series 是官方列出的两个重要定义。

Point: Single data record identified by its measurement, tag keys, tag values, field key, and timestamp.

Point 其实就相当于数据库表中的一条记录。

Series: A group of points with the same measurement, tag keys, and tag values.

Series 可以理解为同一 measurement 中具有相同 tag 的数据组成的数据集。

存储引擎

TSM 采用了类似于 LSM 的存储思路,LSM 是许多 NOSQL 采用的存储引擎,大致的过程如下图(来源网络):

LSM 树核心思想的核心就是放弃部分读能力,换取写入的最大化能力。

LSM 假定不需要每次有数据更新就必须同时将数据写入到磁盘中,而可以先将最新的数据驻留在内存中,等到积累到足够多之后再使用归并排序的方式将内存内的数据合并追加到磁盘队尾(因为所有待排序的树都是有序的,可以通过合并排序的方式快速合并到一起)。

这也就是 LSM 在某些情况比 B+ 树更合适的原因,如果你的场景一直需要大量的写入,而不需要很多读取和修改操作,推荐使用 LSM,比如时序数据的存储,一般不会进行任何修改操作而会有海量的写入操作。

TSM 存储流程大同小异,如下所述:

Batches of points are sent to InfluxDB, compressed, and written to a WAL for immediate durability. Points are also written to an in-memory cache and become immediately queryable. The in-memory cache is periodically written to disk in the form of TSM files. As TSM files accumulate, the storage engine combines and compacts accumulated them into higher level TSM files.

批量的 points 将会被写或更新到 cache 中并将操作写入 wal 中以确保在宕机之类的情况发生时数据的可恢复性。cache 中的数据按 series key 有序地被组织好,且会周期性地写入到磁盘中,以 TSM 文件的格式存储着。

对于 TSM 文件,TSM 全称 Time-Structured Merge Tree,即时间结构化归并树,从名字可以大概推知其组织数据的形式,下面是官方解释:

To efficiently compact and store data, the storage engine groups field values by series key, and then orders those field values by time.

根据 series key 分组数据,并且将他们根据时间排序好。

其中 series key 由 measurement, tag key & value, field key 进行定义。

那么 TSM 如何一步步地合并与分组数据的呢?它大致是基于分级压缩的策略来进行的,当多个一级数据达到一定大小后会合并压缩为二级数据,直到达到四级数据,然后将会为这些数据分出一个新的 TSM 文件,所有具有相同 series key 的数据将为位于同一 TSM 文件中,这样确保了索引数据的效率,因为数据是连续的。

数据操作

创建数据库

1influx bucket create \
2--org="$org" \
3--token="$token" \
4--name="$bucket" \
5--retention "3d" \
6--shard-group-duration "1h"

该语句创建了一个 bucket,其中数据保留 3 天,分片时间跨度为 1 小时。

分片是时间序列数据的横向分区,每个分片包含其指定时间范围内的时间序列数据子集。在这个例子中,由于数据最多保留 3 天,分片时间跨度为 1 小时,那么将产生 3 * 24 个分片。

分片持续期的选择影响查询性能、数据压缩和存储效率,需要仔细考虑后进行使用。

数据写入

influxdb 在插入数据时自动创建 measurement,插入数据的格式需要符合行协议。

 1influx write \
 2--org="$org" \
 3--token="$token" \
 4--bucket $bucket \
 5--precision s "
 6health_data_t1,cardId='20181214062',type=1 heartrate=80,bloodOxygen=97,microcirculation=80 1706002825
 7health_data_t1,cardId='20181215062',type=1 heartrate=81,bloodOxygen=87,microcirculation=81 1706002825
 8health_data_t1,cardId='20181214072',type=1 heartrate=82,bloodOxygen=98,microcirculation=80 1706002825
 9health_data_t1,cardId='20181214052',type=1 heartrate=80,bloodOxygen=97,microcirculation=84 1706002825
10health_data_t1,cardId='20181212062',type=1 heartrate=86,bloodOxygen=99,microcirculation=80 1706002825
11health_data_t1,cardId='20181214162',type=1 heartrate=80,bloodOxygen=97,microcirculation=82 1706002825
12"

以上操作将创建一个名为 health_data_t1 的 measurement,有 cardId,type 这两个 tag,有 heartrate,bloodOxygen,microcirculation 这三个 field,最后跟着的是时间戳,通过 --precision 指定其单位为 s(如果不是纳秒级都需要自己指定单位)。

数据查询

influx 支持通过 flux 和 influxql(sql) 进行查询,flux 更为灵活而强大,下面只介绍 flux。

如下是一个查询示例:

1influx query \
2--org="$org" \
3--token="$token" '
4from(bucket: "equipments")
5    |> range(start: 2024-01-23T16:00:00Z, stop: 2024-01-23T19:00:01Z)
6    |> filter(fn: (r) => r._measurement == "health_data_t1")
7    |> filter(fn: (r) => r._field == "heartrate")
8'

需要注意数据中时间的时区以 UTC 作为标准,如果需要使用当地时间,比如中国,则修改时间范围为如下:

1range(start: 2024-01-23T16:00:00+08:00, stop: 2024-01-23T19:00:01+08:00)

更多用法记录:

 1# 映射操作
 2|> map(fn: (r) => ({r with _value: r._value / 100.0}))
 3
 4# 求平均值
 5|> mean()
 6
 7# 定时任务
 8option task = {name: "downsample_5m_precision", every: 1h, offset: 0m}
 9from(bucket: "equipments")
10    |> range(start: -task.every)
11    |> filter(fn: (r) => r._measurement == "mem" and r.host == "myHost")
12    |> aggregateWindow(every: 5m, fn: mean)
13    |> to(bucket: "equipments-downsample")

其中,定时任务每一小时执行一次,时间偏移为 0,时间窗口设为五分钟,对五分钟内的数据取平均值得到新的数据,然后存入新的 bucket 中,一般新的 bucket 会采用时间更长的保存策略。

第三方接口使用

这里记录下 Java 的接入方式。

1<dependency>
2  <groupId>com.influxdb</groupId>
3  <artifactId>influxdb-client-java</artifactId>
4  <version>6.6.0</version>
5</dependency>

官方提供了三种写入方式:

  1. Use InfluxDB Line Protocol to write data(行协议的方式)
  2. Use a Data Point to write data(Point 方式)
  3. Use POJO and corresponding class to write data(POJO 映射方式)

这里采用第二种作为示例:

 1String url = "http://localhost:8086";
 2String token = "your_token";
 3String bucket = "test";
 4String org = "xdu";
 5
 6try(InfluxDBClient client = InfluxDBClientFactory.create(url,
 7        token.toCharArray(), org, bucket)) {
 8    Point p = Point.measurement("cpu")
 9            .addTag("host", "serverB")
10            .addTag("region", "cn")
11            .addField("value", 0.2318)
12            .addField("value1", 0.8238)
13            .addField("value2", "s1")
14            .time(Instant.now(), WritePrecision.NS);
15    client.getWriteApiBlocking().writePoint(p);
16} catch (Exception e) {
17    System.out.println(e.getMessage());
18}

查询操作可以直接通过 flux 脚本完成。找出过去 12h 内 measurement 为 “cpu”,host 为 “serverB”,且展现 field 为 “value1”:

1String query = "from(bucket:\"test\")\n" +
2        "  |> range(start: -12h)\n" +
3        "  |> filter(fn: (r) =>\n" +
4        "    r._measurement == \"cpu\" and\n" +
5        "    r._field == \"value1\" and\n" +
6        "    r.host == \"serverB\"\n" +
7        "  )";

接着调用该脚本:

1List<FluxTable> tables = client.getQueryApi().query(query, org);
2
3for (FluxTable table : tables) {
4    for (FluxRecord record : table.getRecords()) {
5        System.out.println(record.getValues());
6    }
7}

参考文档