订阅
纠错
加入自媒体

一文详解Flink知识体系

2021-09-13 09:58
园陌
关注

四、Flink 算子大全

Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。

所以下面将Flink的算子分为两大类:一类是DataSet,一类是DataStream。

DataSet 批处理算子一、Source算子1. fromCollection

fromCollection:从本地集合读取数据

例:

val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
 List("1,张三", "2,李四", "3,王五", "4,赵六")
)
2. readTextFile

readTextFile:从文件中读取

val textDataSet: DataSet[String]  = env.readTextFile("/data/a.txt")
3. readTextFile:遍历目录

readTextFile可以对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式

val parameters = new Configuration
// recursive.file.enumeration 开启递归
parameters.setBoolean("recursive.file.enumeration", true)
val file = env.readTextFile("/data").withParameters(parameters)
4. readTextFile:读取压缩文件

对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

压缩方法文件扩展名是否可并行读取DEFLATE.deflatenoGZip.gz .gzipnoBzip2.bz2noXZ.xznoval file = env.readTextFile("/data/file.gz")
二、Transform转换算子

因为Transform算子基于Source算子操作,所以首先构建Flink执行环境及Source算子,后续Transform算子操作基于此:

val env = ExecutionEnvironment.getExecutionEnvironment
val textDataSet: DataSet[String] = env.fromCollection(
 List("张三,1", "李四,2", "王五,3", "张三,4")
)
1. map

将DataSet中的每一个元素转换为另外一个元素

// 使用map将List转换为一个Scala的样例类
case class User(name: String, id: String)
val userDataSet: DataSet[User] = textDataSet.map {
 text =>
   val fieldArr = text.split(",")
   User(fieldArr(0), fieldArr(1))
}
userDataSet.print()
2. flatMap

将DataSet中的每一个元素转换为0...n个元素。

// 使用flatMap操作,将集合中的数据:
// 根据第一个元素,进行分组
// 根据第二个元素,进行聚合求值
val result = textDataSet.flatMap(line => line)
     .groupBy(0) // 根据第一个元素,进行分组
     .sum(1) // 根据第二个元素,进行聚合求值
     
result.print()
3. mapPartition

将一个分区中的元素转换为另一个元素

// 使用mapPartition操作,将List转换为一个scala的样例类
case class User(name: String, id: String)
val result: DataSet[User] = textDataSet.mapPartition(line => {
     line.map(index => User(index._1, index._2))
   })
   
result.print()
4. filter

过滤出来一些符合条件的元素,返回boolean值为true的元素

val source: DataSet[String] = env.fromElements("java", "scala", "java")
val filter:DataSet[String] = source.filter(line => line.contains("java"))//过滤出带java的数据
filter.print()
5. reduce

可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素

// 使用 fromElements 构建数据源
val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
// 使用map转换成DataSet元组
val mapData: DataSet[(String, Int)] = source.map(line => line)
// 根据首个元素分组
val groupData = mapData.groupBy(_._1)
// 使用reduce聚合
val reduceData = groupData.reduce((x, y) => (x._1, x._2 + y._2))
// 打印测试
reduceData.print()
6. reduceGroup

将一个dataset或者一个group聚合成一个或多个元素。
reduceGroup是reduce的一种优化方案;
它会先分组reduce,然后在做整体的reduce;这样做的好处就是可以减少网络IO

// 使用 fromElements 构建数据源
val source: DataSet[(String, Int)] = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
// 根据首个元素分组
val groupData = source.groupBy(_._1)
// 使用reduceGroup聚合
val result: DataSet[(String, Int)] = groupData.reduceGroup {
     (in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>
       val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))
       out.collect(tuple)
   }
// 打印测试
result.print()
7. minBy和maxBy

选择具有最小值或最大值的元素

// 使用minBy操作,求List中每个人的最小值
// List("张三,1", "李四,2", "王五,3", "张三,4")
case class User(name: String, id: String)
// 将List转换为一个scala的样例类
val text: DataSet[User] = textDataSet.mapPartition(line => {
     line.map(index => User(index._1, index._2))
   })
   
val result = text
         .groupBy(0) // 按照姓名分组
         .minBy(1)   // 每个人的最小值
