Flink 基础学习 笔记
DataStream API
DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。
可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。
程序剖析
程序结构
每个程序由相同的基本部分组成:
1、获取一个执行环境(execution environment)
2、加载/创建初始数据
3、指定数据相关的转换
4、指定计算结果的存储位置
5、触发程序执行
Java DataStream API 的所有核心类都可以在 org.apache.flink.streaming.api.scala 中找到
1、获取一个执行环境
在DataStream API 中分为 StreamExecutionEnvironment 、 ExecutionEnvironment StreamExecutionEnvironment 用于处理流式数据,而 ExecutionEnvironment 用于处理批处理数据。StreamExecutionEnvironment 主要用于构建处理流数据的作业,而 ExecutionEnvironment 则用于构建批处理作业。StreamExecutionEnvironment 是所有 Flink 程序的基础.
在StreamExecutionEnvironment 下有getExecutionEnvironment、createLocalEnvironment()和createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
一般用getExecutionEnvironment即可,会返回一个执行环境以在集群上你执行的程序。(在IDE里运行会生成一个本地环境;如果提交到flink集群上flink会自动运行main方法)
例如:
val environment = StreamExecutionEnvironment.getExecutionEnvironment()2、加载/创建初始数据
多并行Source
非并行的Source
Flink 提供了一些Data Source数据源方法:fromElements(T …) 非并行readFile(FileInputFormat inputFormat, String filePath) 并行readTextFile(String filePath) socketTextStream(String hostname, int port) 非并行fromCollection(Collection) 非并行fromParallelCollection(SplittableIterator, Class) 并行generateSequence(long from, long to) 并行
第三方 SourceaddSource
读取后会返回一个 DataSteam,例如:
val text = environment.socketTextStream("192.168.37.159", 9999)3、指定数据相关的转换
这个时候可以调用 DataStream 上具有转换功能的方法来应用转换,例如:
val newText = text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.sum(1)执行完成后将会创建返回一个新的DataSteam
4、指定计算结果的存储位置
当DataSteam处理好后,可以通过sink写到外部,例如:
newText.writeAsText("./text",FileSystem.WriteMode.OVERWRITE)会将文件写到text文件夹中,每次数据如果相同并没有定义windows 数据将会累加
配置windows后
5、触发程序执行
一旦指定了完整的程序,需要调用 StreamExecutionEnvironment 的 execute() 方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在本地机器上触发,或将程序提交到某个集群上执行。
environment.execute()完整代码
package org.example
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.core.fs.{FSDataOutputStream, FileSystem}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object Flink1 {
def main(args: Array[String]): Unit = {
// 1、获取一个执行环境
val environment = StreamExecutionEnvironment.createLocalEnvironment()
environment.setParallelism(1)
// 2、加载/创建初始数据
val text = environment.socketTextStream("192.168.37.159", 9999)
// 3、指定数据相关的转换
val newText = text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
// 4、指定计算结果的存储位置
newText.writeAsText("./text",FileSystem.WriteMode.OVERWRITE)
newText.print("cN")
text.print("NN")
// 5、触发程序执行
environment.execute()
}
}