【Flink网络传输】ShuffleMaster与ShuffleEnvironment创建细节与提供的能力

文章目录

  • 一. Taskmanager之间传递数据细节
  • 二. ShuffleService的设计与实现
  • 三. 在JobMaster中创建ShuffleMaster
  • 四. 在TaskManager中创建ShuffleEnvironment
  • 五. 基于ShuffleEnvironment创建ResultPartition
    • 1. 在task启动时创建ResultPartition
    • 2. ResultPartition的创建与对数据的行为
    • 3. 创建ResultSubpartitions与 应用与流或批场景
  • 六. 基于ShuffleEnvironment创建InputGate
    • 1. 在哪里创建的InputGate
    • 2. SingleInputGate的创建和提供的能力
      • 2.1. 创建SingleInputGate
      • 2.2. InputChannel的创建与处理同一个tm的数据或跨tm的数据的能力

一. Taskmanager之间传递数据细节

Flink作业最终会被转换为ExecutionGraph并拆解成Task,在TaskManager中调度并执行,Task实例之间会发生跨TaskManager节点的数据交换,尤其是在DataStream API中使用了物理分区操作的情况。

ResultPartition组件存放中间结果等待下游节点消费:

从ExecutionGraph到物理执行图的转换中可以看出,ExecutionVertex最终会被转换为Task实例运行,在ExecutionGraph中上游节点产生的数据被称为IntermediateResult,物理执行图对应ResultPartition组件。在ResultPartition组件中会根据分区的数量再细分为ResultSubPartition。在ResultSubPartition中主要有BufferConsumer队列,用于本地存储Buffer数据,供下游的Task节点消费使用。

InputChannel读取上游数据

对下游的Task实例来讲,主要依赖InputGate组件读取上游数据,在InputGate组件中InputChannel和上游的ResultSubPartition数量相同(发送逻辑是?起到shuffle的作用)。
因此RecordWriter向ResultPartition中的ResultSubPartition写入Buffer数据,就是在向下游的InputChannel写入数据,因为最终会从ResultSubPartition的队列中读取Buffer数据再经过TCP网络连接发送到对应的InputChannel中。

在这里插入图片描述

ResultPartition(存储中间结果集)和InputGate(读取中间结果集)组件的创建

TaskManager接收到JobManager的Task创建请求时,会根据TaskDeploymentDescriptor中的参数创建并初始化ResultPartition和InputGate组件。Task启动成功并开始接入数据后,使用ResultPartition和InputGate组件实现上下游算子之间的跨网络数据传输

ShuffleMaster管理ResultPartition和InputGate。

在TaskManager实例中,主要通过ShuffleEnvironment统一创建ResultPartition和InputGate组件。在JobMaster中也会创建ShuffleMaster统一管理和监控作业中所有的ResultPartition和InputGate组件

 

因此在介绍ResultPartition和InputGate之前,我们先了解一下ShuffleMaster和ShuffleEnvironment的主要作用和创建过程。

 

二. ShuffleService的设计与实现

如图,创建ShuffleMaster和ShuffleEnvironment组件主要依赖ShuffleServiceFactory实现。同时为了实现可插拔的ShuffleService服务,ShuffleServiceFactory的实现类通过Java SPI的方式加载到ClassLoader中,即通过ShuffleServiceLoader从配置文件中加载系统配置的ShuffleServiceFactory实现类,因此用户也可以自定义实现Shuffle服务。

基于SPI的方式加载ShuffleServiceFactory

在JobManager内部创建JobManagerRunner实例的过程中会创建ShuffeServiceLoader,用于通过Java SPI服务的方式加载配置的ShuffleServiceFactory,同时在TaskManager的TaskManagerServices中创建ShuffeServiceLoader并加载ShuffleServiceFactory。

ShuffleServiceFactory提供了创建ShuffleMaster和ShuffleEnvironment的能力
ShuffleServiceFactory接口定义中包含创建ShuffleMaster和ShuffleEnvironment的方法。Flink提供了基于Netty通信框架实现的NettyShuffleServiceFactory,作为ShuffleServiceFactory接口的默认实现类。

ShuffleEnvironment组件提供了创建Task实例中ResultPartition和InputGate组件的方法,同时Flink中默认提供了NettyShuffleEnvironment实现。

ShuffleMaster组件实现了对ResultPartition和InputGate的注册功能

ShuffleMaster组件实现了对ResultPartition和InputGate的注册功能,同时每个作业都有ShuffleMaster管理当前作业的ResultPartition和InputGate等信息,Flink中提供了NettyShuffleMaster默认实现。

ShuffleService UML关系图

在这里插入图片描述

 

三. 在JobMaster中创建ShuffleMaster

