前言

对于kuiper的使用来说,可以通过3个步骤来完成一个完整的流程。

  1. 定义数据源, 内置支持 MQTT 数据的接入,提供了扩展点支持任意的类型的接入
  2. 定义处理sql
  3. 定义目标(sink), 内置支持 MQTT、HTTP,提供扩展点支持任意数据目标的支持

第一步和第三步没什么好说的,无非是数据从哪里来,要到哪里去。而我们使用流处理的环境并不是或者说并不仅仅是完成数据的传输的,而是要对数据进行处理分析,这才是流处理的意义和价值。那么sql就是来做处理分析的。通过sql可以对流数据进行抽取,转换和过滤。

截至目前(2021-02-01), 除了基本的查询语法,kuiper内置了 60+的内置函数,如数学运算函数,json处理函数,聚合函数和哈希运算函数等,4类时间窗口和计数窗口。

数据类型

kuiper sql 是一种类sql查询语言,暂时支持以下数据类型。(datetime计划支持中)

数据类型说明
datetime日期时间类型-目前尚不支持
bigint整数型
float浮点型
string文本值,由 Unicode 字符组成。
boolean布尔类型,值可以是true 或者 false
array数组类型可以是简单数据或结构类型中的任何类型。
struct复杂类型。 名称/值对的集合。 值必须是受支持的数据类型。
① 数据类型比较和计算的兼容性

比较和计算,就比如 >+, 如果运算符两侧的数据类型不一样,可能报错。kuiper的兼容性如下表所示(Y: yes, N: no):

bigintfloatstringdatetimeboolean
bigintYYNN
floatYYNN
stringNNYN
datetimeYYY, 如为合法数据类型Y
booleanNNNN

另外,

  • 如果做比较的双方有nil, 返回false
  • 如果做计算的双方有nil, 返回nil

SQL语法关键字

kuiper sql 支持以下关键字进行查询和筛选。

比如: select * from table where a>b order by id

元素总结
SELECTSELECT 用于从输入流中检索行,并允许从 Kuiper 中的一个或多个输入流中选择一个或多个列。
FROMFROM 指定输入流。 任何 SELECT 语句始终需要 FROM 子句。
JOINJOIN 用于合并来自两个或更多输入流的记录。 JOIN 包括 LEFT,RIGHT,FULL 和 CROSS。
WHEREWHERE 指定查询返回的行的搜索条件。
GROUP BYGROUP BY 将一组选定的行分组为一组汇总行,这些汇总行按一个或多个列或表达式的值分组。
ORDER BY按一列或多列的值对行进行排序。
HAVINGHAVING 为组或集合指定搜索条件。 HAVING 只能与 SELECT 表达式一起使用。

内置函数

