Flink在开发中的7点建议 Flink在开发中的7点建议

1. 使用DataStreamAPI还是Table API/SQL?

在 DataStream API 和 Table API/SQL 的选择上,如果有强烈的需求控制状态和每条状态到来的处理,则需要使用 DataStream API;

如果是简单的数据提取和关系代数的运算,可以选择 Table API/SQL。注意在如下几个场景中只能采用 DataStream API:

  • 在升级过程中需要改变状态
  • 迟到的数据不能丢失
  • 在运行时更改程序的行为

总结:

流处理,尽量用API ,感觉好用.

2. 数据类型切勿误用

在开发过程中,关于数据类型,有如下两种误用场景:

  • 使用多层深度嵌套的复杂数据类型
  • KeySelector 中使用任意类型

正确的做法是选择尽可能简单的状态类型,在 KeySelector 中不要使用 Flink 无法自动识别的类型。

总结 :

使用基本数据类型.

3. 序列化

数据类型越简单越好,基于序列化成本的考虑,尽量使用POJO 和 Avro SpecificRecords

序列化器 Opts/s
PojoSeriallizer 813
Avro(SpecificRecord API) 632
Kryo 294
Avro(Reflect API) 114

总结 :

就是pojo

4. 并发性

两种误用场景及相应容易造成的问题:

  • 任务之间共享静态变量

容易引起 bug;容易造成死锁和竞争问题;带来额外的同步开销。

  • 在用户函数中生成线程

检查点变得复杂易错。对于想用线程的情况,如果是需要加速作业,可以调整并行度和资源,使用异步IO;

如果是需要一些定时任务的触发,可以使用 Flink 自带的 Timer 定时调度任务

总结

不要在算子中开启线程.

5. 窗口

sourceStream.flatMap(new Deserializer())     
.keyBy("contry")         
.timeWindow()        
.count()        
.filter(new ContryFilter("Canada"));

上面是一种效率较低的处理过程,我们应该先进行过滤和投影操作,防止不需要的数据进行多余的处理

sourceStream.keyBy("key")
        .window(GlobalWindows.create())
        .trigger(new CustomTrigger())
        .evictor(new CustomEvictor())
        .reduce/aggregate/fold/apply();

尽量避免像上面这样自定义 Window,使用 KeyedProcessFunction 可以使得实现更加简单和稳定。

sourceStream.keyBy("key")
        .timeWindow(Time.of(30, DAYS), Time.of(5, SECONDS))
        .apply(new MyWindowFunction());

另外,也要避免上面的这种滑动窗口,在图7中每个记录被50万个窗口计算,无论是计算资源还是业务延迟都会非常糟糕。

6. 可查询状态

Queryable State 目前还在不断的完善中,可以用于监控和查询,但在实际投产时还是有一些问题需要注意的,比如对于线程安全访问,RocksDB 状态后端是支持的,而 FS 状态后端是不支持的,另外还有性能和一致性保障等问题需要注意。

总结:

看不懂

7. DataStream API的应用

对下面这种场景,可以使用 DataStreamUtils#reinterpretAsKeyedStream 这个方法,避免面对相同的 key 进行多次 shuffle 。

sourceStream.keyBy("key")      
.flatmap(..)       
.keyBy("key")       
.process()        
.keyBy("key")     
.timeWindow(...);

对下面这种场景,应该把一些初始化的逻辑写在 RichFunction 的 open 方法里。

public void flatMap(Bean bean, Collector<Bean> out) throws Exception{    
		MyFactory factory = MyFactory.newInstance();  
		MyParser parser = factory.newParser();   
		out.collect(parser.parse(bean));
}

总结:

初始化方法,放在open里面 , 并在close中关闭.