8. Aggregate

在数据集上进行聚合求最值(最大值、最小值)

val data = new mutable.MutableList[(Int, String, Double)]
   data.+=((1, "yuwen", 89.0))
   data.+=((2, "shuxue", 92.2))
   data.+=((3, "yuwen", 89.99))
// 使用 fromElements 构建数据源
val input: DataSet[(Int, String, Double)] = env.fromCollection(data)
// 使用group执行分组操作
val value = input.groupBy(1)
           // 使用aggregate求最大值元素
           .aggregate(Aggregations.MAX, 2)
// 打印测试
value.print()      

Aggregate只能作用于元组上

注意:
要使用aggregate,只能使用字段索引名或索引名称来进行分组 groupBy(0) ,否则会报一下错误:
Exception in thread "main" java.lang.UnsupportedOperationException: Aggregate does not supportgrouping with KeySelector functions, yet.

9. distinct

去除重复的数据

// 数据源使用上一题的
// 使用distinct操作,根据科目去除集合中重复的元组数据
val value: DataSet[(Int, String, Double)] = input.distinct(1)
value.print()
10. first

取前N个数

input.first(2) // 取前两个数
11. join

将两个DataSet按照一定条件连接到一起,形成新的DataSet

// s1 和 s2 数据集格式如下:
// DataSet[(Int, String,String, Double)]
val joinData = s1.join(s2)  // s1数据集 join s2数据集
            .where(0).equalTo(0) {     // join的条件
     (s1, s2) => (s1._1, s1._2, s2._2, s1._3)
   }
12. leftOuterJoin

左外连接,左边的Dataset中的每一个元素,去连接右边的元素

此外还有:

rightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素

fullOuterJoin:全外连接,左右两边的元素,全部连接

下面以 leftOuterJoin 进行示例:

val data1 = ListBuffer[Tuple2[Int,String]]()
   data1.append((1,"zhangsan"))
   data1.append((2,"lisi"))
   data1.append((3,"wangwu"))
   data1.append((4,"zhaoliu"))
val data2 = ListBuffer[Tuple2[Int,String]]()
   data2.append((1,"beijing"))
   data2.append((2,"shanghai"))
   data2.append((4,"guangzhou"))
val text1 = env.fromCollection(data1)
val text2 = env.fromCollection(data2)
text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
     if(second==null){
       (first._1,first._2,"null")
     }else{
       (first._1,first._2,second._2)
     }
   }).print()
13. cross

交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集

和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作

val cross = input1.cross(input2){
     (input1 , input2) => (input1._1,input1._2,input1._3,input2._2)
   }
cross.print()
14. union

联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重

val unionData: DataSet[String] = elements1.union(elements2).union(elements3)
// 去除重复数据
val value = unionData.distinct(line => line)
15. rebalance

Flink也有数据倾斜的时候,比如当前有数据量大概10亿条数据需要处理,在处理过程中可能会发生如图所示的状况:

这个时候本来总体数据量只需要10分钟解决的问题,出现了数据倾斜,机器1上的任务需要4个小时才能完成,那么其他3台机器执行完毕也要等待机器1执行完毕后才算整体将任务完成;所以在实际的工作中,出现这种情况比较好的解决方案就是接下来要介绍的—rebalance(内部使用round robin方法将数据均匀打散。这对于数据倾斜时是很好的选择。)

// 使用rebalance操作,避免数据倾斜
val rebalance = filterData.rebalance()
16. partitionByHash

按照指定的key进行hash分区

val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
val collection = env.fromCollection(data)
val unique = collection.partitionByHash(1).mapPartition{
 line =>
   line.map(x => (x._1 , x._2 , x._3))
}
unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
env.execute()
17. partitionByRange

根据指定的key对数据集进行范围分区

val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
val collection = env.fromCollection(data)
val unique = collection.partitionByRange(x => x._1).mapPartition(line => line.map{
 x=>
   (x._1 , x._2 , x._3)
})
unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
env.execute()
18. sortPartition

根据指定的字段值进行分区的排序

