# flink简介

# flink特性

Flink 区别与传统数据处理框架的特性如下。

  • 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
  • 结果的准确性。Flink 提供了事件时间(event-time)和处理时间(processing-time)语义。对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证。
  • 可以连接到最常用的存储系统,如 Apache Kafka、Apache Cassandra、Elasticsearch、JDBC、Kinesis 和(分布式)文件系统,如 HDFS 和 S3。
  • 高可用。本身高可用的设置,加上与 K8s,YARN 和 Mesos 的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink 能做到以极少的停机时间 7×24 全天候运行。
  • 能够更新应用程序代码并将作业(jobs)迁移到不同的 Flink 集群,而不会丢失应用程序的状态。

# 分层API

zoQry4.png (opens new window)

最底层级的抽象仅仅提供了有状态流,它将处理函数( Process Function)嵌入到了DataStream API 中。底层处理函数(Process Function)与 DataStream API 相集成,可以对某些操作进行抽象,它允许用户可以使用自定义状态处理来自一个或多个数据流的事件,且状态具有一致性和容错保证。除此之外,用户可以注册事件时间并处理时间回调,从而使程序可以处理复杂的计算。

实际上,大多数应用并不需要上述的底层抽象,而是直接针对核心 API(Core APIs) 进行编程,比如 DataStream API(用于处理有界或无界流数据)以及 DataSet API(用于处理有界数据集)。这些 API 为数据处理提供了通用的构建模块,比如由用户定义的多种形式的转换 (transformations)、连接(joins)、聚合(aggregations)、窗口(windows)操作等。DataSet API为有界数据集提供了额外的支持,例如循环与迭代。这些 API 处理的数据类型以类(classes) 的形式由各自的编程语言所表示。

WARNING

todo

# 快速上手

# 编写代码

# 批处理

package com.chengyi.chapter02;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWorldCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
        DataSource<String> lineDS = env.readTextFile("input/words.txt");
        // 3. 转换数据格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne =
                lineDS.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                    String[] words = line.split(" ");
                    for(String word : words) {
                        out.collect(Tuple2.of(word, 1L));
                    }
                }).returns(Types.TUPLE(Types.STRING, Types.LONG));// 当lambda表达式使用Java泛型的时候,由于泛型擦除的存在,需要显示的声明类型信息

        // 4. 按照word进行分组
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分组内聚合统计
        AggregateOperator<Tuple2<String,Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印结果
        sum.print();
    }
}

运行结果

(java,1)
(flink,1)
(world,1)
(hello,3)

# 流处理

# 读取文件
package com.chengyi.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class BoundedStreamWordCount {

    public static void main(String[] args) throws Exception {
        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文件
        DataStreamSource<String> lineDSS = env.readTextFile("input/words.txt");

        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap((String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                }).returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);

        // 6. 打印
        result.print();

        // 7. 执行
        env.execute();
    }
}

运行结果

1> (java,1)
2> (hello,1)
2> (hello,2)
2> (hello,3)
3> (world,1)
4> (flink,1)

主要观察与批处理程序BatchWordCount的不同:

  • 创建的执行环境不同,流处理程序使用的是StreamExecutionEnvironment。
  • 每一步处理转换之后,得到的数据对象类型不同。
  • 分组操作调用的是keyBy方法,可以传入一个匿名函数作为作为键选择器(KeySelector),指定当前分组的key是什么
  • 代码末尾需要调用env的execute方法,开始执行任务
# 读取文本流
package com.chengyi.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class StreamWordCount {
    public static void main (String[] args) throws Exception {

        // 1. 创建流式执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 读取文本流
        DataStreamSource<String> lineDSS = env.socketTextStream("node001", 7777);

        // 3. 转换数据格式
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOne = lineDSS
                .flatMap( (String line, Collector<String> words) -> {
                    Arrays.stream(line.split(" ")).forEach(words::collect);
                })
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1L))
                .returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4. 分组
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKS = wordAndOne.keyBy(t -> t.f0);

        // 5. 求和
        SingleOutputStreamOperator<Tuple2<String, Long>> result = wordAndOneKS.sum(1);

        // 6. 打印
        result.print();
        // 7. 执行
        env.execute();
    }
}

node001 作为发送数据的socket端口 [root@node001 ~]$ nc -lk 7777

发送数据

hello flink
hello world
hello java

接收数据

2> (hello,1)
4> (flink,1)
2> (hello,2)
4> (world,1)
1> (java,1)
2> (hello,3)

# flink部署

TIP

todo

# DataStream API

代码基本上都由以下几部分组成

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformations)
  • 定义计算结果的输出位置(sink)
  • 触发程序执行(execute) zoTqhQ.png (opens new window)

# 执行环境

# 创建执行环境

  • DataStream执行环境
  1. getExecutionEvironment 这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. createLocalEnvironment 这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数。
StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();
  1. createRemoteEnvironment 这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包。
StreamExecutionEnvironment remoteEnv = StreamExecutionEnvironment
    .createRemoteEnvironment(
    "host", // JobManager 主机名
    1234, // JobManager 进程端口号
    "path/to/jarFile.jar" // 提交给 JobManager 的 JAR 包
);
  • DataSet执行环境 批处理的执行环境与流处理类似,是调用类 ExecutionEnvironment的静态方法
ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();

# 执行模式

  1. 流执行模式(STREAMING) 默认情况下,程序使用的就是 STREAMING 执行模式。

  2. 批执行模式(BATCH)

    1. 通过命令行设置 在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH.
    bin/flink run -Dexecution.runtime-mode=BATCH ...
    
    1. 通过代码配置
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    

    TIP

    不要在代码中配置,而是使用命令行。这同设置并行度是类似的:在提交作业时指定参数可以更加灵活,同一段应用程序写好之后,既可以用于批处理也可以用于流处理。而在代码中硬编码(hard code)的方式可扩展性比较差,一般都不推荐。

  3. 什么时候选择BATCH模式 在 STREAMING模式下,每来一条数据,就会输出一次结果(即使输入数据是有界的);而 BATCH 模式下,只有数据全部处理完之后,才会一次性输出结果。 用 BATCH 模式处理批量数据,用 STREAMING模式处理流式数据。因为数据有界的时候,直接输出结果会更加高效;而当数据无界的时候, 我们没得选择——只有 STREAMING 模式才能处理持续的数据流。

# 触发程序执行

显式地调用执行环境的 execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

env.execute();

# 源算子

  source就是我们整个处理程序的输入端。

DataStream<String> stream = env.addSource(...);

  方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource。这里的DataStreamSource 类继承自 SingleOutputStreamOperator 类,又进一步继承自DataStream。以很明显,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)。

# 准备工作

为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的 urrl,用户访问 url 的时间戳),所以在这里,我们可以创建一个类 Event,将用户行为包装成它的一个对象。

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

这里需要注意,我们定义的 Event,有这样几个特点:

  • 类是公有(public)的
  • 有一个无参的构造方法
  • 所有属性都是公有(public)的
  • 所有属性的类型都是可以序列化的

Flink 会把这样的类作为一种特殊的 POJO 数据类型来对待,方便数据的解析和序列化。另外我们在类中还重写了 toString 方法,主要是为了测试输出显示更清晰。关于 Flink 支持的数据类型,我们会在后面章节做详细说明。

# 从集合中读取数据

在代码中创建一个 Java 集合,然后调用执行环境的fromCollection 方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    ArrayList<Event> clicks = new ArrayList<>();
    clicks.add(new Event("Mary","./home",1000L));
    clicks.add(new Event("Bob","./cart",2000L));
    DataStream<Event> stream = env.fromCollection(clicks);
    stream.print();
    env.execute();
}

我们也可以不构建集合,直接将元素列举出来,调用 fromElements 方法进行读取数据:

DataStreamSource<Event> stream2 = env.fromElements(
 new Event("Mary", "./home", 1000L),
 new Event("Bob", "./cart", 2000L)
);

# 从文件读取数据

DataStream<String> stream = env.readTextFile("clicks.csv");

说明:

  • 参数可以是目录,也可以是文件
  • 路径可以使相对路径,也可以是绝对路径
  • 相对路径是从系统属性 user.dir 获取路径: idea 下是 project 的根目录, standalone 模式下是集群节点根目录;
  • 也可以从 hdfs 目录下读取, 使用路径 hdfs://..., 由于 Flink 没有提供 hadoop 相关依赖, 需要 pom 中添加相关依赖:
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.5</version>
    <scope>provided</scope>
    </dependency>
    

# 从Socket 读取数据

这种方式由于吞吐量小、稳定性较差,一般也是用于测试。

DataStream<String> stream = env.socketTextStream("localhost", 7777);

# 从Kafka读取数据

zTSTHJ.png (opens new window) 想要以 Kafka 作为数据源获取数据,我们需要引入 Kafka 连接器的依赖。

<dependency>
 <groupId>org.apache.flink</groupId>
 <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
 <version>${flink.version}</version>
</dependency>

然后调用 env.addSource(),传入 FlinkKafkaConsumer 的对象实例就可以了。

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "node004:6667");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer",
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> stream = env.addSource(
                new FlinkKafkaConsumer<>("clicks", new SimpleStringSchema(), properties));

        stream.print("Kafka"); // 输出前缀

        env.execute();
    }

# 自定义Source

# 单线程Source

实现 SourceFunction 接口,重写两个关键方法:run()和 cancel()。

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据;
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。

自定义数据源

package com.chengyi.chapter05;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Calendar;
import java.util.Random;

public class ClickSource implements SourceFunction<Event> {

    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;

    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        // 在指定的数据集中随机选取数据
        Random random = new Random();
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        while (running) {
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));

            // 间隔1秒生成一个点击时间,方便观测
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

有了自定义的 source function,接下来只要调用 addSource()就可以了:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 有了自定义的source function, 即用addSource方法
    DataStreamSource<Event> stream = env.addSource(new ClickSource());

    stream.print("SourceCustom");

    env.execute();
}

WARNING

SourceFunction 接口定义的数据源,并行度只能设置为 1,如果数据源设置为大于 1 的并行度,则会抛出异常。 Exception in thread "main" java.lang.IllegalArgumentException: The parallelism of non parallel operator must be 1.

# 并行Source
public class ParallelSourceExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new CustomSource()).setParallelism(2).print();

        env.execute();
    }

    private static class CustomSource implements ParallelSourceFunction<Integer> {
        private boolean running = true;
        private Random random = new Random();

        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            while (running) {
                ctx.collect(random.nextInt());
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

# 转换算子

以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream,可以进行分流、合流等多流转换操作,从而组合成复杂的数据流拓扑。多流转换的内容我们将在后续章节展开

# 基本转换算子

# 映射(map)

基于 DataStrema 调用 map()方法就可以进行转换处理。传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream,不过泛型(流中的元素类型)可能改变。

public class TransMapTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        // 传入匿名类,实现MapFunction
        /*stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return  value.user;
            }
        }).print();*/

        // 传入MapFunction的实现类
        stream.map(new UserExtractor()).print();

        env.execute();

    }


    private static class UserExtractor implements MapFunction<Event, String> {
        @Override
        public String map(Event value) throws Exception {
            return value.user;
        }
    }
}
# 过滤(filter)

通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。 进行 filter 转换之后的新数据流的数据类型与原数据流是相同的。filter 转换需要传入的参数需要实现 FilterFunction 接口,而 FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Event> stream = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );

    // 传入匿名类实现FilterFunction
    /* stream.filter(new FilterFunction<Event>() {
        @Override
        public boolean filter(Event value) throws Exception {
            return value.user.equals("Mary");
        }
    });*/
    
    // 传入FilterFunction实现类
    stream.filter(new UserFilter()).print();

    env.execute();
}


private static class UserFilter implements FilterFunction<Event> {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.user.equals("Mary");
    }
}
# 扁平映射(flatMap)

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten) 和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分 后的元素做转换处理。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Event> stream = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );

    stream.flatMap(new MyFlatMapper()).print();

    env.execute();
}


private static class MyFlatMapper implements FlatMapFunction<Event, String> {
    @Override
    public void flatMap(Event value, Collector<String> out) throws Exception {
        if(value.user.equals("Mary")) {
            out.collect(value.user);
        }else if (value.user.equals("Bob")) {
            out.collect(value.user);
            out.collect(value.url);
        }
    }
}

# 聚合算子(Aggregation)

把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。

# 按键分区(keyBy)

DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的。

keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

基于不同的 key,流中的数据将被分配到不同的分区中去,这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了。

在内部,是通过计算 key 的哈希值(hash code),对分区数进行取模运算来实现的。所以 这里key 如果是 POJO 的话,必须要重写 hashCode()方法。

keyBy()方法需要传入一个参数,这个参数指定了一个或一组 key。有很多不同的方法来指定 key:比如对于 Tuple 数据类型,可以指定字段的位置或者多个位置的组合;对于 POJO 类型,可以指定字段的名称(String);另外,还可以传入 Lambda 表达式或者实现一个键选择器(KeySelector),用于说明从数据中提取 key 的逻辑。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Event> stream = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );
    
    // 使用lambda表达式
    KeyedStream<Event, String> keyedStream = stream.keyBy(e -> e.user);
    
    // 使用匿名函数类实现KeySelector
    KeyedStream<Event, String> keyedStream1 = stream.keyBy(new KeySelector<Event, String>() {
        @Override
        public String getKey(Event value) throws Exception {
            return value.user;
        }
    });

    keyedStream1.print();
    
    env.execute();
}

WARNING

keyBy 得到的结果将不再是 DataStream,而是会将 DataStream 转换为KeyedStream。KeyedStream 可以认为是“分区流”或者“键控流”,它是对 DataStream 按照key 的一个逻辑分区,所以泛型有两个类型:除去当前流中的元素类型外,还需要指定 key 的类型。

KeyedStream 也继承自 DataStream,所以基于它的操作也都归属于 DataStream API。但它 跟之前的转换操作得到的 SingleOutputStreamOperator 不同,只是一个流的分区操作,并不是 一个转换算子。KeyedStream 是一个非常重要的数据结构,只有基于它才可以做后续的聚合操 作(比如 sum,reduce);而且它可以将当前算子任务的状态(state)也按照 key 进行划分、限 定为仅对当前 key 有效。关于状态的相关知识我们会在后面章节继续讨论。

# 简单聚合
  • sum(): 在输入流上,对指定的字段做叠加求和的操作。
  • min(): 在输入流上,对指定的字段求最小值。
  • max(): 在输入流上,对指定的字段求最大值。
  • minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
  • maxBy(): 与max()类似,在输入流上针对指定字段求最大值。参数minby()
  1. 对元组数据流进行聚合的测试:
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStreamSource<Tuple3<String, Integer, String>> stream = env.fromElements(
            Tuple3.of("a", 1, "cy1"),
            Tuple3.of("a", 3, "cy2"),
            Tuple3.of("b", 3, "cy1"),
            Tuple3.of("b", 4, "cy2")
    );

    stream.keyBy(r -> r.f0).sum(1).print("sum_position");
    stream.keyBy(r -> r.f0).sum("f1").print("sum_field");
    stream.keyBy(r -> r.f0).max(1).print("max_position");
    stream.keyBy(r -> r.f0).max("f1").print("max_field");
    stream.keyBy(r -> r.f0).min(1).print("min_position");
    stream.keyBy(r -> r.f0).min("f1").print("min_field");
    stream.keyBy(r -> r.f0).maxBy(1).print("maxBy_position");
    stream.keyBy(r -> r.f0).maxBy("f1").print("maxBy_field");
    stream.keyBy(r -> r.f0).minBy(1).print("minBy_position");
    stream.keyBy(r -> r.f0).minBy("f1").print("minBy_field");

    env.execute();
}

输出结果

max_position:1> (b,3,cy1)
max_position:1> (b,4,cy1)
sum_field:3> (a,1,cy1)
sum_field:3> (a,4,cy1)
sum_position:1> (b,3,cy1)
sum_position:1> (b,7,cy1)
sum_position:3> (a,1,cy1)
sum_position:3> (a,4,cy1)
sum_field:1> (b,3,cy1)
sum_field:1> (b,7,cy1)
max_position:3> (a,1,cy1)
max_position:3> (a,3,cy1)
max_field:1> (b,3,cy1)
max_field:1> (b,4,cy1)
max_field:3> (a,1,cy1)
max_field:3> (a,3,cy1)
minBy_field:3> (a,1,cy1)
minBy_field:3> (a,1,cy1)
minBy_field:1> (b,3,cy1)
minBy_field:1> (b,3,cy1)
minBy_position:3> (a,1,cy1)
minBy_position:3> (a,1,cy1)
maxBy_field:3> (a,1,cy1)
maxBy_field:3> (a,3,cy2)
maxBy_field:1> (b,3,cy1)
maxBy_field:1> (b,4,cy2)
min_field:3> (a,1,cy1)
min_field:3> (a,1,cy1)
min_field:1> (b,3,cy1)
min_field:1> (b,3,cy1)
min_position:3> (a,1,cy1)
min_position:3> (a,1,cy1)
min_position:1> (b,3,cy1)
min_position:1> (b,3,cy1)
maxBy_position:1> (b,3,cy1)
maxBy_position:1> (b,4,cy2)
maxBy_position:3> (a,1,cy1)
maxBy_position:3> (a,3,cy2)
minBy_position:1> (b,3,cy1)
minBy_position:1> (b,3,cy1)
  1. 对pojo类型的聚合测试
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    DataStreamSource<Event> stream = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );

    stream.keyBy(e -> e.user).max("timestamp").print();
    
    env.execute();
}

TIP

简单聚合算子返回的,同样是一个 SingleOutputStreamOperator,也就是从 KeyedStream 又转换成了常规的 DataStream。所以可以这样理解:keyBy 和聚合是成对出现的,先分区、后聚合,得到的依然是一个 DataStream。而且经过简单聚合之后的数据流,元素的数据类型保持不变。

WARNING

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子,应该只用在含有有限个 key 的数据流上。

# 规约聚合(reduce)

reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。

public interface ReduceFunction<T> extends Function, Serializable {   
    T reduce(T value1, T value2) throws Exception;
}

与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStrema。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

/*将数据流按照用户 id 进行分区,然后用一个 reduce 算子实现 sum 的功能,统计每个
用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce 算子实现 maxBy 的功能,
记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。*/

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 这里使用自定义数据源ClickSource作为数据源
    env.addSource(new ClickSource())
            // 将Event数据类型转换成元组类型
            .map(new MapFunction<Event, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(Event value) throws Exception {
                    return Tuple2.of(value.user, 1L);
                }
            })
            .keyBy(r -> r.f0) //使用用户名来进行分流
            .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                    // 每到一条数据,用户PV的统计值加1
                    return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                }
            })
            .keyBy(r -> true) // 使所有数据进入同一条流,将聚合结果发送到一条流中去
            .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                    // 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
                    return value1.f1 > value2.f1 ? value1 : value2;
                }
            })
            .print();

    env.execute();
}

WARNING

reduce 同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以我们需要将 reduce 算子作用在一个有限 key 的流上。

# 用户自定义函数

# 函数类

对于大部分操作而言,都需要传入一个用户自定义函数(UDF),实现相关操作的接口,来完成处理逻辑的定义。Flink 暴露了所有 UDF 函数的接口,具体实现方式为接口或者抽象类,例如 MapFunction、FilterFunction、ReduceFunction 等。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Event> clicks = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );

    DataStream<Event> stream = clicks.filter(new FlinkFilter());

    // 将用于过滤的关键字"home"抽象出来作为类的属性,调用构造方法时传进去。
    DataStream<Event> stream1 = clicks.filter(new KeyWordFilter("home"));

    stream.print();

    env.execute();
}