① 聚合函数
函数示例说明
avgavg(age)平均值, null值会被忽略
countcount(*)统计总数,null值会被忽略
maxmax(age)最大值, null值会被忽略
minmin(age)最小值,null值会被忽略
sumsum(age)求和, null值会被忽略
collectcollect(*), collect(col1)返回组中指定的列或整个消息(参数为*时)的值组成的数组。
deduplicatededuplicate(col, false)返回当前组去重的结果,通常用在窗口中。
其中,第一个参数指定用于去重的列;第二个参数指定是否返回全部结果。
若为 false ,则仅返回最近的未重复的项;若最近的项有重复,则返回空数组;此时可以设置 sink 参数,使得 sink 接到空结果后不触发。
② 数学函数
函数示例说明
absabs(col1)绝对值
acosacos(col1)弧度数的反余弦值
asinasin(col1)弧度数的反正弦值
atanatan(col1)弧度数的反正切值
atan2atan2(col1, col2)正x轴与两个自变量中定义的(x,y)点之间的弧度角
bitandbitand(col1, col2)对两个Int(-converted)参数的位表示执行按位“与”运算
bitorbitor(col1, col2)对两个参数的位表示进行或运算
bitxorbitxor(col1, col2)对两个Int(-converted)参数的位表示执行逐位异或运算
bitnotbitnot(col1)在Int(-converted)参数的位表示形式上执行按位NOT运算
ceilceil(col1)将值舍入到最接近的BIGINT值。
coscos(col1)返回以弧度为单位的数字的余弦值。
coshcosh(col1)返回弧度数的双曲余弦值。
expexp(col1)返回小数点参数的e。
lnln(col1)返回参数的自然对数。
loglog(col1)返回参数的以10为底的对数。
modmod(col1, col2)返回第一个参数除以第二个参数的余数。
powerpower(x, y)返回 x 的 y 次方。
randrand()返回一个伪随机数,其均匀分布在0.0和1.0之间。
roundround(col1)将值四舍五入到最接近的 BIGINT 值。
signsign(col1)返回给定数字的符号。 当参数的符号为正时,将返回1。 当参数的符号为负数时,返回-1。 如果参数为0,则返回0。
sinsin(col1)返回弧度数的正弦值。
sinhsinh(col1)返回弧度数的双曲正弦值。
sqrtsqrt(col1)返回数字的平方根。
tantan(col1)返回以弧度表示的数字的正切值。
tanhtanh(col1)返回弧度数的双曲正切值。
③ 字符串函数
函数示例说明
concatconcat(col1…)连接数组或字符串。 此函数接受任意数量的参数并返回 String 或 Array
endswithendswith(col1, col2)返回一个布尔值,该布尔值指示第一个 String参数是否以第二个 String 参数结尾。
format_timeformat_time(col1, format)将日期时间格式化为字符串。
indexofindexof(col1, col2)返回第二个参数的第一个索引(从0开始),作为第一个参数中的子字符串。
lengthlength(col1)返回提供的字符串中的字符数。
lowerlower(col1)返回给定 String 的小写版本。
lpadlpad(col1, 2)返回 String,在左侧用第二个参数指定的空格数填充。
ltrimltrim(col1)从提供的字符串中删除所有前导空格(制表符和空格)。
numbytesnumbytes(col1)以提供的字符串的 UTF-8 编码返回字节数。
regexp_matchesregexp_matches(col1, regex)如果字符串(第一个参数)包含正则表达式的匹配项,则返回 true。
regexp_replaceregexp_matches(col1, regex, str)将第一个参数中所有出现的第二个参数(正则表达式)替换为第三个参数。
regexp_substrregexp_substr(col1, regex)在第一个参数中找到第二个参数(regex)的第一个匹配项。
rpadrpad(col1, 2)返回字符串参数,在右侧填充第二个参数指定的空格数。
rtrimrtrim(col1)从提供的字符串中删除所有尾随空白(制表符和空格)。
substringsubstring(col1, start, end)从提供的 Int 索引(从0开始,包括0)到字符串的结尾,返回提供的String的子字符串。
startswithstartswith(col1, str)返回布尔值,是否第一个字符串参数是否以第二个字符串参数开头。
trimtrim(col1)从提供的字符串中删除所有前导和尾随空格(制表符和空格)。
upperupper(col1)返回给定 String 的大写版本。
④ 转换函数
函数示例说明
castcast(col, “bigint”)将值从一种数据类型转换为另一种数据类型。 支持的类型包括:bigint,float,string,boolean 和 datetime(现在不支持)。
chrchr(col1)返回与给定 Int 参数对应的 ASCII 字符
encodeencode(col1, “base64”)使用 encode 函数根据编码方案将负载(payload)(可能是非 JSON 数据)编码为其字符串表示形式。目前,只支持”base64”econding 类型。
trunctrunc(dec, int)将第一个参数截断为第二个参数指定的小数位数。 如果第二个参数小于零,则将其设置为零。 如果第二个参数大于34,则将其设置为34。从结果中去除尾随零。
⑤ 哈希函数
函数示例说明
md5md5(col1)参数的md5哈希值
sha1sha1(col1)参数的sha1哈希值
sha256sha256(col1)参数的sha256哈希值
sha384sha384(col1)参数的sha384哈希值
sha512sha512(col1)参数的sha512哈希值
⑥ 功能函数/其他函数
函数示例说明
isNullisNull(col1)如果参数为空值,则返回 true。
newuuidnewuuid()返回一个随机的16字节 UUID。
tstamptstamp()返回当前时间戳,以1970年1月1日星期四00:00:00协调世界时(UTC)为单位。
mqttmqtt(topic)返回指定键的 MQTT 元数据。 当前支持的键包括
-topic:返回消息的主题。 如果有多个流源,则在参数中指定源名称。 如 mqtt(src1.topic)
- messageid:返回消息的消息ID。 如果有多个流源,则在参数中指定源名称。 如 mqtt(src2.messageid)
metameta(topic)返回指定键的元数据。 键可能是:
-如果 from 子句中只有一个来源,则为独立键,例如meta(device)
-用于指定流的合格键,例如 meta(src1.device)
-用于多级元数据的带有箭头的键,例如 meta(src1.reading->device->name)。这里假定读取是地图结构元数据。

内置函数 补充 - json函数

截至目前,kuiper 内置了三个json 函数。

  • json_path_exists
  • json_path_query
  • json_path_query_first