val data = new mutable.MutableList[(Int, Long, String)]
   data.+=((1, 1L, "Hi"))
   data.+=((2, 2L, "Hello"))
   data.+=((3, 2L, "Hello world"))
   data.+=((4, 3L, "Hello world, how are you?"))
val ds = env.fromCollection(data)
   val result = ds
     .map { x => x }.setParallelism(2)
     .sortPartition(1, Order.DESCENDING)//第一个参数代表按照哪个字段进行分区
     .mapPartition(line => line)
     .collect()
println(result)
三、Sink算子1. collect

将数据输出到本地集合

result.collect()
2. writeAsText

将数据输出到文件

Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等

Flink支持多种文件的存储格式,包括text文件,CSV文件等

// 将数据写入本地文件
result.writeAsText("/data/a", WriteMode.OVERWRITE)
// 将数据写入HDFS
result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)
DataStream流处理算子

和DataSet一样,DataStream也包括一系列的Transformation操作

一、Source算子

Flink可以使用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
Flink 已经提供了若干实现好了的 source functions,当然我们也可以通过实现 SourceFunction 来自定义非并行的source或者实现 ParallelSourceFunction 接口或者扩展 RichParallelSourceFunction 来自定义并行的 source。

Flink在流处理上的source和在批处理上的source基本一致。大致有4大类:

基于本地集合的source(Collection-based-source)基于文件的source(File-based-source)- 读取文本文件,即符合 TextInputFormat 规范的文件,并将其作为字符串返回基于网络套接字的source(Socket-based-source)- 从 socket 读取。元素可以用分隔符切分。自定义的source(Custom-source)

下面使用addSource将Kafka数据写入Flink为例:

如果需要外部数据源对接,可使用addSource,如将Kafka数据写入Flink,先引入依赖:


将Kafka数据写入Flink:

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "consumer-group")
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
properties.setProperty("auto.offset.reset", "latest")
val source = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))

基于网络套接字的:

val source = env.socketTextStream("IP", PORT)
二、Transform转换算子1. map

将DataSet中的每一个元素转换为另外一个元素

dataStream.map { x => x * 2 }
2. FlatMap

采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数

dataStream.flatMap { str => str.split(" ") }
3. Filter

计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器

dataStream.filter { _ != 0 }
4. KeyBy

逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。

此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream。

dataStream.keyBy(0)
5. Reduce

被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值

keyedStream.reduce { _ + _ }  
6. Fold

具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值

val result: DataStream[String] =  keyedStream.fold("start")((str, i) => { str + "-" + i })
// 解释:当上述代码应用于序列(1,2,3,4,5)时,输出结果“start-1”,“start-1-2”,“start-1-2-3”,...
7. Aggregations

在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。

keyedStream.sum(0);
keyedStream.min(0);
keyedStream.max(0);
keyedStream.minBy(0);
keyedStream.maxBy(0);
8. Window

可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。这里不再对窗口进行详解,有关窗口的完整说明,请查看这篇文章:Flink 中极其重要的 Time 与 Window 详细解析

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
9. WindowAll

Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。

注意:在许多情况下,这是非并行转换。所有记录将收集在windowAll 算子的一个任务中。

dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
10. Window Apply

将一般函数应用于整个窗口。

注意:如果您正在使用windowAll转换,则需要使用AllWindowFunction。

下面是一个手动求和窗口数据元的函数

windowedStream.apply { WindowFunction }
allWindowedStream.apply { AllWindowFunction }
11. Window Reduce

将函数缩减函数应用于窗口并返回缩小的值

windowedStream.reduce { _ + _ }
12. Window Fold

将函数折叠函数应用于窗口并返回折叠值

val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })
// 上述代码应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”
13. Union

两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元

dataStream.union(otherStream1, otherStream2, ...)
14. Window Join

在给定Keys和公共窗口上连接两个数据流