private static class FlinkFilter implements FilterFunction<Event> {
    @Override
    public boolean filter(Event value) throws Exception {
        return value.url.contains("home");
    }
}

private static class KeyWordFilter implements FilterFunction<Event> {
    private String keyWord;

    public KeyWordFilter(String keyWord) {
        this.keyWord = keyWord;
    }

    @Override
    public boolean filter(Event value) throws Exception {
        return value.url.contains(keyWord);
    }
}
# 匿名函数

匿名函数(Lambda 表达式)是 Java 8 引入的新特性,方便我们更加快速清晰地写代码。Lambda 表达式允许以简洁的方式实现函数,以及将函数作为参数来进行传递,而不必声明额外的(匿名)类。

Flink 的所有算子都可以使用 Lambda 表达式的方式来进行编码,但是,当 Lambda 表达式使用 Java 的泛型时,我们需要显式的声明类型信息。

如果OUT输出的是基本类型,Flink可以从函数签名 OUT map(IN value) 的实现中自动提取出结果的类型信息。

但是对于像 flatMap() 这样的函数,它的函数签名 void flatMap(IN value, Collector<OUT>out) 被 Java 编译器编译成了 void flatMap(IN value, Collector out),也就是说将Collector 的泛型信息擦除掉了。这样 Flink 就无法自动推断输出的类型信息了。

泛型擦除的问题通过一下方式解决:

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Event> clicks = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L)
    );
    
    // 想要转换成二元组类型,需要进行一下处理
    // 1) 使用显示的".returns(...)"
    DataStream<Tuple2<String, Long>> stream1 = clicks.map(event -> Tuple2.of(event.user, 1L))
            .returns(Types.TUPLE(Types.STRING, Types.LONG));
    stream1.print();
    
    // 2) 使用类来代替lambda表达式
    clicks.map(new MyTuple2Mapper()).print();
    
    // 3) 使用匿名类来代替lambda表达式
    clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> map(Event value) throws Exception {
            return Tuple2.of(value.user, 1L);
        }
    }).print();
    
    env.execute();
}

private static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>> {
    @Override
    public Tuple2<String, Long> map(Event value) throws Exception {
        return Tuple2.of(value.user, 1L);
    }
}
# 富函数类(Rich Function Classes)

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等。

富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成。
  • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作。

TIP

生命周期方法,对于一个并行子任务来说只会调用一次;而对应的,实际工作方法,例如 RichMapFunction 中的 map(),在每条数据到来后都会触发一次调用。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);

    DataStreamSource<Event> clicks = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L),
            new Event("Alice", "./prod?id=1", 5 * 1000L),
            new Event("Cary", "./home", 60 * 1000L)
    );

    // 将点击事件转换成长整形的时间戳输出
    clicks.map(new RichMapFunction<Event, Long>() {
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            System.out.println("索引为" + getRuntimeContext().getIndexOfThisSubtask() + "的任务开始!");
        }

        @Override
        public Long map(Event value) throws Exception {
            return value.timestamp;
        }

        @Override
        public void close() throws Exception {
            super.close();
            System.out.println("索引为" + getRuntimeContext().getIndexOfThisSubtask() + "的任务结束!");
        }
    }).print();

    env.execute();
}

输出的结果是:

索引为0的任务开始!
索引为1的任务开始!
1> 1000
1> 5000
2> 2000
2> 60000
索引为0的任务结束!
索引为1的任务结束!
DETAILS

常见的应用场景就是,如果我们希望连接到一个外部数据库进行读写操作,那么将连接操作放在 map()中显然不是个好选择——因为每来一条数据就会重新连接一次数据库;所以我们可以在 open()中建立连接,在 map()中读写数据,而在 close()中关闭连接。

# 物理分区(Physical Partitioning)

“分区”(partitioning)操作就是要将数据进行重新分布,传递到不同的流分区去进行下一步处理。其实我们对分区操作并不陌生,前面介绍聚合算子时,已经提到了 keyBy,它就是一种按照键的哈希值来进行重新分区的操作。只不过这种分区操作只能保证把数据按key“分开”,至于分得均不均匀、每个 key 的数据具体会分到哪一区去,这些是完全无从控制的——所以我们有时也说,keyBy 是一种逻辑分区(logical partitioning)操作。

控制分区策略,精准地调配数据

手动控制数据分区分配策略

物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个 KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式。

常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale) 和广播(Broadcast)。

# 随机分区(shuffle)

通过调用 DataStream 的.shuffle()方法,将数据随机地分配到下游算子的并行任务中去。

经过随机分区之后,得到的依然是一个 DataStream。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 读取数据源,并行度为1
    DataStreamSource<Event> stream = env.addSource(new ClickSource());

    // 经洗牌后打印输出,并行度为4
    stream.shuffle().print("shuffle").setParallelism(4);

    env.execute();
}

打印输出结果

shuffle:4> Event{user='Bob', url='./prod?id=1', timestamp=2022-12-16 11:33:56.628}
shuffle:3> Event{user='Bob', url='./prod?id=2', timestamp=2022-12-16 11:33:57.641}
shuffle:3> Event{user='Mary', url='./prod?id=2', timestamp=2022-12-16 11:33:58.651}
shuffle:4> Event{user='Mary', url='./cart', timestamp=2022-12-16 11:33:59.665}
shuffle:1> Event{user='Cary', url='./prod?id=2', timestamp=2022-12-16 11:34:00.678}
shuffle:4> Event{user='Cary', url='./prod?id=2', timestamp=2022-12-16 11:34:01.693}
shuffle:2> Event{user='Bob', url='./fav', timestamp=2022-12-16 11:34:02.706}
shuffle:2> Event{user='Mary', url='./home', timestamp=2022-12-16 11:34:03.72}
shuffle:1> Event{user='Mary', url='./cart', timestamp=2022-12-16 11:34:04.733}
shuffle:2> Event{user='Bob', url='./fav', timestamp=2022-12-16 11:34:05.733}
shuffle:2> Event{user='Alice', url='./cart', timestamp=2022-12-16 11:34:06.74}
# 轮询分区(Round-Robin)

按照先后顺序将数据做依次分发,调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去。

public static void main(String[] args) throws Exception {

    // 创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 读取数据源,并行度为1
    DataStreamSource<Event> stream = env.addSource(new ClickSource());

    // 经轮询重分区后打印输出,并行度为4
    stream.rebalance().print("rebalance").setParallelism(4);

    env.execute();
}

输出结果

rebalance:1> Event{user='Cary', url='./fav', timestamp=2022-12-16 11:39:32.13}
rebalance:2> Event{user='Alice', url='./prod?id=1', timestamp=2022-12-16 11:39:33.14}
rebalance:3> Event{user='Mary', url='./prod?id=2', timestamp=2022-12-16 11:39:34.155}
rebalance:4> Event{user='Alice', url='./prod?id=2', timestamp=2022-12-16 11:39:35.156}
rebalance:1> Event{user='Bob', url='./prod?id=2', timestamp=2022-12-16 11:39:36.171}
rebalance:2> Event{user='Mary', url='./prod?id=2', timestamp=2022-12-16 11:39:37.186}
rebalance:3> Event{user='Alice', url='./home', timestamp=2022-12-16 11:39:38.201}
rebalance:4> Event{user='Alice', url='./cart', timestamp=2022-12-16 11:39:39.215}
rebalance:1> Event{user='Bob', url='./home', timestamp=2022-12-16 11:39:40.227}
rebalance:2> Event{user='Cary', url='./fav', timestamp=2022-12-16 11:39:41.24}
rebalance:3> Event{user='Bob', url='./prod?id=2', timestamp=2022-12-16 11:39:42.24}
rebalance:4> Event{user='Alice', url='./prod?id=1', timestamp=2022-12-16 11:39:43.252}
# 重缩放分区(rescale)

只会将数据轮询发送到下游并行任务的一部分中,比如上游有2个分区,下游有6个分区,使用重缩放,就是上游1个分区,对应下游3个分区。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 这里使用了并行数据源的富函数版本
    // 这样可以调用getRuntimeContext方法来获取运行时上下文的一些信息
    env.addSource(new RichParallelSourceFunction<Integer>() {

        @Override
        public void run(SourceContext<Integer> ctx) throws Exception {
            // 将奇数发送到索引为1的并行子任务
            // 将偶数发送到索引为0的并行子任务
            for (int i = 0; i < 8 ; i++) {
                if( (i+1) % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
                    ctx.collect(i + 1);
                }
            }
        }

        @Override
        public void cancel() {

        }
    }).setParallelism(2).rescale().print().setParallelism(4);

    env.execute();
}

输出结果

2> 4
2> 8
4> 3
1> 2
1> 6
3> 1
3> 5
4> 7
# 广播(broadcast)

将输入数据复制并发送到下游算子的所有并行任务中去。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 读取数据源,并行度为1
    DataStreamSource<Event> stream = env.addSource(new ClickSource());

    // 经广播后打印输出,并行度为4
    stream.broadcast().print("broadcast").setParallelism(4);

    env.execute();
}

输出结果

broadcast:1> Event{user='Mary', url='./prod?id=1', timestamp=2022-12-16 12:31:57.165}
broadcast:2> Event{user='Mary', url='./prod?id=1', timestamp=2022-12-16 12:31:57.165}
broadcast:3> Event{user='Mary', url='./prod?id=1', timestamp=2022-12-16 12:31:57.165}
broadcast:4> Event{user='Mary', url='./prod?id=1', timestamp=2022-12-16 12:31:57.165}
broadcast:1> Event{user='Alice', url='./fav', timestamp=2022-12-16 12:31:58.179}
broadcast:2> Event{user='Alice', url='./fav', timestamp=2022-12-16 12:31:58.179}
broadcast:3> Event{user='Alice', url='./fav', timestamp=2022-12-16 12:31:58.179}
broadcast:4> Event{user='Alice', url='./fav', timestamp=2022-12-16 12:31:58.179}
# 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力。

# 自定义分区(Custom)

通过使用partitionCustom()方法来自定义分区策略,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector。

/*自然数按照奇偶性进行重分区*/
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 将自然数按照奇偶分区
    env.fromElements(1,2,3,4,5,6,7,8)
            .partitionCustom(new Partitioner<Integer>() {

                @Override
                public int partition(Integer key, int numPartitions) {
                    return key % 2;
                }
            }, new KeySelector<Integer, Integer>() {
                @Override
                public Integer getKey(Integer value) throws Exception {
                    return value;
                }
            }).print().setParallelism(2);

    env.execute();
}

输出结果

1> 2
1> 4
1> 6
1> 8
2> 1
2> 3
2> 5
2> 7

# 输出算子

# 连接到外部系统

Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的,addSink 方法需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception

flink官方提供的连接器
z7l390.png

Flink 还有些一些额外的连接器通过 Apache Bahir 发布, 包括:
z7lJjU.png

# 输出到文件

Flink 提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(4);

    DataStreamSource<Event> stream = env.fromElements(
            new Event("Mary", "./home", 1000L),
            new Event("Bob", "./cart", 2000L),
            new Event("Alice", "./prod?id=100", 3000L),
            new Event("Alice", "./prod?id=200", 3500L),
            new Event("Bob", "./prod?id=2", 2500L),
            new Event("Alice", "./prod?id=300", 3600L),
            new Event("Bob", "./home", 3000L),
            new Event("Bob", "./prod?id=1", 2300L),
            new Event("Bob", "./prod?id=3", 3300L)
    );

    StreamingFileSink<String> fileSink = StreamingFileSink
            .<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
            .withRollingPolicy(
                    DefaultRollingPolicy.builder()
                            .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                            .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                            .withMaxPartSize(1024 * 1024 * 1024)
                            .build()
            ).build();

    // 将event转换成String写入文件
    stream.map(Event::toString).addSink(fileSink);

    env.execute();
}

这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:

  • 至少包含 15 分钟的数据
  • 最近 5 分钟没有收到新的数据
  • 文件大小已达到 1 GB

# 输出到kafka

Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。

完整测试:

  1. 添加 Kafka 连接器依赖由于我们已经测试过从 Kafka 数据源读取数据。
  2. 启动 Kafka 集群
  3. 编写输出到 Kafka 的示例代码
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    Properties properties = new Properties();
    properties.put("bootstrap.servers", "node004:6667");

    DataStreamSource<String> stream = env.readTextFile("input/clicks.csv");

    stream.addSink(new FlinkKafkaProducer<String>(
        "clicks",
        new SimpleStringSchema(),
        properties
    ));

    env.execute();
}

# 输出到redis

Flink 没有直接提供官方的 Redis 连接器,由apache Bahir提供

  1. 导入redis连接器依赖(目前已有1.1版本)
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    
  2. 连接器为我们提供了一个 RedisSink,它继承了抽象类 RichSinkFunction,这就是已经实现好的 向 Redis 写入数据的 SinkFunction。我们可以直接将 Event 数据输出到 Redis:
    public static void main(String[] args) throws Exception {
    
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
    
        // 创建一个Redis连接配置
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("localhost").build();
    
        env.addSource(new ClickSource()).addSink(new RedisSink<Event>(conf, new MyRedisMapper()));
    
        env.execute();
    }
    
    这里的RedisSink的构造方法需要传入两个参数:
    • JFlinkJedisConfigBase:Jedis 的连接配置
    • RedisMapper:Redis 映射类接口,说明怎样将数据转换成可以写入 Redis 的类型
  3. 实现 RedisMapper 接口
    private static class MyRedisMapper implements RedisMapper<Event> {
         @Override
         public RedisCommandDescription getCommandDescription() {
             return new RedisCommandDescription(RedisCommand.HSET, "clicks");
         }
    
         @Override
         public String getKeyFromData(Event event) {
             return event.user;
         }
    
         @Override
         public String getValueFromData(Event event) {
             return event.url;
         }
     }
    
    保存到 Redis 时调用的命令是 HSET,所以是保存为哈希表(hash),表名为“clicks”;保存的数据以 user 为 key,以 url 为 value,每来一条数据就会做一次转换。
  4. 执行查看结果 z7tiqK.png 发送了多条数据, Redis 中只有4条数据. 原因是 hash 中的 key 重复了, 后面的会把前面的覆盖掉

