Flink的单元测试介绍及示例

本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

  本文除了maven依赖外,没有其他依赖。

  一、Flink测试概述

  Apache Flink 同样提供了在测试金字塔的多个级别上测试应用程序代码的工具。

  本文示例的maven依赖:

<properties>
  <encoding>UTF-8</encoding>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  <maven.compiler.source>1.8</maven.compiler.source>
  <maven.compiler.target>1.8</maven.compiler.target>
  <java.version>1.8</java.version>
  <scala.version>2.12</scala.version>
  <flink.version>1.17.0</flink.version>
  </properties>
  <dependencies>
  <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
  </dependency>
  <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
  </dependency>
  <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
  </dependency>
  <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-csv</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
  </dependency>
  <dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-json</artifactId>
  <version>${flink.version}</version>
  <scope>provided</scope>
  </dependency>
  <dependency>
  <groupId>junit</groupId>
  <artifactId>junit</artifactId>
  <version>4.13</version>
  </dependency>
  <dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-core</artifactId>
  <version>4.0.0</version>
  <scope>test</scope>
  </dependency>
  </dependencies>

 

二、测试用户自定义函数

  可以假设 Flink 在用户自定义函数之外产生了正确的结果。因此,建议尽可能多的用单元测试来测试那些包含主要业务逻辑的类。

  1、单元测试无状态、无时间限制的 UDF

  1)、示例-mapFunction

  以下无状态的 MapFunction 为例:

public class IncrementMapFunction implements MapFunction<Long, Long> {
  @Override
  public Long map(Long record) throws Exception {
  return record + 1;
  }
  }

  通过传递合适地参数并验证输出,可以很容易的使用你喜欢的测试框架对这样的函数进行单元测试。

import static org.junit.Assert.assertEquals;
  import org.apache.flink.api.common.functions.MapFunction;
  import org.junit.Test;
  /**
   * @author alanchan
   *
   */
  public class TestDemo {
  public class IncrementMapFunction implements MapFunction<Long, Long> {
  @Override
  public Long map(Long record) throws Exception {
  return record + 1;
  }
  }
  @Test
  public void testIncrement() throws Exception {
  IncrementMapFunction incrementer = new IncrementMapFunction();
  assertEquals((Long) 3L, incrementer.map(2L));
  }
  }

2)、示例-flatMapFunction

  对于使用 org.apache.flink.util.Collector 的用户自定义函数(例如FlatMapFunction 或者 ProcessFunction),可以通过提供模拟对象而不是真正的 collector 来轻松测试。具有与 IncrementMapFunction 相同功能的 FlatMapFunction 可以按照以下方式进行单元测试。

 import static org.mockito.Mockito.mock;
  import static org.mockito.Mockito.times;
  import org.apache.flink.api.common.functions.FlatMapFunction;
  import org.apache.flink.util.Collector;
  import org.junit.Test;
  import org.junit.runner.RunWith;
  import org.mockito.Mockito;
  import org.mockito.junit.MockitoJUnitRunner;
  /**
   * @author alanchan
   *
   */
  @RunWith(MockitoJUnitRunner.class)
  public class TestDemo2 {
  public static class IncrementFlatMapFunction implements FlatMapFunction<String, Long> {
  @Override
  public void flatMap(String value, Collector<Long> out) throws Exception {
  Long sum = 0L;
  for (String num : value.split(",")) {
  sum += Long.valueOf(num);
  }
  out.collect(sum);
  }
  }
  @Test
  public void testSum() throws Exception {
  IncrementFlatMapFunction incrementer = new IncrementFlatMapFunction();
  Collector<Long> collector = mock(Collector.class);
  incrementer.flatMap("1,2,3,4,5", collector);
  Mockito.verify(collector, times(1)).collect(15L);
  }
  }

  2、对有状态或及时 UDF 和自定义算子进行单元测试

  对使用管理状态或定时器的用户自定义函数的功能测试会更加困难,因为它涉及到测试用户代码和 Flink 运行时的交互。 为此,Flink 提供了一组所谓的测试工具,可用于测试用户自定义函数和自定义算子:

  ·OneInputStreamOperatorTestHarness (适用于 DataStream 上的算子)

  · KeyedOneInputStreamOperatorTestHarness (适用于 KeyedStream 上的算子)

  · TwoInputStreamOperatorTestHarness (f适用于两个 DataStream 的 ConnectedStreams 算子)

  · KeyedTwoInputStreamOperatorTestHarness (适用于两个 KeyedStream 上的 ConnectedStreams 算子)

  要使用测试工具,还需要一组其他的依赖项,比如DataStream和TableAPI的依赖。

  1)、DataStream API 测试依赖

  如果要为使用 DataStream API 构建的作业开发测试用例,则需要添加以下依赖项:

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>1.17.2</version>
      <scope>test</scope>
  </dependency>

 

 在各种测试实用程序中,该模块提供了 MiniCluster (一个可配置的轻量级 Flink 集群,能在 JUnit 测试中运行),可以直接执行作业。

  2)、Table API 测试依赖

  如果您想在您的 IDE 中本地测试 Table API 和 SQL 程序,除了前述提到的 flink-test-utils 之外,您还要添加以下依赖项:

 <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-test-utils</artifactId>
      <version>1.17.2</version>
      <scope>test</scope>
  </dependency>