dataStream.join(otherStream)
   .where(

在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

am.intervalJoin(otherKeyedStream)
   .between(Time.milliseconds(-2), Time.milliseconds(2))
   .upperBoundExclusive(true)
   .lowerBoundExclusive(true)
   .process(new IntervalJoinFunction() {...})
16. Window CoGroup

在给定Keys和公共窗口上对两个数据流进行Cogroup

dataStream.coGroup(otherStream)
   .where(0).equalTo(1)
   .window(TumblingEventTimeWindows.of(Time.seconds(3)))
   .apply (new CoGroupFunction () {...})
17. Connect

“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态

DataStream

类似于连接数据流上的map和flatMap

connectedStreams.map(
   (_ : Int) => true,
   (_ : String) => false)connectedStreams.flatMap(
   (_ : Int) => true,
   (_ : String) => false)
19. Split

根据某些标准将流拆分为两个或更多个流

val split = someDataStream.split(
 (num: Int) =>
   (num % 2) match {
     case 0 => List("even")
     case 1 => List("odd")
   })      
20. Select

从拆分流中选择一个或多个流

SplitStream

支持将数据输出到:

本地文件(参考批处理)本地集合(参考批处理)HDFS(参考批处理)

除此之外,还支持:

sink到kafkasink到mysqlsink到redis

下面以sink到kafka为例:

val sinkTopic = "test"
//样例类
case class Student(id: Int, name: String, addr: String, sex: String)
val mapper: ObjectMapper = new ObjectMapper()
//将对象转换成字符串
def toJsonString(T: Object): String = {
   mapper.registerModule(DefaultScalaModule)
   mapper.writeValueAsString(T)
}
def main(args: Array[String]): Unit = {
   //1.创建流执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //2.准备数据
   val dataStream: DataStream[Student] = env.fromElements(
     Student(8, "xiaoming", "beijing biejing", "female")
   )
   //将student转换成字符串
   val studentStream: DataStream[String] = dataStream.map(student =>
     toJsonString(student) // 这里需要显示SerializerFeature中的某一个,否则会报同时匹配两个方法的错误
   )
   //studentStream.print()
   val prop = new Properties()
   prop.setProperty("bootstrap.servers", "node01:9092")
   val myProducer = new FlinkKafkaProducer011[String](sinkTopic, new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), prop)
   studentStream.addSink(myProducer)
   studentStream.print()
   env.execute("Flink add sink")
}

五、流处理中的Time与Window

Flink 是流式的、实时的 计算引擎。

上面一句话就有两个概念,一个是流式,一个是实时。

流式:就是数据源源不断的流进来,也就是数据没有边界,但是我们计算的时候必须在一个有边界的范围内进行,所以这里面就有一个问题,边界怎么确定?无非就两种方式,根据时间段或者数据量进行确定,根据时间段就是每隔多长时间就划分一个边界,根据数据量就是每来多少条数据划分一个边界,Flink 中就是这么划分边界的,本文会详细讲解。

实时:就是数据发送过来之后立马就进行相关的计算,然后将结果输出。这里的计算有两种:

一种是只有边界内的数据进行计算,这种好理解,比如统计每个用户最近五分钟内浏览的新闻数量,就可以取最近五分钟内的所有数据,然后根据每个用户分组,统计新闻的总数。

另一种是边界内数据与外部数据进行关联计算,比如:统计最近五分钟内浏览新闻的用户都是来自哪些地区,这种就需要将五分钟内浏览新闻的用户信息与 hive 中的地区维表进行关联,然后在进行相关计算。

本节所讲的 Flink 内容就是围绕以上概念进行详细剖析的!

1. Time

在Flink中,如果以时间段划分边界的话,那么时间就是一个极其重要的字段。

Flink中的时间有三种类型,如下图所示:

Event Time:是事件创建的时间。它通常由事件中的时间戳描述,例如采集的日志数据中,每一条日志都会记录自己的生成时间,Flink通过时间戳分配器访问事件时间戳。

Ingestion Time:是数据进入Flink的时间。

Processing Time:是每一个执行基于时间操作的算子的本地系统时间,与机器相关,默认的时间属性就是Processing Time。

例如,一条日志进入Flink的时间为2021-01-22 10:00:00.123,到达Window的系统时间为2021-01-22 10:00:01.234,日志的内容如下:
2021-01-06 18:37:15.624 INFO Fail over to rm2

对于业务来说,要统计1min内的故障日志个数,哪个时间是最有意义的?—— eventTime,因为我们要根据日志的生成时间进行统计。

2. Window

