新聞中心
這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
大數(shù)據(jù)(9g)FlinkCEP-創(chuàng)新互聯(lián)
文章目錄
概述
網(wǎng)頁名稱:大數(shù)據(jù)(9g)FlinkCEP-創(chuàng)新互聯(lián)
當(dāng)前網(wǎng)址:http://fisionsoft.com.cn/article/dgicho.html
- 概述
- 示例代碼
- 環(huán)境和依賴
- Java代碼
- 上面代碼可改成下面
概述
- CEP
Complex Event Processing:復(fù)合事件處理
通過分析事件間的關(guān)系,從事件流中查詢出符合要求的事件序列 - 例如【切菜=>洗菜=>炒菜】3個(gè)事件按時(shí)間序串聯(lián),是正常的事件流
當(dāng)發(fā)現(xiàn)【切菜=>炒菜】忽略洗菜的事件流,可認(rèn)為是異常事件
WIN10+JDK1.8+IDEA2021+Maven3.6.3
CEP額外依賴為flink-cep
8 8 1.14.6 2.12 1.18.24 org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-runtime-web_${scala.binary.version} ${flink.version} org.apache.flink flink-cep_${scala.binary.version} ${flink.version}
Java代碼監(jiān)測(cè) 嚴(yán)格近鄰的連續(xù)三次a的事件流
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class CepPractice {public static void main(String[] args) throws Exception {//創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//添加數(shù)據(jù)源,確定水位線策略
SingleOutputStreamOperatord = env.fromElements("c", "a", "a", "a", "a", "b", "a", "a")
.assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) ->1L));
//定義模式
Patternp = Pattern
.begin("first")
.where(new SimpleCondition() {@Override
public boolean filter(String value) {return value.equals("a");
}
})
.next("second")
.where(new SimpleCondition() {@Override
public boolean filter(String value) {return value.equals("a");
}
})
.next("third")
.where(new SimpleCondition() {@Override
public boolean filter(String value) {return value.equals("a");
}
});
//在流上匹配模型
PatternStreampatternStream = CEP.pattern(d, p);
//使用select方法將匹配到的事件流取出
patternStream.select((PatternSelectFunction) map ->{//Map的key是事件名稱(上面的first、second和third)
//Map的key對(duì)應(yīng)的value是列表,儲(chǔ)存匹配到的事件
String first = map.get("first").toString();
String second = map.get("second").toString();
String third = map.get("third").toString();
return first + "->" + second + "->" + third;
}).print();
//執(zhí)行
env.execute();
}
}
打印結(jié)果
[a]->[a]->[a]
[a]->[a]->[a]
上面代碼可改成下面留意.times(3).consecutive()
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.List;
public class CepPractice2 {public static void main(String[] args) throws Exception {//創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
//添加數(shù)據(jù)源,確定水位線策略
SingleOutputStreamOperator>d = env.fromElements(
Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("a", 3000L),
Tuple2.of("a", 4000L), Tuple2.of("b", 5000L), Tuple2.of("a", 6000L))
.assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps()
.withTimestampAssigner((element, recordTimestamp) ->element.f1));
//定義模式
Pattern, Tuple2>p = Pattern
.>begin("=a")
.where(new SimpleCondition>() {@Override
public boolean filter(Tuple2value) {return value.f0.equals("a");
}
})
.times(3)
.consecutive(); //嚴(yán)格連續(xù)
//在流上匹配模型
PatternStream>patternStream = CEP.pattern(d, p);
//使用select方法將匹配到的事件流取出
patternStream.select((PatternSelectFunction, String>) map ->{//Map的key是事件名稱(上面的first、second和third)
//Map的key對(duì)應(yīng)的value是列表,儲(chǔ)存匹配到的事件
List>ls = map.get("=a");
String first = ls.get(0).f0;
String second = ls.get(1).f0;
String third = ls.get(2).f0;
return String.join("=>", first, second, third);
}).print();
//執(zhí)行
env.execute();
}
}
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
網(wǎng)頁名稱:大數(shù)據(jù)(9g)FlinkCEP-創(chuàng)新互聯(lián)
當(dāng)前網(wǎng)址:http://fisionsoft.com.cn/article/dgicho.html