HDFS源码解析---写数据流程

太长不看版

1、写入(create)创建DFSOutputStream,启动DataStreamer线程run (主线程)

2、setPipeline -> nextBlockOutputStream -> locateFollowingBlock(addBlock)

2、createBlockOutputStream (client -> dn1 -> dn2 -> dn3)启动blockStream(实际用来写数据)

4、new ResponseProcessor 并启动线程run

5、按照packet粒度发送 packet 到datanode

a、writeChunk -> waitAndQueueCurrentPacket -> dataQueue

b、DataStreamer run方法不断从dataQueue队列take出来发送

c、收到ack后放入ackQueue

6、写完一个block 后endBlock -> 关闭response 线程 -> 关闭blockStream 线程

7、写下一个block 重复2 - 6

8、complete

一、总体流程

1、客户端向NameNode发出写文件请求。

  2、检查是否已存在文件、检查权限。若通过检查,直接先将操作写入EditLog,并返回输出流对象。

    (注:WAL,write ahead log,先写Log,再写内存,因为EditLog记录的是最新的HDFS客户端执行所有的写操作。如果后续真实写操作失败了,

    由于在真实写操作之前,操作就被写入EditLog中了,故EditLog中仍会有记录)

  3、client端按128MB的块切分文件。

  4、client将NameNode返回的DataNode列表和Data数据一同发送给最近的第一个DataNode节点,此后client端和多个DataNode构成pipeline管道。

    client向第一个DataNode写入一个packet,这个packet便会在pipeline里传给第二个、第三个…DataNode。

    在pipeline反方向上,逐个发送ack(命令正确应答),最终由pipeline中第一个DataNode节点将ack发送给client。

  5、写完数据,关闭输输出流.

  6、发送完成信号给NameNode。

二、代码细节

1、创建文件

        通常情况下,我们在创建文件的时候会新建一个FileSystem对象,然后调用create方法。

FileSystem.java

  /**
   * Create an FSDataOutputStream at the indicated Path with write-progress
   * reporting.
   * @param f the file name to open
   * @param permission
   * @param overwrite if a file with this name already exists, then if true,
   *   the file will be overwritten, and if false an error will be thrown.
   * @param bufferSize the size of the buffer to be used.
   * @param replication required block replication for the file.
   * @param blockSize
   * @param progress
   * @throws IOException
   * @see #setPermission(Path, FsPermission)
   */
  public abstract FSDataOutputStream create(Path f,
      FsPermission permission,
      boolean overwrite,
      int bufferSize,
      short replication,
      long blockSize,
      Progressable progress) throws IOException;

        这是一个抽象类,DistributedFileSystem 继承了这个抽象类,并实现了create方法。 

DistributedFileSystem.java

@Override
  public FSDataOutputStream create(Path f, FsPermission permission,
      boolean overwrite, int bufferSize, short replication, long blockSize,
      Progressable progress) throws IOException {
    return this.create(f, permission,
        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
            : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
        blockSize, progress, null);
  }
  @Override
  public FSDataOutputStream create(final Path f, final FsPermission permission,
    final EnumSet<CreateFlag> cflags, final int bufferSize,
    final short replication, final long blockSize, final Progressable progress,
    final ChecksumOpt checksumOpt) throws IOException {
    statistics.incrementWriteOps(1);
    Path absF = fixRelativePart(f);
    return new FileSystemLinkResolverWithStatistics<FSDataOutputStream>(proxyParameter) {
      @Override
      public FSDataOutputStream doCall(final Path p)
          throws IOException, UnresolvedLinkException {
        final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
                cflags, replication, blockSize, progress, bufferSize,
                checksumOpt);
        return dfs.createWrappedOutputStream(dfsos, statistics);
      }
      @Override
      public FSDataOutputStream next(final FileSystem fs, final Path p)
          throws IOException {
        return fs.create(p, permission, cflags, bufferSize,
            replication, blockSize, progress, checksumOpt);
      }
    }.resolve(this, absF, OpType.CREATE, OpType.CREATE_SPAN, OpType.CREATE_EXC, OpType.CREATE_EXC_SPAN);
  }

        然后调用DFSClient

DFSClient.java

/**
   * Call {@link #create(String, FsPermission, EnumSet, boolean, short, 
   * long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
   *  set to true.
   */
  public DFSOutputStream create(String src, 
                             FsPermission permission,
                             EnumSet<CreateFlag> flag, 
                             short replication,
                             long blockSize,
                             Progressable progress,
                             int buffersize,
                             ChecksumOpt checksumOpt)
      throws IOException {
    return create(src, permission, flag, true,
        replication, blockSize, progress, buffersize, checksumOpt, null);
  }

        往下看,最终调用了底层的ClientProtocal.create()方法通知Namenode进行操作

final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
        src, masked, flag, createParent, replication, blockSize, progress,
        buffersize, dfsClientConf.createChecksum(checksumOpt),
        favoredNodeStrs);
    beginFileLease(result.getFileId(), result);

 2、创建文件