Window,即窗口,我们前面一直提到的边界就是这里的Window(窗口)。

官方解释:流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集,而window是一种切割无限数据为有限块进行处理的手段。

所以Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。

Window类型

本文刚开始提到,划分窗口就两种方式:

根据时间进行截取(time-driven-window),比如每1分钟统计一次或每10分钟统计一次。根据数据进行截取(data-driven-window),比如每5个数据统计一次或每50个数据统计一次。

窗口类型

对于TimeWindow(根据时间划分窗口), 可以根据窗口实现原理的不同分成三类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)和会话窗口(Session Window)。

滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片。

特点:时间对齐,窗口长度固定,没有重叠。

滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。

例如:如果你指定了一个5分钟大小的滚动窗口,窗口的创建如下图所示:

滚动窗口

适用场景:适合做BI统计等(做每个时间段的聚合计算)。

滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。

特点:时间对齐,窗口长度固定,有重叠。

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:

滑动窗口

适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。

会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

特点:时间无对齐。

session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

会话窗口3. Window API1) TimeWindow

TimeWindow是将指定时间范围内的所有数据组成一个window,一次对一个window里面的所有数据进行计算(就是本文开头说的对一个边界内的数据进行计算)。

我们以 红绿灯路口通过的汽车数量 为例子:

红绿灯路口会有汽车通过,一共会有多少汽车通过,无法计算。因为车流源源不断,计算没有边界。

所以我们统计每15秒钟通过红路灯的汽车数量,如第一个15秒为2辆,第二个15秒为3辆,第三个15秒为1辆 ...

tumbling-time-window (无重叠数据)

我们使用 Linux 中的 nc 命令模拟数据的发送方

1.开启发送端口,端口号为9999
nc -lk 9999
2.发送内容(key 代表不同的路口,value 代表每次通过的车辆)
一次发送一行,发送的时间间隔代表汽车经过的时间间隔
9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,4

Flink 进行采集数据并计算:

object Window {
 def main(args: Array[String]): Unit = {
   //TODO time-window
   //1.创建运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //2.定义数据流来源
   val text = env.socketTextStream("localhost", 9999)
   //3.转换数据格式,text->CarWc
   case class CarWc(sensorId: Int, carCnt: Int)
   val ds1: DataStream[CarWc] = text.map {
     line => {
       val tokens = line.split(",")
       CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
     }
   }
   //4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5秒
   //也就是说,每5秒钟统计一次,在这过去的5秒钟内,各个路口通过红绿灯汽车的数量。
   val ds2: DataStream[CarWc] = ds1
     .keyBy("sensorId")
     .timeWindow(Time.seconds(5))
     .sum("carCnt")
   //5.显示统计结果
   ds2.print()
   //6.触发流计算
   env.execute(this.getClass.getName)
 }
}

我们发送的数据并没有指定时间字段,所以Flink使用的是默认的 Processing Time,也就是Flink系统处理数据时的时间。

sliding-time-window (有重叠数据)//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)
//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
 line => {
   val tokens = line.split(",")
   CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
 }
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口时间10秒,滑动时间5秒
//也就是说,每5秒钟统计一次,在这过去的10秒钟内,各个路口通过红绿灯汽车的数量。
val ds2: DataStream[CarWc] = ds1
 .keyBy("sensorId")
 .timeWindow(Time.seconds(10), Time.seconds(5))
 .sum("carCnt")
//5.显示统计结果
ds2.print()
//6.触发流计算
env.execute(this.getClass.getName)
2) CountWindow

CountWindow根据窗口中相同key元素的数量来触发执行,执行时只计算元素数量达到窗口大小的key对应的结果。

注意:CountWindow的window_size指的是相同Key的元素的个数,不是输入的所有元素的总数。

tumbling-count-window (无重叠数据)//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)
//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
 (f) => {
   val tokens = f.split(",")
   CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
 }
}
//4.执行统计操作,每个sensorId一个tumbling窗口,窗口的大小为5
//按照key进行收集,对应的key出现的次数达到5次作为一个结果
val ds2: DataStream[CarWc] = ds1
 .keyBy("sensorId")
 .countWindow(5)
 .sum("carCnt")
