时序数据库-InfluxDB
简介
提示
TDengine是国产的时序数据库,貌似功能更强,允许使用标准的 SQL 语法进行数据写入,但是还没来得及研究。。。
InfluxDB是一个用 Go 语言编写的时序数据库,他可用于程序的指标检测,如将 CPU、内存以及其指标写入时序数据库,再通过 FLUX 语言每隔一段时间查询,发现异常触发报警。
时序数据库通常运用于监控场景,比如运维和 IOT(物联网)领域,设备会不停地向数据中插入数据,数据量会非常大,在应用程序端,通过读取某个区间的数据,分析数据状况并执行报警。它底层数据结构使用的日志结构化合并树。
时序数据库一般会对冷、热数据进行分离以节省空间(压缩冷数据),有价值的数据通常是较新的数据,而且大多数不支持事务、数据的修改操作。
安装 InfluxDB 非常简单,因为 Go 语言编写的程序是可成执行文件,下载官网提供的压缩包,解压直接运行即可,运行后访问 http://localhost:8086/查看效果。
时序数据库大致分为数据采集、数据存储、定期查询、报警这四个步骤,在 InfluxDB 1.X 版本中提供了 TICK 生态分别实现了这四个功能:
- T:Telegraf --------数据采集组件,收集、发送数据到 InfluxDB
- I:InfluxDB --------数据存储、发送数据到 Chronograf
- C:Chronograf ----提供 UI 界面,图形化展示数据
- K: Kapacitor -------处理报警任务、定时任务
在InfluxDB 2.X 版本中,Chronograf 和 Kapacitor 的功能全部整合到了 InfluxDB 中,因此只需关注 Telegraf 和 InfluxDB 即可。
InfluxDB 需要购买企业版才支持集群功能,但是国内有大佬提供了开源的集群方案参考地址
注意
InfluxDB 采用 http 协议和客户端通信,在第一次创建 InfluxDB 用户的时候记得复制一下 API Token,客户端必须携带令牌才能和服务端通信,不复制也行,可以重新生成。
数据插入
数据类型
以下是行协议常见的数据类型
| 数据类型 | 示例 |
|---|---|
| 浮点数 | -3.1415、1.0、1 |
| 整数 | -999i、1i、100i |
| 无符号整数(正数) | 0u、100u |
| 字符串 | "you are good!" |
| 布尔 true | t、T、true、True、TRUE |
| 布尔 false | f、F、false、False、FALSE |
行协议
在 InfluxDB 提供的 UI 操作界面中,提供行协议(Line Protocol)方式导入数据,基本语法为:

// 提前在Buckets菜单中新建一个 test_one 存储桶,并通过行协议向桶中插入数据,如下
mytest,mytag1=value1,mytag2=value1 myname="zjx",myage=18 1724055334000名称(measurement,测量名称)可简单理解为数据表,不可省略。
标签集相当于数据的索引,对应的数据值不宜过多(如性别、地区等),标签可以省略。
字段集存放业务数据,至少需有一个字段,每个字段会被保存为单独的行
时间戳若不指定,默认为当前。
InfluxDB 通过测量名称+标签集+一个字段将数据划分为不同序列,再通过时间定位到具体数据,通过这两个索引,能快速查询到指定时间范围内的数据。
Telegraf
Telegraf 是 Go 语言开发的数据采集工具,可以在 InfluxDB 中创建 Telegraf 的配置文件,不同机器上的 Telegraf 通过读取配置文件,集中向 InfluxDB 发送指标数据。
InfluxDB 整合 Telegraf 十分简单,找到 InfluxDB 控制台中的 Telegraf 选项,新建配置,它默认提供了非常多的 Telegraf 配置项,包括 MySQL、Tomcat、Redis、AMQP、ES、操作系统(System)等,配置创建成功后,在每个安装 Telegraf 的机器上执行控制台中提示的指令(配置 token 、绑定 InfluxDB 地址)即可完成绑定。
Scrapers
InfluxDB 可通过 Scrapers 主动抓取指标数据(仅需指定一个目标 url),所需数据格式和Prometheus(一个开源的服务监控系统和时间序列数据库)所需要的格式一致,指标提供方只需对外暴露一个或多个 http 协议的 API,采集方定期从这些 url 中主动拉取数据。
因为 Prometheus 十分受欢迎,其生态十分完善,支持将监控对象的指标,暴露成 Prometheus 格式数据,并提供 http 服务,这个服务称为 Exporter。Prometheus 官方提供的 Api 支持自定义 Exporter,也有非常多开源的 Exporter 可以直接使用,比如mysql_exporter。
由此可以看到,InfluxDB 对于 Prometheus 格式的数据支持十分友好,紧跟主流。
在控制台中新建的 Scrapers 默认每 10 秒抓取一次,目前不支持在 UI 页面上修改间隔时间。可以在 Telegraf 中安装 Prometheus 数据源插件,收集 Prometheus 格式的数据,再发送给 InfluxDB,Telegraf 可自定义时间。
数据查询
Flux 语法
Flux 语法是 InfluxDB2.X 版本发布的,他是一种全新的语法,有点类似 Java 中的流操作。旧版本使用InfluxQL,它和 SQL 语法相似。这里只简单介绍一下 Flux 语法,使用非常简单,详情可参考官网。
使用 InfluxDB 提供的 UI 控制面板编写 Flux 语言非常方便(Data Explorer 菜单栏),能给出友好提示,可参考官网查看 Flux 语言 API 文档,以下是一个简单示例:
import "array"
import "internal/debug"
// 声明一个String类型的空值
a=debug.null(type:"string")
// 判断是否存在
b=exists a
// 定义一个函数
myFunc= (x,y) => x + y
// 构造一个表,便于查看输出
array.from(rows: [{name: "zs", age: myFunc(x:10, y: 8), isMarry: b}])查询示例
// 前一个小时 from(bucket:"test_one") |> range(start: -1h) // 指定时间段 from(bucket:"test_one") |> range(start: -6h, stop: -10m) |> filter(fn: (r) => r._measurement == "mytest" and r._field == "myage" and r.mytag1 == "value1") |> yield()
Http API
InfluxDB 采用 Http 协议通信,并提供了 Http API 操作数据库,可以访问官方文档或 http://localhost:8086/docs查看各个 Http API 的功能,所有接口符合 OpenAPI 规范,能一键导入到接口测试工具(如 ApiFox)中便于测试。
使用 Http API 操作 InfluxDB 时,需要在请求头中附带上 API Token(可在 UI 页面中生成),格式如下:
curl --request GET "INFLUX_URL/api/v2/buckets?limit=1&offset=50" \
--header "Authorization: Token INFLUX_API_TOKEN"开启 Https 需要在运行 InfluxDB 时指定证书路径,如下:
# Start InfluxDB with TLS
influxd.exe \
--tls-cert="<path-to-crt>" \
--tls-key="<path-to-key>"Java 客户端
InfluxDB 提供了多种编程语言的客户端库,在 UI 控制台的 Source 菜单中,可查看支持的语言和详细接入方式,而且文档十分美观,这里以 Java 客户端为例。
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>7.2.0</version>
</dependency>@Data
@Measurement(name = "myTest")
@AllArgsConstructor
public class TestPojo {
// 地理坐标
@Column(tag = true)
String location;
// 季节
@Column(tag = true)
String season;
// 水位
@Column
String waterLevel;
// 温度
@Column
Integer temp;
@Column(timestamp = true)
Instant time;
}// api token,这个API Token 需要拥有操作桶的权限
private static char[] token = "v7IcublyA33MMNKIlPxZK0QoBmWrRqAqwkfvW-jleFjZa1JfYr_aoyURX4Tgm5RSzf4fRfAEZ9757BeA7Y73ng==".toCharArray();
private static String influxDBUrl = "http://localhost:8086";
private static String orgID = "3e990f484138c88f"; // 组织ID,这个可以在UI界面的用户信息中查看到
// 新增桶
public static String createBucket(String bucketName) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxDBUrl, token);
// 设置存储桶规则,数据保留 3600 秒
BucketRetentionRules retention = new BucketRetentionRules();
retention.setEverySeconds(3600);
Bucket bucket = influxDBClient.getBucketsApi().createBucket(bucketName, retention, orgID);
// 设置存储桶权限
PermissionResource resource = new PermissionResource();
resource.setId(bucket.getId());
resource.setOrgID(orgID);
resource.setType(PermissionResource.TYPE_BUCKETS);
// 可读
Permission read = new Permission();
read.setResource(resource);
read.setAction(Permission.ActionEnum.READ);
// 可写
Permission write = new Permission();
write.setResource(resource);
write.setAction(Permission.ActionEnum.WRITE);
Authorization authorization = influxDBClient.getAuthorizationsApi()
.createAuthorization(orgID, Arrays.asList(read, write));
// 返回操作当前桶需携带的Token
String token = authorization.getToken();
logger.info("新建存储桶:{},向当前存储桶写入数据需携带的token为:{}", bucketName, token);
logger.info("新建存储桶名称:{},桶id为:{}", bucketName, bucket.getId());
influxDBClient.close();
return token;
}
// 删除桶
public static void delBucketById(String bucketId) {
InfluxDBClient influxDBClient = InfluxDBClientFactory.create(influxDBUrl, token);
influxDBClient.getBucketsApi().deleteBucket(bucketId);
influxDBClient.close();
}// 某个桶附带的客户端 Api Token
private static final String BUCKET_TOKEN = "e6jvFhYKDzneiy4JAbRWm1BejP7xhT8e58S7LX67Gg3BbhCMAfEMjANY_T2zatWqxNfrl2FBWwa84OWr2ErZUg==";
private static final String BUCKET_NAME = "test-spring-boot1";
@Test
void createBucket() {
String bucket = MyInfluxDBBucketUtil.createBucket(BUCKET_NAME);
assertNotNull(bucket);
}
@Test
void delBucket() {
MyInfluxDBBucketUtil.delBucketById("6e5f2d8ffbb232cc");
}
@Test
void writeByPoint() {
Point point = Point.measurement("myTest")
.addTag("location", "四川").addTag("season", "春")
.addField("waterLevel", "-6米")
.addField("temp", 6)
.time(Instant.now().toEpochMilli(), WritePrecision.MS);
ReadWriteUtil.writeByPoint(BUCKET_NAME, BUCKET_TOKEN, point);
}
@Test
void writeByLineProtocol() {
String line = "myTest,location=重庆,season=冬 waterLevel=\"-15米\",temp=-3i";
ReadWriteUtil.writeByLineProtocol(BUCKET_NAME, BUCKET_TOKEN, line);
}
@Test
void writeByPojo() {
List<TestPojo> pojos = List.of(
new TestPojo("四川", "春", "-10米", 8, Instant.now().minusSeconds(3)),
new TestPojo("四川", "春", "-8米", 10, Instant.now().minusSeconds(2)),
new TestPojo("四川", "春", "-5米", 12, Instant.now().minusSeconds(1)),
new TestPojo("四川", "夏", "+10米", 34, Instant.now())
);
ReadWriteUtil.writeByPojo(BUCKET_NAME, BUCKET_TOKEN, pojos);
}
@Test
void readByFlux() {
String flux = "from(bucket:\"" + BUCKET_NAME + "\") |> range(start: -8h)";
List<FluxTable> fluxTables = ReadWriteUtil.readByFlux(BUCKET_NAME, BUCKET_TOKEN, flux);
fluxTables.forEach(table -> {
log.info("表序列编号为:{} -- 所包含的数据总条数:{}", table.getRecords().get(0).getTable(), table.getRecords().size());
table.getRecords().forEach(fluxRecord ->
log.info("录入时间:{},数据键:{},数据值: {}", fluxRecord.getTime(), fluxRecord.getValueByKey("_field"), fluxRecord.getValueByKey("_value"))
);
});
assertNotNull(fluxTables);
}模板
InfluxDB 存在一个固定的流程,类似:数据收集->数据查询->数据展示->监控报警;
这一套下来手动去实现比较无聊,已经存在很多可用的模板供我们使用,如 InfluxDB 官方提供的社区模板,能监控 MySQL、Redis、kafka、docker 等众多应用,其部署流程可参官网,流程大致为:
- 在 InfluxDB 中导入所需模板
- 在被监控的机器上安装 Telegraf,设置好连接 InfluxDB 的凭据信息并运行
- 查看 InfluxDB 的 UI 界面观察数据桶、仪表盘、告警等数据是否添加成功
但是官网提供的模板常年未更新,社区也不活跃,推荐使用 Grafana,一个开源的数据可视化和监控、告警平台,支持多种数据源,包括 Elasticsearch、InfluxDB、Prometheus 等。
InfluxDB 结合 Grafana,InfluxDB 负责保存数据,Grafana 负责数据可视化和监控、告警等。
告警
告警包含检查规则、报警终端、报警规则三个部分,其含义如下:
- 检查规则:每隔一段时间查询一次数据,如温度、水位线等,并给定阈值
- 报警终端:实际处理报警请求的终端,如微信通知、邮件、短信等
- 报警规则:将检查规则和报警终端绑定,如温度过高发短信,水位线过高发邮箱
检查规则
检查规则(checks)有阈值检查(Threshold Check)和死亡检查(Deadman Check,用于检测能否收到指标数据)两种,新建一个阈值检查(Threshold Check),根据气温的最大值设置检查规则:


消息模板的定义可参考官网。
报警的数据存储在系统自带的
_monitoring桶中。报警终端
报警终端(Notification Endpoint)配置非常简单,添加一个 Webhook 的地址即可,这里就懒得演示啦。
可以使用睿象云的 Rest Api 工具来接收告警信息,并设置触发通知(如短信、电话等)的规则,或者其他任何支持接收告警的工具,也可以自己写个服务端玩儿一下。
报警规则
将报警消息(格式为消息模板中定义的格式)根据不同规则发送到指定的报警终端。

FluxDB-alert-报警规则
NoteBooks
上面配置告警的方式,我们无法编辑发送给 Webhook 服务端的数据格式,使用 UI 提供的 NoteBooks 能更灵活的封装告警消息,在 NoteBooks 中新建一个报警(Alert),编辑完成后点击右下角导出报警任务,如下:

导出成功后,在 Task 菜单中编辑刚才导出的任务,会得到符合 Flux 语法规范的数据项,在里面能自由编辑告警信息的数据格式(body 部分),部分示例如下:
task_data
|> schema["fieldsAsCols"]()
|> filter(fn: (r) => r["_measurement"] == "myTest")
|> monitor["notify"](
data: notification,
endpoint:
http.endpoint(url: "https://www.example.com/endpoint")(
mapFn: (r) => {
body = {
"myTest": r,
"myMsg": "温度告警:" + r["temp"]
}
return {
headers: {"Content-Type": "application/json"},
data: json.encode(v: body),
}
},
),
)