@AtMostOnce
public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize, 
    CryptoProtocolVersion[] supportedVersions)
    throws AccessControlException, AlreadyBeingCreatedException,
    DSQuotaExceededException, FileAlreadyExistsException,
    FileNotFoundException, NSQuotaExceededException,
    ParentNotDirectoryException, SafeModeException, UnresolvedLinkException,
    SnapshotAccessControlException, IOException;

        NamenodeRPCServer.create() 实现了这了ClientProtocal的接口(响应了这个请求)。最终我们获得了一个DFSOutputStream对象。

                下面我们来看下NamenodeRPCServer.create()。

@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
      String clientName, EnumSetWritable<CreateFlag> flag,
      boolean createParent, short replication, long blockSize, 
      CryptoProtocolVersion[] supportedVersions)
      throws IOException {
    checkNNStartup();
    String clientMachine = getClientMachine();
    if (stateChangeLog.isDebugEnabled()) {
      stateChangeLog.debug("*DIR* NameNode.create: file "
                         +src+" for "+clientName+" at "+clientMachine);
    }
    if (!checkPathLength(src)) {
      throw new IOException("create: Pathname too long.  Limit "
          + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
    }
    namesystem.checkOperation(OperationCategory.WRITE);
    HdfsFileStatus fileStatus = namesystem.startFile(src, new PermissionStatus(
        getRemoteUser().getShortUserName(), null, masked),
        clientName, clientMachine, flag.get(), createParent, replication,
        blockSize, supportedVersions);
    metrics.incrFilesCreated();
    metrics.incrCreateFileOps();
    return fileStatus;
  }

         这里的namenode.startFIle方法会调用startFileInt方法,创建文件目录。

        startFileInternal方法最终用于创建文件或者覆盖一个文件。

2、pipeline写数据

        获取了DFSOutputStream之后,HDFS 客户端就可以向数据流管道写数据了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/407401.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【前端素材】推荐优质后台管理系统Qovex平台模板(附源码)

一、需求分析 1、定义 后台管理系统是一种用于管理和监控网站、应用程序或系统的在线工具。它通常是通过网页界面进行访问和操作&#xff0c;用于管理网站内容、用户权限、数据分析等。后台管理系统是网站或应用程序的控制中心&#xff0c;管理员可以通过后台系统进行各种管理…

关于字符集(彻底搞清楚一个中文占几个字节?)

目录 一、字符集二、ASCII码(字符编码)三、ISO-8859-1(字符集)四、GBxxx(字符集)五、Unicode码(字符集)六、UTF-8(字符编码)总结 一、字符集 编码与解码 计算机中储存的信息都是用二进制数表示的而我们在屏幕上看到的数字、英文、标点符号、汉字等字符是二进制数转换之后的结…

Ubuntu20.04和Windows11下配置StarCraft II环境

1.Ubuntu20.04 根据下面这篇博客就可以顺利安装&#xff1a; 强化学习实战(九) Linux下配置星际争霸Ⅱ环境https://blog.csdn.net/weixin_39059031/article/details/117247635?spm1001.2014.3001.5506 Ubuntu下显示游戏界面目前还没有解决掉。 大家可以根据以下链接看看能…

腾讯云ICP备案服务器多少钱?

腾讯云备案服务器多少钱&#xff1f;备案服务器只要62元一年&#xff0c;可以备案5个网站。备案服务器申请页面 https://curl.qcloud.com/oRMoSucP 链接打开如下图&#xff1a; 腾讯云备案服务器价格 腾讯云支持ICP备案的服务器有以下三点限制条件&#xff1a; 1、地域必须是中…

PMP认证有什么用?含金量高吗?如何备考?

PMP备考多久能参加PMP考试&#xff0c;培训机构是关键点 依我这几年的持证体验来看&#xff0c;PMP认证的用处还是比较多的&#xff0c;也有一定的含金量&#xff0c;这两个方面基本都是随便一百度就能得到结果的&#xff0c;在考PMP的人群中唯一不同的可能就是备考方面的问题…

设计一个 shell 命令行程序

目录 实现 shell 主要思路 代码&#xff08;Linux&#xff09;系统 实现 shell 主要思路 1、要知道一个 shell 进程在运行起来都会在命令行呈现什么&#xff0c;如图是Xshell 登录成功后的界面&#xff1a;所以第一步要做的就是打印命令行提示符。 Xshell 命令行提示符的组…

SQL-Labs46关order by注入姿势

君衍. 四十六关 ORDER BY数字型注入1、源码分析2、rand()盲注3、if语句盲注4、时间盲注5、报错注入6、Limit注入7、盲注脚本 四十六关 ORDER BY数字型注入 请求方式注入类型拼接方式GET报错、布尔盲注、延时盲注ORDER BY $id 我们直接可以从界面中得知传参的参数为SORT&#x…

力扣 169. 多数元素

思路&#xff1a; 因为题目说一定存在多数元素&#xff0c;就说明一定有一个数的个数多于n/2 将数组采用冒泡从小到大排序&#xff0c;最中间的那个元素一定是多数元素&#xff08;因为在大小排好序后&#xff0c;中位数也一定是众数&#xff09; 答案&#xff1a; int maj…

【鸿蒙 HarmonyOS 4.0】UIAbility、页面及组件的生命周期

一、背景 主要梳理下鸿蒙系统开发中常用的生命周期 二、UIAbility组件 UIAbility组件是一种包含UI界面的应用组件&#xff0c;主要用于和用户交互。 UIAbility组件是系统调度的基本单元&#xff0c;为应用提供绘制界面的窗口&#xff1b;一个UIAbility组件中可以通过多个页…

数据价值在线化丨TiDB 在企查查数据中台的应用及 v7.1 版本升级体验

本文介绍了企查查在数据中台建设中使用 TiDB 的经验和应用。通过从 MySQL 到 TiDB 的迁移&#xff0c;企查查构建了基于 TiDB Flink 的实时数仓框架 &#xff0c;充分利用了 TiDB 的分布式架构、MySQL 兼容性和完善的周边工具等特性&#xff0c;实现了数据的在线化处理。2023 年…

svn客户端下载、安装、使用

下载、使用 打开360软件管家&#xff0c;选怎宝库&#xff0c;搜索svn&#xff0c;点击安装 可以修改安装路径 使用 在桌面右键弹出菜单&#xff0c;点击 输入地址&#xff0c;点击ok 输入用户名、密码 &#xff0c;等待检出完成

Java智慧工地管理云平台源码 带AI识别、桌面管理+大屏指挥+手机APP

目录 智慧工地云系统技术说明 1.什么是 AI 危险源识别 2.什么是标养室监测 3.什么是塔机监测 4.什么是塔机黑匣子 5.什么是升降机监测 6.什么是升降机黑匣子 7.什么是标养箱 8.什么是吊钩可视化 9.什么是吊钩追踪控制设备 10.什么是扬尘监测 11.什么是扬尘监测设备…

基于SpringBoot的停车场管理系统

基于SpringBootVue的停车场管理系统的设计与实现~ 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBootMyBatis工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 前台首页 停车位 个人中心 管理员界面 摘要 摘要&#xff1a;随着城市化进程的…

测绘测量行业CRM功能大揭秘:哪家才是最佳选择?

测绘测量行业面临着处理及管理海量数据的难题。办公软件进行数据记录是非常繁琐的&#xff0c;往往需要花费大量的时间来查找所需的信息&#xff0c;甚至造成内容丢失。测绘测量企业运用CRM管理系统至关重要。本文将向您介绍测绘测量行业CRM功能、哪家好&#xff1f; CRM软件的…

03 表数据基本操作

文章目录 插入(insert)查询(select)where子句更新表记录(update)删除表记录&#xff08;delete&#xff09;表字段的操作(alter)时间类型数据 插入(insert) insert into 表名 values(值1&#xff0c;值2...),(值1&#xff0c;值2...),...; insert into 表名 (字段1,...) value…

【办公类-16-10-02】“2023下学期 6个中班 自主游戏观察记录(python 排班表系列)

背景需求&#xff1a; 已经制作了本学期的中4班自主游戏观察记录表 【办公类-16-10-01】“2023下学期 中4班 自主游戏观察记录&#xff08;python 排班表系列&#xff09;-CSDN博客文章浏览阅读398次&#xff0c;点赞10次&#xff0c;收藏3次。【办公类-16-10-01】“2023下学…

Python零基础安装教程(包含各种安装方案)以及PyCharm安装步骤

文章目录 安装方案介绍安装Python解释器安装Pycharm安装Anaconda方案参考 安装方案介绍 方案1&#xff1a;只安装Python解释器&#xff08;适合小型开发&#xff09; 方案2&#xff1a;安装集成开发环境Anaconda&#xff08;集成了Spyder&#xff0c;JupyterNotebook等编辑器&…

代码随想录算法训练营Day27 || leetCode 93.复原IP地址 || 78.子集 || 90.子集II

93.复原IP地址 与分割回文串的代码相近&#xff0c;先写出判断函数&#xff0c;之后以判断结果为标准&#xff0c;执行回溯的代码。此题中使用点的个数来决定回溯的终止&#xff0c;需要学习一下。 class Solution { private:vector<string> result;bool isValid(const …

从事游戏开发类职业岗位的任职资格

游戏开发工程师主要是指致力于游戏总体设计&#xff0c;负责游戏开发和维护工具的设计与开发工作。并配合主程序完成游戏架构的设计&#xff0c;与其他技术支持的工作。从事这项工作&#xff0c;需要掌握的知识和技能非常多&#xff0c;下面就跟着小编来一起了解一下吧。 1、任…

高级RAG:使用RAGAs + LlamaIndex进行RAG评估,包括原理、图和代码

原文地址&#xff1a;Using RAGAs LlamaIndex for RAG evaluation 2024 年 2 月 5 日 如果您已经为实际的业务系统开发了检索增强生成&#xff08;Retrieval Augmented Generation, RAG&#xff09;应用程序&#xff0c;那么您可能会关心它的有效性。换句话说&#xff0c;您…