//5.显示统计结果
ds2.print()
//6.触发流计算
env.execute(this.getClass.getName)
sliding-count-window (有重叠数据)

同样也是窗口长度和滑动窗口的操作:窗口长度是5,滑动长度是3

//1.创建运行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//2.定义数据流来源
val text = env.socketTextStream("localhost", 9999)
//3.转换数据格式,text->CarWc
case class CarWc(sensorId: Int, carCnt: Int)
val ds1: DataStream[CarWc] = text.map {
 (f) => {
   val tokens = f.split(",")
   CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt)
 }
}
//4.执行统计操作,每个sensorId一个sliding窗口,窗口大小3条数据,窗口滑动为3条数据
//也就是说,每个路口分别统计,收到关于它的3条消息时统计在最近5条消息中,各自路口通过的汽车数量
val ds2: DataStream[CarWc] = ds1
 .keyBy("sensorId")
 .countWindow(5, 3)
 .sum("carCnt")
//5.显示统计结果
ds2.print()
//6.触发流计算
env.execute(this.getClass.getName)
Window 总结

flink支持两种划分窗口的方式(time和count)

如果根据时间划分窗口,那么它就是一个time-window

如果根据数据划分窗口,那么它就是一个count-window

flink支持窗口的两个重要属性(size和interval)

如果size=interval,那么就会形成tumbling-window(无重叠数据)

如果size>interval,那么就会形成sliding-window(有重叠数据)

如果size

通过组合可以得出四种基本窗口

time-tumbling-window 无重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5))

time-sliding-window  有重叠数据的时间窗口,设置方式举例:timeWindow(Time.seconds(5), Time.seconds(3))

count-tumbling-window无重叠数据的数量窗口,设置方式举例:countWindow(5)

count-sliding-window 有重叠数据的数量窗口,设置方式举例:countWindow(5,3)

3) Window Reduce

WindowedStream → DataStream:给window赋一个reduce功能的函数,并返回一个聚合的结果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowReduce {
 def main(args: Array[String]): Unit = {
   // 获取执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 创建SocketSource
   val stream = env.socketTextStream("node01", 9999)
   // 对stream进行处理并按key聚合
   val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
   // 引入时间窗口
   val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
   // 执行聚合操作
   val streamReduce = streamWindow.reduce(
     (item1, item2) => (item1._1, item1._2 + item2._2)
   )
   // 将聚合数据写入文件
   streamReduce.print()
   // 执行程序
   env.execute("TumblingWindow")
 }
}
4) Window Apply

apply方法可以进行一些自定义处理,通过匿名内部类的方法来实现。当有一些复杂计算时使用。

用法

实现一个 WindowFunction 类指定该类的泛型为 [输入数据类型, 输出数据类型, keyBy中使用分组字段的类型, 窗口类型]

示例:使用apply方法来实现单词统计

步骤:

获取流处理运行环境构建socket流数据源,并指定IP地址和端口号对接收到的数据转换成单词元组使用 keyBy 进行分流(分组)使用 timeWinodw 指定窗口的长度(每3秒计算一次)实现一个WindowFunction匿名内部类apply方法中实现聚合计算使用Collector.collect收集数据

核心代码如下:

   //1. 获取流处理运行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   //2. 构建socket流数据源,并指定IP地址和端口号
   val textDataStream = env.socketTextStream("node01", 9999).flatMap(_.split(" "))
   //3. 对接收到的数据转换成单词元组
   val wordDataStream = textDataStream.map(_->1)
   //4. 使用 keyBy 进行分流(分组)
   val groupedDataStream: KeyedStream[(String, Int), String] = wordDataStream.keyBy(_._1)
   //5. 使用 timeWinodw 指定窗口的长度(每3秒计算一次)
   val windowDataStream: WindowedStream[(String, Int), String, TimeWindow] = groupedDataStream.timeWindow(Time.seconds(3))
   //6. 实现一个WindowFunction匿名内部类
   val reduceDatStream: DataStream[(String, Int)] = windowDataStream.apply(new RichWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
     //在apply方法中实现数据的聚合
     override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
       println("hello world")
       val tuple = input.reduce((t1, t2) => {
         (t1._1, t1._2 + t2._2)
       })
       //将要返回的数据收集起来,发送回去
       out.collect(tuple)
     }
   })
   reduceDatStream.print()
   env.execute()
