package com.mischen.it;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @ClassName Flink01_TransForm_Map_Anonymous
* @Description 得到一个新的数据流: 新的流的元素是原来流的元素的平方
* @Author mischen
* @Date 2021/6/29 0029 9:36
* @Version 1.0
**/
public class Flink01_TransForm_Map_Anonymous {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(5);
env
.fromElements(1, 2, 3, 4, 5)
.map(new MyRichMapFunction()).setParallelism(2)
.print();
env.execute();
}
public static class MyRichMapFunction extends RichMapFunction
// 默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open ... 执行一次");
}
// 默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次
@Override
public void close() throws Exception {
System.out.println("close ... 执行一次");
}
@Override
public Integer map(Integer value) throws Exception {
System.out.println("map ... 一个元素执行一次");
return value * value;
}
}
}
运行结果:
所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction
默认生命周期方法, 初始化方法, 在每个并行度上只会被调用一次, 而且先被调用默认生命周期方法, 最后一个方法, 做一些清理工作, 在每个并行度上只调用一次, 而且是最后被调用getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行的并行度,任务的名字,以及state状态. 开发人员在需要的时候自行调用获取运行时上下文对象.