这将自动引入查询计划器和运行时,分别用于计划和执行查询。

  flink-table-test-utils 模块已在 Flink 1.15 中引入,截至Flink 1.17版本被认为是实验性的。

  3)、flatmap function 单元测试

  现在,可以使用测试工具将记录和 watermark 推送到用户自定义函数或自定义算子中,控制处理时间,最后对算子的输出(包括旁路输出)进行校验。

  示例如下:

/*
   * @Author: alanchan
   * @LastEditors: alanchan
   * @Description: 单元测试flatmap,如果是偶数则存储原值及平方数
   */
  import java.util.concurrent.ConcurrentLinkedQueue;
  import org.apache.flink.api.common.functions.FlatMapFunction;
  import org.apache.flink.streaming.api.operators.StreamFlatMap;
  import org.apache.flink.streaming.api.watermark.Watermark;
  import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  import org.apache.flink.streaming.util.TestHarnessUtil;
  import org.apache.flink.util.Collector;
  import org.junit.Before;
  import org.junit.Test;
  public class TestStatefulFlatMapDemo3 {
      static class AlanFlatMapFunction implements FlatMapFunction<Integer, Integer> {
          @Override
          public void flatMap(Integer value, Collector<Integer> out) throws Exception {
              if (value % 2 == 0) {
                  out.collect(value);
                  out.collect(value * value);
              }
          }
      }
      OneInputStreamOperatorTestHarness<Integer, Integer> testHarness;
      @Before
      public void setupTestHarness() throws Exception {
          StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new AlanFlatMapFunction());
          testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
          testHarness.open();
      }
      @Test
      public void testFlatMap2() throws Exception {
          long initialTime = 0L;
          ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
          testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
          testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
          testHarness.processWatermark(new Watermark(initialTime + 2));
          testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
          testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
          testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
          testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
          testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
          testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
          expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
          expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
          expectedOutput.add(new Watermark(initialTime + 2));
          expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
          expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
          expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
          expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
          expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
          expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
          TestHarnessUtil.assertOutputEquals("输出结果", expectedOutput, testHarness.getOutput());
      }
  }

 KeyedOneInputStreamOperatorTestHarness 和 KeyedTwoInputStreamOperatorTestHarness 可以通过为键的类另外提供一个包含 TypeInformation 的 KeySelector 来实例化。

  示例如下:

 /*
   * @Author: alanchan
   * @LastEditors: alanchan
   * @Description: 按照城市分类,并将城市缩写变成大写
   */
  import com.google.common.collect.Lists;
  import org.apache.flink.api.common.functions.RichFlatMapFunction;
  import org.apache.flink.api.common.state.ValueState;
  import org.apache.flink.api.common.state.ValueStateDescriptor;
  import org.apache.flink.api.common.typeinfo.Types;
  import org.apache.flink.api.java.functions.KeySelector;
  import org.apache.flink.configuration.Configuration;
  import org.apache.flink.streaming.api.operators.StreamFlatMap;
  import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
  import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  import org.apache.flink.util.Collector;
  import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
  import lombok.AllArgsConstructor;
  import lombok.Data;
  import lombok.NoArgsConstructor;
  public class TestStatefulFlatMapDemo2 {
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      static class User {
          private int id;
          private String name;
          private int age;
          private String city;
      }
      static class AlanFlatMapFunction extends RichFlatMapFunction<User, User> {
          // The state is only accessible by functions applied on a {@code KeyedStream}
          ValueState<User> previousInput;
          @Override
          public void open(Configuration parameters) throws Exception {
              super.open(parameters);
              previousInput = getRuntimeContext()
                      .getState(new ValueStateDescriptor<User>("previousInput", User.class));
          }
          @Override
          public void flatMap(User input, Collector<User> out) throws Exception {
              previousInput.update(input);
              input.setCity(input.getCity().toUpperCase());
              out.collect(input);
          }
      }
      AlanFlatMapFunction alanFlatMapFunction = new AlanFlatMapFunction();
      OneInputStreamOperatorTestHarness<User, User> testHarness;
      @Before
      public void setupTestHarness() throws Exception {
          alanFlatMapFunction = new AlanFlatMapFunction();
          testHarness = new KeyedOneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(alanFlatMapFunction),
                  new KeySelector<User, String>() {
                      @Override
                      public String getKey(User value) throws Exception {
                          return value.getCity();
                      }
                  }, Types.STRING);
          
          testHarness.open();
      }
      @Test
      public void testFlatMap() throws Exception {
          testHarness.processElement(new User(1, "alanchan", 18, "sh"), 10);
          ValueState<User> previousInput = alanFlatMapFunction.getRuntimeContext().getState(
                  new ValueStateDescriptor<>("previousInput", User.class));
          User stateValue = previousInput.value();
          Assert.assertEquals(
                  Lists.newArrayList(new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10)),
                  testHarness.extractOutputStreamRecords());
          Assert.assertEquals(new User(1, "alanchan", 18, "sh".toUpperCase()), stateValue);
          testHarness.processElement(new User(2, "alan", 19, "bj"), 10000);
          Assert.assertEquals(
                  Lists.newArrayList(
                          new StreamRecord<>(new User(1, "alanchan", 18, "sh".toUpperCase()), 10),
                          new StreamRecord<>(new User(2, "alan", 19, "bj".toUpperCase()), 10000)),
                  testHarness.extractOutputStreamRecords());
          Assert.assertEquals(new User(2, "alan", 19, "bj".toUpperCase()), previousInput.value());
      }
  }

 4)、Process Function 单元测试

  除了之前可以直接用于测试 ProcessFunction 的测试工具之外,Flink 还提供了一个名为 ProcessFunctionTestHarnesses 的测试工具工厂类,可以简化测试工具的实例化。

  ·OneInputStreamOperatorTestHarness示例