5) Window Fold

WindowedStream → DataStream:给窗口赋一个fold功能的函数,并返回一个fold后的结果。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object StreamWindowFold {
 def main(args: Array[String]): Unit = {
   // 获取执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 创建SocketSource
   val stream = env.socketTextStream("node01", 9999,'',3)
   // 对stream进行处理并按key聚合
   val streamKeyBy = stream.map(item => (item, 1)).keyBy(0)
   // 引入滚动窗口
   val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
   // 执行fold操作
   val streamFold = streamWindow.fold(100){
     (begin, item) =>
       begin + item._2
   }
   // 将聚合数据写入文件
   streamFold.print()
   // 执行程序
   env.execute("TumblingWindow")
 }
}
6) Aggregation on Window

WindowedStream → DataStream:对一个window内的所有元素做聚合操作。min和 minBy的区别是min返回的是最小值,而minBy返回的是包含最小值字段的元素(同样的原理适用于 max 和 maxBy)。

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.api.scala._
object StreamWindowAggregation {
 def main(args: Array[String]): Unit = {
   // 获取执行环境
   val env = StreamExecutionEnvironment.getExecutionEnvironment
   // 创建SocketSource
   val stream = env.socketTextStream("node01", 9999)
   // 对stream进行处理并按key聚合
   val streamKeyBy = stream.map(item => (item.split(" ")(0), item.split(" ")(1))).keyBy(0)
   // 引入滚动窗口
   val streamWindow = streamKeyBy.timeWindow(Time.seconds(5))
   // 执行聚合操作
   val streamMax = streamWindow.max(1)
   // 将聚合数据写入文件
   streamMax.print()
   // 执行程序
   env.execute("TumblingWindow")
 }
}
4. EventTime与Window1) EventTime的引入与现实世界中的时间是不一致的,在flink中被划分为事件时间,提取时间,处理时间三种。如果以EventTime为基准来定义时间窗口那将形成EventTimeWindow,要求消息本身就应该携带EventTime如果以IngesingtTime为基准来定义时间窗口那将形成IngestingTimeWindow,以source的systemTime为准。如果以ProcessingTime基准来定义时间窗口那将形成ProcessingTimeWindow,以operator的systemTime为准。

在Flink的流式处理中,绝大部分的业务都会使用eventTime,一般只在eventTime无法使用时,才会被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那么需要引入EventTime的时间属性,引入方式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特征
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2) Watermark

我们知道,流处理从事件产生,到流经 source,再到 operator,中间是有一个过程和时间的,虽然大部分情况下,流到 operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生,所谓乱序,就是指 Flink 接收到的事件的先后顺序不是严格按照事件的 Event Time 顺序排列的,所以 Flink 最初设计的时候,就考虑到了网络延迟,网络乱序等问题,所以提出了一个抽象概念:水印(WaterMark);

如上图所示,就出现一个问题,一旦出现乱序,如果只根据 EventTime 决定 Window 的运行,我们不能明确数据是否全部到位,但又不能无限期的等下去,此时必须要有个机制来保证一个特定的时间后,必须触发 Window 去进行计算了,这个特别的机制,就是 Watermark。

Watermark 是用于处理乱序事件的,而正确的处理乱序事件,通常用 Watermark 机制结合 Window 来实现。

数据流中的 Watermark 用于表示 timestamp 小于 Watermark 的数据,都已经到达了,因此,Window 的执行也是由 Watermark 触发的。

Watermark 可以理解成一个延迟触发机制,我们可以设置 Watermark 的延时时长 t,每次系统会校验已经到达的数据中最大的 maxEventTime,然后认定 EventTime 小于 maxEventTime - t 的所有数据都已经到达,如果有窗口的停止时间等于 maxEventTime – t,那么这个窗口被触发执行。

有序流的Watermarker如下图所示:(Watermark设置为0)

有序数据的Watermark

乱序流的Watermarker如下图所示:(Watermark设置为2)