创建ShuffleMaster,ShuffleEnvironment的大致过程

  • 通过ShuffleServiceFactory可以创建ShuffleMaster和ShuffleEnvironment服务,其中ShuffleMaster主要用在JobMaster调度和执行Execution时,维护当前作业中的ResultPartition信息,例如ResourceID、ExecutionAttemptID等
  • 紧接着JobManager会将ShuffleMaster创建的NettyShuffleDescriptor参数信息发送给对应的TaskExecutor实例,在TaskExecutor中就会基于NettyShuffleDescriptor的信息,通过ShuffleEnvironment组件创建ResultPartition、InputGate等组件。

分配slot资源,并将分区信息注册到ShuffleMaster中

如代码清单,在JobMaster开始向Execution分配Slot资源时,会通过分配的Slot计算资源获取TaskManagerLocation信息,然后调用Execution.registerProducedPartitions()方法将分区信息注册到ShuffleMaster中。

CompletableFuture<Execution> allocateResourcesForExecution(
      SlotProviderStrategy slotProviderStrategy,
      LocationPreferenceConstraint locationPreferenceConstraint,
      @Nonnull Set<AllocationID> allPreviousExecutionGraphAllocationIds) {
   return allocateAndAssignSlotForExecution(
      slotProviderStrategy,
      locationPreferenceConstraint,
      allPreviousExecutionGraphAllocationIds)
      .thenCompose(slot -> registerProducedPartitions(slot.getTaskManagerLocation()));
}

Execution.registerProducedPartitions()方法逻辑如下。

  1. 创建ProducerDescriptor对象,其中包含了分区生产者的基本信息,例如网络连接地址和端口以及TaskManagerLocation信息
  2. 获取当前ExecutionVertex节点对应的IntermediateResultPartition信息,在IntermediateResultPartition结构中包含了ExecutionVertex、IntermediateResultPartitionID以及ExecutionEdge等逻辑分区信息。
  3. 遍历IntermediateResultPartition列表,将IntermediateResultPartition转换为PartitionDescriptor数据结构,然后调用ExecutionGraph的ShuffleMaster服务,将创建的PartitionDescriptor和ProducerDescriptor注册到ShuffleMaster服务中
  4. 根据ShuffleDescriptor创建ResultPartitionDeploymentDescriptor并添加到partitionRegistrations集合中。(producedPartitions信息会被TaskManager的ShuffleEnvironment用于创建ResultPartition和InputGate等组件。
static CompletableFuture<Map<IntermediateResultPartitionID, ResultPartitionDep
   loymentDescriptor>> registerProducedPartitions(
      ExecutionVertex vertex,
      TaskManagerLocation location,
      ExecutionAttemptID attemptId,
      boolean sendScheduleOrUpdateConsumersMessage) {
     // 创建ProducerDescriptor
   ProducerDescriptor producerDescriptor = 
       ProducerDescriptor.create(location, attemptId);
     // 获取当前节点的partition信息
   Collection<IntermediateResultPartition> partitions = 
       vertex.getProducedPartitions().values();
   Collection<CompletableFuture<ResultPartitionDeploymentDescriptor>> 
      partitionRegistrations =
      new ArrayList<>(partitions.size());
     // 向ShuffleMaster注册partition信息
   for (IntermediateResultPartition partition : partitions) {
      PartitionDescriptor partitionDescriptor = PartitionDescriptor.from(partition);
      int maxParallelism = getPartitionMaxParallelism(partition);
      // 调用ShuffleMaster注册partitionDescriptor和producerDescriptor
      CompletableFuture<? extends ShuffleDescriptor> shuffleDescriptorFuture = vertex
         .getExecutionGraph()
         .getShuffleMaster()
         .registerPartitionWithProducer(partitionDescriptor, producerDescriptor);
      Preconditions.checkState(shuffleDescriptorFuture.isDone(), 
         "ShuffleDescriptor future is incomplete.");
      // 创建ResultPartitionDeploymentDescriptor实例
      CompletableFuture<ResultPartitionDeploymentDescriptor> 
         partitionRegistration = 
          shuffleDescriptorFuture
         .thenApply(shuffleDescriptor -> new ResultPartitionDeploymentDescriptor(
            partitionDescriptor,
            shuffleDescriptor,
            maxParallelism,
            sendScheduleOrUpdateConsumersMessage));
      // 添加到partitionRegistrations集合中
      partitionRegistrations.add(partitionRegistration);
   }
   // 转换存储结构
   return FutureUtils.combineAll(partitionRegistrations).thenApply(rpdds -> {
      Map<IntermediateResultPartitionID, ResultPartitionDeploymentDescriptor> 
         producedPartitions =
         new LinkedHashMap<>(partitions.size());
      rpdds.forEach(rpdd -> producedPartitions.put(rpdd.getPartitionId(), rpdd));
      return producedPartitions;
   });
}

 

四. 在TaskManager中创建ShuffleEnvironment

从fromConfiguration创建并启动shuffleEnvironment

在TaskManagerServices的启动过程中会创建并启动ShuffleEnvironment。如代码,在TaskManagerServices.fromConfiguration()方法中包含创建和启动ShuffleEnvironment的过程。和ShuffleMaster的创建过程一样,在TaskManagerServices.createShuffleEnvironment()方法中,也会通过Java SPI的方式加载ShuffleServiceFactory实现类,然后创建ShuffleEnvironment。

public static TaskManagerServices fromConfiguration(...)  
        throws Exception {
        。。。
   // 调用createShuffleEnvironment创建ShuffleEnvironment
final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
   taskManagerServicesConfiguration,
   taskEventDispatcher,
   taskManagerMetricGroup);
// 启动shuffleEnvironment
final int dataPort = shuffleEnvironment.start();
...
}

 