# 输出到Elasticsearch

  1. 添加 Elasticsearch 连接器依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    
  2. 编写输出到Elasticsearch代码

    public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
    
         DataStreamSource<Event> stream = env.fromElements(
                 new Event("Mary", "./home", 1000L),
                 new Event("Bob", "./cart", 2000L),
                 new Event("Alice", "./prod?id=100", 3000L),
                 new Event("Alice", "./prod?id=200", 3500L),
                 new Event("Bob", "./prod?id=2", 2500L),
                 new Event("Alice", "./prod?id=300", 3600L),
                 new Event("Bob", "./home", 3000L),
                 new Event("Bob", "./prod?id=1", 2300L),
                 new Event("Bob", "./prod?id=3", 3300L)
         );
    
         ArrayList<HttpHost> httpHosts = new ArrayList<HttpHost>();
         httpHosts.add(new HttpHost("192.168.207.102", 9200, "http"));
    
         // 创建一个ElasticsearchSinkFunction
         ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {
             @Override
             public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) {
                 HashMap<String, String> data = new HashMap<>();
                 data.put(element.user, element.url);
    
                 IndexRequest request = Requests.indexRequest()
                         .index("clicks")
                         .type("type")
                         .source(data);
                 indexer.add(request);
             }
         };
    
         stream.addSink(new ElasticsearchSink.Builder<Event>(httpHosts, elasticsearchSinkFunction).build());
    
         env.execute();
     }
    

    与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction——ElasticsearchSink。区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。 Builder 的构造方法中又有两个参数:

    • httpHosts:连接到的 Elasticsearch 集群主机列表
    • elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数,重写其中的process方法,将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求。
  3. 结果

    {
        "took": 4,
        "timed_out": false,
        "_shards": {
            "total": 1,
            "successful": 1,
            "skipped": 0,
            "failed": 0
        },
        "hits": {
            "total": {
                "value": 9,
                "relation": "eq"
            },
            "max_score": 1.0,
            "hits": [
                {
                    "_index": "clicks",
                    "_type": "type",
                    "_id": "1gjwGYUB9t6HkP9fABAX",
                    "_score": 1.0,
                    "_source": {
                        "Mary": "./home"
                    }
                },
                {
                    "_index": "clicks",
                    "_type": "type",
                    "_id": "1wjwGYUB9t6HkP9fABAX",
                    "_score": 1.0,
                    "_source": {
                        "Bob": "./cart"
                    }
                },
    

# 输出到MySQL(JDBC)

  1. 添加依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.47</version>
    </dependency>
    
  2. 启动 MySQL,在 database 库下建表 clicks

     mysql> create table clicks(
         -> user varchar(20) not null,
         -> url varchar(100) not null);
    
  3. 编写输出到MySQL代码

     public static void main(String[] args) throws Exception {
         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
    
         DataStreamSource<Event> stream = env.fromElements(
                 new Event("Mary", "./home", 1000L),
                 new Event("Bob", "./cart", 2000L),
                 new Event("Alice", "./prod?id=100", 3000L),
                 new Event("Alice", "./prod?id=200", 3500L),
                 new Event("Bob", "./prod?id=2", 2500L),
                 new Event("Alice", "./prod?id=300", 3600L),
                 new Event("Bob", "./home", 3000L),
                 new Event("Bob", "./prod?id=1", 2300L),
                 new Event("Bob", "./prod?id=3", 3300L));
    
         SinkFunction<Event> jdbcSink = JdbcSink.sink("INSERT INTO clicks (user, url) VALUES (?, ?)",
                 (statement, r) -> {
                     statement.setString(1, r.user);
                     statement.setString(2, r.url);
                 },
                 JdbcExecutionOptions.builder()
                         .withBatchSize(1000)
                         .withBatchIntervalMs(200)
                         .withMaxRetries(2)
                         .build(),
                 new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                         .withUrl("jdbc:mysql://localhost:3306/test")
                         .withDriverName("com.mysql.jdbc.Driver")
                         .withUsername("root")
                         .withPassword("chengyi123")
                         .build()
    
         );
    
         stream.addSink(jdbcSink);
    
         env.execute();
     }
    
  4. 输出结果 z7rO6e.png (opens new window)

# 自定义Sink输出

Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkFunction抽象类,在实现 SinkFunction 的时候,需要重写的一个关键方法 invoke(),在这个方法中我们就可以实现将流里的数据发送出去的逻辑。

使用了 SinkFunction 的富函数版本,因为使用到了生命周期的概念,创建 HBase 的连接以及关闭 HBase 的连接需要分别放在 open()方法和 close()方法中。


TIP

TODO

# flink中得时间和窗口

“窗口”一般就是划定的一段时间范围,也就是“时间窗”;对在这范围内的数据进行处理,就是所谓的窗口计算。所以窗口和时间往往是分不开的。

# 时间语义

# flink中的时间语义

# 处理时间(Processing Time)

执行处理操作的机器的系统时间。

这种方法非常简单粗暴,不需要各个节点之间进行协调同步,也不需要考虑数据在流中的位置,简单来说就是“我的地盘听我的”。所以处理时间是最简单的时间语义。

# 事件时间(Event Time)

指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

这里有个前提,就是“先产生的数据先被处理”,这要求我们可以保证数据到达的顺序。但是由于分布式系统中网络传输延迟的不确定性,实际应用中我们要面对的数据流往往是乱序的。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。

# 水位线

# 事件事件和窗口

水位线是基于事件时间提出的概念。在介绍水位线前,首先梳理下事件时间和窗口的关系。

# 什么是水位线

用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)。

水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

  1. 有序流中的水位线 数据按照先后顺序到来,水位线也会不断增长、时间时钟不断向前推进。

    TIP

    对于水位线的周期性生成,周期时间是指处理时间(系统时间),而不是事件时间。

  2. 乱序流中水位线 数据到来的先后顺序不一样,通过设置延迟,来保证正确处理乱序数据

  3. 水位线的特性

    • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
    • 水位线的主要的内容是一个时间戳,用来标识当前事件时间的进展
    • 水位线是基于数据的时间戳生成的
    • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
    • 水位线可以通过设置延迟,来保证正确处理乱序数据
    • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

# 如何生成水位线

  1. 生成水位线的总体原则
    水位线是流处理中对低延迟和结果正确性的一个权衡机制,而且把控制的权利交给了程序员

  2. 水位线生成策略
    DataStream中,单独生成水位线的方法:assignTimestampsAndWatermarks()

    public SingleOutputStreamOperator<T>assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
    

    .assignTimestampsAndWatermarks()方法需要传入一个WatemarkStrategy作为参数,这就是所谓的“水位线生成策略”。WatermarkStrategy中包含了一个“时间戳分配器”timestampAssigner和一个“水位线生成器”WatermarkGenerator.

    public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> {
        @Override
        TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
        @Override
        WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
    }
    
    • TimestampAssigner:主要负责从流中数据元素的某个字段提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础。
    • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()。
    • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作
    • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms。
    env.getConfig().setAutoWatermarkInterval(60 * 1000L);
    
  3. Flink内置水位线生成器
    通过调用WatermarkStrategy的静态辅助方法来创建,都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

    • 有序流
    stream.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
    .withTimestampAssigner(newSerializableTimestampAssigner<Event>() {
        @Override
        public long extractTimestamp(Event element, long recordTimestamp) 
        {
            return element.timestamp;
            }
        })
    );
    
    • 乱序流
    // 插入水位线的逻辑
    .assignTimestampsAndWatermarks(
        // 针对乱序流插入水位线,延迟时间设置为 5s
        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                // 抽取时间戳的逻辑
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
        })
    )
    

    事实上,有序流的水位线生成器本质上和乱序流是一样的,相当于延迟为0的乱序流水位线生成器,两者完全相同:

    WatermarkStrategy.forMonotonousTimestamps()
    WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
    

    乱序流中生成的水位线真正的时间戳,其实是 当前最大时间戳 – 延迟时间 – 1,这里的单位是毫秒。为什么要减 1 毫秒呢?我们可以回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳≤t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。这一点可以在 BoundedOutOfOrdernessWatermarks 的源码中明显地看到:

    public void onPeriodicEmit(WatermarkOutput output) { 
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1)); 
    }
    
  4. 自定义水位线策略
    在 WatermarkStrategy 中,时间戳分配器 TimestampAssigner 都是大同小异的,指定字段提取时间戳就可以了;而不同策略的关键就在于 WatermarkGenerator 的实现。整体说来,Flink有两种不同的生成水位线的方式:一种是周期性的(Periodic),另一种是断点式(Punctuated)。

    onEvent()和 onPeriodicEmit(),前者是在每个事件到来时调用,而后者由框架周期性调用。周期性调用的方法中发出水位线,自然就是周期性生成水位线;而在事件触发的方法中发出水位线,自然就是断点式生成了。两种方式的不同就集中体现在这两个方法的实现上。

    • 周期性水位线生成器(Periodic Generator) 周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线。
    /*在 onPeriodicEmit()里调用 output.emitWatermark(),就可以发出水位线了;这个方法
    由系统框架周期性地调用,默认 200ms 一次。所以水位线的时间戳是依赖当前已有数据的最
    大时间戳的(这里的实现与内置生成器类似,也是减去延迟时间再减 1),但具体什么时候生
    成与数据无关。*/
    
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = new StreamExecutionEnvironment();
        env.setParallelism(1);
    
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                .print();
    
        env.execute();
    }
    
    private static class CustomWatermarkStrategy implements WatermarkStrategy<Event>
    
    {
        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event event, long l) {
                    // 告诉程序数据源里的时间戳是哪一个字段
                    return event.timestamp;
                }
            };
        }
    
        @Override
        public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }
    
        private class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
    
            private Long delayTime = 5000L; //延迟时间
            private Long maxTs = Long.MIN_VALUE + delayTime + 1L; //观察到的最大时间戳
    
            @Override
            public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
                // 每来一条数据就调用一次
                maxTs = Math.max(event.timestamp, maxTs); //更新最大时间戳
            }
    
            @Override
            public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                // 发射水位线,默认200ms调用一次
                watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));
            }
        }
    }
    
    • 断点式水位线生成器(Punctuated Generator) 断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。
    private class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
        @Override
        public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
            // 只有在遇到特定的itemId时,才发送水位线
            if(event.user.equals("Mary")) {
                watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));
            }
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            // 不需要做任何事情,因为我们在onEvent方法中发射了水位线
        }
    }
    

    我们在onEvent()中判断当前时间的user字段,只有遇到”Mary"这个特殊的值时,才调用output.emitWatermark()发出水位线,这个过程是完全依靠事件来触发的,所以水位线的生成一定在某个数据到来之后。

  5. 在自定义数据源中发送水位线
    自定义的数据源中抽取事件时间,然后发送水位线。

    注意:在自定义数据源中发送了水位线以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方法 来 生 成 水 位 线 了 。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = new StreamExecutionEnvironment();
        env.setParallelism(1);
    
        env.addSource(new ClickSourceWithWatermark()).print();
    
        env.execute();
    }
    
    private static class ClickSourceWithWatermark implements SourceFunction<Event> {
    
        private Boolean running = true;
    
        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr = {"./home", "./cart", "./prod?id=1"};
            while(running) {
                long currTs = Calendar.getInstance().getTimeInMillis();// 毫秒时间戳
                String username = userArr[random.nextInt(userArr.length)];
                String url = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
    
                // 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 发送水位线
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }
    
        @Override
        public void cancel() {
            running = false;
        }
    }
    

    在自定义水位线中生成水位线相比 assignTimestampsAndWatermarks 方法更加灵活,可以任意的产生周期性的、非周期性的水位线,以及水位线的大小也完全由我们自定义。所以非常适合用来编写 Flink 的测试程序,测试 Flink 的各种各样的特性。

# 水位线的传递

zH3TZF.png (opens new window) 例:当前任务上游有四个并行子任务,所以会接受到来自四个分区的水位线,而下游有三个并行子任务,所以会像三个分区发出子任务。

  1. 上游并行子任务发来不同的水位线,当前任务会为每一个分区设置一个“分区水位线” (Partition Watermark),这是一个分区时钟;而当前任务自己的时钟,就是所有分区时钟里最小的那个。
  2. 当有一个新的水位线(第一分区的4)从上游传来时,当前任务会首先更新对应的分区时钟;然后再次判断所有分区中时钟的最小值,如果比之前大,说明事件时间有了进展,当前任务的时钟就可以更新了。
  3. 再次收到新的水位线(第二分区的7)后,执行同样的处理流程。首先将第二个分区时钟更新为7,然后比较所有分区的时钟;返现最小值没有变化,那么当前任务的时钟也不变,也不会向下游发送水位线。
  4. 同样的道理,当又一次收到新的水位线(第三分区的6)之后,第三个分区时钟更新为6,同时所有分区时钟最小值变成了第一个分区的4,所以当前任务的时钟推进到4,并发出时间戳为4的水位线,广播到下游各个分区任务。

# 窗口

我们往往需要面对的是连续不断、无休无止的无界流,不可能等到所有所有数据都到齐了才开始处理。所以聚合计算其实只能针对当前已有的数据——之后再有数据到来,就需要继续叠加、再次输出结果。这样似乎很“实时”,但现实中大量数据一般会同时到来,需要并行处理,这样频繁地更新结果就会给系统带来很大负担了。

把无界流进行切分,每一段数据分别进行聚合,结果只输出一次。这就相当于将无界流的聚合转化为了有界数据集的聚合,这就是所谓的“窗口”(Window)聚合操作。窗口聚合其实是对实时性和处理效率的一个权衡。

# 窗口的概念

Flink 是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 zHICJe.png

窗口都是包含起始时间、不包含结束时间.

由于乱序数据,我们需要设置一个延迟时间来等所有数据到齐。 zHLqDH.png

这样虽然把包含了迟到的9秒数据,但是连11秒和12秒的数据也包含进去了。所以窗口起始并不是一个“框”,我门更应该理解成一个“桶”,每个时间段的数据发送到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。 zHOeP0.png

TIP

Flink中窗口并不是静态准备好的,而是动态创建的--当有落在这个窗口区间范围的数据到达时,才创建对应的窗口。

当到达窗口结束时间时,”触发计算“和”窗口关闭“两个行为可以分开。

# 窗口的分类

# 按照驱动类型分类

以什么标准来开始和结束数据的截取,我们把它叫作窗口的“驱动类型”。

  1. 时间窗口(Time Window)
    时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收集数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”。

  2. 计数窗口(Count Window)
    计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。这相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小。

TIP

flink内部也没有对应的类来表示计数窗口,底层是通过“全局窗口”(Global Window)来实现的。

# 按照窗口分配数据的规则分类

窗口的具体实现可以分为 4 类:滚动窗口(Tumbling Window)、滑动窗口(Sliding Window)、会话窗口(Session Window),以及全局窗口(Global Window)。

  1. 滚动窗口(Tumbling Windows)
    滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。
    滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。 zHXRt1.png

  2. 滑动窗口(Sliding Windows)
    与滚动窗口类似,滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。
    除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率。 zHjuB4.png

  3. 会话窗口(Session Window)
    数据来了之后就开启一个会话窗口,如果接下来还有数据陆续到来,那么就一直保持会话;如果一段时间一直没收到数据,那就认为会话超时失效,窗口自动关闭。

    会话窗口只能基于时间来定义,而没有“会话计数窗口”的概念。

    会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。

    Flink 底层,对会话窗口的处理会比较特殊:每来一个新的数据,都会创建一个新的会话窗口;然后判断已有窗口之间的距离,如果小于给定的 size,就对它们进行合并(merge)操作。

    zb1UsK.png

  4. 全局窗口(Global Windows)
    比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)。 zb1yRI.png

# 窗口API概览

# 按键分区(Keyed)和非按键分区(Non-Keyed)

在定义窗口操作之前,首先需要确定,到底是基于按键分区(Keyed)的数据流 KeyedStream来开窗,还是直接在没有按键分区的 DataStream 上开窗。也就是说,在调用窗口算子之前,是否有 keyBy 操作。

  1. 按键分区窗口(Keyed Windows)
    经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
    stream.keyBy(...).window(...)
    
  2. 非按键分区(Non-Keyed Windows)
    如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。
    stream.windowAll(...) //直接基于windowAll()定义窗口
    
# 代码中窗口API的调用

窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(Window Functions)。

stream.keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(<window function>)

其中.window()方法需要传入一个窗口分配器,它指明了窗口的类型。.aggregate()方法传入一个窗口函数做为参数,它用来定义窗口具体的处理逻辑。

TIP

窗口分脾气有各种形式,窗口函数的调用方法也不只是.aggregate()一种

# 窗口分配器(Window Assigners)

定义数据应该被“分配”到哪个窗口。可以理解为窗口分配器其实就是在指定窗口的类型。

窗口分配器最通用的定义方式,就是调用.window()方法。这个方法需要传入一个WindowAssigner 作为参数,返回 WindowedStream。如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个 WindowAssigner,返回的是 AllWindowedStream。

# 时间窗口

时间窗口是最常用的窗口类型,又可以细分为滚动、滑动和会话三种。

直接调用.window(),在里面传入对应时间语义下的窗口分配器。不需要专门定义时间语义,默认就是事件时间;如果想用处理时间,那么在这里传入处理时间的窗口分配器。

  1. 滚动处理时间窗口
    窗口分配器由类TumblingProcessingTimeWindows提供,需要嗲用它的静态方法.of()。

    stream.keyBy(...)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .aggregate(...)
    

    这里.of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小,我们这里创建了一个长度为 5 秒的滚动窗口。 .of()还有一个重载方法,可以传入两个 Time 类型的参数:size 和 offset。用来处理各个国家的时区问题。

  2. 滑动处理时间窗口
    窗口分配器由类 SlidingProcessingTimeWindows 提供,同样需要调用它的静态方法.of()。

    stream.keyBy(...)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .aggregate(...)
    

    .of()方法需要传入两个 Time 类型的参数:size 和 slide,前者表示滑动窗口的大小,后者表示滑动窗口的滑动步长。我们这里创建了一个长度为 10 秒、滑动步长为 5 秒的滑动窗口。 滑动窗口同样可以追加第三个参数,用于指定窗口起始点的偏移量。

  3. 处理时间会话窗口
    ProcessingTimeSessionWindows 提供,需要调用它的静态方法.withGap()或者.withDynamicGap()。

    stream.keyBy(...)
        .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
        .aggregate(...)
    

    这里.withGap()方法需要传入一个 Time 类型的参数 size,表示会话的超时时间,也就是最小间隔 session gap。我们这里创建了静态会话超时时间为 10 秒的会话窗口。

    .window(ProcessingTimeSessionWindows.withDynamicGap(
        new SessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
            @Override
            public long extract(Tuple2<String, Long> element) { 
                // 提取 session gap 值返回, 单位毫秒
                return element.f0.length() * 1000;
            }
        }
    ))
    

    .withDynamicGap()方法需要传入一个SessionWindowTimeGapExtractor 作为参数,用来定义 session gap 的动态提取逻辑。在这里,我们提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔。

  4. 滚动事件时间窗口
    窗口分配器由类TumblingEventTimeWindows提供,用法与滚动处理事件窗口完全一致。

    stream.keyBy(...)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .aggregate(...)
    

    .of()方法也可以传入第二个参数 offset,用于设置窗口起始点的偏移量。

  5. 滑动事件时间窗口
    窗口分配器由类 SlidingEventTimeWindows 提供,用法与滑动处理事件窗口完全一致。

    stream.keyBy(...)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .aggregate(...)
    
  6. 事件时间会话窗口
    窗口分配器由类 EventTimeSessionWindows 提供,用法与处理事件会话窗口完全一致。

    stream.keyBy(...)
        .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
        .aggregate(...)
    
# 计数窗口

计数窗口概念非常简单,本身底层是基于全局窗口(Global Window)实现的。Flink 为我们提供了非常方便的接口:直接调用.countWindow()方法。根据分配规则的不同,又可以分为滚动计数窗口和滑动计数窗口两类

  1. 滚动计数窗口
    滚动计数窗口只需要传入一个长整型的参数 size,表示窗口的大小。
    stream.keyBy(...)
        .countWindow(10)
    
    这里定义了一个长度为 10 的滚动计数窗口,当窗口中元素数量达到 10 的时候,就会触发计算执行并关闭窗口。
  2. 滑动计数窗口
    与滚动计数窗口类似,不过需要在.countWindow()调用时传入两个参数:size 和 slide,前者表示窗口大小,后者表示滑动步长。
    stream.keyBy(...)
        .countWindow(10,3)
    
    我们定义了一个长度为 10、滑动步长为 3 的滑动计数窗口。每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果。
# 全局窗口

全局窗口是计数窗口的底层实现,一般在需要自定义窗口时使用。它的定义同样是直接调用.window(),分配器由 GlobalWindows 类提供。

stream.keyBy(...)
    .window(GlobalWindows.create());

TIP

需要注意使用全局窗口,必须自行定义触发器才能实现窗口计算,否则起不到任何作用。

# 窗口函数(Window Functions)

经窗口分配器处理之后,数据可以分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是 DataStream,所以并不能直接进行其他转换,而必须进一步调用窗口函数,对收集到的数据进行处理计算之后,才能最终再次得到 DataStream zLff76.png

# 增量聚合函数
  1. ReduceFunction
public class WindowsReduceExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从自定义数据源读取数据,并提取时间戳、生成水位线
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(Event value) throws Exception {
                // 将数据转换成二元组,方便计算
                return Tuple2.of(value.user, 1L);
            }
        }).keyBy(r -> r.f0)
        // 设置滚动事件事件窗口
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .reduce(new ReduceFunction<Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                return Tuple2.of(value1.f0, value1.f1 + value2.f1);
            }
        }).print();
        
        env.execute();
    }
}
  1. AggregateFunction 直接基于 WindowedStream 调用.aggregate()方法,这个方法需要传入一个AggregateFunction 的实现类作为参数。
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{
    ACC createAccumulator();
    ACC add(IN value, ACC accumulator);
    OUT getResult(ACC accumulator);
    Acc merge(ACC a, ACC b);
}

三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
  • add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
  • getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
在电商网站中,PV(页面浏览量)和 UV(独立访客数)是非常重要的两个流量指标。一般来说,
PV 统计的是所有的点击量;而对用户 id 进行去重之后,得到的就是 UV。所以有时我们会用 PV/UV 
这个比值,来表示“人均重复访问量”,也就是平均每个用户会访问多少次页面,这在一定程度上代表了
用户的粘度。
public class WindowAggregateFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                    @Override
                                    public long extractTimestamp(Event element, long recordTimestamp) {
                                        return element.timestamp;
                                    }
                                }));

        // 所有数据设置相同的key,发送到同一个分区统计PV和UV,再相除
        stream.keyBy(data -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
                .aggregate(new AvgPv())
                .print();

        env.execute();
    }

    private static class AvgPv implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {
        @Override
        public Tuple2<HashSet<String>, Long> createAccumulator() {
            // 创建累加器
            return Tuple2.of(new HashSet<String>(), 0L);
        }

        @Override
        public Tuple2<HashSet<String>, Long> add(Event value, Tuple2<HashSet<String>, Long> accumulator) {
            // 属于本窗口的数据来一条累加一次,并返回累加器
            accumulator.f0.add(value.user);
            return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
        }

        @Override
        public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
            // 窗口闭时,增量聚合结束,将计算结果发送到下游
            return (double)accumulator.f1 /accumulator.f0.size();
        }

        @Override
        public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> a, Tuple2<HashSet<String>, Long> b) {
            return null;
        }
    }
}
# 全窗口函数(full window functions)
  1. 窗口函数(WindowFunction) 以基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类。
stream
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());

这个类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口(Window)本身的信息。

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, 
Serializable {
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws 
Exception;
}

当窗口到达结束时间需要触发计算时,就会调用这里的 apply 方法。我们可以从 input 集合中取出窗口收集的数据,结合 key 和 window 信息,通过收集器(Collector)输出结果。

  1. 处理窗口函数(ProcessWindowFunction) ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个160“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)

ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员

基于 WindowedStream 调用.process()方法,传入一个 ProcessWindowFunction 的实现类。

电商网站统计每小时UV的例子
public class UvCountByWindowExample {
    // 电商网站统计每小时UV的例子
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));

        // 将数据全部发往同一个分区,按窗口统计UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UvCountByWindow())
                .print();

        env.execute();

    }

    private static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {
        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            HashSet<String> userSet = new HashSet<>();
            // 遍历所有数据,放到set里去重
            for(Event event : elements)  {
                userSet.add(event.user);
            }

            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect("窗口:" + new Timestamp(start) + "~" + new Timestamp(end) + "的独立访客数量是:" + userSet.size());
        }
    }
}
# 增量聚合和全窗口函数的结合使用

增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。所以在实际使用中,我们希望兼具这两者的优点,把它们结合在一起使用。
在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。

public class UrlViewCountExample {

    public static void main(String[] args) throws Exception {
        //统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        // 需要按照url分组,开滑动窗口统计
        stream.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                // 同时传入增量聚合函数和全窗口函数
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult())
                .print();
        env.execute();
    }

    // 自定义增量聚合函数,来一条数据就加一
    private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    // 自定义窗口处理函数,只需要包装窗口信息
    private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>  {
        @Override
        public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();

            // 迭代器中只有一个元素,就是增量聚合函数的计算结果
            out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
        }
    }

    private static class UrlViewCount {
        public String url;

        public Long count;

        public Long windowStart;

        public Long windowEnd;

        // 省略无参、全参、tostring方法
    }

}