无序数据的Watermark

当 Flink 接收到每一条数据时,都会产生一条 Watermark,这条 Watermark 就等于当前所有到达数据中的 maxEventTime - 延迟时长,也就是说,Watermark 是由数据携带的,一旦数据携带的 Watermark 比当前未触发的窗口的停止时间要晚,那么就会触发相应窗口的执行。由于 Watermark 是由数据携带的,因此,如果运行过程中无法获取新的数据,那么没有被触发的窗口将永远都不被触发。

上图中,我们设置的允许最大延迟到达时间为2s,所以时间戳为7s的事件对应的Watermark是5s,时间戳为12s的事件的Watermark是10s,如果我们的窗口1是1s~5s,窗口2是6s~10s,那么时间戳为7s的事件到达时的Watermarker恰好触发窗口1,时间戳为12s的事件到达时的Watermark恰好触发窗口2。

3) Flink对于迟到数据的处理

waterMark和Window机制解决了流式数据的乱序问题,对于因为延迟而顺序有误的数据,可以根据eventTime进行业务处理,于延迟的数据Flink也有自己的解决办法,主要的办法是给定一个允许延迟的时间,在该时间范围内仍可以接受处理延迟数据。

设置允许延迟的时间是通过 allowedLateness(lateness: Time) 设置

保存延迟数据则是通过 sideOutputLateData(outputTag: OutputTag[T]) 保存

获取延迟数据是通过 DataStream.getSideOutput(tag: OutputTag[X]) 获取

具体的用法如下:

allowedLateness(lateness: Time)

def allowedLateness(lateness: Time): WindowedStream[T, K, W] = {
 javaStream.allowedLateness(lateness)
 this
}

该方法传入一个Time值,设置允许数据迟到的时间,这个时间和 WaterMark 中的时间概念不同。再来回顾一下:

WaterMark=数据的事件时间-允许乱序时间值

随着新数据的到来,waterMark的值会更新为最新数据事件时间-允许乱序时间值,但是如果这时候来了一条历史数据,waterMark值则不会更新。总的来说,waterMark是为了能接收到尽可能多的乱序数据。

那这里的Time值,主要是为了等待迟到的数据,在一定时间范围内,如果属于该窗口的数据到来,仍会进行计算,后面会对计算方式仔细说明

注意:该方法只针对于基于event-time的窗口,如果是基于processing-time,并且指定了非零的time值则会抛出异常。

sideOutputLateData(outputTag: OutputTag[T])

def sideOutputLateData(outputTag: OutputTag[T]): WindowedStream[T, K, W] = {
 javaStream.sideOutputLateData(outputTag)
 this
}

该方法是将迟来的数据保存至给定的outputTag参数,而OutputTag则是用来标记延迟数据的一个对象。

DataStream.getSideOutput(tag: OutputTag[X])

通过window等操作返回的DataStream调用该方法,传入标记延迟数据的对象来获取延迟的数据。

对延迟数据的理解

延迟数据是指:

在当前窗口【假设窗口范围为10-15】已经计算之后,又来了一个属于该窗口的数据【假设事件时间为13】,这时候仍会触发 Window 操作,这种数据就称为延迟数据。

那么问题来了,延迟时间怎么计算呢?

假设窗口范围为10-15,延迟时间为2s,则只要 WaterMark<15+2,并且属于该窗口,就能触发 Window 操作。而如果来了一条数据使得 WaterMark>=15+2,10-15这个窗口就不能再触发 Window 操作,即使新来的数据的 Event Time 属于这个窗口时间内 。

<上一页  1  2  3  4  下一页>  
声明: 本文由入驻维科号的作者撰写,观点仅代表作者本人,不代表OFweek立场。如有侵权或其他问题,请联系举报。

发表评论

0条评论,0人参与

请输入评论内容...

请输入评论/评论长度6~500个字

您提交的评论过于频繁,请输入验证码继续

暂无评论

暂无评论

人工智能 猎头职位 更多
扫码关注公众号
OFweek人工智能网
获取更多精彩内容
文章纠错
x
*文字标题:
*纠错内容:
联系邮箱:
*验 证 码:

粤公网安备 44030502002758号