NettyShuffleEnvironment的创建过程,以及它提供的能力:

在Flink中默认提供基于Netty通信框架实现的NettyShuffleServiceFactory实现类,创建NettyShuffleEnvironment。
ShuffleEnvironment控制了TaskManager中网络数据交换需要的全部服务和组件信息,包括创建上下游数据传输的ResultPartition、SingleInput以及用于网络栈中Buffer数据缓存的NetworkBufferPool等

这里了解NettyShuffleEnvironment的创建过程:

  1. 从NettyShuffleEnvironmentConfiguration参数中获取Netty相关配置,例如TransportType、InetAddress、serverPort以及numberOfSlots等信息。
  2. 创建ResultPartitionManager实例,注册和管理TaskManager中的ResultPartition信息,并提供创建ResultSubpartitionView的方法,专门用于消费ResultSubpartition中的Buffer数据
  3. 创建FileChannelManager实例,指定配置中的临时文件夹,然后创建并获取文件的FileChannel。对于离线类型的作业,会将数据写入文件系统,再对文件进行处理,这里的实现和MapReduce算法类似(ing)。
  4. 创建ConnectionManager实例,主要用于InputChannel组件。
    InputChannel会通过ConnectionManager创建PartitionRequestClient,实现和ResultPartition之间的网络连接。ConnectionManager会根据NettyConfig是否为空,选择创建NettyConnectionManager还是LocalConnectionManager。
  5. 创建NetworkBufferPool组件,用于向ResultPartition和InputGate组件提供Buffer内存存储空间,实际上就是分配和管理MemorySegment内存块
  6. 向系统中注册ShuffleMetrics,用于跟踪Shuffle过程的监控信息
  7. 创建ResultPartitionFactory工厂类,用于创建ResultPartition。
  8. 创建SingleInputGateFactory工厂类,用于创建SingleInputGate。

将以上创建的组件或服务作为参数来创建NettyShuffleEnvironment。

NettyShuffleServiceFactory.createNettyShuffleEnvironment()
static NettyShuffleEnvironment createNettyShuffleEnvironment(
      NettyShuffleEnvironmentConfiguration config,
      ResourceID taskExecutorResourceId,
      TaskEventPublisher taskEventPublisher,
      MetricGroup metricGroup) {
   // 检查参数都不能为空
。。。
   // 获取Netty相关的配置参数
   NettyConfig nettyConfig = config.nettyConfig();
   // 创建ResultPartitionManager实例
   ResultPartitionManager resultPartitionManager = new ResultPartitionManager();
   // 创建FileChannelManager实例
   FileChannelManager fileChannelManager = 
       new FileChannelManagerImpl(config.getTempDirs(), DIR_NAME_PREFIX);
   // 创建ConnectionManager实例
   ConnectionManager connectionManager = 
       nettyConfig != null ?
       new NettyConnectionManager(resultPartitionManager, 
                                  taskEventPublisher, nettyConfig)
       : new LocalConnectionManager();
   // 创建NetworkBufferPool实例
   NetworkBufferPool networkBufferPool = new NetworkBufferPool(
      config.numNetworkBuffers(),
      config.networkBufferSize(),
      config.networkBuffersPerChannel(),
      config.getRequestSegmentsTimeout());
   // 注册ShuffleMetrics信息
   registerShuffleMetrics(metricGroup, networkBufferPool);
   // 创建ResultPartitionFactory实例
   ResultPartitionFactory resultPartitionFactory = new ResultPartitionFactory(
      resultPartitionManager,
      fileChannelManager,
      networkBufferPool,
      config.getBlockingSubpartitionType(),
      config.networkBuffersPerChannel(),
      config.floatingNetworkBuffersPerGate(),
      config.networkBufferSize(),
      config.isForcePartitionReleaseOnConsumption(),
      config.isBlockingShuffleCompressionEnabled(),
      config.getCompressionCodec());
   // 创建SingleInputGateFactory实例
   SingleInputGateFactory singleInputGateFactory = new SingleInputGateFactory(
      taskExecutorResourceId,
      config,
      connectionManager,
      resultPartitionManager,
      taskEventPublisher,
      networkBufferPool);
   // 最后返回NettyShuffleEnvironment
   return new NettyShuffleEnvironment(
      taskExecutorResourceId,
      config,
      networkBufferPool,
      connectionManager,
      resultPartitionManager,
      fileChannelManager,
      resultPartitionFactory,
      singleInputGateFactory);
}

