Flink 基础学习 笔记

2023-12-25T15:09:00

DataStream API

参考文章

DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。
DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。
可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filterAPI 方法把 DataStream 和派生的流连接在一起。

程序剖析

程序结构

每个程序由相同的基本部分组成:
1、获取一个执行环境(execution environment
2、加载/创建初始数据
3、指定数据相关的转换
4、指定计算结果的存储位置
5、触发程序执行

Java DataStream API 的所有核心类都可以在 org.apache.flink.streaming.api.scala 中找到

1、获取一个执行环境

DataStream API 中分为 StreamExecutionEnvironmentExecutionEnvironment
StreamExecutionEnvironment 用于处理流式数据,而 ExecutionEnvironment 用于处理批处理数据。StreamExecutionEnvironment 主要用于构建处理流数据的作业,而 ExecutionEnvironment 则用于构建批处理作业。
StreamExecutionEnvironment 是所有 Flink 程序的基础.
StreamExecutionEnvironment 下有getExecutionEnvironmentcreateLocalEnvironment()createRemoteEnvironment(host: String, port: Int, jarFiles: String*)
一般用getExecutionEnvironment即可,会返回一个执行环境以在集群上你执行的程序。(在IDE里运行会生成一个本地环境;如果提交到flink集群上flink会自动运行main方法)
例如:

val environment = StreamExecutionEnvironment.getExecutionEnvironment()

2、加载/创建初始数据

参考文章


多并行Source

非并行的Source
Flink 提供了一些Data Source数据源方法:
fromElements(T …) 非并行
readFile(FileInputFormat inputFormat, String filePath) 并行
readTextFile(String filePath)
socketTextStream(String hostname, int port) 非并行
fromCollection(Collection) 非并行
fromParallelCollection(SplittableIterator, Class) 并行
generateSequence(long from, long to) 并行
第三方 Source
addSource
读取后会返回一个 DataSteam,例如:

val text = environment.socketTextStream("192.168.37.159", 9999)

3、指定数据相关的转换

这个时候可以调用 DataStream 上具有转换功能的方法来应用转换,例如:

    val newText = text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(_._1)
      .sum(1)

执行完成后将会创建返回一个新的DataSteam

4、指定计算结果的存储位置

DataSteam处理好后,可以通过sink写到外部,例如:

newText.writeAsText("./text",FileSystem.WriteMode.OVERWRITE)

会将文件写到text文件夹中,每次数据如果相同并没有定义windows 数据将会累加


配置windows

5、触发程序执行

一旦指定了完整的程序,需要调用 StreamExecutionEnvironmentexecute() 方法来触发程序执行。根据 ExecutionEnvironment 的类型,执行会在本地机器上触发,或将程序提交到某个集群上执行。

environment.execute()

完整代码

package org.example

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, createTypeInformation}
import org.apache.flink.core.fs.{FSDataOutputStream, FileSystem}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object Flink1 {
  def main(args: Array[String]): Unit = {
    // 1、获取一个执行环境
    val environment = StreamExecutionEnvironment.createLocalEnvironment()
    environment.setParallelism(1)
    // 2、加载/创建初始数据
    val text = environment.socketTextStream("192.168.37.159", 9999)
    // 3、指定数据相关的转换
    val newText = text.flatMap(_.split(" "))
      .map((_,1))
      .keyBy(_._1)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
      .sum(1)

    // 4、指定计算结果的存储位置
    newText.writeAsText("./text",FileSystem.WriteMode.OVERWRITE)
    newText.print("cN")
    text.print("NN")
    
    // 5、触发程序执行
    environment.execute()
  }
}


当前页面是本站的「Baidu MIP」版。发表评论请点击:完整版 »