import com.google.common.collect.Lists;
  import org.apache.flink.api.common.typeinfo.Types;
  import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
  import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
  import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  import org.apache.flink.util.Collector;
  import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
  /*
   * @Author: alanchan
   * @LastEditors: alanchan
   * @Description: 
   */
  public class TestProcessOperatorDemo1 {
      // public abstract class KeyedProcessFunction<K, I, O>
      static class AlanProcessFunction extends KeyedProcessFunction<String, String, String> {
          @Override
          public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
                  Collector<String> out) throws Exception {
              ctx.timerService().registerProcessingTimeTimer(50);
              out.collect("vx->" + value);
          }
          @Override
          public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
              // 到达时间点触发事件操作
              out.collect(String.format("定时器在 %d 被触发", timestamp));
          }
      }
      private OneInputStreamOperatorTestHarness<String, String> testHarness;
      private AlanProcessFunction processFunction;
      @Before
      public void setupTestHarness() throws Exception {
          processFunction = new AlanProcessFunction();
          testHarness = new KeyedOneInputStreamOperatorTestHarness<>(
                  new KeyedProcessOperator<>(processFunction),
                  x -> "1",
                  Types.STRING);
          // Function time is initialized to 0
          testHarness.open();
      }
      @Test
      public void testProcessElement() throws Exception {
          testHarness.processElement("alanchanchn", 10);
          Assert.assertEquals(
                  Lists.newArrayList(
                          new StreamRecord<>("vx->alanchanchn", 10)),
                  testHarness.extractOutputStreamRecords());
      }
      @Test
      public void testOnTimer() throws Exception {
          // test first record
          testHarness.processElement("alanchanchn", 10);
          Assert.assertEquals(1, testHarness.numProcessingTimeTimers());
          // Function time 设置为 100
          testHarness.setProcessingTime(100);
          Assert.assertEquals(
                  Lists.newArrayList(
                          new StreamRecord<>("vx->alanchanchn", 10),
                          new StreamRecord<>("定时器在 100 被触发")),
                  testHarness.extractOutputStreamRecords());
      }
  }