在说明这三个函数的使用之前,先来看一下kuiper sql中的基本json操作方法。

  1. 假如有一份样例json数据如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    {
    "name": "Link",
    "age": 18,
    "friends": ["Mipha","Revail","Daruk","Urbosa"],
    "saddle_horse": {"name": "Audrey", "color": "white", "level": 5},
    "tasks": [
    {"name": "t1", "location": "l1", "level": 1},
    {"name": "t2", "location": "l2", "level": 2},
    {"name": "t3", "location": "l3", "level": 3},
    {"name": "t4", "location": "l4", "level": 4},
    {"name": "t5", "location": "l5", "level": 5}
    ]
    }
  1. 以该json为数据样例,创建 流:

    1
    2
    3
    {
    "sql": "CREATE STREAM json_stream ( name string, age bigint, friends array(string), saddle_horse struct(name string, color string, level bigint), tasks array(struct(name string, location string, level bigint))) WITH (DATASOURCE=\"json_topic\", FORMAT=\"json\", CONF_KEY=\"default\", TYPE=\"mqtt\")"
    }
  1. 创建规则模板为:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    {
    "id": "${id}", # id 随意编写
    "sql": "${sql}", # 这里替换为真实处理sql
    "actions":[
    {
    "mqtt":{
    "server":"tcp://192.168.21.23:8883",
    "topic":"json_sinker"},
    "log":{}
    }
    ],
    "options":{
    "qos":1
    }
    }
  1. 创建规则后,发送样例数据, 可以订阅tcp://192.168.21.23:8883json_sinker看到输出。

    具体规则和对应输出有以下几种情况。

① 取json第一层字段值: .

通过 流名 + . + key 来获取,如:json_stream.name,

ps : 流名可写可不写, 因为 from关键字已指定流,这里示例sql都写上是为了方便展示json的层级关系。

${sql}:

1
select json_stream.name from json_stream

输出为:

1
[{"name":"Link"}]
② 取嵌套json的键: ->

${sql}:

1
SELECT json_stream.saddle_horse->name from json_stream 

输出为:

1
[{"rengine_field_0":"Audrey"}]
③ 添加别名: as

${sql}:

1
SELECT json_stream.saddle_horse->name as newName from json_stream 

输出为:

1
[{"newName":"Audrey"}]
④ 数组按索引下标取值

索引下标从 0 开始,支持索引表达式

${sql}:

1
SELECT json_stream.friends[0:2] as partFriends from json_stream 

输出为:

1
[{"partFriends":["Mipha","Revail"]}]
⑤ 通配符: * 暂时不支持,计划支持

${sql}:

1
SELECT json_stream.tasks[*]->name as gotNames from json_stream 

输出为:

1
[{"gotNames":["t1","t2", "t3", "t4", "t5"]}]

上面的几种取值方法可以灵活配合使用。

json 函数的使用

知道了kuiper 提供的 json基本解析方法,接下来可以了解三个json 函数的使用。

函数示例说明
json_path_existsjson_path_exists(col1, “$.name”)检查 JSON 路径是否存在指定JSON 值的任何项目。 返回布尔值。
json_path_queryjson_path_query(col1, “$.name”)获取 JSON 路径返回的指定 JSON值的所有项目。
json_path_query_firstjson_path_query_first(col1, “$.name”)获取 JSON 路径返回的指定 JSON值的第一项。

三个函数的作用已经知道了,那么参数是什么?

  1. 第一个参数 col1, 这个就是json数据了,需要注意的是,不能是用 流 名,需要指定到具体的字段,比如 json_stream.tasks, json_stream.name, 直接写json_stream的话会报错导致应用崩溃

  2. 第二个参数$.name, 其实就是上面基本解析方法的组合, 相较于基本的解析方法,多了两个特殊符号:$@, 而且支持通配符 *

    • $是对 col1的引用,
    • @是对应json路径的value。可以用该值来进行比较和筛选。

来看几个示例:

① json_path_query: 取出示例数据tasks的所有task name.

${sql}:

1
select json_path_query(json_stream.tasks, "$[*].name") from json_stream

输出:

1
[{"json_path_query":["t1","t2","t3","t4","t5"]}]
② json_path_query_first: 取出示例数据tasks的第一项task name.

json_path_query_firstjson_path_query 用法类似,区别在于后者取出符合条件的所有数据, 前者取出符合条件的第一条数据。

${sql}:

1
select json_path_query_first(json_stream.tasks, "$[*].name") from json_stream

输出:

1
[{"json_path_query_first":"t1"}]
③. json_path_exists: 判断数据流中如果有任意一条task level>3

${sql}:

1
SELECT json_stream.tasks[:] as tasks_detail FROM json_stream where json_path_exists(json_stream.tasks, "$[? @.level>3]")

输出:

1
2
3
4
5
6
7
8
9
10
11
[
{
"tasks_detail":[
{"level":1,"location":"l1","name":"t1"},
{"level":2,"location":"l2","name":"t2"},
{"level":3,"location":"l3","name":"t3"},
{"level":4,"location":"l4","name":"t4"},
{"level":5,"location":"l5","name":"t5"}
]
}
]