至此,创建NettyShuffleEnvironment的过程就基本完成了,接下来TaskManager会接受JobMaster提交的Task申请(这是一个被动过程?为了开口子接收其他task的数据?),然后通过ShuffleEnvironment为Task实例创建ResultPartition和InputGate组件。创建这些组件的信息来自ShuffleMaster中注册的ResultPartition和ExecutionEdge等信息。

 
接下来我们具体了解如何通过ShuffleEnvironment创建ResultPartition和InputGate两个重要组件。

 

五. 基于ShuffleEnvironment创建ResultPartition

1. 在task启动时创建ResultPartition

task启动时就创建ResultPartition

当TaskManager接收到JobMaster提交的Task作业申请后,就会创建并启动Task线程。
如代码所示,Task的构造器方法包含了NettyShuffleEnvironment创建ResultPartitionWriter的实现,可以理解为在创建Task线程的时候就通过ShuffleEnvironment创建了ResultPartition

反压控制:动态控制数据向下游输出

创建好ResultPartitionWriter后,对ResultPartitionWriter进行装饰,目的是让ResultPartition可以向下游节点发送ResultPartition是否可消费的信息,以便实现动态控制ResultPartitionWriter内的数据输出

org.apache.flink.runtime.taskmanager.Task
public Task(...){
final ShuffleIOOwnerContext taskShuffleContext = shuffleEnvironment
    .createShuffleIOOwnerContext(taskNameWithSubtaskAndId, executionId, 
                                 metrics.getIOMetricGroup());
// 创建ResultPartitonWriter
final ResultPartitionWriter[] resultPartitionWriters = 
   shuffleEnvironment.createResultPartitionWriters(
   taskShuffleContext,
   resultPartitionDeploymentDescriptors).toArray(new ResultPartitionWriter[] {});
// 对ResultPartiton进行装饰
this.consumableNotifyingPartitionWriters = 
   ConsumableNotifyingResultPartitionWriterDecorator.decorate(
   resultPartitionDeploymentDescriptors,
   resultPartitionWriters,
   this,
   jobId,
   resultPartitionConsumableNotifier);

}

2. ResultPartition的创建与对数据的行为

如代码,接着看创建ResultPartition的主要逻辑。

  1. 根据resultPartitionDeploymentDescriptors的大小初始化ResultPartition数组。
  2. 通过resultPartitionFactory创建ResultPartition。
  3. 调用registerOutputMetrics()方法注册resultPartitions相关的监控指标信息。
  4. 返回创建的ResultPartition数组。
NettyShuffleEnvironment.createResultPartitionWriters()
public Collection<ResultPartition> createResultPartitionWriters(
      ShuffleIOOwnerContext ownerContext,
      Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeployment
         Descriptors) {
   synchronized (lock) {
      Preconditions
          .checkState(!isClosed, 
                      "The NettyShuffleEnvironment has already been shut down.");
      // 根据resultPartitionDeploymentDescriptors创建ResultPartition数组
      ResultPartition[] resultPartitions = 
          new ResultPartition[resultPartitionDeploymentDescriptors.size()];
      int counter = 0;
      // 遍历ResultPartitionDeploymentDescriptor创建ResultPartition
      for (ResultPartitionDeploymentDescriptor rpdd : 
           resultPartitionDeploymentDescriptors) {
         resultPartitions[counter++] = 
             resultPartitionFactory.create(ownerContext.getOwnerName(), rpdd);
      }
      registerOutputMetrics(config.isNetworkDetailedMetrics(), 
                            ownerContext.getOutputGroup(), resultPartitions);
      return  Arrays.asList(resultPartitions);
   }
}

 

继续了解ResultPartition的创建过程

  1. 判断ResultPartitionType是否为Blocking类型,如果是则需要创建BufferCompressor,用于压缩Buffer数据,即在离线数据处理过程中通过BufferCompressor压缩Buffer数据。
  2. 根据numberOfSubpartitions对应的数量创建ResultSubpartition数组,并存储当前ResultPartition中的ResultSubpartition。
  3. 根据ResultPartitionType参数创建ResultPartition,如果ResultPartitionType是Blocking类型,则创建ReleaseOnConsumptionResultPartition,即数据消费完便立即释放ResultPartition。否则创建ResultSubpartition,即不会随着数据消费完之后进行释放,适用于流数据处理场景
  4. 调用createSubpartitions()方法创建ResultSubpartition。ResultSubpartition会有ID进行区分,并和InputGate中的InputChannel一一对应