·ProcessFunctionTestHarnesses示例:

  本示例通过ProcessFunctionTestHarnesses验证了ProcessFunction、KeyedProcessFunction、CoProcessFunction、KeyedCoProcessFunction和BroadcastProcessFunction,基本完成了覆盖。

 import java.util.Arrays;
  import java.util.Collections;
  import org.apache.flink.api.common.state.MapStateDescriptor;
  import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
  import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
  import org.apache.flink.api.common.typeinfo.TypeInformation;
  import org.apache.flink.api.java.functions.KeySelector;
  import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
  import org.apache.flink.streaming.api.functions.ProcessFunction;
  import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
  import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
  import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
  import org.apache.flink.streaming.util.BroadcastOperatorTestHarness;
  import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
  import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
  import org.apache.flink.streaming.util.ProcessFunctionTestHarnesses;
  import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
  import org.apache.flink.util.Collector;
  import org.junit.Assert;
  import org.junit.Test;
  import lombok.AllArgsConstructor;
  import lombok.Data;
  import lombok.NoArgsConstructor;
  /*
   * @Author: alanchan
   * 
   * @LastEditors: alanchan
   * 
   * @Description:
   */
  public class TestProcessOperatorDemo3 {
      @Data
      @NoArgsConstructor
      @AllArgsConstructor
      static class User {
          private int id;
          private String name;
          private int age;
          private String city;
      }
      // 测试ProcessFunction 的 processElement
      @Test
      public void testProcessFunction() throws Exception {
          // public abstract class ProcessFunction<I, O>
          ProcessFunction<String, String> function = new ProcessFunction<String, String>() {
              @Override
              public void processElement(
                      String value, Context ctx, Collector<String> out) throws Exception {
                  out.collect("vx->" + value);
              }
          };
          OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
                  .forProcessFunction(function);
          harness.processElement("alanchanchn", 10);
          Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList("vx->alanchanchn"));
      }
      // 测试KeyedProcessFunction 的 processElement
      @Test
      public void testKeyedProcessFunction() throws Exception {
          // public abstract class KeyedProcessFunction<K, I, O>
          KeyedProcessFunction<String, String, String> function = new KeyedProcessFunction<String, String, String>() {
              @Override
              public void processElement(String value, KeyedProcessFunction<String, String, String>.Context ctx,
                      Collector<String> out) throws Exception {
                  out.collect("vx->" + value);
              }
          };
          OneInputStreamOperatorTestHarness<String, String> harness = ProcessFunctionTestHarnesses
                  .forKeyedProcessFunction(function, x -> "name", BasicTypeInfo.STRING_TYPE_INFO);
          harness.processElement("alanchan", 10);
          Assert.assertEquals(harness.extractOutputValues(), Collections.singletonList(1));
      }
      // 测试CoProcessFunction 的 processElement1、processElement2
      @Test
      public void testCoProcessFunction() throws Exception {
          // public abstract class CoProcessFunction<IN1, IN2, OUT>
          CoProcessFunction<String, User, User> function = new CoProcessFunction<String, User, User>() {
              @Override
              public void processElement1(String value, CoProcessFunction<String, User, User>.Context ctx,
                      Collector<User> out) throws Exception {
                  String[] userStr = value.split(",");
                  out.collect(
                          new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
              }
              @Override
              public void processElement2(User value, CoProcessFunction<String, User, User>.Context ctx,
                      Collector<User> out) throws Exception {
                  out.collect(value);
              }
          };
          TwoInputStreamOperatorTestHarness<String, User, User> harness = ProcessFunctionTestHarnesses
                  .forCoProcessFunction(function);
          harness.processElement2(new User(2, "alan", 19, "bj"), 100);
          harness.processElement1("1,alanchan,18,sh", 10);
          Assert.assertEquals(harness.extractOutputValues(),
                  Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
      }
      // 测试KeyedCoProcessFunction 的 processElement1和processElement2
      @Test
      public void testKeyedCoProcessFunction() throws Exception {
          // public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT>
          KeyedCoProcessFunction<String, String, User, User> function = new KeyedCoProcessFunction<String, String, User, User>() {
              @Override
              public void processElement1(String value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
                      Collector<User> out) throws Exception {
                  String[] userStr = value.split(",");
                  out.collect(
                          new User(Integer.parseInt(userStr[0]), userStr[1], Integer.parseInt(userStr[2]), userStr[3]));
              }
              @Override
              public void processElement2(User value, KeyedCoProcessFunction<String, String, User, User>.Context ctx,
                      Collector<User> out) throws Exception {
                  out.collect(value);
              }
          };
          // public static <K,IN1,IN2,OUT>
          // KeyedTwoInputStreamOperatorTestHarness<K,IN1,IN2,OUT>
          // forKeyedCoProcessFunction(
          // KeyedCoProcessFunction<K,IN1,IN2,OUT> function,
          // KeySelector<IN1,K> keySelector1,
          // KeySelector<IN2,K> keySelector2,
          // TypeInformation<K> keyType)
          KeyedTwoInputStreamOperatorTestHarness<String, String, User, User> harness = ProcessFunctionTestHarnesses
                  .forKeyedCoProcessFunction(function, new KeySelector<String, String>() {
                      @Override
                      public String getKey(String value) throws Exception {
                          return value.split(",")[3];
                      }
                  }, new KeySelector<User, String>() {
                      @Override
                      public String getKey(User value) throws Exception {
                          return value.getCity();
                      }
                  }, TypeInformation.of(String.class));
          harness.processElement2(new User(2, "alan", 19, "bj"), 100);
          harness.processElement1("1,alanchan,18,sh", 10);
          Assert.assertEquals(harness.extractOutputValues(),
                  Arrays.asList(new User(1, "alanchan", 18, "sh"), new User(2, "alan", 19, "bj")));
      }
      // 测试 BroadcastProcessFunction 的 processElement 和 processBroadcastElement
      @Test
      public void testBroadcastOperator() throws Exception {
          // 定义广播
          // 数据格式:
          // sh,上海
          // bj,北京
          // public class MapStateDescriptor<UK, UV>
          MapStateDescriptor<String, String> broadcastDesc = new MapStateDescriptor("Alan_RulesBroadcastState",
                  String.class,
                  String.class);
          // public abstract class BroadcastProcessFunction<IN1, IN2, OUT>
          // * @param <IN1> The input type of the non-broadcast side.
          // * @param <IN2> The input type of the broadcast side.
          // * @param <OUT> The output type of the operator.
          BroadcastProcessFunction<User, String, User> function = new BroadcastProcessFunction<User, String, User>() {
              // 负责处理广播流的元素
              @Override
              public void processBroadcastElement(String value, BroadcastProcessFunction<User, String, User>.Context ctx,
                      Collector<User> out) throws Exception {
                  System.out.println("收到广播数据:" + value);
                  // 得到广播流的存储状态
                  ctx.getBroadcastState(broadcastDesc).put(value.split(",")[0], value.split(",")[1]);
              }
              // 处理非广播流,关联维度
              @Override
              public void processElement(User value, BroadcastProcessFunction<User, String, User>.ReadOnlyContext ctx,
                      Collector<User> out) throws Exception {
                  // 得到广播流的存储状态
                  ReadOnlyBroadcastState<String, String> state = ctx.getBroadcastState(broadcastDesc);
                  value.setCity(state.get(value.getCity()));
                  out.collect(value);
              }
          };
          BroadcastOperatorTestHarness<User, String, User> harness = ProcessFunctionTestHarnesses
                  .forBroadcastProcessFunction(function, broadcastDesc);
          harness.processBroadcastElement("sh,上海", 10);
          harness.processBroadcastElement("bj,北京", 20);
          harness.processElement(new User(2, "alan", 19, "bj"), 10);
          harness.processElement(new User(1, "alanchan", 18, "sh"), 30);
          Assert.assertEquals(harness.extractOutputValues(),
                  Arrays.asList(new User(1, "alanchan", 18, "上海"), new User(2, "alan", 19, "北京")));
      }
  }

 三、测试 Flink 作业

  1、JUnit 规则 MiniClusterWithClientResource

  Apache Flink 提供了一个名为 MiniClusterWithClientResource 的 Junit 规则,用于针对本地嵌入式小型集群测试完整的作业。 叫做 MiniClusterWithClientResource.

  要使用 MiniClusterWithClientResource,需要添加一个额外的依赖项(测试范围)。

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-test-utils</artifactId>
      <version>1.17.2</version>    
      <scope>test</scope>
  </dependency>

 让我们采用与前面几节相同的简单 MapFunction来做示例。

 /*
   * @Author: alanchan
   * @LastEditors: alanchan
   * @Description: 
   */
  package com.win;
  import static org.junit.Assert.assertFalse;
  import static org.junit.Assert.assertTrue;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
  import java.util.List;
  import org.apache.flink.api.common.functions.MapFunction;
  import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  import org.apache.flink.streaming.api.functions.sink.SinkFunction;
  import org.apache.flink.test.util.MiniClusterResourceConfiguration;
  import org.apache.flink.test.util.MiniClusterWithClientResource;
  import org.junit.ClassRule;
  import org.junit.Test;
  public class TestExampleIntegrationDemo {
      static class AlanIncrementMapFunction implements MapFunction<Long, Long> {
          @Override
          public Long map(Long record) throws Exception {
              return record + 1;
          }
      }
      @ClassRule
      public static MiniClusterWithClientResource flinkCluster = new MiniClusterWithClientResource(
              new MiniClusterResourceConfiguration.Builder()
                      .setNumberSlotsPerTaskManager(2)
                      .setNumberTaskManagers(1)
                      .build());
      @Test
      public void testIncrementPipeline() throws Exception {
          StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          // configure your test environment
          env.setParallelism(2);
          // values are collected in a static variable
          CollectSink.values.clear();
          // create a stream of custom elements and apply transformations
          env.fromElements(1L, 21L, 22L)
                  .map(new AlanIncrementMapFunction())
                  .addSink(new CollectSink());
          // execute
          env.execute();
          // verify your results
          assertTrue(CollectSink.values.containsAll(Arrays.asList(2L, 22L, 23L)));
      }
      // create a testing sink
      private static class CollectSink implements SinkFunction<Long> {
          // must be static
          public static final List<Long> values = Collections.synchronizedList(new ArrayList<>());
          @Override
          public void invoke(Long value, SinkFunction.Context context) throws Exception {
              values.add(value);
          }
      }
  }

 关于使用 MiniClusterWithClientResource 进行集成测试的几点备注:

  ·为了不将整个 pipeline 代码从生产复制到测试,请将你的 source 和 sink 在生产代码中设置成可插拔的,并在测试中注入特殊的测试 source 和测试 sink。

  · 这里使用 CollectSink 中的静态变量,是因为Flink 在将所有算子分布到整个集群之前先对其进行了序列化。 解决此问题的一种方法是与本地 Flink 小型集群通过实例化算子的静态变量进行通信。 或者,你可以使用测试的 sink 将数据写入临时目录的文件中。

  · 如果你的作业使用事件时间计时器,则可以实现自定义的 并行 源函数来发出 watermark。

  · 建议始终以 parallelism > 1 的方式在本地测试 pipeline,以识别只有在并行执行 pipeline 时才会出现的 bug。

  · 优先使用 @ClassRule 而不是 @Rule,这样多个测试可以共享同一个 Flink 集群。这样做可以节省大量的时间,因为 Flink 集群的启动和关闭通常会占用实际测试的执行时间。

  · 如果你的 pipeline 包含自定义状态处理,则可以通过启用 checkpoint 并在小型集群中重新启动作业来测试其正确性。为此,你需要在 pipeline 中(仅测试)抛出用户自定义函数的异常来触发失败。

  以上,本文详细的介绍了Flink的单元测试,分为有状态、无状态以及作业的测试,特别是针对无状态的单元测试给出了常见的使用示例。

 感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

 

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/397321.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

离谱,华为食堂也要搞末位淘汰

华为饭堂 末位淘汰 今天逛职场 App&#xff0c;无意间翻到一篇帖子&#xff1a; 点开图片之前&#xff0c;我还以为只是普通的争霸赛被网友解读为末位淘汰。 点开图片后我却发现 ... 可以看出&#xff0c;是深圳华为的行政部做的海报&#xff0c;里面清晰写到&#xff1a;员工的…

QT-地形3D

QT-地形3D 一、 演示效果二、关键程序三、下载链接 一、 演示效果 二、关键程序 #include "ShaderProgram.h"namespace t3d::core {void ShaderProgram::init() {initializeOpenGLFunctions();loadShaders(); }void ShaderProgram::addShader(const QString &fil…

如何使用Docker搭建YesPlayMusic网易云音乐播放器并发布至公网访问

文章目录 1. 安装Docker2. 本地安装部署YesPlayMusic3. 安装cpolar内网穿透4. 固定YesPlayMusic公网地址 本篇文章讲解如何使用Docker搭建YesPlayMusic网易云音乐播放器&#xff0c;并且结合cpolar内网穿透实现公网访问音乐播放器。 YesPlayMusic是一款优秀的个人音乐播放器&am…

JS逆向进阶篇【去哪儿旅行登录】【中篇-滑动轨迹破解补浏览器环境破参数】

目录&#xff1a; 每篇前言&#xff1a;0、整体分析1、逆向轨迹snapshot&#xff08;1&#xff09;分析&#xff1a;&#xff08;2&#xff09;Python轨迹生成&#xff1a;&#xff08;3&#xff09;AES加密&#xff1a;&#xff08;4&#xff09;轨迹加密&#xff1a;&#xf…

springcloud:1.Eureka详细讲解

Eureka 是 Netflix 开源的一个服务注册和发现工具,被广泛应用于微服务架构中。作为微服务架构中的核心组件之一,Eureka 提供了服务注册、发现和失效剔除等功能,帮助构建弹性、高可用的分布式系统。在现代软件开发领域,使用 Eureka 可以有效地管理和监控服务实例,实现服务之…

Qt Creator在#include第三方库不带.h后缀的文件时,没有智能提示和自动补全

1、问题截图 OSG文件目录下有很多头文件&#xff08;均不带.h后缀&#xff09;&#xff0c;Qt Creator可以识别到OSG目录&#xff0c;但是OSG目录下的所有头文件识别不到 2、原因 找到原因是因为Qt Creator开启了ClanCodeModel插件导致的 3、解决方法 1、在Qt Creator中…

GenAI的“关键一跃”:推理与知识

当前的人工智能领域正通过生成式人工智能&#xff08;GenAI&#xff09;经历一场重大转变。这一转变不仅代表了技术上的飞跃&#xff0c;更标志着人工智能领域的范式转变&#xff0c;引发了有关GenAI的独特特性及其深远影响的关键问题讨论。 植根于计算革命的丰富历史&#xff…

OpenCV人脸检测案例实战

人脸检测是一种计算机视觉技术&#xff0c;旨在识别图像或视频中的人脸。这项技术的基本内容包括使用特定的算法和模型来定位和识别人脸&#xff0c;通常涉及在图像中寻找面部特征&#xff0c;如眼睛、鼻子、嘴巴等&#xff0c;以便准确地确定人脸的位置和边界。人脸检测技术的…

LeetCode JS专栏刷题笔记(一)

一、前言 LeetCode 在前不久出了一个 JavaScript 专栏&#xff0c;这个专栏一个目的是为了非前端工程师学习 JS&#xff0c;另一个是为了前端工程师提升 JS 能力。 因此在这个专栏中&#xff0c;基本不涉及什么具体算法问题&#xff0c;都是一些 JS 的入门语法与常见的 JS 面…

安卓游戏开发之图形渲染技术优劣分析

一、引言 随着移动设备的普及和性能的提升&#xff0c;安卓游戏开发已经成为一个热门领域。在安卓游戏开发中&#xff0c;图形渲染技术是关键的一环。本文将对安卓游戏开发中常用的图形渲染技术进行分析&#xff0c;比较它们的优劣&#xff0c;并探讨它们在不同应用场景下的适用…

从零开始:开发多商户商城APP的技术指南

当下&#xff0c;电子商务正在飞速发展&#xff0c;多商户商城APP的需求也与日俱增。本篇文章&#xff0c;小编将为大家深度详解如何开发多商户商城APP。 1.确定功能需求 在着手开发多商户商城APP之前&#xff0c;首先需要明确功能需求。这包括但不限于&#xff1a; -用户注…

如何在CentOS安装SQL Server数据库并实现无公网ip环境远程连接

文章目录 前言1. 安装sql server2. 局域网测试连接3. 安装cpolar内网穿透4. 将sqlserver映射到公网5. 公网远程连接6.固定连接公网地址7.使用固定公网地址连接 前言 简单几步实现在Linux centos环境下安装部署sql server数据库&#xff0c;并结合cpolar内网穿透工具&#xff0…

MongoDB文档插入

文章目录 MongoDB文档插入对比增删改查文档插入 MongoDB写安全机制非确认式写入 MongoDB文档查询参数说明查询操作符比较查询操作符逻辑查询操作符元素查询操作符数组查询操作符 模糊查询区别:$regex操作符中的option选项 MongoDB游标介绍游标函数手动迭代游标示例游标介绍 Mon…

揭秘智能商品计划管理系统:为何服装企业老板争相引入?

在如今日新月异的商业环境中&#xff0c;服装企业老板们纷纷将目光转向了一种名为“智能商品计划管理系统”的创新工具。这种系统不仅具有高度的自动化和智能化特性&#xff0c;还能显著提升企业的运营效率、减少库存积压&#xff0c;并帮助企业在激烈的市场竞争中占据优势地位…

xilinx除法器的使用

平台&#xff1a;Vivado2018.3. 芯片&#xff1a;xcku115-flva1517-2-i (active) 最近学习使用了xilinx除法器&#xff0c;在使用过程中出现了很多次除法器的结果和我预计的结果不一致&#xff0c;特此记录学习一下。 参考文件&#xff1a;pg151.下载地址 pg151-div-gen.pdf …

简单了解一下加密算法

1.1单向散列算法 单向散列函数算法也称 Hash(哈希)算法&#xff0c;是一种将任意长度的消息压缩到某一固定长度(消 息摘要)的函数(该过程不可逆)。Hash 函数可用于数字签名、消息的完整性检测、消息起源的认 证检测等。常见的散列算法有MD5 、SHA 、RIPE-MD 、HAVAL 、N-Hash等…

【Pygame手册03/20】用 pygame 模块draw绘制形状

目录 一、说明二、画图函数2.1 接口draw下的函数2.2 pygame.draw.rect()2.3 pygame.draw.polygon()2.4 pygame.draw.circle()2.5 pygame.draw.ellipse()2.6 pygame.draw.arc()2.7 pygame.draw.line ()2.8 pygame.draw.lines()2.9 pygame.draw.aaline()2.10 pygame.draw.aaline…

【EI会议征稿通知】2024年通信安全与信息处理国际学术会议(CSIP 2024)

2024年通信安全与信息处理国际学术会议&#xff08;CSIP 2024) 2024 International Conference on Communication Security and Information Processing 随着全球信息化的深入发展&#xff0c;通信安全与信息处理已成为当今社会关注的热点问题。为了加强国际间的学术交流与合…

Java之获取Nginx代理之后的客户端IP

Java之获取Nginx代理之后的客户端IP Nginx代理接口之后&#xff0c;后台获取的IP地址都是127.0.0.1&#xff0c;解决办法是需要配置Nginx搭配后台获取的方法&#xff0c;获得设备的真实地址。我们想要获取的就是nginx代理日志中的这个IP nginx配置 首先在nginx代理的对应lo…

opencv安装介绍以及基本图像处理详解

文章目录 一、什么是OpenCV &#xff1f;二. OpenCV 安装1. 下载地址2.安装命令&#xff1a;pip install opencv-python 三、图像基础1. 基本概念2. 坐标系3. 基本操作&#xff08;彩色图片&#xff09;&#xff08;1&#xff09;读取图片&#xff1a;cv2.imread( )&#xff08…