Apache Hudi初探(二)(与flink的结合)--flink写hudi的操作(JobManager端的提交操作)

背景

在Apache Hudi初探(一)(与flink的结合)中,我们提到了Pipelines.hoodieStreamWrite 写hudi文件,这个操作真正写hudi是在Pipelines.hoodieStreamWrite方法下的transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory),具体分析一下写入的过程。

分析

对于transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)这个代码片段,我们主要看operatorFactory 这个对象(transform这个操作是Flink框架的操作):

public class StreamWriteOperator<I> extends AbstractWriteOperator<I> {

  public StreamWriteOperator(Configuration conf) {
    super(new StreamWriteFunction<>(conf));
  }

  public static <I> WriteOperatorFactory<I> getFactory(Configuration conf) {
    return WriteOperatorFactory.instance(conf, new StreamWriteOperator<>(conf));
  }
}

最主要的hudi算子为StreamWriteOperator,其中最主要的操作是由StreamWriteFunction来完成的:

// StreamWriteFunction
  

   @Override
  public void initializeState(FunctionInitializationContext context) throws Exception {
    this.taskID = getRuntimeContext().getIndexOfThisSubtask();
    this.metaClient = StreamerUtil.createMetaClient(this.config);
    this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext());
    this.writeStatuses = new ArrayList<>();
    this.writeMetadataState = context.getOperatorStateStore().getListState(
        new ListStateDescriptor<>(
            "write-metadata-state",
            TypeInformation.of(WriteMetadataEvent.class)
        ));

    this.ckpMetadata = CkpMetadata.getInstance(this.metaClient.getFs(), this.metaClient.getBasePath());
    this.currentInstant = lastPendingInstant();
    if (context.isRestored()) {
      restoreWriteMetadata();
    } else {
      sendBootstrapEvent();
    }
    // blocks flushing until the coordinator starts a new instant
    this.confirming = true;
  }

  @Override
  public void open(Configuration parameters) throws IOException {
    this.tracer = new TotalSizeTracer(this.config);
    initBuffer();
    initWriteFunction();
  }

  @Override
  public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    if (inputEnded) {
      return;
    }
    snapshotState();
    // Reload the snapshot state as the current state.
    reloadWriteMetaState();
  }

  @Override
  public void snapshotState() {
    // Based on the fact that the coordinator starts the checkpoint first,
    // it would check the validity.
    // wait for the buffer data flush out and request a new instant
    flushRemaining(false);
  }

  @Override
  public void processElement(I value, ProcessFunction<I, Object>.Context ctx, Collector<Object> out) throws Exception {
    bufferRecord((HoodieRecord<?>) value);
  }

  • initializeState操作,主要是做一些初始化的操作

    • this.taskID = getRuntimeContext().getIndexOfThisSubtask();
      获取当前的task的索引下标,用来向operator coordinator发送event给operator coordinator,之后 StreamWriteOperatorCoordinator(operator coordinator) 进行处理,后续会说到StreamWriteOperatorCoordinator

    • metaClient = StreamerUtil.createMetaClient(this.config)
      writeClient = FlinkWriteClients.createWriteClient
      初始化hudi的元数据客户端(这里是HoodieTableMetaClient)和写入客户端(这里是HoodieFlinkWriteClient)

    • writeStatuses = new ArrayList<>()
      记录后续的写入hudi文件的信息

    • writeMetadataState = context.getOperatorStateStore().getListState
      记录写入hudi的元数据事件,会在后续的操作中,会包装成event发送给operator coordinator(StreamWriteOperatorCoordinator)

    • ckpMetadata = CkpMetadata.getInstance
      Flink的checkpoint的元数据信息路径,默认的路径是/${hoodie.basePath}/.hoodie/.aux/ckp_meta

    • currentInstant = lastPendingInstant()
      获取上次还没有完成的commit

    • restoreWriteMetadata或者sendBootstrapEvent,根据是否是从checkpoint恢复过来的进行不同消息的发送,
      这里的operator coordinator(StreamWriteOperatorCoordinator)会进行统一的处理,并初始化一个commit

  • open操作
    写入hudi前的前置操作,比如说 初始化TotalSizeTracer记录maxBufferSize便于flush操作
    根据write.operation的值(默认是upsert)选择后续的操作是insert或upsert或overwrite,这里是upsert

  • processElement操作
    这里对传入的HoodieRecord进行缓存,主要是bufferRecord做的事情,

    • 首先会获取bucketID,之后再往对应的bucket中插入数据
    • 如果超出write.batch.size(默认是128MB),则会进行flushBucket操作,该操作主要是写入hudi操作 //TODO: 具体的写入hudi操作
      • 首先会获取新的需要提交的commit
      • 再进行写入的实际操作
      • 写入的文件元数据信息回传到operator coordinator进行统一处理
  • snapshotState 操作

    • 调用flushRemaining 写入剩下的数据到hudi存储中
    • 重新加载当前写入的hudi文件元数据信息到当前flink的state中

