基本语法

select select_list
from stream_def [as name] [, stream_def [as name]] [,...]
[where search_conditions]
[group by grouping_expression_list]
[having grouping_search_conditions]
[output output_specification]
[order by order_by_expression_list]
[limit num_rows]

选择事件流

1.进入与流出

select [istream | irstream | rstream] [distinct] * from userbuy.win:time(5 sec)

我们划分了一个5秒的滑动时间窗口,并往里边写入数据
select后的4个选项影响事件流的类型,分别对应进入时间窗口、从窗口移出、二者包含、以及不重复的事件
分别编写istream和rstream的规则,在我们插入一批数据后,istream在5秒后输出进入窗口的数据,5秒过后,rstream会输出同样的内容

将规则改为

select [istream | rstream] count(*) from userbuy.win:time(15 sec)

每隔一秒插入一条数据,插入3次,会得到如下的统计结果:

 

time istream rstream
18:33:35 1 0
18:33:38 2 1
18:33:43 3 2
18:33:50 2 3
18:33:53 1 2
18:33:58 0 1

在滑动时间窗口中,CEP会在每条事件进入和移出窗口时,执行EPL语句,比如istream分析窗口内的事件,当有事件移出时,也会对当前窗口内的事件进行统计
rstream分析移出窗口的事件,但当有事件进入时,也会统计当前移出窗口的事件的总数,当窗口中无数据时,看上去像是把一个空数据移出了

2.过滤

select * from userbuy(lottery='ssc').win:time(15 sec)
在事件进入窗口前先进行了过滤

views

即数据窗口,分别针对时间、数量等因素制定了各类窗口

win

win:length

win:length(10) 滑动窗口,存最近的5条数据

win:length_batch

win:length_batch(10) 间隔窗口,存满10条触发查询

win:time

win:time(5 sec) 滑动窗口,存最近5秒内的数据

win:ext_timed

win:ext_timed(timestamp,10 sec) 滑动窗口,存最近10秒内的数据,不再基于EPL引擎,而是系统时间戳

win:time_batch

win:time_batch(10 sec) 10秒间隔的窗口
win:time_batch(10 sec,"FORCE_UPDATE, START_EAGER") 加上这两个参数后,会在窗口初始化时就执行查询,并且在没有数据进入和移出时,强制查询出结果

win:time_length_batch

win:time_length_batch(10 sec,5),当时间或数量任意一个满足条件时,触发查询

win:time_accum

select rstream * from userbuy.win:time_accum(15 sec) 阻塞输出,直到15秒内没有新进入的数据时,才输出并触发查询. 这些数据视为已移出窗口

win:keepall

win:keepall() 无参数,记录所有进入的数据,除非使用delete操作,才能从窗口移出数据,详见name window

win:firstlength

win:firstlength(10) 保留一批数据的前10条,需配合delete操作

win:firsttime

win:firsttime(10 sec) 保留窗口初始化后10秒内的所有数据

std

std:unique

std:unique(id) 对不同的id保留其最近的一条事件

std:groupwin

std:groupwin(id).win:length(5) 一般与其它窗口组合使用,此例子中将进入的数据按id分组,相同id的数据条目数不大于5
select sum(amount),id from std:groupwin(id).win:length(5) group by id 统计每组id最近5次消费的总金额数

std:size

select size from userbuy.win:time(1 min).std:size() 统计一分钟内事件总数

std:firstunique

std:firstunique(id) 相同id的事件只取第一条

stat

进行一些统计数学统计,统计项固定
stat:uni, stat:linest, stat:correl,stat:weighted_avg

ext

ext:sort

对指定的字段进行排序,限定个数
select sum(amount) from userbuy.ext:sort(3, amount desc) 将3个最高金额求和 
select * from userbuy.ext:sort(3,amount desc,id asc) 找出最高金额的3条事件,并按id排序

output 与 stream


与数据库关联查询

EPL可以与各种关系型及非关系型数据库通讯,进行关联查询,基本语法

select custId, cust_name from CustomerCallEvent,

  sql:MyCustomerDB [' select cust_name from Customer where cust_id = ${custId} ']

name window

在EPL中可以创建命名窗口,此窗口可供很多语句来查询,并且可以对此窗口进行insert update delete操作,一些不自动移出数据的view可以派上用场了,当然,也可以对常用的那些view使用


create

create window USERBUY.win:keepall() select id,amount,lottery from userbuy

创建命名窗口,并声明包含的字段和类型


insert

insert into USERBUY select id,amount,lottery from userbuy

update

update和delete属于触发式操作,当有事件进入时产生

on lotterychange.win:time(5 sec)
update USERBUY set amount = 1

where USERBUY.id = lotterychange.id

delete

on ticket.win:time(10 sec)
delete from USERBUY

where USERBUY.amount != 1

trigger

与标准sql不同,EPL基于数据流,操作都需要事件触发,比如动态窗口的每次查询都是在事件发生的时候。
一个适合的例子是子查询

select (select count(*) from OLDUSERBUY) as oldcount,
       (select sum(amount) from NEWUSERBUY) as newsum    //标准SQL到此为止
from trigger.win:time(5 sec)       //EPL由事件触发子查询

此查询只会在trigger接收到事件时触发

分割与复制

将事件进行条件判断,把数据流划分到不同命名窗口

on userbuy
insert into OLDUSERBUY select id,amount,lottery where id > 100
insert into NEWUSERBUY select id,amount,lottery where amount > 1000
[output all]

当事件进入时,以从上往下的速度去执行,成功则停止,只进入一个命名窗口,实现分割;加上output all后,逐一判断,复制到多个命名窗口

自定义变量

创建一个变量,此值可以用在各种语句中作为条件判断,也可以被触发修改


Pattern

Match Recognize

以正则表达式的形式匹配事件,做出复杂分析,语法如下:

match_recognize (
  [ partition by partition_expression [, partition_expression] [,...]  ]
  measures  measure_expression as col_name [, measure_expression as col_name ] [,...]
  [ all matches ]
  [ after match skip (past last row | to next row | to current row) ]
  pattern ( variable_regular_expr [, variable_regular_expr] [,...] )
  [ interval time_period ]
  define  variable as variable_condition [, variable as variable_condition]  [,...]
)

假定现在有人对单个url逐页进行抓取,我们可以从日志中取出IP、URL、页码,发给cep进行分析

select * from webaccess.win:time_batch(60 sec)
  match_recognize(
     partition by ip,url
     measures A.url as a_url,count(B.page) as b_count,A.ip as a_ip
     pattern (A B+)
     define
     B as B.page - prev(1,B.page) = 1)
having b_count > 10

从webaccess数据流接收事件,并开始匹配,按相同的ip和url来分组,需要获取访问的url、ip及访问次数
接下来要确认什么样的事件符合我们的需求,它在抓取第一页后,抓取第二页时,页码数都要比之前大1位,之后一直是这类情况,于是统计此情况发生的次数,
如果此次数在1分钟的窗口内大于100次,便认为它是抓取