需要注意的是,这里的过滤是针对一条完整的数据流的,数据流是kuiper sql操作的最小单位。也就是说,where 后面的 json_path_exists判断为真时,select 会对整条数据的task进行选择,如果判断为假(比如 json_path_exists(json_stream.tasks, "$[? @.level>5]")), 则这条数据流会被放弃

窗口函数

截至目前,kuiper 共支持五种窗口函数:

  • 滚动窗口
  • 跳跃窗口
  • 滑动窗口
  • 会话窗口
  • 计数窗口

窗口操作会在末尾输出结果, 窗口的输出是基于所用聚合函数的单个时间。

可以使用五种时间单位来定义一个窗口。

标识示例说明
SSTUMBLINGWINDOW(ss, 10)10秒间隔的窗口
DDTUMBLINGWINDOW(dd, 1)1天间隔
HHTUMBLINGWINDOW(hh, 24)24小时间隔
MITUMBLINGWINDOW(mi, 30)30分钟间隔
MSTUMBLINGWINDOW(ms, 30)30毫秒间隔
① 滚动窗口

示例:

1
SELECT count(*) FROM demo GROUP BY ID, TUMBLINGWINDOW(ss, 10);

以10秒为单位,统计一个窗口时间内数据流的总量。

② 跳跃窗口

示例:

1
SELECT count(*) FROM demo GROUP BY ID, TUMBLINGWINDOW(ss, 10, 5);

以10秒为单位,统计一个窗口时间内数据流的总量。但是和滚动窗口的区别在于,示例中的窗口是有重叠部分的,每5秒会重新计算一个窗口。

举个栗子,

00:00:00 –> 00:00:10 是一个窗口,

00:00:05 –> 00:00:15 也是一个窗口。

两个窗口的统计在 00:00:05 –> 00:00:10是重叠的。

当然也可以自由选择任意的跳跃值,比如说 10,此时会和滚动窗口效果一样

③ 滑动窗口

示例:

1
SELECT count(*) FROM demo GROUP BY ID, SLIDINGWINDOW(mm, 1);

滑动窗口功能与翻转或跳动窗口不同,仅在事件发生时会产生输出。 每个窗口至少会有一个事件,并且该窗口连续向前移动€(ε)。 就像跳跃窗口一样,事件可以属于多个滑动窗口

④ 会话窗口

示例:

1
SELECT count(*) FROM demo GROUP BY ID, SESSIONWINDOW(mm, 2, 1);

会话窗口功能对在相似时间到达的事件进行分组,以过滤掉没有数据的时间段。 它有两个主要参数:超时和最大持续时间。

当第一个事件发生时,会话窗口开始。 如果从上一次摄取的事件起在指定的超时时间内发生了另一个事件,则窗口将扩展为包括新事件。 否则,如果在超时时间内未发生任何事件,则该窗口将在超时时关闭。

如果事件在指定的超时时间内持续发生,则会话窗口将继续扩展直到达到最大持续时间。 最大持续时间检查间隔设置为与指定的最大持续时间相同的大小。 例如,如果最大持续时间为10,则检查窗口是否超过最大持续时间将在 t = 0、10、20、30等处进行。

⑤ 计数窗口

计数窗口不关注时间,只关注事件发生的次数。

@1 滚动计数窗口
1
SELECT * FROM demo WHERE temperature > 20 GROUP BY COUNTWINDOW(5)

按照 5 次对事件进行分组,并且只获取 temperature 大于 20 的数据。

窗口过滤

在某些情况下,窗口不需要所有输入。filter 子句用于过滤给定条件下的输入数据。与 where 子句不同,filter 子句在窗口分区之前运行。结果会有所不同,特别是计数窗口。如果对带有长度为 3 的计数窗口的数据使用 where 子句进行过滤,则输出长度将随窗口的不同而变化;而使用 filter 子句进行筛选时,输出长度将始终为 3。filter 子句必须跟在 window 函数后面。filter子句必须类似于 FILTER(WHERE expr)。例如:

1
SELECT * FROM demo GROUP BY COUNTWINDOW(3,1) FITLER(where revenue > 100)
时间戳

每个事件都有一个与之关联的时间戳。 时间戳将用于计算窗口。 默认情况下,当事件输入到源时,将添加时间戳,称为处理时间。 我们还支持将某个字段指定为时间戳,称为事件时间。 时间戳字段在流定义中指定。 在下面的定义中,字段 ts 被指定为时间戳字段。

1
CREATE STREAM demo ( color STRING, size BIGINT, ts BIGINT ) WITH (DATASOURCE="demo", FORMAT="json", KEY="ts", TIMESTAMP="ts"

在事件时间模式下,水印算法用于计算窗口。

窗口运行时错误

如果窗口从上游接收到错误(例如,数据类型不符合流定义),则错误事件将立即转发到目标(sink)。 当前窗口计算将忽略错误事件