hudi StreamWriteOperatorCoordinator作用

总的来说,StreamWriteOperatorCoordinator扮演的角色和在Spark中driver的角色一样,都是来最后来提交 元数据信息到huid中。
具体的作用还是得从具体的方法来看:

  @Override
  public void handleEventFromOperator(int i, OperatorEvent operatorEvent) {
    ValidationUtils.checkState(operatorEvent instanceof WriteMetadataEvent,
        "The coordinator can only handle WriteMetaEvent");
    WriteMetadataEvent event = (WriteMetadataEvent) operatorEvent;

    if (event.isEndInput()) {
      // handle end input event synchronously
      // wrap handleEndInputEvent in executeSync to preserve the order of events
      executor.executeSync(() -> handleEndInputEvent(event), "handle end input event for instant %s", this.instant);
    } else {
      executor.execute(
          () -> {
            if (event.isBootstrap()) {
              handleBootstrapEvent(event);
            } else {
              handleWriteMetaEvent(event);
            }
          }, "handle write metadata event for instant %s", this.instant
      );
    }
  }
  ...
  @Override
  public void notifyCheckpointComplete(long checkpointId) {
    executor.execute(
        () -> {
          // The executor thread inherits the classloader of the #notifyCheckpointComplete
          // caller, which is a AppClassLoader.
          Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
          // for streaming mode, commits the ever received events anyway,
          // the stream write task snapshot and flush the data buffer synchronously in sequence,
          // so a successful checkpoint subsumes the old one(follows the checkpoint subsuming contract)
          final boolean committed = commitInstant(this.instant, checkpointId);

          if (tableState.scheduleCompaction) {
            // if async compaction is on, schedule the compaction
            CompactionUtil.scheduleCompaction(metaClient, writeClient, tableState.isDeltaTimeCompaction, committed);
          }

          if (tableState.scheduleClustering) {
            // if async clustering is on, schedule the clustering
            ClusteringUtil.scheduleClustering(conf, writeClient, committed);
          }

          if (committed) {
            // start new instant.
            startInstant();
            // sync Hive if is enabled
            syncHiveAsync();
          }
        }, "commits the instant %s", this.instant
    );
  }
  • handleEventFromOperator方法用来接受task发送的消息

    • 对于BootStrap类型的WriteMetadataEvent(在StreamWriteFunction方法initializeState中),相当于函数初始化也就会触发
      该类型的消息由handleBootstrapEvent来处理(我们这里假设每个任务operator都完成了初始化的操作),对应的数据流如下:

      initInstant
         ||
         \/
      reset => startInstant
      

      startInstant 这里就会初始化一个hudi写操作的commit信息

    • 对于一般的write的信息的event,(比如说在processElement的flushBucket函数中),由handleWriteMetaEvent来处理:

       if (this.eventBuffer[event.getTaskID()] != null) {
       this.eventBuffer[event.getTaskID()].mergeWith(event);
       } else {
         this.eventBuffer[event.getTaskID()] = event;
       }
      

      这里只是加到变量名为eventBuffer 的WriteMetadataEvent类型的数组中,后续中会进行处理

    • 对于isEndInputtrue的event,这种一般source是基于文件的这种,这里先不讨论

  • notifyCheckpointComplete 当对应的checkpointId完成以后,该方法会被调用

    • commitInstant 提交hudi元数据,如果如果有发生异常,则回滚当前hudi对应的commit
    • scheduleCompaction && scheduleClustering 进行hui的CompcationClustering
    • 如果成功的提交了,则会开启一个新的commit,如果开了hive同步(hive_sync.enabled默认为false),则会同步元数据信息到hive

