Flink 基础学习 笔记
DataStream API
DataStream API
得名于特殊的 DataStream
类,该类用于表示 Flink
程序中的数据集合。可以认为 它们是可以包含重复项的不可变数据集合
。这些数据可以是有界(有限)
的,也可以是无界(无限)
的,但用于处理它们的API
是相同的。DataStream
在用法上类似于常规的 Java
集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API
操作来处理它们,DataStream API
操作也叫作转换(transformation
)。
可以通过在 Flink
程序中添加 source
创建一个初始的 DataStream
。然后,你可以基于 DataStream
派生新的流,并使用 map、filter
等 API
方法把 DataStream
和派生的流连接在一起。
程序剖析
程序结构
每个程序由相同的基本部分组成:
1、获取一个执行环境(execution environment
)
2、加载/创建初始数据
3、指定数据相关的转换
4、指定计算结果的存储位置
5、触发程序执行
Java
DataStream API
的所有核心类都可以在 org.apache.flink.streaming.api.scala
中找到
1、获取一个执行环境
在DataStream API
中分为 StreamExecutionEnvironment
、 ExecutionEnvironment
StreamExecutionEnvironment
用于处理流式数据,而 ExecutionEnvironment
用于处理批处理数据。StreamExecutionEnvironment
主要用于构建处理流数据的作业,而 ExecutionEnvironment
则用于构建批处理作业。StreamExecutionEnvironment
是所有 Flink
程序的基础.
在StreamExecutionEnvironment
下有getExecutionEnvironment
、createLocalEnvironment()
和createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
一般用getExecutionEnvironment
即可,会返回一个执行环境以在集群上你执行的程序。(在IDE
里运行会生成一个本地环境;如果提交到flink
集群上flink
会自动运行main
方法)
例如:
val environment = StreamExecutionEnvironment.getExecutionEnvironment()
2、加载/创建初始数据
多并行Source
非并行的Source
Flink 提供了一些Data Source
数据源方法:fromElements(T …)
非并行readFile(FileInputFormat inputFormat, String filePath)
并行readTextFile(String filePath)
socketTextStream(String hostname, int port)
非并行fromCollection(Collection)
非并行fromParallelCollection(SplittableIterator, Class)
并行generateSequence(long from, long to)
并行
第三方 SourceaddSource
读取后会返回一个 DataSteam
,例如:
val text = environment.socketTextStream("192.168.37.159", 9999)
3、指定数据相关的转换
这个时候可以调用 DataStream
上具有转换功能的方法来应用转换,例如:
val newText = text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.sum(1)
执行完成后将会创建返回一个新的DataSteam
4、指定计算结果的存储位置
当DataSteam
处理好后,可以通过sink
写到外部,例如:
newText.writeAsText("./text",FileSystem.WriteMode.OVERWRITE)
会将文件写到text
文件夹中,每次数据如果相同并没有定义windows
数据将会累加
配置windows
后
5、触发程序执行
一旦指定了完整的程序,需要调用 StreamExecutionEnvironment
的 execute()
方法来触发程序执行。根据 ExecutionEnvironment
的类型,执行会在本地机器上触发,或将程序提交到某个集群上执行。
environment.execute()
完整代码
package org.example
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.core.fs.{FSDataOutputStream, FileSystem}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object Flink1 {
def main(args: Array[String]): Unit = {
// 1、获取一个执行环境
val environment = StreamExecutionEnvironment.createLocalEnvironment()
environment.setParallelism(1)
// 2、加载/创建初始数据
val text = environment.socketTextStream("192.168.37.159", 9999)
// 3、指定数据相关的转换
val newText = text.flatMap(_.split(" "))
.map((_,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum(1)
// 4、指定计算结果的存储位置
newText.writeAsText("./text",FileSystem.WriteMode.OVERWRITE)
newText.print("cN")
text.print("NN")
// 5、触发程序执行
environment.execute()
}
}
当前页面是本站的「Google AMP」版。查看和发表评论请点击:完整版 »