# 测试水位线和窗口的使用

当水位线到达窗口结束时间时,窗口就会闭合不再接收迟到的数据,因为根据水位线的定义,所有小于等于水位线的数据都已经到达,所以显然 Flink 会认为窗口中的数据都到达了(尽管可能存在迟到数据,也就是时间戳小于当前水位线的数据)。我们可以在之前生成水位线代码 WatermarkTest 的基础上,增加窗口应用做一下测试:

public class WatermarkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 将数据源改为socket文本流,并转换成event类型
        env.socketTextStream("192.168.207.102", 7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                    }
                })
                // 插入水位线的逻辑
                .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                @Override
                                public long extractTimestamp(Event element, long recordTimestamp) {
                                    return element.timestamp;
                                }
                            })
                )
                .keyBy(data -> data.user)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new WatermarkTestResult())
                .print();
        
        env.execute();
    }

    private static class WatermarkTestResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            Long currentWatermark = context.currentWatermark();
            Long count = elements.spliterator().getExactSizeIfKnown();
            out.collect("窗口" + start + " ~ " + end + "中共有" + count + "个元素,窗口闭合计算时,水位线处于:" + currentWatermark);
        }
    }
}

这里设置的最大延迟时间是 5 秒,所以当我们在终端启动 nc 程序,也就是 nc –lk 7777 然后输入如下数据时

Alice, ./home, 1000
Alice, ./cart, 2000
Alice, ./prod?id=100, 10000
Alice, ./prod?id=200, 8000
Alice, ./prod?id=300, 15000

输出结果

窗口0 ~ 10000中共有3个元素,窗口闭合计算时,水位线处于:9999

# 其他API

# 触发器(Trigger)

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程 基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。

stream.keyBy(...)
    .window(...)
    .trigger(new MyTrigger())

Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现。例如,所有事件时间窗口,默认的触发器都是 EventTimeTrigger;类似还有 ProcessingTimeTrigger 和 CountTrigger。所以一般情况下是不需要自定义触发器的,不过我们依然有必要了解它的原理。

Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。

上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口

Trigger 除了可以控制触发计算,还可以定义窗口什么时候关闭(销毁)。上面的四种类型,其实也就是这两个操作交叉配对产生的结果。一般我们会认为,到了窗口的结束时间,那么就会触发计算输出结果,然后关闭窗口——似乎这两个操作应该是同时发生的;但 TriggerResult 的定义告诉我们,两者可以分开。稍后我们就会看到它们分开操作的场景。

在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的
pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以
使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗
口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。
public class TriggerExample {

    //在日常业务场景中,我们经常会开比较大的窗口来计算每个窗口的
    //pv 或者 uv 等数据。但窗口开的太大,会使我们看到计算结果的时间间隔变长。所以我们可以
    //使用触发器,来隔一段时间触发一次窗口计算。我们在代码中计算了每个 url 在 10 秒滚动窗
    //口的 pv 指标,然后设置了触发器,每隔 1 秒钟触发一次窗口的计算。
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }))
                .keyBy(r -> r.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .trigger(new MyTrigger())
                .process(new WindowResult())
                .print();

        env.execute();
    }

    private static class MyTrigger extends Trigger<Event, TimeWindow> {
        @Override
        public TriggerResult onElement(Event element, long timestamp, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            if(isFirstEvent.value() == null) {
                for(long i = timeWindow.getStart(); i< timeWindow.getEnd(); i = i + 1000L) {
                    triggerContext.registerEventTimeTimer(i);
                }
                isFirstEvent.update(true);
            }
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.FIRE;
        }

        @Override
        public void clear(TimeWindow window, TriggerContext triggerContext) throws Exception {
            ValueState<Boolean> isFirstEvent = triggerContext.getPartitionedState(new ValueStateDescriptor<Boolean>("first-event", Types.BOOLEAN));
            isFirstEvent.clear();
        }
    }

    private static class WindowResult extends ProcessWindowFunction<Event, UrlViewCount, String, TimeWindow> {
        @Override
        public void process(String s, Context context, Iterable<Event> iterable, Collector<UrlViewCount> out) throws Exception {
            out.collect(new UrlViewCount(
                    s,
                    // 获取迭代器中的元素个数
                    iterable.spliterator().getExactSizeIfKnown(),
                    context.window().getStart(),
                    context.window().getEnd()
            ));
        }
    }

    private static class UrlViewCount {
        public String url;

        public Long count;

        public Long windowStart;

        public Long windowEnd;

        // 省略无参、全参、tostring方法
    }
}

输出结果

UrlViewCount{url='./fav', count=1, windowStart=1672732410000, windowEnd=1672732420000}
UrlViewCount{url='./fav', count=1, windowStart=1672732410000, windowEnd=1672732420000}
UrlViewCount{url='./prod?id=2', count=1, windowStart=1672732410000, windowEnd=1672732420000}
UrlViewCount{url='./prod?id=2', count=1, windowStart=1672732410000, windowEnd=1672732420000}
UrlViewCount{url='./fav', count=1, windowStart=1672732410000, windowEnd=1672732420000}
UrlViewCount{url='./prod?id=2', count=1, windowStart=1672732410000, windowEnd=1672732420000}
# 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器。

stream.keyBy(...)
 .window(...)
 .evictor(new MyEvictor())

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的。

# 允许延迟(Allowed Lateness)

当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃。
不过在多数情况下,直接丢弃数据也会导致统计结果不准确,我们还是希望该上车的人都能上来。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。
基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可 以表示允许这段时间内的延迟数据。

stream.keyBy(...)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .allowedLateness(Time.minutes(1))

从这里我们就可以看到,窗口的触发计算(Fire)和清除(Purge)操作确实可以分开。不过在默认情况下,允许的延迟是 0,这样一旦水位线到达了窗口结束时间就会触发计算并清除窗口,两个操作看起来就是同时发生了。当窗口被清除(关闭)之后,再来的数据就会被丢弃。

# 将迟到的数据放入侧输出流

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。
基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同。

DataStream<Event> stream = env.addSource(...);
OutputTag<Event> outputTag = new OutputTag<Event>("late") {};

stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)

将迟到数据放入侧输出流之后,还应该可以将它提取出来。基于窗口处理完成之后的DataStream,调用.getSideOutput()方法,传入对应的输出标签,就可以获取到迟到数据所在的流了。

SingleOutputStreamOperator<AggResult> winAggStream = stream.keyBy(...)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .sideOutputLateData(outputTag)
    .aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

这里注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据 类型应该和 OutputTag 指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

# 迟到数据的处理

所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

# 设置水位线的延迟时间

# 允许窗口处理迟到数据

# 将迟到数据放入窗口侧输出流

用窗口的侧输出流来收集关窗以后的迟到数据。这种方式是最后“兜底”的方法,只能保证数据不丢失;因为窗口已经真正关闭,所以是无法基于之前窗口的结果直接做更新的。我们只能将之前的窗口计算结果保存下来,然后获取侧输出流中的迟到数据,判断数据所属的窗口,手动对结果进行合并更新。尽管有些烦琐,实时性也不够强,但能够保证最终结果一定是正确的。

public class ProcessLateDataExample {

    public static void main(String[] args) throws Exception {
        //统计 10 秒钟的 url 浏览量,每 5 秒钟更新一次;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.socketTextStream("192.168.207.102", 7777)
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String value) throws Exception {
                        String[] fields = value.split(",");
                        return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
                    }
                })
                // 方式一:设置watermark延迟时间,2秒钟
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));

        // 定义侧输出标签
        OutputTag<Event> outputTag = new OutputTag<Event>("late") {
        };

        // 需要按照url分组,开滑动窗口统计
        SingleOutputStreamOperator<UrlViewCount> result = stream.keyBy(data -> data.url)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
                .allowedLateness(Time.minutes(1))
                // 方式三:将最后的迟到数据输出到侧输出流
                .sideOutputLateData(outputTag)
                // 同时传入增量聚合函数和全窗口函数
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        result.print("result");

        result.getSideOutput(outputTag).print("late");

        // 为了方便观察,将原始数据进行输出
        stream.print("input");

        env.execute();
    }

    // 自定义增量聚合函数,来一条数据就加一
    private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    // 自定义窗口处理函数,只需要包装窗口信息
    private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow>  {
        @Override
        public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();

            // 迭代器中只有一个元素,就是增量聚合函数的计算结果
            out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
        }
    }

    private static class UrlViewCount {
        public String url;

        public Long count;

        public Long windowStart;

        public Long windowEnd;

        //省略无参、全参、toString方法

}

启动 nc –lk 7777,然后依次输入以下数据:

Alice, ./home, 1000
Alice, ./home, 2000
Alice, ./home, 10000
Alice, ./home, 9000
Alice, ./cart, 12000
Alice, ./prod?id=100, 15000
Alice, ./home, 9000
Alice, ./home, 8000
Alice, ./prod?id=200, 70000
Alice, ./home, 8000
Alice, ./prod?id=300, 72000
Alice, ./home, 8000

输出

input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:01.0}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:02.0}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:10.0}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:09.0}
input> Event{user='Alice', url='./cart', timestamp=1970-01-01 08:00:12.0}
result> UrlViewCount{url='./home', count=3, windowStart=0, windowEnd=10000}
input> Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:15.0}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:09.0}
result> UrlViewCount{url='./home', count=4, windowStart=0, windowEnd=10000}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:08.0}
result> UrlViewCount{url='./home', count=5, windowStart=0, windowEnd=10000}
input> Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:01:10.0}
result> UrlViewCount{url='./cart', count=1, windowStart=10000, windowEnd=20000}
result> UrlViewCount{url='./home', count=1, windowStart=10000, windowEnd=20000}
result> UrlViewCount{url='./prod?id=100', count=1, windowStart=10000, windowEnd=20000}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:08.0}
result> UrlViewCount{url='./home', count=6, windowStart=0, windowEnd=10000}
input> Event{user='Alice', url='./prod?id=300', timestamp=1970-01-01 08:01:12.0}
input> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:08.0}
late> Event{user='Alice', url='./home', timestamp=1970-01-01 08:00:08.0}

# 处理函数

在更底层,我们可以不定义任何具体的算子(比如 map,filter,或者 window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function)。
数据流中最基本的元素:数据事件(event)、状态(state)以及时间(time)。这就相当于对流有了完全的控制权。处理函数比较抽象,没有具体的操作,所以对于一些常见的简单应用(比如求和、开窗口)会显得有些麻烦;不过正是因为它不限定具体做什么,所以理论上我们可以做任何事情,实现所有需求。所以可以说,处理函数是我们进行 Flink 编程的“大招”,轻易不用,一旦放出来必然会扫平一切。

# 基本处理函数(ProcessFunction)

***处理函数主要是定义数据流的转换操作,所以也可以把它归到转换算子中。***所对应的函数 类,叫作 ProcessFunction。

# 处理函数的功能和使用

处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册“定时事件”。而且处理函数继承了 AbstractRichFunction 抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑;同时也是整个 DataStream API 的底层基础。

基于 DataStream 调用.process()方法传入一个 ProcessFunction 作为参数,用来定义处理逻辑。

stream.process(new MyProcessFunction()) //processFunction是一个抽象类,继承了AbstractRichFunction,这里的MyProcessFunction是他的一个实现

例子

public class ProcessFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                )
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        if(value.user.equals("Mary")) {
                            out.collect(value.user);
                        }else if (value.user.equals("Bob")) {
                            out.collect(value.user);
                            out.collect(value.user);
                        }
                        System.out.println(ctx.timerService().currentWatermark());
                    }
                })
                .print();

        env.execute();
    }
}

# ProcessFunction解析

# 处理函数的分类

Flink 提供了 8 个不同的处理函数:

  1. ProcessFunction
    最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
  2. KeyedProcessFunction
    对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用定时器,比如基于 KeyedStream。
  3. ProcessWindowFunction
    开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作为参数传入。
  4. ProcessAllWindowFunction
    同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
  5. CoProcessFunction
    合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。关于流的连接合并操作,我们会在后续章节详细介绍。
  6. ProcessJoinFunction
    间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入
  7. BroadcastProcessFunction
    广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
  8. KeyedBroadcastProcessFunction
    按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。

# 按键分区处理函数(KeyedProcessFunction)

为了实现数据的聚合统计,或者开窗计算之类的功能,我们一般都要先用 keyBy 算子对数据流进行“按键分区”,得到一个 KeyedStream。也就是指定一个键(key),按照它的哈希值(hash code)将数据分成不同的“组”,然后分配到不同的并行子任务上执行计算;这相当于做了一个逻辑分流的操作,从而可以充分利用并行计算的优势实时处理海量数据。

在上节中也提到,只有在 KeyedStream 中才支持使用 TimerService 设置定时器的操作。

# 定时器(Timer)和定时服务(TimerService)

KeyedProcessFunction 的一个特色,就是可以灵活地使用定时器。

定时器(timers)是处理函数中进行时间相关操作的主要机制。在.onTimer()方法中可以实现定时处理的逻辑,而它能触发的前提,就是之前曾经注册过定时器、并且现在已经到了触发时间。注册定时器的功能,是通过上下文中提供的“定时服务”(TimerService)来实现的。

ProcessFunction 的上下文(Context)中提供了.timerService()方法,可以直接返回一个 TimerService 对象:

public abstract TimerService timerService();

// TimerService 是 Flink 关于时间和定时器的基础服务接口,包含以下六个方法:

// 获取当前的处理时间
long currentProcessingTime();

// 获取当前的水位线(事件时间)
long currentWatermark();

// 注册处理时间定时器,当处理时间超过 time 时触发
void registerProcessingTimeTimer(long time);

// 注册事件时间定时器,当水位线超过 time 时触发
void registerEventTimeTimer(long time);

// 删除触发时间为 time 的处理时间定时器
void deleteProcessingTimeTimer(long time);

// 删除触发时间为 time 的处理时间定时器
void deleteEventTimeTimer(long time);

WARNING

尽管处理函数中都可以直接访问TimerService,不过只有基于 KeyedStream 的处理函数,才能去调用注册和删除定时器的方法;未作按键分区的 DataStream 不支持定时器操作,只能获取当前时间。

对于每个 key 和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。

Flink 的定时器同样具有容错性,它和状态一起都会被保存到一致性检查点(checkpoint)中。当发生故障时,Flink 会重启并读取检查点中的状态,恢复定时器。如果是处理时间的定时器,有可能会出现已经“过期”的情况,这时它们会在重启时被立刻触发。

# KeyProcessFunction的使用

KeyedProcessFunction 可以说是处理函数中的“嫡系部队”,可以认为是 ProcessFunction 的一个扩展。我们只要基于 keyBy 之后的 KeyedStream,直接调用.process()方法,这时需要传入的参数就是 KeyedProcessFunction 的实现类。

stream.keyBy( t -> t.f0 )
    .process(new MyKeyedProcessFunction())

与 ProcessFunction 的定义几乎完全一样,区别只是在于类型参数多了一个 K,这是当前按键分区的 key 的类型。同样地,我们必须实现一个.processElement()抽象方法,用来处理流中的每一个数据;另外还有一个非抽象方法.onTimer(),用来定义定时器触发时的回调操作。由于定时器只能在 KeyedStream 上使用,所以到了 KeyedProcessFunction 这里,我们才真正对时间有了精细的控制,定时方法.onTimer()才真正派上了用场。

public class ProcessingTimeTimerTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 处理时间语义,不需要分配时间戳和watermark
        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());

        // 要用定时器,必须基于KeyedStream
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        Long currTs = ctx.timerService().currentProcessingTime();
                        out.collect("数据到达,到达时间:" + new Timestamp(currTs));

                        // 注册一个10秒后的定时器
                        ctx.timerService().registerProcessingTimeTimer(currTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
                    }
                })
                .print();

        env.execute();
    }
}

上面的例子是处理时间的定时器,所以我们是真的需要等待 10 秒才会看到结果。事件时间语义下,又会有什么不同呢?我们可以对上面的代码略作修改,做一个测试:

public class EventTimeTimerTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 处理时间语义,不需要分配时间戳和watermark
        SingleOutputStreamOperator<Event> stream = env.addSource(new CustomSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));

        // 要用定时器,必须基于KeyedStream
        stream.keyBy(data -> true)
                .process(new KeyedProcessFunction<Boolean, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("数据到达,到达时间:" + new Timestamp(ctx.timestamp()));
                        out.collect(" 数据到达,水位线为: " +
                                ctx.timerService().currentWatermark() + "\n-------分割线-------");
                        // 注册一个10秒后的定时器
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect("定时器触发,触发时间:" + new Timestamp(timestamp));
                    }
                })
                .print();

        env.execute();
    }

    private static class CustomSource implements SourceFunction<Event> {
        @Override
        public void run(SourceContext<Event> ctx) throws Exception {
            // 直接发出测试数据
            ctx.collect(new Event("Mary", "./home", 1000L));
            // 为了更加明显,中间停顿 5 秒钟
            Thread.sleep(5000L);
            // 发出 10 秒后的数据
            ctx.collect(new Event("Mary", "./home", 11000L));
            Thread.sleep(5000L);
            // 发出 10 秒+1ms 后的数据
            ctx.collect(new Event("Alice", "./cart", 11001L));
            Thread.sleep(5000L);
        }

        @Override
        public void cancel() {

        }
    }
}

执行结果

数据到达,到达时间:1000
 数据到达,水位线为: -9223372036854775808
-------分割线-------
数据到达,到达时间:11000
 数据到达,水位线为: 999
-------分割线-------
数据到达,到达时间:11001
 数据到达,水位线为: 10999
-------分割线-------
定时器触发,触发时间:11000
定时器触发,触发时间:21000
定时器触发,触发时间:21001

当第一条数据到来,时间戳为 1000,可水位线的生成是周期性的(默认 200ms 一次),不会立即发生改变,所以依然是最小值 Long.MIN_VALUE;随后只要到了水位线生成的时间点(200ms 到了),就会依据当前的最大时间戳 1000 来生成水位线了。这里我们没有设置水位线延迟,默认需要减去 1 毫秒,所以水位线推进到了 999。而当时间戳为 11000 的第二条数据到来之后,水位线同样没有立即改变,仍然是 999,就好像总是“滞后”数据一样。

# 窗口处理函数

除 了 KeyedProcessFunction , 另 外 一 大 类 常 用 的 处 理 函 数 , 就 是 基 于 窗 口 的ProcessWindowFunction 和 ProcessAllWindowFunction 了。

# 窗口处理函数的使用

基于WindowedStream 直接调用.process()。

stream.keyBy( t -> t.f0 )
    .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
    .process(new MyProcessWindowFunction())

# ProcessWindowFunction 解析

ProcessWindowFunction 既是处理函数又是全窗口函数。从名字上也可以推测出,它的本质似乎更倾向于“窗口函数”一些。

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
 extends AbstractRichFunction {
    ...
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    public void clear(Context context) throws Exception {}
    public abstract class Context implements java.io.Serializable {...}
}

这里的参数不再是一个输入数据,而是窗口中所有数据的集合。而上下文context 所包含的内容也跟其他处理函数有所差别:

public abstract class Context implements java.io.Serializable {
    public abstract W window();
    public abstract long currentProcessingTime();
    public abstract long currentWatermark();
    public abstract KeyedStateStore windowState();
    public abstract KeyedStateStore globalState();
    public abstract <X> void output(OutputTag<X> outputTag, X value);
}