总结

用一张图总结一下交互方式,如下:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/85708.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Vue的Ajax请求-axios、前后端分离练习

Vue的Ajax请求 axios简介 ​ Axios&#xff0c;是Web数据交互方式&#xff0c;是一个基于promise [5]的网络请求库&#xff0c;作用于node.js和浏览器中&#xff0c;它是 isomorphic 的(即同一套代码可以运行在浏览器和node.js中)。在服务端它使用原生node.js http模块, 而在…

手写模拟SpringBoot核心流程(一):实现极简一个SpringBoot——模拟SpringBoot启动过程

前言 Spring Boot 是一个开源的框架&#xff0c;用于简化 Spring 应用程序的开发和部署。它建立在 Spring Framework 的基础上&#xff0c;内置了web服务器——tomcat和jetty&#xff0c;使得 Spring 应用的构建变得更加快速、简单和可维护。 本文通过实现一个SpringBoot&…

HTTP连接管理

基础知识&#xff1a;非持久连接 HTTP初始时1.0版本在浏览器每一次向服务器请求完资源都会立即断开TCP连接&#xff0c;如果想要请求多个资源&#xff0c;就必须建立多个连接&#xff0c;这就导致了服务端和客户端维护连接的开销。 例如&#xff1a;一个网页中包含文字资源也包…

第一百三十三天学习记录:数据结构与算法基础:串、数组和广义表(串Ⅱ)(王卓教学视频)

注&#xff1a;在之前学习C语言的时候&#xff0c;了解过这一块。其中对KMP算法进行了自学&#xff0c;前面的学习记录也有提到过。这一次根据视频教学再系统性的学习学习一次。 串的模式匹配算法 KMP算法

[oneAPI] 基于BERT预训练模型的SQuAD问答任务

[oneAPI] 基于BERT预训练模型的SQuAD问答任务 Intel Optimization for PyTorch and Intel DevCloud for oneAPI基于BERT预训练模型的SQuAD问答任务语料介绍数据下载构建 模型 结果参考资料 比赛&#xff1a;https://marketing.csdn.net/p/f3e44fbfe46c465f4d9d6c23e38e0517 Int…

GPT-4一纸重洗:从97.6%降至2.4%的巨大挑战

斯坦福大学和加州大学伯克利分校合作进行的一项 “How Is ChatGPTs Behavior Changing Over Time?” 研究表明&#xff0c;随着时间的推移&#xff0c;GPT-4 的响应能力非但没有提高&#xff0c;反而随着语言模型的进一步更新而变得更糟糕。 研究小组评估了 2023 年 3 月和 20…

Django视图-HttpRequest请求对象和HttpResponse响应对象

文章目录 HttpRequestHttpResponse实践request对象的属性和方法响应 def index(request): 这个request其实就是内部已经封装好的Http请求HttpRequest&#xff0c;它是一个请求对象Django中的视图主要用来接受Web请求&#xff0c;并做出响应。 视图的本质就是一个Python中的函数…

Eduma主题 - 线上教育WordPress主题/网站

Eduma主题 – 线上教育WordPress主题是为教育网站、LMS、培训中心、课程中心、学院、大学、学校、幼儿园而制作的。基于我们使用以前的主题eLearning WP构建WordPress LMS的经验&#xff0c;Education WP是下一代&#xff0c;也是围绕WordPress最好的教育主题之一&#xff0c;它…

Qt下拉菜单

1&#xff0c;QComboBox 2&#xff0c;setMenu()---设置下拉菜单 AI对话未来丨智能写作对话: setMenu()是QWidget类的一个成员函数&#xff0c;在Qt中用于将一个菜单作为一个控件的下拉菜单设置。具体来说&#xff0c;它会把相应的菜单对象与该控件关联&#xff0c;并在控件上…

PySpark-核心编程

2. PySpark——RDD编程入门 文章目录 2. PySpark——RDD编程入门2.1 程序执行入口SparkContext对象2.2 RDD的创建2.2.1 并行化创建2.2.2 获取RDD分区数2.2.3 读取文件创建 2.3 RDD算子2.4 常用Transformation算子2.4.1 map算子2.4.2 flatMap算子2.4.3 reduceByKey算子2.4.4 Wor…