//ResultPartitionFactory.create()
public ResultPartition create(
      String taskNameWithSubtaskAndId,
      ResultPartitionID id,
      ResultPartitionType type,
      int numberOfSubpartitions,
      int maxParallelism,
      FunctionWithException<BufferPoolOwner, BufferPool, IOException> 
         bufferPoolFactory)
{
   BufferCompressor bufferCompressor = null;
   // 如果ResultPartitionType是Blocking类型,则需要创建BufferCompressor,用于数据压缩
   if (type.isBlocking() && blockingShuffleCompressionEnabled) {
      bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec);
   }
   // 创建ResultSubpartition数组
   ResultSubpartition[] subpartitions = new ResultSubpartition
      [numberOfSubpartitions];
   // 根据条件创建ResultPartition
   ResultPartition partition = forcePartitionReleaseOnConsumption || !type.isBlocking()
      ? new ReleaseOnConsumptionResultPartition(
         taskNameWithSubtaskAndId,
         id,
         type,
         subpartitions,
         maxParallelism,
         partitionManager,
         bufferCompressor,
         bufferPoolFactory)
      : new ResultPartition(
         taskNameWithSubtaskAndId,
         id,
         type,
         subpartitions,
         maxParallelism,
         partitionManager,
         bufferCompressor,
         bufferPoolFactory);
   // 创建Subpartitions
   createSubpartitions(partition, type, blockingSubpartitionType, subpartitions);
   LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);
   return partition;
}

 

3. 创建ResultSubpartitions与 应用与流或批场景

  • 在创建ResultSubpartitions的时候,也会根据ResultPartitionType是否为Blocking类型,选择创建BoundedBlockingPartitions(用于有界批计算处理场景)或PipelinedSubpartition(用于无界流式数据集处理场景)。
  • 在PipelinedSubpartition中会以subpartitions的数组索引作为ResultPartition中的index,也就是说,ResultPartition主要通过index确认数据写入哪个ResultSubPartition。
private void createSubpartitions(
      ResultPartition partition,
      ResultPartitionType type,
      BoundedBlockingSubpartitionType blockingSubpartitionType,
      ResultSubpartition[] subpartitions) {
   // 创建ResultSubpartitions.
   if (type.isBlocking()) {
      initializeBoundedBlockingPartitions(
         subpartitions,
         partition,
         blockingSubpartitionType,
         networkBufferSize,
         channelManager);
   } else {
      for (int i = 0; i < subpartitions.length; i++) {
         subpartitions[i] = new PipelinedSubpartition(i, partition);
      }
   }
}

 

六. 基于ShuffleEnvironment创建InputGate

1. 在哪里创建的InputGate

和ResultPartition创建过程相似,Task的初始化过程中也会创建InputGate。如代码,Task构造器方法中涵盖了InputGate的创建逻辑。

final InputGate[] gates = shuffleEnvironment.createInputGates(
   taskShuffleContext,
   this,
   inputGateDeploymentDescriptors).toArray(new InputGate[] {});
this.inputGates = new InputGate[gates.length];
int counter = 0;
for (InputGate gate : gates) {
   inputGates[counter++] = new InputGateWithMetrics(gate, metrics.
      getIOMetricGroup().getNumBytesInCounter());
}

接下来具体看NettyShuffleEnvironment.createInputGates()的逻辑

  1. 获取networkInputGroup信息,用于创建InputChannelMetrics。
  2. 根据inputGateDeploymentDescriptorsShufflemanager传递的,那这个数量是怎么确定的?ing)数组的大小创建SingleInputGate数组,用于存储SingleInputGate组件。
  3. 根据InputGateDeploymentDescriptor创建SingleInputGate
  4. 注册InputGate的监控信息,并返回SingleInputGate集合。