除了可以通过.output()方法定义侧输出流不变外,其他部分都有所变化。这里不再持有TimerService 对象,只能通过 currentProcessingTime()和 currentWatermark()来获取当前时间,所以失去了设置定时器的功能;另外由于当前不是只处理一个数据,所以也不再提供.timestamp()方法。与此同时,也增加了一些获取其他信息的方法:比如可以通过.window()直接获取到当前的窗口对象,也可以通过.windowState()和.globalState()获取到当前自定义的窗口状态和全局状态。注意这里的“窗口状态”是自定义的,不包括窗口本身已经有的状态,针对当前 key、当前窗口有效;而“全局状态”同样是自定义的状态,针对当前 key 的所有窗口有效。

ProcessWindowFunction 中除了.process()方法外,并没有.onTimer()方法,而是多出了一个.clear()方法。从名字就可以看出,这主要是方便我们进行窗口的清理工作。如果我们自定义了窗口状态,那么必须在.clear()方法中进行显式地清除,避免内存溢出。

这里有一个问题:没有了定时器,那窗口处理函数就失去了一个最给力的武器,如果我们希望有一些定时操作又该怎么做呢?其实仔细思考会发现,对于窗口而言,它本身的定义就包含了一个触发计算的时间点,其实一般情况下是没有必要再去做定时操作的。如果非要这么干,Flink也提供了另外的途径——使用窗口触发器(Trigger)。在触发器中也有一个TriggerContext,它可以起到类似 TimerService 的作用:获取当前时间、注册和删除定时器,另外还可以获取当前的状态。这样设计无疑会让处理流程更加清晰——定时操作也是一种“触发”,所以我们就让所有的触发操作归触发器管,而所有处理数据的操作则归窗口函数管。

至于另一种窗口处理函数 ProcessAllWindowFunction,它的用法非常类似。区别在于它基于的是 AllWindowedStream,相当于对没有 keyBy 的数据流直接开窗并调用.process()方法:

stream.windowAll( TumblingEventTimeWindows.of(Time.seconds(10)) )
    .process(new MyProcessAllWindowFunction())

# 应用案例——TopN

需要统计最近10 秒钟内最热门的两个 url 链接,并且每 5 秒钟更新一次。我们知道,这可以用一个滑动窗口来实现,而“热门度”一般可以直接用访问量来表示。于是就需要开滑动窗口收集 url 的访问数据,按照不同的 url 进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。

简单的增量聚合可以得到 url 链接的访问量,但是后续的排序输出 Top N 就很难实现了。所以接下来我们用窗口处理函数进行实现。

# 使用ProcessAllWindowFunction

一种最简单的想法是,我们干脆不区分 url 链接,而是将所有访问数据都收集起来,统一进行统计计算。所以可以不做 keyBy,直接基于 DataStream 开窗,然后使用全窗口函数ProcessAllWindowFunction 来进行处理。

在窗口中可以用一个 HashMap 来保存每个 url 的访问次数,只要遍历窗口中的所有数据,自然就能得到所有 url 的热门度。最后把 HashMap 转成一个列表 ArrayList,然后进行排序、取出前两名输出就可以了。

public class ProcessAllWindowTopN {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
        );

        SingleOutputStreamOperator<String> result = eventStream.map(new MapFunction<Event, String>() {
                    @Override
                    public String map(Event value) throws Exception {
                        return value.url;
                    }
                }).windowAll(
                        SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))
                )
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> urlCountMap = new HashMap<>();
                        // 遍历窗口中数据,将浏览量保存到一个HashMap中
                        for(String url : elements) {
                            if (urlCountMap.containsKey(url)) {
                                long count = urlCountMap.get(url);
                                urlCountMap.put(url, count + 1L);
                            } else {
                                urlCountMap.put(url, 1L);
                            }
                        }

                        ArrayList<Tuple2<String, Long>> mapList = new ArrayList<>();
                        // 将浏览量数据放入ArrayList,进行排序
                        for(String key : urlCountMap.keySet()) {
                            mapList.add(Tuple2.of(key, urlCountMap.get(key)));
                        }

                        mapList.sort(new Comparator<Tuple2<String, Long>>() {
                            @Override
                            public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                                return o2.f1.intValue() - o1.f1.intValue();
                            }
                        });
                        // 取排序后的前两名,构建输出结果
                        StringBuilder result = new StringBuilder();

                        result.append("======================================================\n");

                        for(int i = 0; i<2; i++) {
                            Tuple2<String,Long> temp = mapList.get(i);
                            String info = "浏览量 No." + (i + 1) +
                                    " url:" + temp.f0 +
                                    " 浏览量:" + temp.f1 +
                                    " 窗 口 结 束 时 间 : " + new Timestamp(context.window().getEnd()) + "\n";

                            result.append(info);
                        }

                        result.append("======================================================\n");

                        out.collect(result.toString());

                    }
                });

        result.print();

        env.execute();
    }
}

结果如图所示

======================================================
浏览量 No.1 url:./prod?id=2 浏览量:2 窗 口 结 束 时 间 : 2023-01-04 15:29:30.0
浏览量 No.2 url:./cart 浏览量:1 窗 口 结 束 时 间 : 2023-01-04 15:29:30.0
======================================================

# 使用KeyedProcessFunction

处理流程

  1. 读取数据源
  2. 筛选浏览行为(PV)
  3. 提取时间戳并生成水位线
  4. 按照url进行keyBy分区操作
  5. 开长度为 1 小时、步长为 5 分钟的事件时间滑动窗口
  6. 使用增量聚合函数 AggregateFunction,并结合全窗口函数 WindowFunction 进行窗口聚合,得到每个 url、在每个统计窗口内的浏览量,包装成 UrlViewCount;
  7. 按照窗口结束时间进行 keyBy 分区操作;
  8. 对同一窗口的统计结果数据,使用 KeyedProcessFunction 进行收集并排序输出。
public class KeyedProcessTopN {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从自定义数据源读取数据
        SingleOutputStreamOperator<Event> eventStream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Event>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Event>() {
                                            @Override
                                            public long extractTimestamp(Event element, long recordTimestamp) {
                                                return element.timestamp;
                                            }
                                        }
                                )
                );

        // 需要按照url分组,求出每个url的访问量
        SingleOutputStreamOperator<UrlViewCount> urlCountStream = eventStream.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        // 对结果中同一个窗口的统计数据,进行排序
        SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.windowEnd)
                .process(new TopN(2));

        result.print("result");

        env.execute();
    }

    private static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    private static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
        @Override
        public void process(String url, Context context, Iterable<Long> elements, Collector<UrlViewCount> out) throws Exception {
            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect(new UrlViewCount(url, elements.iterator().next(), start, end));
        }
    }

    private static class TopN extends KeyedProcessFunction<Long, UrlViewCount, String> {

        //将n 作为属性
        private Integer n;

        // 定义一个列表状态
        private ListState<UrlViewCount> urlViewCountListState;
        public TopN(Integer n) {
            this.n = n;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 从环境中获取列表状态句柄
            urlViewCountListState = getRuntimeContext()
                    .getListState(
                            new ListStateDescriptor<UrlViewCount>(
                                    "url-view-count-list",
                                    Types.POJO(UrlViewCount.class))
                    );
        }


        @Override
        public void processElement(UrlViewCount value, Context ctx, Collector<String> out) throws Exception {
            // 将count数据添加到列表状态中,保存起来
            urlViewCountListState.add(value);
            // 注册window end + 1ms 后的定时器, 等待所有数据到齐开始排序
            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);

        }

        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, UrlViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // 将数据从列表状态变量中取出,放入ArrayList,方便排序
            ArrayList<UrlViewCount> urlViewCountArrayList = new ArrayList<>();
            for(UrlViewCount urlViewCount: urlViewCountListState.get()) {
                urlViewCountArrayList.add(urlViewCount);
            }

            // 清空状态,释放资源
            urlViewCountListState.clear();

            //排序
            urlViewCountArrayList.sort(new Comparator<UrlViewCount>() {
                @Override
                public int compare(UrlViewCount o1, UrlViewCount o2) {
                    return o2.count.intValue() - o1.count.intValue();
                }
            });

            // 取排序后的前两名,构建输出结果
            StringBuilder result = new StringBuilder();

            result.append("======================================================\n");

            for(int i = 0; i<this.n; i++) {
                UrlViewCount urlViewCount = urlViewCountArrayList.get(i);
                String info = "浏览量 No." + (i + 1) +
                        " url:" + urlViewCount.url +
                        " 浏览量:" + urlViewCount.count +
                        " 窗 口 结 束 时 间 : " + urlViewCount.windowEnd + "\n";

                result.append(info);
            }

            result.append("======================================================\n");

            out.collect(result.toString());

        }
    }
}

在 open 方法中初始化了列表状态变量,我们初始化的时候使用了 ListStateDescriptor描述符,这个描述符用来告诉 Flink 列表状态变量的名字和类型。列表状态变量是单例,也就是说只会被实例化一次。这个列表状态变量的作用域是当前 key 所对应的逻辑分区。我们使用add 方法向列表状态变量中添加数据,使用 get 方法读取列表状态变量中的所有元素。

# 侧输出流(Side Output)

处理函数还有另外一个特有功能,就是将自定义的数据放入“侧输出流”(side output)输出。

侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了。

public class ProcessSideOutputExample {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        OutputTag<Event> outputTag = new OutputTag<Event>("side-output"){};

        SingleOutputStreamOperator<Event> result = env.addSource(new ClickSource()).process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")) {
                    ctx.output(outputTag, value);
                } else {
                    out.collect(value);
                }
            }
        });

        result.print("result");

        result.getSideOutput(outputTag).print("side-output");

        env.execute();
    }
}

# 多流转换

多流转换可以分为“分流”和“合流”两大类。目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作。

# 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子 DataStream

# 简单实现

针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。

# 使用侧输出流

public class SplitStreamByOutputTag {

    // 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)
    private static OutputTag<Tuple3<String, String, Long>> marryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};
    private static OutputTag<Tuple3<String, String, Long>> bogTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        SingleOutputStreamOperator<Event> process = stream.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")) {
                    ctx.output(marryTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")) {
                    ctx.output(bogTag, new Tuple3<>(value.user, value.url, value.timestamp));
                } else {
                    out.collect(value);
                }
            }
        });

        process.getSideOutput(marryTag).print("Mary PV");
        process.getSideOutput(bogTag).print("Bog pv");
        process.print("else");

        env.execute();

    }
}

结果输出

Bog pv> (Bob,./prod?id=1,1672976877462)
else> Event{user='Cary', url='./cart', timestamp=2023-01-06 11:47:58.463}
else> Event{user='Cary', url='./fav', timestamp=2023-01-06 11:47:59.466}
else> Event{user='Cary', url='./home', timestamp=2023-01-06 11:48:00.467}
else> Event{user='Alice', url='./prod?id=1', timestamp=2023-01-06 11:48:01.469}
else> Event{user='Alice', url='./cart', timestamp=2023-01-06 11:48:02.472}
Mary PV> (Mary,./home,1672976883475)
else> Event{user='Alice', url='./prod?id=2', timestamp=2023-01-06 11:48:04.479}

# 基本合流操作

在实际应用中,经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的API 也更加丰富。

# 联合

直接将多条流合在一起,叫作流的“联合”(union)

联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。

基于DataStream直接调用.union()方法,传入其他DataStream作为参数,就可以实现流的联合,得到的依然是一个DataStream

stream1.union(stream2, stream3, ...)

union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

以对于合流之后的水位线,也是以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据

public class UnionExample {

    public static void main(String[] args) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777).map(data -> {
            String[] filed = data.split(",");
            return new Event(filed[0].trim(), filed[1].trim(), Long.valueOf(filed[2].trim()));
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );
        stream1.print("hadoop102");

        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777).map(data -> {
            String[] filed = data.split(",");
            return new Event(filed[0].trim(), filed[1].trim(), Long.valueOf(filed[2].trim()));
        }).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );
        stream2.print("hadoop103");

        // 合并两条流
        stream1.union(stream2).process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                out.collect("水位线:" + ctx.timerService().currentWatermark());
            }
        }).print("union");

        env.execute();

    }
}

输出结果

hadoop103> Event{user='bob', url='/home', timestamp=1970-01-01 08:00:01.0}
union> 水位线:-9223372036854775808
hadoop102> Event{user='mary', url='/index', timestamp=1970-01-01 08:00:02.0}
union> 水位线:-9223372036854775808
hadoop103> Event{user='cici', url='/pay', timestamp=1970-01-01 08:00:03.0}
union> 水位线:-1001
hadoop102> Event{user='lala', url='/detail', timestamp=1970-01-01 08:00:04.0}
union> 水位线:-1
hadoop103> Event{user='java', url='/java', timestamp=1970-01-01 08:00:05.0}
union> 水位线:999
hadoop102> Event{user='python', url='/python', timestamp=1970-01-01 08:00:06.0}
union> 水位线:1999
hadoop103> Event{user='go', url='/go', timestamp=1970-01-01 08:00:07.0}
union> 水位线:2999
hadoop103> Event{user='c', url='/c', timestamp=1970-01-01 08:00:08.0}
union> 水位线:3999

# 连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。顾名 思义,这种操作就是直接把两条流像接线一样对接起来。

  1. 连接流(ConnectedStreams) 两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个 DataStream 中。
public class CoMapExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3);
        DataStreamSource<Long> stream2 = env.fromElements(1L, 2L, 3L);

        ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);

        SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "Integer: " + value;
            }

            @Override
            public String map2(Long value) throws Exception {
                return "Long: " + value;
            }
        });

        result.print();

        env.execute();

    }
}

输出结果:

Integer: 1
Long: 1
Integer: 2
Long: 2
Integer: 3
Long: 3

,ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams:

connectedStreams.keyBy(keySelector1, keySelector2);

TIP

两条流的连接(connect),与联合(union)操作相比,最大的优势就是可以处理不同类型的流的合并,使用更灵活、应用更广泛。当然它也有限制,就是合并流的数量只能是 2,而 union可以同时进行多条流的合并。这也非常容易理解:union 限制了类型不变,所以直接合并没有问题;而 connect 是“一国两制”,后续处理的接口只定义了两个转换方法,如果扩展需要重新定义接口,所以不能“一国多制”

  1. CoProcessFunction CoProcessFunction 也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。
// 实现一个实时对账的需求,也就是
//app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将
//会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息。
public class BillCheckExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 来自app的支付日志
        SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                }));

        // 来自第三方支付平台的支付日志
        SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy
                .<Tuple4<String, String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {
                        return element.f3;
                    }
                }));

        // 检测同一支付单在两条流中是否匹配,不匹配就报警
        appStream.connect(thirdpartStream)
                .keyBy(data -> data.f0, data -> data.f0)
                .process(new OrderMatchResult())
                .print();

        env.execute();
    }

    private static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String> {

        // 定义状态变量,用来保存已经到达的事件
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        @Override
        public void open(Configuration parameters) throws Exception {
            appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>(
                            "app-event",
                            Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)
                    )
            );

            thirdPartyEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple4<String, String, String, Long>>(
                            "thirdparty-event",
                            Types.TUPLE(Types.STRING, Types.STRING, Types.STRING, Types.LONG)
                    )
            );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            // 看另一条流中事件是否来过
            if(thirdPartyEventState.value() != null) {
                out.collect("对账成功:" + value + " " + thirdPartyEventState.value());
                // 清空状态
                thirdPartyEventState.clear();
            }else {
                // 更新状态
                appEventState.update(value);
                // 注册一个5秒后的定时器,开始等待另一条流的事件
                ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);
            }
        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            if(appEventState.value() != null) {
                out.collect("对账成功:" + appEventState.value() + " " + value);
                // 清空状态
                appEventState.clear();
            }else {
                // 更新状态
                thirdPartyEventState.update(value);
                // 注册一个5秒后的定时器,开始等待另一条流的事件
                ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);
            }
        }

        @Override
        public void onTimer(long timestamp, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // 定时触发,判断状态,如果某个状态不为空,说明另一条流中事件没来
            if(appEventState.value() != null) {
                out.collect("对账失败:" + appEventState.value() + " " + "第三方支付凭条信息未到!");
            }

            if(thirdPartyEventState.value() != null) {
                out.collect("对账失败:" + thirdPartyEventState.value() + " " + "app信息未到!");
            }

            appEventState.clear();

            thirdPartyEventState.clear();
        }
    }
}

输出结果

对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账失败:(order-2,app,2000) 第三方支付凭条信息未到!
对账失败:(order-3,third-party,success,4000) app信息未到!
  1. 广播连接流(BroadcastConnectedStream) DataStream 调用.connect()方法时,传入的参数也可以不是一个 DataStream,而是一个“广播流”(BroadcastStream),这时合并两条流得到的就变成了一个“广播连接流”(BroadcastConnectedStream)。

这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)。

广播状态底层是用一个“映射”(map)结构来保存的。在代码实现上,可以直接调用DataStream 的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态的名称和类型,就可以得到规则数据的“广播流”(BroadcastStream)

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

接下来就可以将要处理的数据流,与这条广播流进行连接(connect),得到的就是所谓的“广播连接流”(BroadcastConnectedStream)。基于 BroadcastConnectedStream 调用.process()方法,就可以同时获取规则和数据,进行动态处理了。

这里既然调用了.process()方法,当然传入的参数也应该是处理函数大家族中一员——如果对数据流调用过 keyBy 进行了按键分区,那么要传入的就是 KeyedBroadcastProcessFunction;如果没有按键分区,就传入 BroadcastProcessFunction。

DataStream<String> output = stream
    .connect(ruleBroadcastStream)
    .process( new BroadcastProcessFunction<>() {...} );

BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement()和.processBroadcastElement()。

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends 
BaseBroadcastProcessFunction {
...
 public abstract void processElement(IN1 value, ReadOnlyContext ctx, 
Collector<OUT> out) throws Exception;
 public abstract void processBroadcastElement(IN2 value, Context ctx, 
Collector<OUT> out) throws Exception;
...
}

案例在状态编程-广播状态

WARNING

todo 添加锚点,跳转至状态编程-广播状态

# 基于时间的合流——双流联结(Join)

为了更方便地实现基于时间的合流操作,Flink 的 DataStrema API 提供了两种内置的 join 算子,以及coGroup 算子。

SQL 中 join 一般会翻译为“连接”;我们这里为了区分不同的算子,一般的合流操作connect 翻译为“连接”,而把 join 翻译为“联结”。

# 窗口联结(Window Join)

如果我们希望将两条流的数据进行合并、且同样针对某段时间进行处理和统计。Flink 为这种场景专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

  1. 窗口联结的调用
    调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。

    stream1.join(stream2)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(<WindowAssigner>)
        .apply(<JoinFunction>)
    

    代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的 key;而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。

    .window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。

    后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。

    传入的 JoinFunction 也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。JoinFunction 在源码中的定义如下:

    public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
        OUT join(IN1 first, IN2 second) throws Exception;
    }
    

    注意,JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。

    当然,既然是窗口计算,在.window()和.apply()之间也可以调用可选 API 去做一些自定义,比如用.trigger()定义触发器,用.allowedLateness()定义允许延迟时间,等等。

  2. 窗口联结的处理流程
    两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理,得到的结果直接输出如图 8-8 所示。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果。

    除了 JoinFunction,在.apply()方法中还可以传入 FlatJoinFunction,用法非常类似,只是内部需要实现的.join()方法没有返回值。结果的输出是通过收集器(Collector)来实现的,所以对于一对匹配数据可以输出任意条结果。

  3. 窗口联结实例

    //在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流,
    //按照用户 ID 进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如
    //1 小时)来统计的,那我们就可以使用窗口 join 来实现这样的需求。
    public class WindowJoinExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
                    Tuple2.of("a", 1000L),
                    Tuple2.of("b", 1000L),
                    Tuple2.of("a", 2000L),
                    Tuple2.of("b", 2000L)
            ).assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                    return element.f1;
                                }
                            })
            );
    
            SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env.fromElements(
                    Tuple2.of("a", 3000L),
                    Tuple2.of("b", 3000L),
                    Tuple2.of("a", 4000L),
                    Tuple2.of("b", 4000L)
            ).assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                    return element.f1;
                                }
                            })
            );
    
            stream1.join(stream2)
                    .where(r -> r.f0)
                    .equalTo(r -> r.f0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                        @Override
                        public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                            return first + "=>" + second;
                        }
                    })
                    .print();
    
            env.execute();
        }
    }
    

    输出结果

    (a,1000)=>(a,3000)
    (a,1000)=>(a,4000)
    (a,2000)=>(a,3000)
    (a,2000)=>(a,4000)
    (b,1000)=>(b,3000)
    (b,1000)=>(b,4000)
    (b,2000)=>(b,3000)
    (b,2000)=>(b,4000)
    

