前言: 什么是 kuiper

如果接触过大数据,一定不会对flink,spark, storm等数据处理框架陌生。这些框架承担着大数据中流处理和批处理的任务。

这些框架功能强大,但是对于物联网的边缘段来说,无疑有些重了。那么在资源受限的设备上如何进行数据清洗? kuiper就是答案。

kuiper 全称 EMQ X Kuiper, 由golang 实现的轻量级物联网边缘分析,流式处理开源软件, 是和运行在各类资源首先的边缘设备上,这正是Kuiper的设计目标: 将云端运行的实时流式处理框架迁移到边缘端。

所以,你可以在kuiper中见到非常熟悉的概念: source(数据源), sink(目标), SQL(业务处理逻辑)

优势

  • 超轻量

    • 核心服务安装包约 4.5MB,初始运行时占用内存约 10MB
  • 跨平台

    • 流行 CPU 架构:X86 AMD * 32, X86 AMD * 64; ARM * 32, ARM * 64位; PPC
    • 常见 Linux 发行版、OpenWrt 嵌入式系统、MacOS、Docker
    • 工控机、树莓派、工业网关、家庭网关、MEC 边缘云等
  • 完整的数据分析

    • 通过 SQL 支持数据抽取、转换和过滤
    • 数据排序、分组、聚合、连接
    • 60+ 各类函数,覆盖数学运算、字符串处理、聚合运算和哈希运算等
    • 4 类时间窗口,以及计数窗口
  • 高可扩展性

    提供插件扩展机制,可以支持在源 (Source)SQL 函数, 目标 (Sink) 三个方面的扩展

    • 源 (Source) :内置支持 MQTT 数据的接入,提供了扩展点支持任意的类型的接入
    • 目标(Sink):内置支持 MQTT、HTTP,提供扩展点支持任意数据目标的支持
    • SQL 函数:内置支持60+常见的函数,提供扩展点可以扩展自定义函数
  • 管理能力

  • 与 EMQ X Edge 集成

    提供了与 EMQ X Neuron 和 EMQ X Edge 的无缝集成,实现在边缘端从消息接入到数据分析端到端的场景实现能力

安装

安装方案一般有压缩包和docker镜像两种, 还可以选择dep/rpm包的方式进行安装,这里不介绍

方案1:下载压缩包

可以通过

或者

获取安装包

这里以kuiper-1.1.1-linux-x86_64.tar.gz为例

1
2
3
4
$ wget wget https://github.com/emqx/kuiper/releases/download/1.1.1/kuiper-1.1.1-linux-x86_64.tar.gz   # 下载压缩包 
$ tar -zxvf kuiper-1.1.1-linux-x86_64.tar.gz # 解压
$ cd cd kuiper-1.1.1-linux-x86_64/ #
$ bin/kuiperd # 启动
ps

在启动的时候可能会遇到 bin/kuiperd: /lib64/libc.so.6: version GLIBC_2.28 not found (required by bin/kuiperd) 的异常

可以通过 strings /lib64/libc.so.6 | grep GLIBC查看当前系统的版本,

可以在 http://ftp.gnu.org/gnu/glibc/ 下载对应的版本进行编译安装所需版本,

(当然也可以直接选择docker 安装, 推荐,)

1
2
3
4
5
$ wget http://ftp.gnu.org/gnu/glibc/glibc-2.28.tar.gz 
$ tar -zxvf glibc-2.28.tar.gz
$ cd glibc-2.28
$ ./configure --prefix=/usr --disable-profile --enable-add-ons --with-headers=/usr/include --with-binutils=/usr/bin
$ make -j 8 && make install
方案2:docker
1
2
$ docker pull emqx/kuiper:1.1.1
$ docker run -p 9081:9081 -d --name kuiper -e MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883] emqx/kuiper:1.1.1

这样kuiper服务就起来了。

ps:

MQTT_SOURCE__DEFAULT__SERVERS=[tcp://broker.emqx.io:1883] 是设置了一个默认的MQTT服务器。

tcp://broker.emqx.io:1883 由 EMQ提供的公有服务器。

你可以使用任意MQTT客户端连接发布消息到该服务器。

也可以将默认设置为我们自己的服务起。

端口9081是kuiper默认端口。

使用

如何使用kuiper? 通常需要三步。

  1. 创建流, 也就是数据源(source)
  2. 创建规则
    • sql(处理逻辑)
    • 指定数据处理结果的保存目标,即 sink
  3. 部署运行规则
1. 源

kuiper支持三种内置源:

  • MQTT源
  • HTTP定时拉取,按照指定的间隔,定时从HTTP服务拉取数据
  • EdgeX源

除了三种内置源,kuiper支持 用户自定义源。

2. 规则
参数名必需说明
id自定义规则id, 需要保证在同一个kuiper实例中具有唯一性
sqlKuiper 提供了一种类似于 SQL 的查询语言,用于对事件流执行转换和计算
actions处理完成之后,需要执行的sink动作,可以有多个,数组形式
options选项

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"id": "rule1",
"sql": "SELECT demo.temperature, demo1.temp FROM demo left join demo1 on demo.timestamp = demo1.timestamp where demo.temperature > demo1.temp GROUP BY demo.temperature, HOPPINGWINDOW(ss, 20, 10)",
"actions": [
{
"log": {}
},
{
"mqtt": {
"server": "tcp://47.52.67.87:1883",
"topic": "demoSink"
}
}
]
}
① sql