public Collection<SingleInputGate> createInputGates(
      ShuffleIOOwnerContext ownerContext,
      PartitionProducerStateProvider partitionProducerStateProvider,
      Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
   synchronized (lock) {
      Preconditions.checkState(!isClosed, "The NettyShuffleEnvironment has 
         already been shut down.");
      MetricGroup networkInputGroup = ownerContext.getInputGroup();
      @SuppressWarnings("deprecation")
      InputChannelMetrics inputChannelMetrics = 
          new InputChannelMetrics(networkInputGroup, ownerContext.
             getParentGroup());
      SingleInputGate[] inputGates = 
          new SingleInputGate[inputGateDeploymentDescriptors.size()];
      int counter = 0;
      //遍历igdd通过singleInputGateFactory创建inputGate
      for (InputGateDeploymentDescriptor igdd : inputGateDeploymentDescriptors) {
         SingleInputGate inputGate = singleInputGateFactory.create(
            ownerContext.getOwnerName(),
            igdd,
            partitionProducerStateProvider,
            inputChannelMetrics);
         InputGateID id = new InputGateID(igdd.getConsumedResultId(), 
                                          ownerContext.
                                              getExecutionAttemptID());
         inputGatesById.put(id, inputGate);
         inputGate.getCloseFuture().thenRun(() -> inputGatesById.remove(id));
         inputGates[counter++] = inputGate;
      }
      //注册metric
      registerInputMetrics(config.isNetworkDetailedMetrics(), networkInputGroup,
                           inputGates);
      return Arrays.asList(inputGates);
   }
}

 

2. SingleInputGate的创建和提供的能力

2.1. 创建SingleInputGate

继续看SingleInputGateFactory创建SingleInputGate的过程,如代码

  1. 创建createBufferPoolFactory,用于创建LocalBufferPool。通过LocalBufferPool可以为InputGate提供Buffer数据的存储空间,实现本地缓冲接入InputGate中的二进制数据。
  2. 根据结果分区类型和是否支持压缩决定是否创建BufferDecompressor,这里和ResultPartition中的BufferCompressor是对应的,即通过BufferDecompressor解压经过BufferCompressor压缩后的Buffer数据。
  3. 通过InputGateDeploymentDescriptor中的参数BufferCompressor和BufferPoolFactory创建SingleInputGate对象。
  4. 调用createInputChannels()方法创建SingleInputGate中的InputChannels。
  5. 将创建完成的inputGate返回给Task实例。
public SingleInputGate create(
      @Nonnull String owningTaskName,
      @Nonnull InputGateDeploymentDescriptor igdd,
      @Nonnull PartitionProducerStateProvider partitionProducerStateProvider,
      @Nonnull InputChannelMetrics metrics) {
   SupplierWithException<BufferPool, IOException> bufferPoolFactory = 
       createBufferPoolFactory(
      networkBufferPool,
      networkBuffersPerChannel,
      floatingNetworkBuffersPerGate,
      igdd.getShuffleDescriptors().length,
      igdd.getConsumedPartitionType());
   BufferDecompressor bufferDecompressor = null;
   if (igdd.getConsumedPartitionType().isBlocking() 
       && blockingShuffleCompressionEnabled) {
      bufferDecompressor = new BufferDecompressor(networkBufferSize, 
         compressionCodec);
   }
   SingleInputGate inputGate = new SingleInputGate(
      owningTaskName,
      igdd.getConsumedResultId(),
      igdd.getConsumedPartitionType(),
      igdd.getConsumedSubpartitionIndex(),
      igdd.getShuffleDescriptors().length,
      partitionProducerStateProvider,
      bufferPoolFactory,
      bufferDecompressor);
      //创建SingleInputGate中的InputChannels。
   createInputChannels(owningTaskName, igdd, inputGate, metrics);
   return inputGate;
}

SingleInputGateFactory.createInputChannels()方法定义了创建指定SingleInputGate对应的InputChannel集合。

  1. 获取ShuffleDescriptor列表,ShuffleDescriptor是在ShuffleMaster中创建和生成的,描述了数据生产者和ResultPartition等信息。
  2. 创建InputChannel数组,最后将其存储到inputGate中。可以看出每个resultPartitionID对应一个InputChannel。
private void createInputChannels(
      String owningTaskName,
      InputGateDeploymentDescriptor inputGateDeploymentDescriptor,
      SingleInputGate inputGate,
      InputChannelMetrics metrics) {
   ShuffleDescriptor[] shuffleDescriptors = 
      inputGateDeploymentDescriptor.getShuffleDescriptors();
   // 创建InputChannel
   InputChannel[] inputChannels = new InputChannel[shuffleDescriptors.length];
   ChannelStatistics channelStatistics = new ChannelStatistics();
   for (int i = 0; i < inputChannels.length; i++) {
      inputChannels[i] = createInputChannel(
         inputGate,
         i,
         shuffleDescriptors[i],
         channelStatistics,
         metrics);
      ResultPartitionID resultPartitionID = inputChannels[i].getPartitionId();
      inputGate.setInputChannel(resultPartitionID.getPartitionId(), inputChannels[i]);
   }
   LOG.debug("{}: Created {} input channels ({}).",
      owningTaskName,
      inputChannels.length,
      channelStatistics);
}

2.2. InputChannel的创建与处理同一个tm的数据或跨tm的数据的能力

概述

在SingleInputGateFactory.createInputChannel()方法中定义了创建InputChannel的具体逻辑,
同时会根据ShuffleDescriptor实现类是否为NettyShuffleDescriptor决定创建UnknownInputChannel还是系统内置的LocalInputChannel和RemoteInputChannel。

重点了解LocalInputChannel和RemoteInputChannel的创建过程。

创建内置InputChannel的主要逻辑:

[!NOTE]
判断消费数据的Task实例和数据生产的Task实例是否运行在同一个TaskManager中。这一步主要是在判断producerLocation和consumerLocation是否相等,

  • 如果相等则说明上下游Task属于同一TaskManager,创建的InputChannel就为LocalInputChannel,下游InputChannel不经过网络获取数据
  • 不相等,则说明上下游Task不在同一个TaskManager中,此时创建基于Netty框架实现的RemoteInputChannel,帮助下游Task实例从网络中消费上游Task中的Buffer数据。

在RemoteInputChannel中需要networkBufferPool、connectionManager等组件,对于LocalInputChannel则不需要这些组件。在ShuffleMaster注册分区信息的时候(when:在申请好tm资源后?),创建上下游Task的连接信息,此时会根据Task分配的Slot信息,传入ProducerLocation和ConsumerLocation等配置信息,然后创建不同的InputChannel,从而实现上下游Task的网络连接。

private InputChannel createKnownInputChannel(
      SingleInputGate inputGate,
      int index,
      NettyShuffleDescriptor inputChannelDescriptor,
      ChannelStatistics channelStatistics,
      InputChannelMetrics metrics) {
   ResultPartitionID partitionId = inputChannelDescriptor.getResultPartitionID();
   if (inputChannelDescriptor.isLocalTo(taskExecutorResourceId)) {
      // Task实例属于同一个TaskManager
      channelStatistics.numLocalChannels++;
      return new LocalInputChannel(
         inputGate,
         index,
         partitionId,
         partitionManager,
         taskEventPublisher,
         partitionRequestInitialBackoff,
         partitionRequestMaxBackoff,
         metrics);
   } else {
      // Task实例属于不同的TaskManager
      channelStatistics.numRemoteChannels++;
      return new RemoteInputChannel(
         inputGate,
         index,
         partitionId,
         inputChannelDescriptor.getConnectionId(),
         connectionManager,
         partitionRequestInitialBackoff,
         partitionRequestMaxBackoff,
         metrics,
         networkBufferPool);
   }
}

 

到这里,ResultPartition和InputGate组件就全部创建完毕了。Task实例会将ResultPartition和InputGate组件封装在环境信息中,然后传递给StreamTask。StreamTask获取ResultPartition和InputGate,用于创建StreamNetWorkTaskInput和RecordWriter组件,从而完成Task中数据的输入和输出。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/436683.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WSL2安装+深度学习环境配置

WSL2安装深度学习环境配置 1 安装WSL22 配置深度学习环境1.1 设置用户名、密码1.2 安装cuda修改WSL安装路径 1.3 安装Anaconda 参考&#xff1a;搭建Windows Linux子系统&#xff08;WSL2&#xff09;CUDA环境 参考&#xff1a;深度学习环境配置 WindowsWSL2 1 安装WSL2 WSL …

java-springboot 源码 01

01.springboot 是一个启动器 先安装maven&#xff0c;按照网上的流程来。主要是安装完成后&#xff0c;要修改conf目录下的setting.xml文件。 添加&#xff1a;阿里云镜像 <mirror><id>aliyunmaven</id><mirrorOf>*</mirrorOf><name>ali…

判断连续数据同意特征的方法:插旗法

bool isMonotonic(int* nums, int numsSize) {int flag 2;for (int i 1; i < numsSize; i) {if (nums[i-1] > nums[i]) {if (flag 0)return false;flag 1;}else if (nums[i-1] < nums[i]) {if (flag 1)return false;flag 0;}}return true; }此代码较为简单&…

【Python】新手入门(8):什么是迭代?迭代的作用是什么?

【Python】新手入门&#xff08;8&#xff09;&#xff1a;什么是迭代&#xff1f;迭代有什么应用&#xff1f; &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】…

《时代教育》是什么级别的期刊?是正规期刊吗?能评职称吗?

问题解答 问&#xff1a;《时代教育》杂志是正规刊物吗&#xff1f; 答&#xff1a;是的&#xff0c;国家新闻出版总署正式备案的期刊 问&#xff1a;《时代教育》是什么级别刊物&#xff1f; 答&#xff1a;省级 主管单位&#xff1a;成都日报报业集团 主办单位&#x…

<C++>【继承篇】

​ ✨前言✨ &#x1f393;作者&#xff1a;【 教主 】 &#x1f4dc;文章推荐&#xff1a; ☕博主水平有限&#xff0c;如有错误&#xff0c;恳请斧正。 &#x1f4cc;机会总是留给有准备的人&#xff0c;越努力&#xff0c;越幸运&#xff01; &#x1f4a6;导航助手&#x1…

主题乐园如何让新客变熟客,让游客变“留客”?

群硕跨越时间结识了一位爱讲故事的父亲&#xff0c;他汇集了一群幻想工程师&#xff0c;打算以故事为基础&#xff0c;建造一个梦幻的主题乐园。 这个乐园后来成为全球游客最多、收入最高的乐园之一&#xff0c;不仅在2023财年创下了近90亿&#xff08;美元&#xff09;的营收…

软件测试必学的16个高频数据库操作及命令

数据库作为软件系统数据的主要存取与操作中心&#xff0c;广泛应用于企业当中。在企业中常用的数据库管理系统有 ORACLE、MS SQL SERVER、MySQL等。其中以免费的 MySQL 最多&#xff0c;特别在中小型互联网公司里。 因此&#xff0c;本文的数据库操作是基于 MySQL 数据库系统下…

c# 二分查找(迭代与递归)

二分搜索被定义为一种在排序数组中使用的搜索算法&#xff0c;通过重复将搜索间隔一分为二。二分查找的思想是利用数组已排序的信息&#xff0c;将时间复杂度降低到O(log N)。 二分查找算法示例 何时在数据结构中应用二分查找的条件&#xff1a; 应用二分查找算法&#xff1a…

平台工程师的崛起:如何应对日益复杂的软件

平台工程只是 DevOps 专业化的另一个术语&#xff0c;还是有什么不同&#xff1f;事实可能介于两者之间。DevOps 及其相关的 DevXOps 风格具有浓厚的文化色彩&#xff0c;将各个团队置于中心位置。不幸的是&#xff0c;在许多地方&#xff0c;DevOps 导致了新的问题&#xff0c…

OpenAI 大声朗读出来

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Kubernetes: 本地部署dashboard

本篇文章主要是介绍如何在本地部署kubernetes dashboard, 部署环境是mac m2 下载dashboard.yaml 官网release地址: kubernetes/dashboard/releases 本篇文章下载的是kubernetes-dashboard-v2.7.0的版本&#xff0c;通过wget命令下载到本地: wget https://raw.githubusercont…

成都正信:亲戚借了钱一直不还怎么委婉的说

在中国传统文化中&#xff0c;亲情关系往往被视为最为重要和敏感的部分。当亲戚间发生借贷时&#xff0c;若出现拖欠不还的情形&#xff0c;处理起来尤为棘手。面对这样的尴尬局面&#xff0c;采取委婉而有效的沟通方式至关重要。 张华最近就遇到了这样的困扰。他的表弟去年因急…

vue3中的生命周期有哪些和怎么使用?

目录 前言&#xff1a; 正文&#xff1a; 总结: 前言&#xff1a; Vue.js 3是Vue.js框架的最新主要版本&#xff0c;引入了一些重大的改变和增强。在Vue 3中&#xff0c;由于Composition API的引入&#xff0c;生命周期钩子被替换为生命周期函数。 正文&#xff1a; 以下是…

回调函数、回调地狱、解放方法Promise的用法

回调函数 回调函数的定义非常简单&#xff1a;一个函数被当做一个实参传入到另一个函数(外部函数)&#xff0c;并且这个函数在外部函数内被调用&#xff0c;用来完成某些任务的函数。就称为回调函数回调函数的两种写法(实现效果相同)&#xff1a; const text () > {docum…

Python算法题集_N 皇后

Python算法题集_N 皇后 题51&#xff1a;N 皇后1. 示例说明2. 题目解析- 题意分解- 优化思路- 测量工具 3. 代码展开1) 标准求解【规则遍历合理性回溯】2) 改进版一【线状态检测合理性回溯】3) 改进版二【单行矩阵回溯】 4. 最优算法5. 相关资源 本文为Python算法题集之一的代码…

文生视频Sora模型发布,是否引爆AI芯片热潮

文生视频Sora模型发布&#xff0c;是否引爆AI芯片热潮 1. 引言 在人工智能的历史长河中&#xff0c;每一次技术的飞跃都伴随着社会生产力的巨大变革。自2015年以来&#xff0c;深度学习技术的突破性进展&#xff0c;尤其是在自然语言处理、图像识别和机器学习等领域的成功应…

检测螺栓扭矩的方法有哪些——SunTorque智能扭矩系统

螺栓扭矩的检测是确保螺栓连接紧固程度和安全性的重要环节。正确的扭矩检测能够预防螺栓松动、断裂等潜在风险&#xff0c;从而保障设备和结构的稳定运行。SunTorque智能扭矩系统接下来将详细介绍螺栓扭矩的检测方法。 螺栓扭矩的检测是确保螺栓连接紧固程度和安全性的重要环节…

刷题笔记day27-回溯算法3

39. 组合总和 var path []int var tmp []int var result [][]int// 还是需要去重复&#xff0c;题目中要求的是至少一个数字备选的数量不同。 // 所以需要剪枝操作&#xff0c;右边的要比左边的> func combinationSum(candidates []int, target int) [][]int {// 组合问题pa…

Ubuntu环境配置-LinuxQQ篇

本教程下载Linux QQ的版本是linuxqq_3.0.0-571_amd64.deb 一、下载LinuxQQ 直接使用wget命令下载链接&#xff0c;下载文件 wget https://dldir1.qq.com/qqfile/qq/QQNT/c005c911/linuxqq_3.0.0-571_amd64.deb 二、安装LinuxQQ 当下载完成后&#xff0c;运行命令&#xff1a;…