# 间隔联结(Interval Join)

间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配。

  1. 间隔联结的原理 间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。所以匹配的条件为:

    a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

    注意,做间隔联结的两条流 A 和 B,也必须基于相同的 key;下界 lowerBound 应该小于等于上界 upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。

    pSEiSeA.png

    下方的流 A 去间隔联结上方的流 B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒,上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为 3 的元素,可匹配区间为[1, 4],B 中只有时间戳为 1 的一个数据可以匹配,于是得到匹配数据对(3, 1)。

  2. 间隔联结的调用 间隔联结在代码中,是基于 KeyedStream 的联结(join)操作。DataStream 在 keyBy 得到KeyedStream 之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个 KeyedStream,两者的 key 类型应该一致;得到的是一个 IntervalJoin 类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”ProcessJoinFunction.

    stream1
        .keyBy(<KeySelector>)
        .intervalJoin(stream2.keyBy(<KeySelector>))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process (new ProcessJoinFunction<Integer, Integer, String(){
            @Override
            public void processElement(Integer left, Integer right, Context ctx, 
            Collector<String> out) {
            out.collect(left + "," + right);
        }
        });
    

    可以看到,抽象类 ProcessJoinFunction 就像是 ProcessFunction 和 JoinFunction 的结合,内部同样有一个抽象方法.processElement()。与其他处理函数不同的是,它多了一个参数,这自然是因为有来自两条流的数据。参数中 left 指的就是第一条流中的数据,right 则是第二条流中与它匹配的数据。每当检测到一组匹配,就会调用这里的.processElement()方法,经处理转换之后输出结果。

  3. 间隔联结实例

    // 一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。
    public class IntervalJoinExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
                            Tuple3.of("Mary", "order-1", 5000L),
                            Tuple3.of("Alice", "order-2", 5000L),
                            Tuple3.of("Bob", "order-3", 20000L),
                            Tuple3.of("Alice", "order-4", 20000L),
                            Tuple3.of("Cary", "order-5", 51000L)
                    )
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                                    return element.f2;
                                }
                            }));
    
            SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                            new Event("Bob", "./cart", 2000L),
                            new Event("Alice", "./prod?id=100", 3000L),
                            new Event("Alice", "./prod?id=200", 3500L),
                            new Event("Bob", "./prod?id=2", 2500L),
                            new Event("Alice", "./prod?id=300", 36000L),
                            new Event("Bob", "./home", 30000L),
                            new Event("Bob", "./prod?id=1", 23000L),
                            new Event("Bob", "./prod?id=3", 33000L)
                    )
                    .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                @Override
                                public long extractTimestamp(Event element, long recordTimestamp) {
                                    return element.timestamp;
                                }
                            }));
    
            orderStream.keyBy(data -> data.f0)
                    .intervalJoin(clickStream.keyBy(data -> data.user))
                    .between(Time.seconds(-5), Time.seconds(10))
                    .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
                        @Override
                        public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
                            out.collect(right + "=>" + left);
                        }
                    })
                    .print();
    
            env.execute();
        }
    }
    

    输出结果

    Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0}=>(Alice,order-2,5000)
    Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.5}=>(Alice,order-2,5000)
    Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:30.0}=>(Bob,order-3,20000)
    Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:23.0}=>(Bob,order-3,20000)
    

# 窗口同组联结(Window CoGroup)

除窗口联结和间隔联结之外,Flink 还提供了一个“窗口同组联结”(window coGroup)操作。它的用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了。

stream1.coGroup(stream2)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .apply(<CoGroupFunction>)

与 window join 的区别在于,调用.apply()方法定义具体操作时,传入的是一个CoGroupFunction。这也是一个函数类接口,源码中定义如下:

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
 void coGroup(Iterable<IN1> first, Iterable<IN2>    second, Collector<O> out) 
throws Exception;
}

内部的.coGroup()方法,有些类似于 FlatJoinFunction 中.join()的形式,同样有三个参数,分别代表两条流中的数据以及用于输出的收集器(Collector)。不同的是,这里的前两个参数不再是单独的每一组“配对”数据了,而是传入了可遍历的数据集合。也就是说,现在不会再去计算窗口中两条流数据集的笛卡尔积,而是直接把收集到的所有数据一次性传入,至于要怎样配对完全是自定义的。这样.coGroup()方法只会被调用一次,而且即使一条流的数据没有任何另一条流的数据匹配,也可以出现在集合中、当然也可以定义输出结果了。

coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的“内连接”(inner join),也可以实现左外连接(left outer join)、右外连接(right outer join)和全外连接(full outer join)。事实上,窗口 join 的底层,也是通过 coGroup 来实现的。

public class CoGroupExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 1000L),
                Tuple2.of("a", 2000L),
                Tuple2.of("b", 2000L)
        ).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                return element.f1;
                            }
                        })
        );

        SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env.fromElements(
                Tuple2.of("a", 3000L),
                Tuple2.of("b", 3000L),
                Tuple2.of("a", 4000L),
                Tuple2.of("b", 4000L)
        ).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                                return element.f1;
                            }
                        })
        );

        stream1.coGroup(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
                        out.collect(first + "=>" + second);
                    }
                })
                .print();

        env.execute();
    }
}

输出结果:

[(a,1000), (a,2000)]=>[(a,3000), (a,4000)]
[(b,1000), (b,2000)]=>[(b,3000), (b,4000)]

# 状态编程

不论是简单聚合、窗口聚合,还是处理函数的应用,都会有状态的身影出现。状态就如同事务处理时数据库中保存的信息一样,是用来辅助进行任务计算的数据.而在 Flink 这样的分布式系统中,我们不仅需要定义出状态在任务并行时的处理方式,还需要考虑如何持久化保存、以便发生障时正确地恢复。

# Flink中的状态

每个任务进行计算处理时,可以基于当前数据直接转换得到输出结果;也可以依赖一些其他数据。这些由一个任务维护,并且用来计算输出结果的所有数据,就叫作这个任务的状态。

# 有状态算子

在Flink中,算子任务可以分为无状态和有状态两种情况。

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果,如 map、filter、flatMap,计算时不依赖其他数据,就都属于无状态的算子。

有状态的算子任务,则除当前数据之外,还需要一些其他数据来得到计算结果。这里的“其他数据”,就是所谓的状态(state),最常见的就是之前到达的数据,或者由之前数据计算出的某个结果。之前讲过的聚合算子、窗口算子都属于有状态的算子。

# 状态的管理

Flink 的解决方案是,将状态直接保存在内存中来保证性能,并通过分布式扩展来提高吞吐量。

大数据的场景下,我们必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题就会随之而来了

  • 状态的访问权限
  • 容错性
  • 横向扩展性

# 状态的分类

  1. 托管状态(Managed State)和原始状态(Raw State)

    • 托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;
    • 原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
  2. 算子状态(Operator State)和按键分区状态(Keyed State)

    • 算子状态:在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

    状态作用范围限定为当前的算子任务实例,也就是只对当前并行子任务实例有效。这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的

    • 按键分区状态:而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。

    可以通过富函数类(Rich Function)来自定义 Keyed State,所以只要提供了富函数类接口的算子,也都可以使用 Keyed State。

    即使是 map、filter 这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State,或者实现 CheckpointedFunction 接口来定义 Operator State;从这个角度讲,Flink 中所有的算子都可以是有状态的,不愧是“有状态的流处理”。

TIP

无论是 Keyed State 还是 Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

# 按键分区状态(Keyed State)

keyBy 之后的聚合、窗口计算,算子所持有的状态,都是 Keyed State。通过富函数类(Rich Function)对转换算子进行扩展、实现自定义功能,比如 RichMapFunction、RichFilterFunction。在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自 定义的状态也是 Keyed State。

# 基本概念和特点

按键分区状态(Keyed State)顾名思义,是任务按照键(key)来访问和维护的状态。它的特点非常鲜明,就是以 key 为作用范围进行隔离。

因为一个并行子任务可能会处理多个 key 的数据,所以 Flink 需要对 Keyed State 进行一些特殊优化。在底层,Keyed State 类似于一个分布式的映射(map)数据结构,所有的状态会根据 key 保存成键值对(key-value)的形式。这样当一条数据到来时,任务就会自动将状态的访问范围限定为当前数据的 key,从 map 存储中读取出对应的状态值。所以具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的。

WARNING

使用 Keyed State 必须基于 KeyedStream。没有进行 keyBy 分区的 DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问 Keyed State。

# 支持的结构类型

  1. 值状态(ValueState)
    状态中只保存一个“值”(value)

    public interface ValueState<T> extends State {
        T value() throws IOException;
        void update(T value) throws IOException;
    }
    

    在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。

    public ValueStateDescriptor(String name, Class<T> typeClass) {
        super(name, typeClass, null);
    }
    

    这里需要传入状态的名称和类型——这跟我们声明一个变量时做的事情完全一样。有了这个描述器,运行时环境就可以获取到状态的控制句柄(handler)了。

  2. 列表状态(ListState)
    将需要保存的数据,以列表(List)的形式组织起来。在 ListState<T>接口中同样有一个类型参数 T,表示列表中数据的类型。

    - Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型 Iterable<T>; 
    > 查看源码并没有找到该方法
    - update(List<T> values):传入一个列表 values,直接对状态进行覆盖;
    - add(T value):在状态列表中添加一个元素 value;
    - addAll(List<T> values):向列表中添加多个元素,以列表 values 形式传入。
    

    ListState 的状态描述器就叫作 ListStateDescriptor,用法跟 ValueStateDescriptor完全一致

  3. 映射状态(MapState) 把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState<UK, UV>接口中,就会有UK、UV两个泛型,分别表示保存的keyvalue的类型。

    - UV get(UK key):传入一个 key 作为参数,查询对应的 value 值;
    - put(UK key, UV value):传入一个键值对,更新 key 对应的 value 值;
    - putAll(Map<UK, UV> map):将传入的映射 map 中所有的键值对,全部添加到映射状态中;
    - remove(UK key):将指定 key 对应的键值对删除;
    - boolean contains(UK key):判断是否存在指定的 key,返回一个 boolean 值。另外,MapState 也提供了获取整个映射相关信息的方法:
    - Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
    - Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代 Iterable 类型;
    - Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代 Iterable类型;
    - boolean isEmpty():判断映射是否为空,返回一个 boolean 值。
    
  4. 规约状态(ReducingState) 类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducintState<T>这个接口调用的方法类似于 ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

    归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍 reduce 聚合算子时讲到的 ReduceFunction,所以状态类型跟输入的数据类型是一样的。

    public ReducingStateDescriptor(
        String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
    
  5. 聚合状态(AggregatingState) 与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与 ReducingState 不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的 AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

    AggregatingState 接口调用方法也与 ReducingState 相同,调用.add()方法添加元素时,会直接使用指定的 AggregateFunction 进行聚合并更新状态。

# 代码实现

  1. 整体介绍
    Flink 中,状态始终是与特定算子相关联的;算子在使用状态前首先需要“注册”,其实就是告诉 Flink 当前上下文中定义状态的信息,这样运行时的 Flink 才能知道算子有哪些状态。

    状态的注册,主要是通过“状态描述器”(StateDescriptor)来实现的。状态描述器中最重要的内容,就是状态的名称(name)和类型(type)

    因为状态的访问需要获取运行时上下文,这只能在富函数类(Rich Function)中获取到,所以自定义的 Keyed State 只能在富函数中使用。当然,底层的处理函数(Process Function)本身继承了 AbstractRichFunction 抽象类,所以也可以使用。

    在富函数中,调用.getRuntimeContext()方法获取到运行时上下文之后,RuntimeContext 有以下几个获取状态的方法:

    ValueState<T> getState(ValueStateDescriptor<T>)
    
    MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
    
    ListState<T> getListState(ListStateDescriptor<T>)
    
    ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
    
    AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
    

    获取到状态对象之后,就可以调用它们各自的方法进行读写操作了。另外,所有类型的状态都有一个方法.clear(),用于清除当前状态。

  2. 值状态

    // 使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想
    //每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时
    //间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保
    //存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间
    //戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,
    //注册完定时器之后将定时器的时间戳继续保存在状态变量中。
    public class PeriodicPvExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy
                                    .<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                        @Override
                                        public long extractTimestamp(Event event, long l) {
                                            return event.timestamp;
                                        }
                                    })
                    );
    
            stream.print("input");
    
            // 统计每个用户的pv,隔一段时间(10s)输出一次结果
            stream.keyBy(data -> data.user)
                    .process(new PeriodicPvResult())
                    .print();
    
            env.execute();
        }
    
        private static class PeriodicPvResult extends KeyedProcessFunction<String, Event, String> {
    
            // 定义两个状态,保存当前PV值,以及定时器时间戳
            ValueState<Long> countState;
            ValueState<Long> timerTsState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
                timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timerTs", Long.class));
            }
    
            @Override
            public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                // 更新count值
                Long count = countState.value();
                if(count == null) {
                    countState.update(1L);
                } else {
                    countState.update(count + 1);
                }
    
                // 注册定时器
                if(timerTsState.value() == null) {
                    ctx.timerService().registerEventTimeTimer(value.timestamp + 10 * 1000);
                    timerTsState.update(value.timestamp + 10 * 1000L);
                }
            }
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                out.collect(ctx.getCurrentKey() + "pv:" + countState.value());
                // 清空状态
                timerTsState.clear();
            }
        }
    }
    
  3. 列表状态(ListState)

    //在 Flink SQL 中,支持两条流的全量 Join,语法如下:
    //SELECT * FROM A INNER JOIN B WHERE A.id = B.id;
    //这样一条 SQL 语句要慎用,因为 Flink 会将 A 流和 B 流的所有数据都保存下来,然后进
    //行 Join。不过在这里我们可以用列表状态变量来实现一下这个 SQL 语句的功能。
    public class TwoStreamFullJoinExample {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Tuple3<String, String, Long>> stream1 = env.fromElements(
                    Tuple3.of("a", "stream-1", 1000L),
                    Tuple3.of("b", "stream-1", 2000L)
            ).assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<Tuple3<String, String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                    return t.f2;
                                }
                            })
            );
    
            SingleOutputStreamOperator<Tuple3<String, String, Long>> stream2 = env.fromElements(
                    Tuple3.of("a", "stream-2", 1000L),
                    Tuple3.of("b", "stream-2", 2000L)
            ).assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<Tuple3<String, String, Long>>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                                @Override
                                public long extractTimestamp(Tuple3<String, String, Long> t, long l) {
                                    return t.f2;
                                }
                            })
            );
    
            stream1.keyBy(r -> r.f0)
                    .connect(stream2.keyBy(r -> r.f0))
                    .process(new CoProcessFunction<Tuple3<String,String,Long>, Tuple3<String,String,Long>, String>() {
                        private ListState<Tuple3<String, String, Long>> stream1ListState;
    
                        private ListState<Tuple3<String, String, Long>> stream2ListState;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            super.open(parameters);
    
                            stream1ListState = getRuntimeContext().getListState(
                                    new ListStateDescriptor<Tuple3<String, String, Long>>(
                                            "stream1-list",
                                            Types.TUPLE(Types.STRING, Types.STRING)
                                    )
                            );
    
                            stream2ListState = getRuntimeContext().getListState(
                                    new ListStateDescriptor<Tuple3<String, String, Long>>(
                                            "stream2-list",
                                            Types.TUPLE(Types.STRING, Types.STRING)
                                    )
                            );
                        }
    
                        @Override
                        public void processElement1(Tuple3<String, String, Long> left, Context ctx, Collector<String> out) throws Exception {
                            stream1ListState.add(left);
                            for(Tuple3<String, String, Long> right : stream2ListState.get()) {
                                out.collect(left + "=>" + right);
                            }
                        }
    
                        @Override
                        public void processElement2(Tuple3<String, String, Long> right, Context ctx, Collector<String> out) throws Exception {
                            stream2ListState.add(right);
                            for(Tuple3<String, String, Long> left : stream1ListState.get()) {
                                out.collect(right + "=>" + left);
                            }
                        }
                    })
                    .print();
    
            env.execute();
        }
    }
    

    输出结果

    (a,stream-2,1000)=>(a,stream-1,1000)
    (b,stream-2,2000)=>(b,stream-1,2000)
    
  4. 映射状态(MapState)

    // 映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探
    //索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个
    //滚动窗口。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和
    //全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下。
    // 使用keyedProcessFunction模拟滚动窗口
    public class FakeWindowExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<Event>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                @Override
                                public long extractTimestamp(Event event, long l) {
                                    return event.timestamp;
                                }
                            })
            );
    
            // 统计每10s窗口内,每个url的pv
            stream.keyBy(data -> data.url)
                    .process(new FakeWindowResult(10000L))
                    .print();
    
            env.execute();
        }
    
        private static class FakeWindowResult extends KeyedProcessFunction<String, Event, String> {
            // 定义属性,窗口长度
            private Long windowSize;
    
    
            public FakeWindowResult(Long windowSize) {
                this.windowSize = windowSize;
            }
    
            // 声明状态,用map保存pv值(窗口start, count)
            MapState<Long, Long> windowPvMapState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                windowPvMapState = getRuntimeContext().getMapState(
                        new MapStateDescriptor<Long, Long>("window-pv", Long.class, Long.class)
                );
            }
    
            @Override
            public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                // 每来一条数据,就根据时间戳判断属于哪个窗口
                Long windowStart = value.timestamp / windowSize * windowSize;
                Long windowEnd = windowStart + windowSize;
    
                // 注册end-1的定时器,窗口触发计算
                ctx.timerService().registerEventTimeTimer(windowEnd -1 );
    
                // 更新状态中的pv值
                if(windowPvMapState.contains(windowStart)) {
                    Long pv = windowPvMapState.get(windowStart);
                    windowPvMapState.put(windowStart, pv + 1);
                }else {
                    windowPvMapState.put(windowStart, 1L);
                }
            }
    
            // 定时器触发,直接输出统计的PV结果
    
            @Override
            public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                Long windowEnd = timestamp + 1;
                Long windowStart = windowEnd - windowSize;
                Long pv = windowPvMapState.get(windowStart);
    
                out.collect("url: " + ctx.getCurrentKey()
                        + "访问量:" + pv + "窗口:"
                        + new Timestamp(windowStart)
                        + " ~ " + new Timestamp(windowEnd));
    
                // 模拟窗口的销毁,清楚map中的key
                windowPvMapState.remove(windowStart);
            }
        }
    }
    
  5. 聚合状态(aggregatingState)

    //对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类
    //似计数窗口(CountWindow)求平均值的计算,这里我们可以使用一个有聚合状态的
    //RichFlatMapFunction 来实现。
    public class AverageTimestampExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource()).assignTimestampsAndWatermarks(
                    WatermarkStrategy
                            .<Event>forMonotonousTimestamps()
                            .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                @Override
                                public long extractTimestamp(Event event, long l) {
                                    return event.timestamp;
                                }
                            })
            );
    
            // 统计每个用户的点击频次,到达5次就输出统计结果
            stream.keyBy(data -> data.user)
                    .flatMap(new AvgTsResult())
                    .print();
    
            env.execute();
        }
    
        private static class AvgTsResult extends RichFlatMapFunction<Event, String> {
    
            // 定义聚合状态,用来计算平均时间戳
            AggregatingState<Event, Long> avgTsAggState;
    
            // 定义一个值状态,用来保存当前用户访问频次
            ValueState<Long> countState;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                avgTsAggState = getRuntimeContext().getAggregatingState(
                        new AggregatingStateDescriptor<Event, Tuple2<Long, Long>, Long>(
                                "avg-ts",
                                new AggregateFunction<Event, Tuple2<Long, Long>, Long>() {
                                    @Override
                                    public Tuple2<Long, Long> createAccumulator() {
                                        return Tuple2.of(0L, 0L);
                                    }
    
                                    @Override
                                    public Tuple2<Long, Long> add(Event value, Tuple2<Long, Long> accumulator) {
                                        return Tuple2.of(accumulator.f0 + value.timestamp, accumulator.f1 + 1);
                                    }
    
                                    @Override
                                    public Long getResult(Tuple2<Long, Long> accumulator) {
                                        return accumulator.f0 / accumulator.f1;
                                    }
    
                                    @Override
                                    public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
                                        return null;
                                    }
                                },
                                Types.TUPLE(Types.LONG, Types.LONG)
                        ));
    
                countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("count", Long.class));
            }
    
            @Override
            public void flatMap(Event event, Collector<String> collector) throws Exception {
                Long count = countState.value();
    
                if(count == null) {
                    count = 1L;
                }else {
                    count ++;
                }
    
                countState.update(count);
                avgTsAggState.add(event);
    
                // 达到5次就输出结果,并清空状态
                if(count == 5) {
                    collector.collect(event.user + "平均时间戳:" + new Timestamp(avgTsAggState.get()));
                    countState.clear();
                }
            }
        }
    }
    

