学习 Flink(十八):单元测试
2009 年 5 月 3 日
更新至 Flink 1.9.0 版本
Flink 提供了基于 JUnit
的单元测试工具库 flink-test-utils。
依赖
编辑 pom.xml 文件,添加依赖:
org.apache.flink flink-test-utils_2.11 1.9.0 test org.apache.flink flink-runtime_2.11 1.9.0 test tests org.apache.flink flink-streaming-java_2.11 1.9.0 test tests
注意:由于需要测试 JAR 包 org.apache.flink:flink-runtime_2.11:tests:1.9.0
和 org.apache.flink:flink-streaming-java_2.11:tests:1.9.0
,依赖需要制定 classifier 为 tests。
Function 单元测试
对于无状态时间无关的 Function,给定输入,验证输出,即可。
定义偶数过滤 Function:
public class EvenNumberFilterFunction implements FilterFunction { @Override public boolean filter(Long input) throws Exception { return input % 2 == 0L; } }
编写单元测试:
public class EvenNumberFilterFunctionTest { @Test public void testEvenNumber() throws Exception { EvenNumberFilterFunction filter = new EvenNumberFilterFunction(); assertFalse(filter.filter(1L)); assertTrue(filter.filter(2L)); } }
对于有状态时间相关的 Function,由于需要依赖 Flink 运行时,情况更复杂。好在 Flink 提供了 Test Harnesses
测试这类 Function:
-
OneInputStreamOperatorTestHarness
用于测试 DataStream 算子; -
KeyedOneInputStreamOperatorTestHarness
用于测试 KeyedStream 算子; -
TwoInputStreamOperatorTestHarness
用于测试两个 DataStream 的 ConnectedStreams 算子; -
KeyedTwoInputStreamOperatorTestHarness
用于测试两个 KeyedStream 的 ConnectedStreams 算子;
定义计数 Function:
public class CountFunction extends KeyedProcessFunction { private ValueState countValueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor countVSD = new ValueStateDescriptor( "count", Types.LONG ); this.countValueState = getRuntimeContext().getState(countVSD); } @Override public void processElement(String key, Context context, Collector collector) throws Exception { long count = 0; if (this.countValueState.value() != null) { count = this.countValueState.value(); } count++; this.countValueState.update(count); collector.collect(count); } }
编写单元测试:
public class CountFunctionTest { private KeyedOneInputStreamOperatorTestHarness testHarness; private CountFunction countFunction; @Before public void setupTestHarness() throws Exception { // ① this.countFunction = new CountFunction(); this.testHarness = new KeyedOneInputStreamOperatorTestHarness( new KeyedProcessOperator(this.countFunction), (KeySelector) input -> input, Types.STRING ); // ② this.testHarness.open(); } @Test public void testSumFunction() throws Exception { // ③ testHarness.processElement("a", 100L); testHarness.processElement("a", 101L); testHarness.processElement("a", 102L); // ④ testHarness.getOutput(); } }
① 待测试 Function 创建 TestHarness;
② 调用 TestHarness 的 open() 方法,创建内部状态;
③ 测试数据输入;
④ 获取输出。
Job 单元测试
Flink 提供了用于测试一个完成 Job 的本地集群环境。
public class IntegrationTest { // ① @ClassRule public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource( new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build()); @Test public void testIncrementPipeline() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // ② env.execute(); } }
① 创建 MiniClusterWithClientResource 实例,创建一个本地集群运行 Job;
② 执行 Job;