学习 Flink(十八):单元测试

更新至 Flink 1.9.0 版本

Flink 提供了基于 JUnit
的单元测试工具库 flink-test-utils。

依赖

编辑 pom.xml 文件,添加依赖:

  
  org.apache.flink
  flink-test-utils_2.11
  1.9.0
  test
  
  
  org.apache.flink
  flink-runtime_2.11
  1.9.0
  test
  tests
  
  
  org.apache.flink
  flink-streaming-java_2.11
  1.9.0
  test
  tests

注意:由于需要测试 JAR 包 org.apache.flink:flink-runtime_2.11:tests:1.9.0
org.apache.flink:flink-streaming-java_2.11:tests:1.9.0
,依赖需要制定 classifier 为 tests。

Function 单元测试

对于无状态时间无关的 Function,给定输入,验证输出,即可。
定义偶数过滤 Function:

public class EvenNumberFilterFunction implements FilterFunction {

    @Override
    public boolean filter(Long input) throws Exception {
        return input % 2 == 0L;
    }

}

编写单元测试:

public class EvenNumberFilterFunctionTest {

    @Test
    public void testEvenNumber() throws Exception {
        EvenNumberFilterFunction filter = new EvenNumberFilterFunction();

        assertFalse(filter.filter(1L));
        assertTrue(filter.filter(2L));
    }

}

对于有状态时间相关的 Function,由于需要依赖 Flink 运行时,情况更复杂。好在 Flink 提供了 Test Harnesses
测试这类 Function:

  • OneInputStreamOperatorTestHarness
    用于测试 DataStream 算子;
  • KeyedOneInputStreamOperatorTestHarness
    用于测试 KeyedStream 算子;
  • TwoInputStreamOperatorTestHarness
    用于测试两个 DataStream 的 ConnectedStreams 算子;
  • KeyedTwoInputStreamOperatorTestHarness
    用于测试两个 KeyedStream 的 ConnectedStreams 算子;

定义计数 Function:

public class CountFunction extends KeyedProcessFunction {

    private ValueState countValueState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        ValueStateDescriptor countVSD = new ValueStateDescriptor(
                "count",
                Types.LONG
        );
        this.countValueState = getRuntimeContext().getState(countVSD);
    }

    @Override
    public void processElement(String key, Context context, Collector collector) throws Exception {
        long count = 0;
        if (this.countValueState.value() != null) {
            count = this.countValueState.value();
        }

        count++;

        this.countValueState.update(count);

        collector.collect(count);
    }

}

编写单元测试:

public class CountFunctionTest {

    private KeyedOneInputStreamOperatorTestHarness testHarness;

    private CountFunction countFunction;

    @Before
    public void setupTestHarness() throws Exception {
        // ①
        this.countFunction = new CountFunction();
        this.testHarness = new KeyedOneInputStreamOperatorTestHarness(
                new KeyedProcessOperator(this.countFunction),
                (KeySelector) input -> input,
                Types.STRING
        );

        // ②
        this.testHarness.open();
    }

    @Test
    public void testSumFunction() throws Exception {
        // ③
        testHarness.processElement("a", 100L);
        testHarness.processElement("a", 101L);
        testHarness.processElement("a", 102L);

        // ④
        testHarness.getOutput();
    }

}

① 待测试 Function 创建 TestHarness;
② 调用 TestHarness 的 open() 方法,创建内部状态;
③ 测试数据输入;
④ 获取输出。

Job 单元测试

Flink 提供了用于测试一个完成 Job 的本地集群环境。

public class IntegrationTest {

     // ①
     @ClassRule
     public static MiniClusterWithClientResource flinkCluster =
         new MiniClusterWithClientResource(
             new MiniClusterResourceConfiguration.Builder()
                 .setNumberSlotsPerTaskManager(2)
                 .setNumberTaskManagers(1)
                 .build());

    @Test
    public void testIncrementPipeline() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // ②

        env.execute();
    }

}

① 创建 MiniClusterWithClientResource 实例,创建一个本地集群运行 Job;
② 执行 Job;

参考