# 状态生存时间(TTL)

在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

实现逻辑:状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的.enableTimeToLive()方法启动 TTL 功能。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
  • .newBuilder() 状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
  • .setUpdateType() 设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
  • .setStateVisibility() 设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compaction filter)进行后台清理。

TIP

目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。

# 算子状态(Operator State)

算子状态是更底层的状态类型,因为它只针对当前算子并行任务有效,不需要考虑不同 key 的隔离。算子状态功能不如按键分区状态丰富,应用场景较少,它的调用方法也会有一些区别。

# 基本概念和特点

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。

算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。

当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

# 状态类型

  1. 列表状态(ListState)
    与 Keyed State 中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似, 是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。

算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

  1. 联合列表状态(UnionListState)
    与 ListState 类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

UnionListState 的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

  1. 广播状态(BroadcastState)
    有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。

在底层,广播状态是以类似映射结构(map)的键值对(key-value)来保存的,必须基于一个“广播流”(BroadcastStream)来创建。

# 代码实现

对于 Operator State 来说因为不存在 key,所有数据发往哪个分区是不可预测的;也就是说,当发生故障重启之后,我们不能保证某个数据跟之前一样,进入到同一个并行子任务、访问同一个状态。所以 Flink 无法直接判断该怎样保存和恢复状态,而是提供了接口,让我们根据业务需求自行设计状态的快照保存(snapshot)和恢复(restore)逻辑。

  1. CheckpointedFunction接口
    在 Flink 中,对状态进行持久化保存的快照机制叫作“检查点”(Checkpoint)。于是使用算子状态时,就需要对检查点的相关操作进行定义,实现一个 CheckpointedFunction 接口。

    public interface CheckpointedFunction {
        // 保存状态快照到检查点时,调用这个方法
        void snapshotState(FunctionSnapshotContext context) throws Exception
        // 初始化状态时调用这个方法,也会在恢复状态时调用
        void initializeState(FunctionInitializationContext context) throws Exception;
    }
    

    每次应用保存检查点做快照时,都会调用.snapshotState()方法,将状态进行外部持久化。而在算子任务进行初始化时,会调用. initializeState()方法。这又有两种情况:一种是整个应用第一次运行,这时状态会被初始化为一个默认值(default value);另一种是应用重启时,从检查点(checkpoint)或者保存点(savepoint)中读取之前状态的快照,并赋给本地状态。所以,接口中的.snapshotState()方法定义了检查点的快照保存逻辑,而. initializeState()方法不仅定义了初始化逻辑,也定义了恢复逻辑。

    CheckpointedFunction 接口中的两个方法,分别传入了一个上下文(context)作为参数。不同的是,.snapshotState()方法拿到的是快照的上下文 FunctionSnapshotContext,它可以提供检查点的相关信息,不过无法获取状态句柄;而. initializeState()方法拿到的是FunctionInitializationContext,这是函数类进行初始化时的上下文,是真正的“运行时上下文”。FunctionInitializationContext 中提供了“算子状态存储”(OperatorStateStore)和“按键分区状态存储(” KeyedStateStore),在这两个存储对象中可以非常方便地获取当前任务实例中的 Operator State 和 Keyed State。

    ListStateDescriptor<String> descriptor =
        new ListStateDescriptor<>(
            "buffered-elements",
            Types.of(String)
        );
    ListState<String> checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    

    算子状态的注册和使用跟 Keyed State 非常类似,也是需要先定义一个状态描述器(StateDescriptor),告诉 Flink 当前状态的名称和类型,然后从上下文提供的算子状态存储(OperatorStateStore)中获取对应的状态对象。如果想要从 KeyedStateStore 中获取 Keyed State也是一样的,前提是必须基于定义了 key 的 KeyedStream,这和富函数类中的方式并不矛盾。通过这里的描述可以发现,CheckpointedFunction 是 Flink 中非常底层的接口,它为有状态的流处理提供了灵活且丰富的应用。

  2. 示例代码

    // 自定义的 SinkFunction 会在
    //CheckpointedFunction 中进行数据缓存,然后统一发送到下游。这个例子演示了列表状态的平
    //均分割重组(event-split redistribution)。
    public class BufferingSinkExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                    .assignTimestampsAndWatermarks(
                            WatermarkStrategy
                                    .<Event>forMonotonousTimestamps()
                                    .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                                        @Override
                                        public long extractTimestamp(Event element, long recordTimestamp) {
                                            return element.timestamp;
                                        }
                                    })
                    );
    
            stream.print("input");
    
            // 批量缓存输出
            stream.addSink(new BufferingSink(10));
            env.execute();
        }
    
        private static class BufferingSink implements SinkFunction<Event>, CheckpointedFunction {
    
            private final int threshold;
    
            private transient ListState<Event> checkpointedState;
    
            private List<Event> bufferedElement;
    
    
            public BufferingSink(int threshold) {
                this.threshold = threshold;
                this.bufferedElement = new ArrayList<>();
            }
    
            @Override
            public void invoke(Event value, Context context) throws Exception {
                bufferedElement.add(value);
    
                if(bufferedElement.size() == threshold) {
                    for(Event element : bufferedElement) {
                        // 输出到外部系统,这里用控制台打印模式
                        System.out.println(element);
                    }
                    System.out.println("========输出完毕======");
    
                    bufferedElement.clear();
                }
            }
    
            @Override
            public void snapshotState(FunctionSnapshotContext context) throws Exception {
                checkpointedState.clear();
                // 把当前局部变量中的所有元素写入到检查点中
                for(Event element : bufferedElement) {
                    checkpointedState.add(element);
                }
            }
    
            @Override
            public void initializeState(FunctionInitializationContext context) throws Exception {
                ListStateDescriptor<Event> decriptor =
                        new ListStateDescriptor<>("buffered-elements", Types.POJO(Event.class));
    
                checkpointedState = context.getOperatorStateStore().getListState(decriptor);
    
                // 如果从故障中恢复,就将ListState中的所有元素添加到局部变量中
                if(context.isRestored()) {
                    for(Event element : checkpointedState.get()) {
                        bufferedElement.add(element);
                    }
                }
            }
        }
    }
    

    对于不同类型的算子状态,需要调用不同的获取状态对象的接口,对应地也就会使用不同的状态分配重组算法。比如获取列表状态时,调用.getListState() 会使用最简单的 平均分割重组(even-split redistribution)算法;而获取联合列表状态时,调用的是.getUnionListState() ,对应就会使用联合重组(union redistribution) 算法。

# 广播状态(Broadcast State)

算子状态中有一类很特殊,就是广播状态(Broadcast State)。从概念和原理上讲,广播状态非常容易理解:状态广播出去,所有并行子任务的状态都是相同的;并行度调整时只要直接复制就可以了。然而在应用上,广播状态却与其他算子状态大不相同。

# 基本用法

流处理的“事件驱动”思路——我们可以将这动态的配置数据看作一条流,将这条流和本身要处理的数据流进行连接(connect),就可以实时地更新配置进行计算了。

由于配置或者规则数据是全局有效的,我们需要把它广播给所有的并行子任务。而子任务需要把它作为一个算子状态保存起来,以保证故障恢复后处理结果是一致的。这时的状态,就是一个典型的广播状态。我们知道,广播状态与其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)。

直接调用 DataStream 的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态的名称和类型,就可以得到一个“广播流”(BroadcastStream);进而将要处理的数据流与这条广播流进行连接(connect),就会得到“广播连接流”(BroadcastConnectedStream)。注意广播状态只能用在广播连接流中。

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);
DataStream<String> output = stream.connect(ruleBroadcastStream).process( new BroadcastProcessFunction<>() {...} );

对 于 广 播 连 接 流 调 用 .process() 方 法 , 可 以 传 入 “ 广 播 处 理 函 数 ”KeyedBroadcastProcessFunction 或者 BroadcastProcessFunction 来进行处理计算。广播处理函数里面有两个方法.processElement()和.processBroadcastElement()

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
...
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
...
}

这里的.processElement()方法,处理的是正常数据流,第一个参数 value 就是当前到来的流数据;而.processBroadcastElement()方法就相当于是用来处理广播流的,它的第一个参数 value就是广播流中的规则或者配置数据。两个方法第二个参数都是一个上下文 ctx,都可以通过调用.getBroadcastState()方法获取到当前的广播状态;区别在于,.processElement()方法里的上下文 是 “ 只 读 ” 的 ( ReadOnly ), 因 此 获 取 到 的 广 播 状 态 也 只 能 读 取 不 能 更 改 ;而.processBroadcastElement()方法里的 Context 则没有限制,可以根据当前广播流中的数据更新状态。

Rule rule = ctx.getBroadcastState( new MapStateDescriptor<>("rules", Types.String, Types.POJO(Rule.class))).get("my rule");

# 代码实例

//在电商应用中,往往需要判断用户先后发生
//的行为的“组合模式”,比如“登录-下单”或者“登录-支付”,检测出这些连续的行为进行统
//计,就可以了解平台的运用状况以及用户的行为习惯。
public class ProadcastStateExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Action> actionStream = env.fromElements(
                new Action("Alice", "login"),
                new Action("Alice", "pay"),
                new Action("Bob", "login"),
                new Action("Bob", "buy")
        );

        DataStreamSource<Pattern> patternStream = env.fromElements(
                new Pattern("login", "pay"),
                new Pattern("login", "buy")
        );

        // 定义广播流的描述器,创建广播流
        MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<Void, Pattern>("patterns", Types.VOID, Types.POJO(Pattern.class));
        BroadcastStream<Pattern> bcPatterns = patternStream.broadcast(bcStateDescriptor);

        // 将事件流和广播流连接起来,进行处理
        SingleOutputStreamOperator<Tuple2<String, Pattern>> matches = actionStream.keyBy(data -> data.userId)
                .connect(bcPatterns)
                .process(new PatternEvaluator());

        matches.print();

        env.execute();

    }

    private static class PatternEvaluator extends KeyedBroadcastProcessFunction<String, Action, Pattern, Tuple2<String, Pattern>> {

        // 定义一个值状态,保存上一次用户行为
        ValueState<String> prevActionState;

        @Override
        public void open(Configuration parameters) throws Exception {
            prevActionState  = getRuntimeContext().getState(new ValueStateDescriptor<String>("lastAction", Types.STRING));
        }

        @Override
        public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
            Pattern pattern = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class))).get(null);
            String prevAction = prevActionState.value();
            if(pattern != null && prevAction != null) {
                // 如果前后两次行为都符合模式的定义,输出一组匹配
                if(pattern.action1.equals(prevAction) && pattern.action2.equals(action.action)) {
                    out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
                }
            }
            // 更新状态
            prevActionState.update(action.action);
        }

        @Override
        public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<String, Pattern>> out) throws Exception {
            BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)));

            // 将广播状态更新为当前的pattern
            bcState.put(null, pattern);
        }
    }

    // 定义用户行为时事件POJO类
    public static class Action {
        public String userId;

        public String action;

        public Action() {
        }

        public Action(String userId, String action) {
            this.userId = userId;
            this.action = action;
        }

        @Override
        public String toString() {
            return "Action{" +
                    "userId='" + userId + '\'' +
                    ", action='" + action + '\'' +
                    '}';
        }
    }

    // 定义行为模式POJO类,包含先后发生的两个行为
    public static class Pattern {
        public String action1;
        public String action2;

        public Pattern() {
        }

        public Pattern(String action1, String action2) {
            this.action1 = action1;
            this.action2 = action2;
        }

        @Override
        public String toString() {
            return "Pattern{" +
                    "action1='" + action1 + '\'' +
                    ", action2='" + action2 + '\'' +
                    '}';
        }
    }
}

这里我们将检测的行为模式定义为 POJO 类 Pattern,里面包含了连续的两个行为。由于广播状态中只保存了一个 Pattern,并不关心 MapState 中的 key,所以也可以直接将 key 的类型指定为 Void,具体值就是 null。在具体的操作过程中,我们将广播流中的 Pattern 数据保存为广播变量;在行为数据 Action 到来之后读取当前广播变量,确定行为模式,并将之前的一次行为保存为一个 ValueState——这是针对当前用户的状态保存,所以用到了 Keyed State。检测到如果前一次行为与 Pattern 中的 action1 相同,而当前行为与 action2 相同,则发现了匹配模式的一组行为,输出检测结果。

# 状态持久化和状态后端

Flink 对状态进行持久化的方式,就是将当前所有分布式状态进行“快照”保存,写入一个“检查点”(checkpoint)或者保存点(savepoint)保存到外部存储系统中。具体的存储介质,一般是分布式文件系统(distributed file system)。

# 检查点(Checkpoint)

默认情况下,检查点是被禁用的,需要在代码中手动开启。直接调用执行环境的.enableCheckpointing()方法就可以开启检查点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
env.enableCheckpointing(1000);

除了检查点之外,Flink 还提供了“保存点”(savepoint)的功能。保存点在原理和形式上跟检查点完全一样,也是状态持久化保存的一个快照;区别在于,保存点是自定义的镜像保存,所以不会由 Flink 自动创建,而需要用户手动触发。这在有计划地停止、重启应用时非常有用。

# 状态后端(State Backends)

检查点的保存离不开 JobManager 和 TaskManager,以及外部存储系统的协调。在应用进行检查点保存时,首先会由 JobManager 向所有 TaskManager 发出触发检查点的命令;TaskManger 收到之后,将当前任务的所有状态进行快照保存,持久化到远程的存储介质中;完成之后向 JobManager 返回确认信息。这个过程是分布式的,当JobManger 收到所有TaskManager 的返回信息后,就会确认当前检查点成功保存。

pSZm3ZT.png

在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

  1. 状态后端的分类
    状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。Flink 中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌 RocksDB 状态后端”(EmbeddedRocksDBStateBackend)。如果没有特别配置,系统默认的状态后端是 HashMapStateBackend。

    • 哈希表状态后端(HashMapStateBackend) 这种方式状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表 (HashMap),这种状态后端也因此得名。

      对于检查点的保存,一般是放在持久化的分布式文件系统(file system)中,也可以通过配置“检查点存储”(CheckpointStorage)来另外指定。

    • 内嵌 RocksDB 状态后端(EmbeddedRocksDBStateBackend) RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。

      EmbeddedRocksDBStateBackend 始终执行的是异步快照,也就是不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

      由于它会把状态数据落盘,而且支持增量化的检查点,所以在状态非常大、窗口非常长、键/值状态很大的应用场景中是一个好选择,同样对所有高可用性设置有效。

  2. 如何选择正确的状态后端
    HashMap 和 RocksDB 两种状态后端最大的区别,就在于本地状态存放在哪里:前者是内存,后者是 RocksDB。

    HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

    RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。

  3. 状态后端的配置
    在不做配置的时候,应用程序使用的默认状态后端是由集群配置文件 flink-conf.yaml 中指定的,配置的键名称为 state.backend。这个默认配置对集群上运行的所有作业都有效,我们可以通过更改配置值来改变默认的状态后端。另外,我们还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

    • 配置默认的状态后端 在 flink-conf.yaml 中,可以使用 state.backend 来配置默认状态后端。

      配置项的可能值为 hashmap,这样配置的就是 HashMapStateBackend;也可以是 rocksdb,这样配置的就是 EmbeddedRocksDBStateBackend。另外,也可以是一个实现了状态后端工厂StateBackendFactory 的类的完全限定类名。

      # 默认状态后端
      state.backend: hashmap
      # 存放检查点的文件路径
      state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
      
    • 为每个作业(Per-job)单独配置状态后端 在代码中,基于作业的执行环境直接设置

      StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStateBackend(new HashMapStateBackend());
      

      设置 EmbeddedRocksDBStateBackend

      StreamExecutionEnvironment env = 
      StreamExecutionEnvironment.getExecutionEnvironment();
      env.setStateBackend(new EmbeddedRocksDBStateBackend());
      

      如果想在 IDE 中使用EmbeddedRocksDBStateBackend,需要为 Flink 项目添加依赖:

      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
          <version>1.13.0</version>
      </dependency>
      

# 容错机制

# 检查点(Checkpoint)

将之前某个时间点所有的状态保存下来,这份“存档”就是所谓的“检查点”(checkpoint)。

检查点是 Flink 容错机制的核心。这里所谓的“检查”,其实是针对故障恢复的结果而言的:故障恢复之后继续处理的结果,应该与发生故障前完全一致,我们需要“检查”结果的正确性。所以,有时又会把 checkpoint 叫作“一致性检查点”。

# 检查点的保存

  1. 周期性的触发保存 检查点作为应用状态的一份“存档”,其实就是所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。具体来说,当每隔一段时间检查点保存操作被触发时,就把每个任务当前的状态复制一份,按照一定的逻辑结构放在一起持久化保存起来,就构成了检查点。
  2. 保存的时间点 当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。如果出现故障,我们恢复到之前保存的状态,故障时正在处理的所有数据都需要重新处理;所以我们只需要让源(source)任务向数据源重新提交偏移量、请求重放数据就可以了。
  3. 保存的具体流程 检查点的保存,最关键的就是要等所有任务将“同一个数据”处理完毕。下面我们通过一个具体的例子,来详细描述一下检查点具体的保存过程。 pSZJRqH.png 上图中,已经处理了 3 条数据:“hello”“world”“hello”,所以我们会看到 Source 算子的偏移量为 3;后面的 Sum 算子处理完第三条数据“hello”之后,此时已经有 2 个“hello”和 1 个“world”,所以对应的状态为“hello”-> 2,“world”-> 1(这里 KeyedState底层会以 key-value 形式存储)。此时所有任务都已经处理完了前三个数据,所以我们可以把当前的状态保存成一个检查点,写入外部存储中。至于具体保存到哪里,这是由状态后端的配置项 “ 检 查 点 存 储 ”( CheckpointStorage )来决定的,可以有作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)两种选择。一般情况下,我们会将检查点写入持久化的分布式文件系统。