(一)idea连接GitHub的全部流程(注册GitHub、idea集成GitHub、增加合作伙伴、跨团队合作、分支操作)

&#xff08;二&#xff09;Git在公司中团队内合作和跨团队合作和分支操作的全部流程&#xff08;一篇就够&#xff09;https://blog.csdn.net/m0_65992672/article/details/132336481 4.1、简介 Git是一个免费的、开源的*分布式**版本控制**系统*&#xff0c;可以快速高效地…

mongodb 数据库管理(数据库、集合、文档)

目录 一、数据库操作 1、创建数据库 2、删除数据库 二、集合操作 1、创建集合 2、删除集合 三、文档操作 1、创建文档 2、 插入文档 3、查看文档 4、更新文档 1&#xff09;update() 方法 2&#xff09;replace() 方法 一、数据库操作 1、创建数据库 创建数据库…

hive表的全关联full join用法

背景&#xff1a;实际开发中需要用到全关联的用法&#xff0c;之前没遇到过&#xff0c;现在记录一下。需求是找到两张表的并集。 全关联的解释如下&#xff1b; 下面建两张表进行测试 test_a表的数据如下 test_b表的数据如下&#xff1b; 写第一个full join 的SQL进行查询…

树莓派时间更新为中国区时间

一、测试环境为&#xff1a;树莓派3B piraspberrypi:~/workfile/lorawan/lorawan-gw $ uname -a Linux raspberrypi 6.1.21-v7 #1642 SMP Mon Apr 3 17:20:52 BST 2023 armv7l GNU/Linux 测试过程中&#xff0c;请确保树莓派连接网络 &#xff1b; 二、安装ntp相关命令&…

RK3399平台开发系列讲解(内核调试篇)Valgrind使用案例

🚀返回专栏总目录 文章目录 一、使用未初始化的内存案例二、内存泄露三、在内存被释放后进行读/写案例四、从已分配内存块的尾部进行读/写案例五、两次释放内存案例沉淀、分享、成长,让自己和他人都能有所收获!😄 📢Valgrind 是一个开源的内存调试和性能分析工具,用于…

【计算机网络篇】TCP协议

✅作者简介&#xff1a;大家好&#xff0c;我是小杨 &#x1f4c3;个人主页&#xff1a;「小杨」的csdn博客 &#x1f433;希望大家多多支持&#x1f970;一起进步呀&#xff01; TCP协议 1&#xff0c;TCP 简介 TCP&#xff08;Transmission Control Protocol&#xff09;是…

CSS中如何实现文字溢出省略号(text-overflow: ellipsis)效果?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ CSS中如何实现文字溢出省略号&#xff08;text-overflow: ellipsis&#xff09;效果&#xff1f;⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 …

阿里云服务器-修改ecs操作系统,把window系统更换成Linux操作系统

其他sql格式也在更新中&#xff0c;可直接查看这个系列&#xff0c;要是没有你需要的格式&#xff0c;可在评论或私信我 总目录 目录-后期更新打算 hive的nvl中的子查询 总目录我这个是window&#xff0c;默认应该都是window&#xff0c;我需要改成Linux系统第一步&#xff…

如何在Windows、Mac和Linux操作系统上安装Protocol Buffers(protobuf)编译器

&#x1f337;&#x1f341; 博主猫头虎 带您 Go to New World.✨&#x1f341; &#x1f984; 博客首页——猫头虎的博客&#x1f390; &#x1f433;《面试题大全专栏》 文章图文并茂&#x1f995;生动形象&#x1f996;简单易学&#xff01;欢迎大家来踩踩~&#x1f33a; &a…

排序算法之详解冒泡排序

引入 冒泡排序顾名思义&#xff0c;就是像冒泡一样&#xff0c;泡泡在水里慢慢升上来&#xff0c;由小变大。虽然冒泡排序和冒泡并不完全一样&#xff0c;但却可以帮助我们理解冒泡排序。 思路 一组无序的数组&#xff0c;要求我们从小到大排列 我们可以先将最大的元素放在数组…