Kuiper 提供的一种类似于 SQL 的查询语言,用于对事件流执行转换和计算。

具体可以查看 SQL页面

② actions

actions即需要执行的sink操作,当前支持将处理结果发送至:

  • log: 日志文件
  • mqtt: MQTT服务器
  • edgex: Edgex消息总线
  • rest: Rest HTTP服务器
  • nop: nop操作

每个动作可以定义自己的属性。当前有以下的公共属性:

属性名类型和默认值描述
concurrencyint: 1设置运行的线程数。该参数值大于1时,消息发出的顺序可能无法保证。
bufferLengthint: 1024设置可缓存消息数目。若缓存消息数超过此限制,sink将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。
runAsyncbool:false设置是否异步运行输出操作以提升性能。请注意,异步运行的情况下,输出结果顺序不能保证。
retryIntervalint:1000设置信息发送失败后重试等待时间,单位为毫秒。如果该值的设置 <= 0,那么不会尝试重新发送。
retryCountint:3设置信息发送失败后重试次数,如果该值的设置 <= 0,那么不会尝试重新发送。
cacheLengthint:1024设置最大消息缓存数量。缓存的消息会一直保留直到消息发送成功。缓存消息将按顺序发送,除非运行在异步或者并发模式下。缓存消息会定期存储到磁盘中。
cacheSaveIntervalint:1000设置缓存存储间隔时间。需要注意的是,当规则关闭时,缓存会自动存储。该值越大,则缓存保存开销越小,但系统意外退出时缓存丢失的风险变大。
omitIfEmptybool: false如果配置项设置为 true,则当 SELECT 结果为空时,该结果将不提供给目标运算符。
sendSingletrue输出消息以数组形式接收,该属性意味着是否将结果一一发送。 如果为false,则输出消息将为{"result":"${the string of received message}"}。 例如,{"result":"[{\"count\":30},"\"count\":20}]"}。否则,结果消息将与实际字段名称一一对应发送。 对于与上述相同的示例,它将发送 {"count":30},然后发送{"count":20}到 RESTful 端点。默认为 false。
dataTemplatetruegolang 模板 (opens new window)格式字符串,用于指定输出数据格式。 模板的输入是目标消息,该消息始终是映射数组。 如果未指定数据模板,则将数据作为原始输入。
③ options

当前的选项包括:

选项名类型和默认值说明
isEventTimebool:false使用事件时间还是将时间用作事件的时间戳。 如果使用事件时间,则将从有效负载中提取时间戳。 必须通过 stream 定义指定时间戳记。
lateToleranceint64:0在使用事件时间窗口时,可能会出现元素延迟到达的情况。 LateTolerance 可以指定在删除元素之前可以延迟多少时间(单位为 ms)。 默认情况下,该值为0,表示后期元素将被删除。
concurrencyint: 1一条规则运行时会根据 sql 语句分解成多个 plan 运行。该参数设置每个 plan 运行的线程数。该参数值大于1时,消息处理顺序可能无法保证。
bufferLengthint: 1024指定每个 plan 可缓存消息数。若缓存消息数超过此限制,plan 将阻塞消息接收,直到缓存消息被消费使得缓存消息数目小于限制为止。此选项值越大,则消息吞吐能力越强,但是内存占用也会越多。
sendMetaToSinkbool:false指定是否将事件的元数据发送到目标。 如果为 true,则目标可以获取元数据信息。
qosint:0指定流的 qos。 值为0对应最多一次; 1对应至少一次,2对应恰好一次。 如果 qos 大于0,将激活检查点机制以定期保存状态,以便可以从错误中恢复规则。
checkpointIntervalint:300000指定触发检查点的时间间隔(单位为 ms)。 仅当 qos 大于0时才有效。

有关 qoscheckpointInterval 的详细信息,请查看状态和容错

可以在 rules 下属的 etc/kuiper.yaml 中全局定义规则选项。 规则 json 中定义的选项将覆盖全局设置。