# 从检查点恢复状态

例如在上节的 word count 示例中,我们处理完三个数据后保存了一个检查点。之后继续运行,又正常处理了一个数据“flink”,在处理第五个数据“hello”时发生了故障。

pSZYpWV.png

这里 Source 任务已经处理完毕,所以偏移量为 5;Map 任务也处理完成了。而 Sum 任务在处理中发生了故障,此时状态并未保存。

恢复步骤:

  1. 重启应用
    我们将应用重新启动后,所有任务的状态会清空
  2. 读取检查点,重置状态 找到最近一次保存的检查点,从中读出每个算子任务状态的快照,分别填充到对应的状态中。这样,Flink 内部所有任务的状态,就恢复到了保存检查点的那一时刻,也就是刚好处理完第三个数据的时候。
  3. 重放数据 从保存检查点后开始重新读取数据,这可以通过 Source 任务向外部数据源重新提交偏移量(offset)来实现
  4. 继续处理数据 重放数据,继续读取后面的数据。

我们既没有丢掉数据也没有重复计算数据,这就保证了计算结果的正确性。在分布式系统中,这叫作实现了“精确一次”(exactly-once)的状态一致性保证。

# 检查点算法

在不暂停整体流处理的前提下,将状态备份保存到检查点。在 Flink中,采用了基于 Chandy-Lamport 算法的分布式快照

  1. 检查点分界线(Barrier)
    把一条流上的数据按照不同的检查点分隔开,所以就叫作检查点的“分界线”(Checkpoint Barrier)

    与水位线很类似,检查点分界线也是一条特殊的数据,由 Source 算子注入到常规的数据流中,它的位置是限定好的,不能超过其他数据,也不能被后面的数据超过。检查点分界线中带有一个检查点 ID,这是当前要保存的检查点的唯一标识

    分界线就将一条流逻辑上分成了两部分:分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所表示的检查点中;而基于分界线之后的数据导致的状态更改,则会被包含在之后的检查点中。

    在 JobManager 中有一个“检查点协调器”(checkpoint coordinator),专门用来协调处理检查点的相关工作。检查点协调器会定期向 TaskManager 发出指令,要求保存检查点(带着检查点 ID);TaskManager 会让所有的 Source 任务把自己的偏移量(算子状态)保存起来,并将带有检查点 ID 的分界线(barrier)插入到当前的数据流中,然后像正常的数据一样像下游传递;之后 Source 任务就可以继续读入新的数据了。

    每个算子任务只要处理到这个 barrier,就把当前的状态进行快照;在收到 barrier 之前,还是正常地处理之前的数据,完全不受影响。

  2. 分布式快照算法
    Flink 使用了 Chandy-Lamport 算法的一种变体,被称为“异步分界线快照”(asynchronous barrier snapshotting)算法。算法的核心就是两个原则:当上游任务向多个并行下游任务发送 barrier 时,需要广播出去;而当多个上游任务向同一个下游任务传递 barrier 时,需要在下游任务执行“分界线对齐”(barrier alignment)操作,也就是需要等到所有并行分区的 barrier 都到齐,才可以开始状态的保存。

    具体过程如下:

    1. JobManager 发送指令,触发检查点的保存;Source 任务保存状态,插入分界线
    2. 状态快照保存完成,分界线向下游传递
    3. 向下游多个并行子任务广播分界线,执行分界线对齐
    4. 分界线对齐后,保存状态到持久化存储
    5. 先处理缓存数据,然后正常继续处理
    6. JobManager 收到所有任务成功保存状态的信息,就可以确认当前检查点成功保存。之后遇到故障就可以从这里恢复了。

    TIP

    分界线对齐要求先到达的分区做缓存等待,一定程度上会影响处理的速度;当出现背压(backpressure)时,下游任务会堆积大量的缓冲数据,检查点可能需要很久才可以保存完毕。为了应对这种场景,Flink 1.11 之后提供了不对齐的检查点保存方式,可以将未处理的缓冲数据(in-flight data)也保存进检查点。这样,当我们遇到一个分区 barrier 时就不需等待对齐,而是可以直接启动状态的保存了。

# 检查点配置

  1. 启用检查点
    默认情况下,Flink 程序是禁用检查点的。如果想要为 Flink 应用开启自动保存快照的功能需要在代码中显式地调用执行环境的.enableCheckpointing()方法:

    StreamExecutionEnvironment env = 
    StreamExecutionEnvironment.getExecutionEnvironment();
    // 每隔 1 秒启动一次检查点保存
    env.enableCheckpointing(1000); // 传入一个毫秒数,标识周期性保存检查点的间隔时间
    
  2. 检查点存储(Checkpoint Storage)
    通过调用检查点配置的 .setCheckpointStorage() 来 配 置 , 需 要 传 入 一 个CheckpointStorage 的实现类。Flink 主要提供了两种 CheckpointStorage:作业管理器的堆内存(JobManagerCheckpointStorage)和文件系统(FileSystemCheckpointStorage)。

    // 配置存储检查点到 JobManager 堆内存
    env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage());
    // 配置存储检查点到文件系统
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("hdfs://namenode:40010/flink/checkpoints"));
    
  3. 其他高级配置
    检查点还有很多可以配置的选项,可以通过获取检查点配置(CheckpointConfig)来进行设置。

    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    
    1. 检查点模式(CheckpointingMode)
      设置检查点一致性的保证级别,有“精确一次”(exactly-once)和“至少一次”(at-least-once)两个选项。默认级别为 exactly-once,而对于大多数低延迟的流处理程序,at-least-once 就够用了,而且处理效率会更高。
    2. 超时时间(checkpointTimeout)
      用于指定检查点保存的超时时间,超时没完成就会被丢弃掉。传入一个长整型毫秒数作为参数,表示超时时间。
    3. 最小间隔时间(minPauseBetweenCheckpoints)
      用于指定在上一个检查点完成之后,检查点协调器(checkpoint coordinator)最快等多久可以出发保存下一个检查点的指令。这就意味着即使已经达到了周期触发的时间点,只要距离上一个检查点完成的间隔不够,就依然不能开启下一次检查点的保存。这就为正常处理数据留下了充足的间隙。当指定这个参数时,maxConcurrentCheckpoints 的值强制为 1。
    4. 最大并发检查点数量(maxConcurrentCheckpoints) 用于指定运行中的检查点最多可以有多少个。由于每个任务的处理进度不同,完全可能出现后面的任务还没完成前一个检查点的保存、前面任务已经开始保存下一个检查点了。这个参数就是限制同时进行的最大数量。
    5. 开启外部持久化存储(enableExternalizedCheckpoints) 用于开启检查点的外部持久化,而且默认在作业失败的时候不会自动清理,如果想释放空间需要自己手工清理。里面传入的参数 ExternalizedCheckpointCleanup 指定了当作业取消的时候外部的检查点该如何清理。
      • DELETE_ON_CANCELLATION:在作业取消的时候会自动删除外部检查点,但是如果是作业失败退出,则会保留检查点。
      • RETAIN_ON_CANCELLATION:作业取消的时候也会保留外部检查点。
    6. 检查点异常时是否让整个任务失败(failOnCheckpointingErrors) 用于指定在检查点发生异常的时候,是否应该让任务直接失败退出。默认为 true,如果设置为 false,则任务会丢弃掉检查点然后继续运行。
    7. 不对齐检查点(enableUnalignedCheckpoints) 不再执行检查点的分界线对齐操作,启用之后可以大大减少产生背压时的检查点保存时间。这个设置要求检查点模式(CheckpointingMode)必须为 exctly-once,并且并发的检查点个数为 1。
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 启用检查点,间隔时间 1 秒
    env.enableCheckpointing(1000);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    // 设置精确一次模式
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 最小间隔时间 500 毫秒
    checkpointConfig.setMinPauseBetweenCheckpoints(500);
    // 超时时间 1 分钟
    checkpointConfig.setCheckpointTimeout(60000);
    // 同时只能有一个检查点
    checkpointConfig.setMaxConcurrentCheckpoints(1);
    // 开启检查点的外部持久化保存,作业取消后依然保留
    checkpointConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    // 启用不对齐的检查点保存方式
    checkpointConfig.enableUnalignedCheckpoints();
    // 设置检查点存储,可以直接传入一个 String,指定文件系统的路径
    checkpointConfig.setCheckpointStorage("hdfs://my/checkpoint/dir")
    

# 保存点(Savepoint)

这也是一个存盘的备份,它的原理和算法与检查点完全相同,只是多了一些额外的元数据。事实上,保存点就是通过检查点的机制来创建流式作业状态的一致性镜像(consistent image)的。

保存点中的状态快照,是以算子 ID 和状态名称组织起来的,相当于一个键值对。从保存点启动应用程序时,Flink 会将保存点的状态数据重新分配给相应的算子任务。

  1. 保存点的用途
    保存点与检查点最大的区别,就是触发的时机。检查点是由 Flink 自动管理的,定期创建,发生故障之后自动读取进行恢复,这是一个“自动存盘”的功能;而保存点不会自动创建,必须由用户明确地手动触发保存操作,所以就是“手动存盘”。因此两者尽管原理一致,但用途就有所差别了:检查点主要用来做故障恢复,是容错机制的核心;保存点则更加灵活,可以用来做有计划的手动备份和恢复。

    应用环境:

    • 版本管理和归档存储
    • 更新Flink版本
    • 更新应用程序
    • 调整并行度
    • 暂停应用程序

    保存点能够在程序更改的时候依然兼容,前提是状态的拓扑结构和数据类型不变。我们知道保存点中状态都是以算子 ID-状态名称这样的 key-value 组织起来的,算子ID 可以在代码中直接调用 SingleOutputStreamOperator 的.uid()方法来进行指定:

    DataStream<String> stream = env
        .addSource(new StatefulSource())
        .uid("source-id")
        .map(new StatefulMapper())
        .uid("mapper-id")
        .print();
    

    对于没有设置 ID 的算子,Flink 默认会自动进行设置,所以在重新启动应用后可能会导致ID 不同而无法兼容以前的状态。所以为了方便后续的维护,强烈建议在程序中为每一个算子手动指定 ID。

  2. 使用保存点

    1. 创建保存点

      bin/flink savepoint :jobId [:targetDirectory]
      

      这里 jobId 需要填充要做镜像保存的作业 ID,目标路径 targetDirectory 可选,表示保存点存储的路径。

      对于保存点的默认路径,可以通过配置文件 flink-conf.yaml 中的 state.savepoints.dir 项来设定:

      state.savepoints.dir: hdfs:///flink/savepoints
      

      对于单独的作业,我们也可以在程序代码中通过执行环境来设置:

      env.setDefaultSavepointDir("hdfs:///flink/savepoints");
      

      停掉一个作业时直接创建保存点:

      bin/flink stop --savepointPath [:targetDirectory] :jobId
      
    2. 从保存点重启应用

      bin/flink run -s :savepointPath [:runArgs]
      

      这里只要增加一个-s 参数,指定保存点的路径就可以了,其他启动时的参数还是完全一样的。

# 状态一致性

# 一致性的概念和级别

,一致性其实就是结果的正确性。对于分布式系统而言,强调的是不同节点中相同数据的副本应该总是“一致的”,也就是从不同节点读取时总能得到相同的值;而对于事务而言,是要求提交更新操作后,能够读取到新的数据。对于 Flink 来说,多个节点并行处理不同的任务,我们要保证计算结果是正确的,就必须不漏掉任何一个数据,而且也不会重复处理同一个数据。流式计算本身就是一个一个来的,所以正常处理的过程中结果肯定是正确的;但在发生故障、需要恢复状态进行回滚时就需要更多的保障机制了。我们通过检查点的保存来保证状态恢复后结果的正确,所以主要讨论的就是“状态的一致性”。

  1. 最多一次(AT-MOST-ONCE)
    当任务发生故障时,最简单的做法就是直接重启,别的什么都不干;既不恢复丢失的状态,也不重放丢失的数据。每个数据在正常情况下会被处理一次,遇到故障时就会丢掉,所以就是“最多处理一次”。

  2. 至少一次(AT-LEAST-ONCE)
    在实际应用中,我们一般会希望至少不要丢掉数据。这种一致性级别就叫作“至少一次”(at-least-once),就是说是所有数据都不会丢,肯定被处理了;不过不能保证只处理一次,有些数据会被重复处理

  3. 精确一次(EXACTLY-ONCE)
    最严格的一致性保证,就是所谓的“精确一次”(exactly-once,有时也译作“恰好一次”)。这也是最难实现的状态一致性语义。exactly-once 意味着所有数据不仅不会丢失,而且只被处理一次,不会重复处理。也就是说对于每一个数据,最终体现在状态和输出结果上,只能有一次统计。

# 端到端的状态一致性

完整的流处理应用,应该包括了数据源、流处理器和外部存储系统三个部分。这个完整应用的一致性,就叫作“端到端(end-to-end)的状态一致性”,它取决于三个组件中最弱的那一环。一般来说,能否达到 at-least-once 一致性级别,主要看数据源能够重放数据;而能否达到 exactly-once 级别,流处理器内部、数据源、外部存储都要有相应的保证机制。

# 端到端精确一次(end-to-end exactly-once)

输入的数据源端和输出的外部存储端。

# 输入端保证

想要在故障恢复后不丢数据,外部数据源就必须拥有重放数据的能力。常见的做法就是对数据进行持久化保存,并且可以重设数据的读取位置。一个最经典的应用就是 Kafka。在 Flink的 Source 任务中将数据读取的偏移量保存为状态,这样就可以在故障恢复时从检查点中读取出来,对数据源重置偏移量,重新获取数据。

# 输出端保证

因为检查点保存之后,继续到来的数据也会一一处理,任务的状态也会更新,最终通过Sink 任务将计算结果输出到外部系统;只是状态改变还没有存到下一个检查点中。这时如果出现故障,这些数据都会重新来一遍,就计算了两次。我们知道对 Flink 内部状态来说,重复计算的动作是没有影响的,因为状态已经回滚,最终改变只会发生一次;但对于外部系统来说,已经写入的结果就是泼出去的水,已经无法收回了,再次执行写入就会把同一个数据写入两次。

为了实现端到端 exactly-once,我们还需要对外部存储系统、以及 Sink 连接器有额外的要求。能够保证 exactly-once 一致性的写入方式有两种:

  1. 幂等(idempotent)写入
    所谓“幂等”操作,就是说一个操作可以重复执行很多次,但只导致一次结果更改。也就是说,后面再重复执行就不会对结果起作用了。

    以这种方式主要的限制在于外部存储系统必须支持这样的幂等写入:比如 Redis 中键值存储,或者关系型数据库(如 MySQL)中满足查询条件的更新操作。

  2. 事务(transactional)写入
    事务写入是更一般化的保证一致性的方式。

    用一个事务来进行数据向外部系统的写入,这个事务是与检查点绑定在一起的。当 Sink 任务遇到 barrier 时,开始保存状态的同时就开启一个事务,接下来所有数据的写入都在这个事务中;待到当前检查点保存完毕时,将事务提交,所有写入的数据就真正可用了。如果中间过程出现故障,状态会回退到上一个检查点,而当前事务没有正常关闭(因为当前检查点没有保存完),所以也会回滚,写入到外部的数据就被撤销了。

    具体来说,又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)

    1. 预写日志(write-ahead-log,WAL)

      1. 先把结果数据作为日志(log)状态保存起来
      2. 进行检查点保存时,也会将这些结果数据一并做持久化存储
      3. 在收到检查点完成的通知时,将所有结果一次性写入外部系统

      这种方式类似于检查点完成时做一个批处理,一次性的写入会带来一些性能上的问题;而优点就是比较简单,由于数据提前在状态后端中做了缓存,所以无论什么外部存储系统,理论上都能用这种方式一批搞定。在 Flink 中 DataStream API 提供了一个模板类GenericWriteAheadSink,用来实现这种事务型的写入方式。

    2. 两阶段提交(two-phase-commit,2PC)
      先做“预提交”,等检查点完成之后再正式提交。这种提交方式是真正基于事务的,它需要外部系统提供事务支持

      1. 当第一条数据到来时,或者收到检查点的分界线时,Sink 任务都会启动一个事务
      2. 接下来接收到的所有数据,都通过这个事务写入外部系统;这时由于事务没有提交,所以数据尽管写入了外部系统,但是不可用,是“预提交”的状态。
      3. 当 Sink 任务收到 JobManager 发来检查点完成的通知时,正式提交事务,写入的结果就真正可用了。

      Flink 提供了 TwoPhaseCommitSinkFunction 接口,方便我们自定义实现两阶段提交的SinkFunction 的实现,提供了真正端到端的 exactly-once 保证。

      两阶段提交虽然精巧,却对外部系统有很高的要求。这里将 2PC 对外部系统的要求列举如下:

      • 外部系统必须提供事务支持,或者 Sink 任务必须能够模拟外部系统上的事务。
      • 在检查点的间隔期间里,必须能够开启一个事务并接受数据写入。
      • 在收到检查点完成的通知之前,事务必须是“等待提交”的状态。在故障恢复的情况下,这可能需要一些时间。如果这个时候外部系统关闭事务(例如超时了),那么未提交的数据就会丢失。
      • Sink 任务必须能够在进程失败后恢复事务。
      • 提交事务必须是幂等操作。也就是说,事务的重复提交应该是无效的。
  1. 整体介绍

    1. flink内部
      Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义。
    2. 输入端
      输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)。所以我们可以在 Source 任务(FlinkKafkaConsumer)中将当前读取的偏移量保存为算子状态,写入到检查点中;当发生故障时,从检查点中读取恢复状态,并由连接器 FlinkKafkaConsumer 向 Kafka重新提交偏移量,就可以重新消费数据、保证结果的一致性了。
    3. 输出端
      Flink 官方实现的 Kafka 连接器中,提供了写入到 Kafka 的 FlinkKafkaProducer,它就实现了 TwoPhaseCommitSinkFunction 接口:
      public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, 
      FlinkKafkaProducer.KafkaTransactionState, 
      FlinkKafkaProducer.KafkaTransactionContext> {
      ...
      }
      
      也就是说,我们写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”。如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。
  2. 具体步骤

    1. 启动检查点保存
    2. 算子任务对状态做快照
    3. Sink 任务开启事务,进行预提交
    4. 检查点保存完成,提交事务
  3. 需要的配置

    1. 必须启用检查点;
    2. 在 FlinkKafkaProducer 的构造函数中传入参数 Semantic.EXACTLY_ONCE;
    3. 配置 Kafka 读取数据的消费者的隔离级别 这里所说的 Kafka,是写入的外部系统。预提交阶段数据已经写入,只是被标记为“未提交”(uncommitted),而 Kafka 中默认的隔离级别 isolation.level 是 read_uncommitted,也就是可以读取未提交的数据。这样一来,外部应用就可以直接消费未提交的数据,对于事务性的保证就失效了。所以应该将隔离级别配置为 read_committed,表示消费者遇到未提交的消息时,会停止从分区中消费数据,直到消息被标记为已提交才会再次恢复消费。当然,这样做的话,外部应用消费数据就会有显著的延迟。
    4. 事务超时配置 Flink 的 Kafka连接器中配置的事务超时时间 transaction.timeout.ms 默认是 1小时,而Kafka集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是 15 分钟。所以在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而 Sink 任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。所以这两个超时时间,前者应该小于等于后者
Last Updated: 7/2/2024, 4:27:38 PM