Hadoop3.x源码解析

文章目录

  • 一、RPC通信原理解析
    • 1、概要
    • 2、代码demo
  • 二、NameNode启动源码解析
    • 1、概述
    • 2、启动9870端口服务
    • 3、加载镜像文件和编辑日志
    • 4、初始化NN的RPC服务端
    • 5、NN启动资源检查
    • 6、NN对心跳超时判断
    • 7、安全模式
  • 三、DataNode启动源码解析
    • 1、概述
    • 2、初始化DataXceiverServer
    • 3、初始化HTTP服务
    • 4、初始化DN的RPC服务端
    • 5、DN向NN注册
    • 6、向NN发送心跳
  • 四、HDFS上传源码解析
    • 1、概述
    • 2、create创建过程
      • 2.1 DN向NN发起创建请求
      • 2.2 NN处理DN的创建请求
      • 2.3 DataStreamer启动流程
    • 3、write上传过程
      • 3.1 向DataStreamer的队列里面写数据
      • 3.2 建立管道之机架感知(块存储位置)
      • 3.3 建立管道之Socket发送
      • 3.4 建立管道之Socket接收
      • 3.5 客户端接收DN写数据应答Response
  • 五、Yarn源码解析
    • 1、概述
    • 2、Yarn客户端向RM提交作业
    • 3、RM启动MRAppMaster
    • 4、调度器任务执行(YarnChild)
  • 六、MapReduce源码解析
    • 1、Job提交流程源码和切片源码详解
    • 2、MapTask & ReduceTask 源码解析
  • 七、Hadoop源码编译
    • 1、环境准备
    • 2、工具包安装
    • 3、编译源码

一、RPC通信原理解析

1、概要

模拟RPC的客户端、服务端、通信协议三者如何工作的

2、代码demo

在HDFSClient项目基础上创建包名com.atguigu.rpc,创建RPC协议

public interface RPCProtocol {

    long versionID = 666;

    void mkdirs(String path);
}

创建RPC服务端

public class NNServer implements RPCProtocol{

    @Override
    public void mkdirs(String path) {
        System.out.println("服务端,创建路径" + path);
    }

    public static void main(String[] args) throws IOException {

        Server server = new RPC.Builder(new Configuration())
                .setBindAddress("localhost")
                .setPort(8888)
                .setProtocol(RPCProtocol.class)
                .setInstance(new NNServer())
                .build();

        System.out.println("服务器开始工作");

        server.start();
    }
}

创建RPC客户端

public class HDFSClient {

    public static void main(String[] args) throws IOException {
        RPCProtocol client = RPC.getProxy(
                RPCProtocol.class,
                RPCProtocol.versionID,
                new InetSocketAddress("localhost", 8888),
                new Configuration());

        System.out.println("我是客户端");

        client.mkdirs("/input");
    }
}

测试,启动服务端,观察控制台打印:服务器开始工作,在控制台Terminal窗口输入,jps,查看到NNServer服务

启动客户端,观察客户端控制台打印:我是客户端,观察服务端控制台打印:服务端,创建路径/input

二、NameNode启动源码解析

1、概述

然后首先需要环境准备,导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>3.1.3</version>
  </dependency>

  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>3.1.3</version>
  </dependency>

  <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs-client</artifactId>
    <version>3.1.3</version>
    <scope>provided</scope>
  </dependency>
</dependencies>

ctrl+h或者双击shift全局查找namenode,进入NameNode.java,然后ctrl + f,查找main方法,点击createNameNode,点击最后default返回的NameNode,点击initialize初始化,核心方法就在里面

protected void initialize(Configuration conf) throws IOException {
  ... ...

  if (NamenodeRole.NAMENODE == role) {
  // 启动HTTP服务端(9870)
    startHttpServer(conf);
  }

  // 加载镜像文件和编辑日志到内存
  loadNamesystem(conf);
  startAliasMapServerIfNecessary(conf);

  // 创建NN的RPC服务端
  rpcServer = createRpcServer(conf);

  initReconfigurableBackoffKey();

  if (clientNamenodeAddress == null) {
    // This is expected for MiniDFSCluster. Set it now using 
    // the RPC server's bind address.
    clientNamenodeAddress = 
        NetUtils.getHostPortString(getNameNodeAddress());
    LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
        + " this namenode/service.");
  }
  if (NamenodeRole.NAMENODE == role) {
    httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }

  // NN启动资源检查
  startCommonServices(conf);
  startMetricsLogger(conf);
}

2、启动9870端口服务

点击startHttpServer

private void startHttpServer(final Configuration conf) throws IOException {
  httpServer = new NameNodeHttpServer(conf, this, getHttpServerBindAddress(conf));
  httpServer.start();
  httpServer.setStartupProgress(startupProgress);
}

protected InetSocketAddress getHttpServerBindAddress(Configuration conf) {
  InetSocketAddress bindAddress = getHttpServerAddress(conf);

  ... ...
  return bindAddress;
}

protected InetSocketAddress getHttpServerAddress(Configuration conf) {
  return getHttpAddress(conf);
}

public static InetSocketAddress getHttpAddress(Configuration conf) {
  return  NetUtils.createSocketAddr(
      conf.getTrimmed(DFS_NAMENODE_HTTP_ADDRESS_KEY, DFS_NAMENODE_HTTP_ADDRESS_DEFAULT));
}

public static final String  DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_NAMENODE_HTTP_PORT_DEFAULT;

public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT =
HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;

int  DFS_NAMENODE_HTTP_PORT_DEFAULT = 9870;

点击startHttpServer方法中的httpServer.start();

void start() throws IOException {
  ... ...
  // Hadoop自己封装了HttpServer,形成自己的HttpServer2
  HttpServer2.Builder builder = DFSUtil.httpServerTemplateForNNAndJN(conf,
      httpAddr, httpsAddr, "hdfs",
      DFSConfigKeys.DFS_NAMENODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY,
      DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY);
  ... ...

  httpServer = builder.build();

  ... ...

  httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
  httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
  setupServlets(httpServer, conf);
  httpServer.start();

  ... ...
}

点击setupServlets,这里就是一些控制台的各个功能页跳转

private static void setupServlets(HttpServer2 httpServer, Configuration conf) {
  httpServer.addInternalServlet("startupProgress",
    StartupProgressServlet.PATH_SPEC, StartupProgressServlet.class);
  httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class,
    true);
  httpServer.addInternalServlet("imagetransfer", ImageServlet.PATH_SPEC,
      ImageServlet.class, true);
}

3、加载镜像文件和编辑日志

点击loadNamesystem

protected void loadNamesystem(Configuration conf) throws IOException {
  this.namesystem = FSNamesystem.loadFromDisk(conf);
}

static FSNamesystem loadFromDisk(Configuration conf) throws IOException {

  checkConfiguration(conf);

  FSImage fsImage = new FSImage(conf,
      FSNamesystem.getNamespaceDirs(conf),
      FSNamesystem.getNamespaceEditsDirs(conf));

  FSNamesystem namesystem = new FSNamesystem(conf, fsImage, false);
  StartupOption startOpt = NameNode.getStartupOption(conf);
  if (startOpt == StartupOption.RECOVER) {
    namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
  }

  long loadStart = monotonicNow();
  try {
    namesystem.loadFSImage(startOpt);
  } catch (IOException ioe) {
    LOG.warn("Encountered exception loading fsimage", ioe);
    fsImage.close();
    throw ioe;
  }
  long timeTakenToLoadFSImage = monotonicNow() - loadStart;
  LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
  NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
  if (nnMetrics != null) {
    nnMetrics.setFsImageLoadTime((int) timeTakenToLoadFSImage);
  }
  namesystem.getFSDirectory().createReservedStatuses(namesystem.getCTime());
  return namesystem;
}

4、初始化NN的RPC服务端

点击createRpcServer,如第一章的服务端RPC开启,为客户端提供服务支持,客户端可以通过rpc协议发送指令

protected NameNodeRpcServer createRpcServer(Configuration conf)
    throws IOException {
  return new NameNodeRpcServer(conf, this);
}

public NameNodeRpcServer(Configuration conf, NameNode nn)
      throws IOException {
  ... ....  
    serviceRpcServer = new RPC.Builder(conf)
        .setProtocol(
            org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB.class)
        .setInstance(clientNNPbService)
        .setBindAddress(bindHost)
        .setPort(serviceRpcAddr.getPort())
        .setNumHandlers(serviceHandlerCount)
        .setVerbose(false)
        .setSecretManager(namesystem.getDelegationTokenSecretManager())
        .build();
  ... ....  
}

5、NN启动资源检查

点击startCommonServices

private void startCommonServices(Configuration conf) throws IOException {

  namesystem.startCommonServices(conf, haContext);

  registerNNSMXBean();
  if (NamenodeRole.NAMENODE != role) {
    startHttpServer(conf);
    httpServer.setNameNodeAddress(getNameNodeAddress());
    httpServer.setFSImage(getFSImage());
  }
  rpcServer.start();
  try {
    plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
        ServicePlugin.class);
  } catch (RuntimeException e) {
    String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
    LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
        pluginsValue, e);
    throw e;
  }
  ......
}

点击startCommonServices

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBean for the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
    checkAvailableResources();

    assert !blockManager.isPopulatingReplQueues();
    StartupProgress prog = NameNode.getStartupProgress();
    prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();

    // 安全模式
    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);

    // 启动块服务
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }
  
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
    dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress != null) ?
      serviceAddress.getHostName() : "";
}

点击NameNodeResourceChecker

public NameNodeResourceChecker(Configuration conf) throws IOException {
  this.conf = conf;
  volumes = new HashMap<String, CheckedVolume>();
  
  // dfs.namenode.resource.du.reserved默认值 1024 * 1024 * 100 =》100m
  duReserved = conf.getLong(DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_KEY,
      DFSConfigKeys.DFS_NAMENODE_DU_RESERVED_DEFAULT);
  
  Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
      .getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
  
  Collection<URI> localEditDirs = Collections2.filter(
      FSNamesystem.getNamespaceEditsDirs(conf),
      new Predicate<URI>() {
        @Override
        public boolean apply(URI input) {
          if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
            return true;
          }
          return false;
        }
      });

  // 对所有路径进行资源检查
  for (URI editsDirToCheck : localEditDirs) {
    addDirToCheck(editsDirToCheck,
        FSNamesystem.getRequiredNamespaceEditsDirs(conf).contains(
            editsDirToCheck));
  }

  // All extra checked volumes are marked "required"
  for (URI extraDirToCheck : extraCheckedVolumes) {
    addDirToCheck(extraDirToCheck, true);
  }
  
  minimumRedundantVolumes = conf.getInt(
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY,
      DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_DEFAULT);
}

点击checkAvailableResources

void checkAvailableResources() {
  long resourceCheckTime = monotonicNow();
  Preconditions.checkState(nnResourceChecker != null,
    "nnResourceChecker not initialized");

  // 判断资源是否足够,不够返回false
  hasResourcesAvailable = nnResourceChecker.hasAvailableDiskSpace();

  resourceCheckTime = monotonicNow() - resourceCheckTime;
  NameNode.getNameNodeMetrics().addResourceCheckTime(resourceCheckTime);
}

public boolean hasAvailableDiskSpace() {
  return NameNodeResourcePolicy.areResourcesAvailable(volumes.values(),
      minimumRedundantVolumes);
}

static boolean areResourcesAvailable(
    Collection<? extends CheckableNameNodeResource> resources,
    int minimumRedundantResources) {

  // TODO: workaround:
  // - during startup, if there are no edits dirs on disk, then there is
  // a call to areResourcesAvailable() with no dirs at all, which was
  // previously causing the NN to enter safemode
  if (resources.isEmpty()) {
    return true;
  }
  
  int requiredResourceCount = 0;
  int redundantResourceCount = 0;
  int disabledRedundantResourceCount = 0;

  // 判断资源是否充足
  for (CheckableNameNodeResource resource : resources) {
    if (!resource.isRequired()) {
      redundantResourceCount++;
      if (!resource.isResourceAvailable()) {
        disabledRedundantResourceCount++;
      }
    } else {
      requiredResourceCount++;
      if (!resource.isResourceAvailable()) {
        // Short circuit - a required resource is not available. 不充足返回false
        return false;
      }
    }
  }
  
  if (redundantResourceCount == 0) {
    // If there are no redundant resources, return true if there are any
    // required resources available.
    return requiredResourceCount > 0;
  } else {
    return redundantResourceCount - disabledRedundantResourceCount >=
        minimumRedundantResources;
  }
}

interface CheckableNameNodeResource {
  
  public boolean isResourceAvailable();
  
  public boolean isRequired();
}


if (!resource.isResourceAvailable()),ctrl+alt+B可以查看其实现类

public boolean isResourceAvailable() {

  // 获取当前目录的空间大小
  long availableSpace = df.getAvailable();

  if (LOG.isDebugEnabled()) {
    LOG.debug("Space available on volume '" + volume + "' is "
        + availableSpace);
  }

  // 如果当前空间大小,小于100m,返回false
  if (availableSpace < duReserved) {
    LOG.warn("Space available on volume '" + volume + "' is "
        + availableSpace +
        ", which is below the configured reserved amount " + duReserved);
    return false;
  } else {
    return true;
  }
}

6、NN对心跳超时判断

Ctrl + n 搜索namenode,ctrl + f搜索startCommonServices,点击namesystem.startCommonServices(conf, haContext);点击blockManager.activate(conf, completeBlocksTotal);点击datanodeManager.activate(conf);

void activate(final Configuration conf) {
  datanodeAdminManager.activate(conf);
  heartbeatManager.activate();
}

void activate() {
  // 启动的线程,搜索run方法
  heartbeatThread.start();
}

public void run() {
  while(namesystem.isRunning()) {
    restartHeartbeatStopWatch();
    try {
      final long now = Time.monotonicNow();
      if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
    // 心跳检查
        heartbeatCheck();
        lastHeartbeatCheck = now;
      }
      if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
        synchronized(HeartbeatManager.this) {
          for(DatanodeDescriptor d : datanodes) {
            d.setNeedKeyUpdate(true);
          }
        }
        lastBlockKeyUpdate = now;
      }
    } catch (Exception e) {
      LOG.error("Exception while checking heartbeat", e);
    }
    try {
      Thread.sleep(5000);  // 5 seconds
    } catch (InterruptedException ignored) {
    }
    // avoid declaring nodes dead for another cycle if a GC pause lasts
    // longer than the node recheck interval
    if (shouldAbortHeartbeatCheck(-5000)) {
      LOG.warn("Skipping next heartbeat scan due to excessive pause");
      lastHeartbeatCheck = Time.monotonicNow();
    }
  }
}

void heartbeatCheck() {
  final DatanodeManager dm = blockManager.getDatanodeManager();

  boolean allAlive = false;
  while (!allAlive) {
    // locate the first dead node.
    DatanodeDescriptor dead = null;

    // locate the first failed storage that isn't on a dead node.
    DatanodeStorageInfo failedStorage = null;

    // check the number of stale nodes
    int numOfStaleNodes = 0;
    int numOfStaleStorages = 0;
    synchronized(this) {
      for (DatanodeDescriptor d : datanodes) {
        // check if an excessive GC pause has occurred
        if (shouldAbortHeartbeatCheck(0)) {
          return;
        }
    // 判断DN节点是否挂断
        if (dead == null && dm.isDatanodeDead(d)) {
          stats.incrExpiredHeartbeats();
          dead = d;
        }
        if (d.isStale(dm.getStaleInterval())) {
          numOfStaleNodes++;
        }
        DatanodeStorageInfo[] storageInfos = d.getStorageInfos();
        for(DatanodeStorageInfo storageInfo : storageInfos) {
          if (storageInfo.areBlockContentsStale()) {
            numOfStaleStorages++;
          }

          if (failedStorage == null &&
              storageInfo.areBlocksOnFailedStorage() &&
              d != dead) {
            failedStorage = storageInfo;
          }
        }
      }
      
      // Set the number of stale nodes in the DatanodeManager
      dm.setNumStaleNodes(numOfStaleNodes);
      dm.setNumStaleStorages(numOfStaleStorages);
    }
    ... ...
  }
}

boolean isDatanodeDead(DatanodeDescriptor node) {
  return (node.getLastUpdateMonotonic() <
          (monotonicNow() - heartbeatExpireInterval));
}

private long heartbeatExpireInterval;
// 10分钟 + 30秒
this.heartbeatExpireInterval = 2 * heartbeatRecheckInterval + 10 * 1000 * heartbeatIntervalSeconds;

private volatile int heartbeatRecheckInterval;
heartbeatRecheckInterval = conf.getInt(
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
        DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes

private volatile long heartbeatIntervalSeconds;
heartbeatIntervalSeconds = conf.getTimeDuration(
        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
        DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT, TimeUnit.SECONDS);
public static final long    DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;

7、安全模式

void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
  this.registerMBean(); // register the MBean for the FSNamesystemState
  writeLock();
  this.haContext = haContext;
  try {
    nnResourceChecker = new NameNodeResourceChecker(conf);
    // 检查是否有足够的磁盘存储元数据(fsimage(默认100m) editLog(默认100m))
    checkAvailableResources();

    assert !blockManager.isPopulatingReplQueues();
    StartupProgress prog = NameNode.getStartupProgress();

    // 开始进入安全模式
    prog.beginPhase(Phase.SAFEMODE);

    // 获取所有可以正常使用的block
long completeBlocksTotal = getCompleteBlocksTotal();

    prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
        completeBlocksTotal);

    // 启动块服务
    blockManager.activate(conf, completeBlocksTotal);
  } finally {
    writeUnlock("startCommonServices");
  }
  
  registerMXBean();
  DefaultMetricsSystem.instance().register(this);
  if (inodeAttributeProvider != null) {
    inodeAttributeProvider.start();
    dir.setINodeAttributeProvider(inodeAttributeProvider);
  }
  snapshotManager.registerMXBean();
  InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
  this.nameNodeHostName = (serviceAddress != null) ?
      serviceAddress.getHostName() : "";
}

点击getCompleteBlocksTotal

public long getCompleteBlocksTotal() {
  // Calculate number of blocks under construction
  long numUCBlocks = 0;
  readLock();
  try {
    // 获取正在构建的block
    numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
  // 获取所有的块 - 正在构建的block = 可以正常使用的block
    return getBlocksTotal() - numUCBlocks;
  } finally {
    readUnlock("getCompleteBlocksTotal");
  }
}

点击activate

public void activate(Configuration conf, long blockTotal) {
  pendingReconstruction.start();
  datanodeManager.activate(conf);

  this.redundancyThread.setName("RedundancyMonitor");
  this.redundancyThread.start();

  storageInfoDefragmenterThread.setName("StorageInfoMonitor");
  storageInfoDefragmenterThread.start();
  this.blockReportThread.start();

  mxBeanName = MBeans.register("NameNode", "BlockStats", this);

  bmSafeMode.activate(blockTotal);
}

void activate(long total) {
  assert namesystem.hasWriteLock();
  assert status == BMSafeModeStatus.OFF;

  startTime = monotonicNow();

  // 计算是否满足块个数的阈值
  setBlockTotal(total);

  // 判断DataNode节点和块信息是否达到退出安全模式标准
  if (areThresholdsMet()) {
    boolean exitResult = leaveSafeMode(false);
    Preconditions.checkState(exitResult, "Failed to leave safe mode.");
  } else {
    // enter safe mode
status = BMSafeModeStatus.PENDING_THRESHOLD;

initializeReplQueuesIfNecessary();

    reportStatus("STATE* Safe mode ON.", true);
    lastStatusReport = monotonicNow();
  }
}

点击setBlockTotal

void setBlockTotal(long total) {
  assert namesystem.hasWriteLock();
  synchronized (this) {
    this.blockTotal = total;
  // 计算阈值:例如:1000个正常的块 * 0.999 = 999
    this.blockThreshold = (long) (total * threshold);
  }
  
  this.blockReplQueueThreshold = (long) (total * replQueueThreshold);
}

this.threshold = conf.getFloat(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
        DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT);

public static final float   DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.999f;

点击areThresholdsMet

private boolean areThresholdsMet() {
  assert namesystem.hasWriteLock();
  // Calculating the number of live datanodes is time-consuming
  // in large clusters. Skip it when datanodeThreshold is zero.
  int datanodeNum = 0;

  if (datanodeThreshold > 0) {
    datanodeNum = blockManager.getDatanodeManager().getNumLiveDataNodes();
  }
  synchronized (this) {
  // 已经正常注册的块数 》= 块的最小阈值 》=最小可用DataNode
    return blockSafe >= blockThreshold && datanodeNum >= datanodeThreshold;
  }
}

三、DataNode启动源码解析

1、概述

工作机制

启动源码流程

查找DataNode.class

public static void main(String args[]) {
  if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
    System.exit(0);
  }

  secureMain(args, null);
}

public static void secureMain(String args[], SecureResources resources) {
  int errorCode = 0;
  try {
    StringUtils.startupShutdownMessage(DataNode.class, args, LOG);

    DataNode datanode = createDataNode(args, null, resources);

    ......
  } catch (Throwable e) {
    LOG.error("Exception in secureMain", e);
    terminate(1, e);
  } finally {
    LOG.warn("Exiting Datanode");
    terminate(errorCode);
  }
}

public static DataNode createDataNode(String args[], Configuration conf,
    SecureResources resources) throws IOException {
  // 初始化DN
  DataNode dn = instantiateDataNode(args, conf, resources);

  if (dn != null) {
    // 启动DN进程
    dn.runDatanodeDaemon();
  }
  return dn;
}

public static DataNode instantiateDataNode(String args [], Configuration conf,
    SecureResources resources) throws IOException {
  ... ...
  
  return makeInstance(dataLocations, conf, resources);
}

static DataNode makeInstance(Collection<StorageLocation> dataDirs,
    Configuration conf, SecureResources resources) throws IOException {
  ... ...
  return new DataNode(conf, locations, storageLocationChecker, resources);
}

DataNode(final Configuration conf,
         final List<StorageLocation> dataDirs,
         final StorageLocationChecker storageLocationChecker,
         final SecureResources resources) throws IOException {
  super(conf);
  ... ...

  try {
    hostName = getHostName(conf);
    LOG.info("Configured hostname is {}", hostName);
  // 启动DN
    startDataNode(dataDirs, resources);
  } catch (IOException ie) {
    shutdown();
    throw ie;
  }
  ... ...
}

void startDataNode(List<StorageLocation> dataDirectories,
                   SecureResources resources
                   ) throws IOException {
  ... ...
  // 创建数据存储对象
  storage = new DataStorage();
  
  // global DN settings
  registerMXBean();
  // 初始化DataXceiver
  initDataXceiver();
  
  // 启动HttpServer
  startInfoServer();

  pauseMonitor = new JvmPauseMonitor();
  pauseMonitor.init(getConf());
  pauseMonitor.start();

  // BlockPoolTokenSecretManager is required to create ipc server.
  this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager();

  // Login is done by now. Set the DN user name.
  dnUserName = UserGroupInformation.getCurrentUser().getUserName();
  LOG.info("dnUserName = {}", dnUserName);
  LOG.info("supergroup = {}", supergroup);
  
  // 初始化RPC服务
  initIpcServer();

  metrics = DataNodeMetrics.create(getConf(), getDisplayName());
  peerMetrics = dnConf.peerStatsEnabled ?
      DataNodePeerMetrics.create(getDisplayName(), getConf()) : null;
  metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

  ecWorker = new ErasureCodingWorker(getConf(), this);
  blockRecoveryWorker = new BlockRecoveryWorker(this);
  
  // 创建BlockPoolManager
  blockPoolManager = new BlockPoolManager(this);
  // 心跳管理
  blockPoolManager.refreshNamenodes(getConf());

  // Create the ReadaheadPool from the DataNode context so we can
  // exit without having to explicitly shutdown its thread pool.
  readaheadPool = ReadaheadPool.getInstance();
  saslClient = new SaslDataTransferClient(dnConf.getConf(),
      dnConf.saslPropsResolver, dnConf.trustedChannelResolver);
  saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager);
  startMetricsLogger();

  if (dnConf.diskStatsEnabled) {
    diskMetrics = new DataNodeDiskMetrics(this,
        dnConf.outliersReportIntervalMs);
  }
}

2、初始化DataXceiverServer

点击initDataXceiver

private void initDataXceiver() throws IOException {
// dataXceiverServer是一个服务,DN用来接收客户端和其他DN发送过来的数据服务
  this.dataXceiverServer = new Daemon(threadGroup, xserver);
  this.threadGroup.setDaemon(true); // auto destroy when empty

  ... ...
}

3、初始化HTTP服务

点击startInfoServer();

private void startInfoServer()
  throws IOException {
  // SecureDataNodeStarter will bind the privileged port to the channel if
  // the DN is started by JSVC, pass it along.
  ServerSocketChannel httpServerChannel = secureResources != null ?
      secureResources.getHttpServerChannel() : null;

  httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
  httpServer.start();
  if (httpServer.getHttpAddress() != null) {
    infoPort = httpServer.getHttpAddress().getPort();
  }
  if (httpServer.getHttpsAddress() != null) {
    infoSecurePort = httpServer.getHttpsAddress().getPort();
  }
}

public DatanodeHttpServer(final Configuration conf,
    final DataNode datanode,
    final ServerSocketChannel externalHttpChannel)
  throws IOException {
  
  ... ...
  HttpServer2.Builder builder = new HttpServer2.Builder()
      .setName("datanode")
      .setConf(confForInfoServer)
      .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")))
      .hostName(getHostnameForSpnegoPrincipal(confForInfoServer))
      .addEndpoint(URI.create("http://localhost:" + proxyPort))
      .setFindPort(true);
  ... ...
}

4、初始化DN的RPC服务端

点击initIpcServer

private void initIpcServer() throws IOException {
  InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
      getConf().getTrimmed(DFS_DATANODE_IPC_ADDRESS_KEY));
  
  ... ...
  ipcServer = new RPC.Builder(getConf())
      .setProtocol(ClientDatanodeProtocolPB.class)
      .setInstance(service)
      .setBindAddress(ipcAddr.getHostName())
      .setPort(ipcAddr.getPort())
      .setNumHandlers(
          getConf().getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
              DFS_DATANODE_HANDLER_COUNT_DEFAULT)).setVerbose(false)
      .setSecretManager(blockPoolTokenSecretManager).build();
  ... ...
}

5、DN向NN注册

点击refreshNamenodes

void refreshNamenodes(Configuration conf)
    throws IOException {
  ... ...

  synchronized (refreshNamenodesLock) {
    doRefreshNamenodes(newAddressMap, newLifelineAddressMap);
  }
}

private void doRefreshNamenodes(
    Map<String, Map<String, InetSocketAddress>> addrMap,
    Map<String, Map<String, InetSocketAddress>> lifelineAddrMap)
    throws IOException {
  ......
  
  synchronized (this) {
   ......

    // Step 3. Start new nameservices
    if (!toAdd.isEmpty()) {

      for (String nsToAdd : toAdd) {
        … …
        BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
        bpByNameserviceId.put(nsToAdd, bpos);
        offerServices.add(bpos);
      }
    }
    startAll();
  }

 ......
}

protected BPOfferService createBPOS(
    final String nameserviceId,
    List<InetSocketAddress> nnAddrs,
    List<InetSocketAddress> lifelineNnAddrs) {
  // 根据NameNode个数创建对应的服务
  return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
}

点击startAll()

synchronized void startAll() throws IOException {
  try {
    UserGroupInformation.getLoginUser().doAs(
        new PrivilegedExceptionAction<Object>() {
          @Override
          public Object run() throws Exception {
            for (BPOfferService bpos : offerServices) {
        // 启动服务
              bpos.start();
            }
            return null;
          }
        });
  } catch (InterruptedException ex) {
    ... ...
  }
}

void start() {
  for (BPServiceActor actor : bpServices) {
    actor.start();
  }
}

void start() {
  ... ...
  bpThread = new Thread(this);
  bpThread.setDaemon(true); // needed for JUnit testing
// 表示开启一个线程,所有查找该线程的run方法
  bpThread.start();

  if (lifelineSender != null) {
    lifelineSender.start();
  }
}

ctrl + f 搜索run方法

public void run() {
  LOG.info(this + " starting to offer service");

  try {
    while (true) {
      // init stuff
      try {
        // setup storage
    // 向NN 注册
        connectToNNAndHandshake();
        break;
      } catch (IOException ioe) {
        // Initial handshake, storage recovery or registration failed
        runningState = RunningState.INIT_FAILED;
        if (shouldRetryInit()) {
          // Retry until all namenode's of BPOS failed initialization
          LOG.error("Initialization failed for " + this + " "
              + ioe.getLocalizedMessage());
      // 注册失败,5s后重试
          sleepAndLogInterrupts(5000, "initializing");
        } else {
          runningState = RunningState.FAILED;
          LOG.error("Initialization failed for " + this + ". Exiting. ", ioe);
          return;
        }
      }
    }
    … …
    while (shouldRun()) {
      try {
        // 发送心跳
        offerService();
      } catch (Exception ex) {
        ... ...
      }
    }
}

private void connectToNNAndHandshake() throws IOException {
  // get NN proxy 获取NN的RPC客户端对象
  bpNamenode = dn.connectToNN(nnAddr);

  // First phase of the handshake with NN - get the namespace
  // info.
  NamespaceInfo nsInfo = retrieveNamespaceInfo();

  // Verify that this matches the other NN in this HA pair.
  // This also initializes our block pool in the DN if we are
  // the first NN connection for this BP.
  bpos.verifyAndSetNamespaceInfo(this, nsInfo);

  /* set thread name again to include NamespaceInfo when it's available. */
  this.bpThread.setName(formatThreadName("heartbeating", nnAddr));

  // 注册
  register(nsInfo);
}

DatanodeProtocolClientSideTranslatorPB connectToNN(
    InetSocketAddress nnAddr) throws IOException {
  return new DatanodeProtocolClientSideTranslatorPB(nnAddr, getConf());
}

public DatanodeProtocolClientSideTranslatorPB(InetSocketAddress nameNodeAddr,
    Configuration conf) throws IOException {
  RPC.setProtocolEngine(conf, DatanodeProtocolPB.class,
      ProtobufRpcEngine.class);
  UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
  rpcProxy = createNamenode(nameNodeAddr, conf, ugi);
}

private static DatanodeProtocolPB createNamenode(
    InetSocketAddress nameNodeAddr, Configuration conf,
    UserGroupInformation ugi) throws IOException {
  return RPC.getProxy(DatanodeProtocolPB.class,
      RPC.getProtocolVersion(DatanodeProtocolPB.class), nameNodeAddr, ugi,
      conf, NetUtils.getSocketFactory(conf, DatanodeProtocolPB.class));
}

返回,点击register

void register(NamespaceInfo nsInfo) throws IOException {
  // 创建注册信息
  DatanodeRegistration newBpRegistration = bpos.createRegistration();

  LOG.info(this + " beginning handshake with NN");

  while (shouldRun()) {
    try {
      // Use returned registration from namenode with updated fields
    // 把注册信息发送给NN(DN调用接口方法,执行在NN)
      newBpRegistration = bpNamenode.registerDatanode(newBpRegistration);
      newBpRegistration.setNamespaceInfo(nsInfo);
      bpRegistration = newBpRegistration;
      break;
    } catch(EOFException e) {  // namenode might have just restarted
      LOG.info("Problem connecting to server: " + nnAddr + " :"
          + e.getLocalizedMessage());
      sleepAndLogInterrupts(1000, "connecting to server");
    } catch(SocketTimeoutException e) {  // namenode is busy
      LOG.info("Problem connecting to server: " + nnAddr);
      sleepAndLogInterrupts(1000, "connecting to server");
    }
  }
  … …
}

回到NN,搜索NameNodeRpcServer

public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  checkNNStartup();
  verifySoftwareVersion(nodeReg);
  // 注册DN
  namesystem.registerDatanode(nodeReg);

  return nodeReg;
}

void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
  writeLock();
  try {
    blockManager.registerDatanode(nodeReg);
  } finally {
    writeUnlock("registerDatanode");
  }
}

public void registerDatanode(DatanodeRegistration nodeReg)
    throws IOException {
  assert namesystem.hasWriteLock();
  datanodeManager.registerDatanode(nodeReg);
  bmSafeMode.checkSafeMode();
}

public void registerDatanode(DatanodeRegistration nodeReg)
    throws DisallowedDatanodeException, UnresolvedTopologyException {
  ... ...
  // register new datanode 注册DN
    addDatanode(nodeDescr);
    blockManager.getBlockReportLeaseManager().register(nodeDescr);
    // also treat the registration message as a heartbeat
    // no need to update its timestamp
    // because its is done when the descriptor is created
  // 将DN添加到心跳管理
    heartbeatManager.addDatanode(nodeDescr);
    heartbeatManager.updateDnStat(nodeDescr);
    incrementVersionCount(nodeReg.getSoftwareVersion());
    startAdminOperationIfNecessary(nodeDescr);
    success = true;
  ... ...
}

void addDatanode(final DatanodeDescriptor node) {
  // To keep host2DatanodeMap consistent with datanodeMap,
  // remove  from host2DatanodeMap the datanodeDescriptor removed
  // from datanodeMap before adding node to host2DatanodeMap.
  synchronized(this) {
    host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
  }

  networktopology.add(node); // may throw InvalidTopologyException
  host2DatanodeMap.add(node);
  checkIfClusterIsNowMultiRack(node);
  resolveUpgradeDomain(node);
  ......
}

6、向NN发送心跳

点击BPServiceActor.java中的run方法中的offerService方法

private void offerService() throws Exception {

  while (shouldRun()) {
        ... ...
        HeartbeatResponse resp = null;
        if (sendHeartbeat) {

          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
                  scheduler.isBlockReportDue(startTime);
          if (!dn.areHeartbeatsDisabledForTests()) {
        // 发送心跳信息
            resp = sendHeartBeat(requestBlockReportLease);
            assert resp != null;
            if (resp.getFullBlockReportLeaseId() != 0) {
              if (fullBlockReportLeaseId != 0) {
        ... ...
              }
              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
            }
            ... ...
          }
        }
    ... ...
  }
}

HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
    throws IOException {
  ... ...
  // 通过NN的RPC客户端发送给NN
  HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
        reports,
        dn.getFSDataset().getCacheCapacity(),
        dn.getFSDataset().getCacheUsed(),
        dn.getXmitsInProgress(),
        dn.getXceiverCount(),
        numFailedVolumes,
        volumeFailureSummary,
        requestBlockReportLease,
        slowPeers,
        slowDisks);
  ... ...
}

回到NN,搜索NameNodeRpcServer类,ctrl + f 在NameNodeRpcServer.java中搜索sendHeartbeat

public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
    int xmitsInProgress, int xceiverCount,
    int failedVolumes, VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {

  checkNNStartup();
  verifyRequest(nodeReg);

  // 处理DN发送的心跳
  return namesystem.handleHeartbeat(nodeReg, report,
      dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
      failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
      slowPeers, slowDisks);
}

HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int xmitsInProgress, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary,
    boolean requestFullBlockReportLease,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  readLock();
  try {
    //get datanode commands
    final int maxTransfer = blockManager.getMaxReplicationStreams()
        - xmitsInProgress;
  // 处理DN发送过来的心跳
    DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
        nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
        xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
        slowPeers, slowDisks);

    long blockReportLeaseId = 0;
    if (requestFullBlockReportLease) {
      blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
    }
    //create ha status
    final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
        haContext.getState().getServiceState(),
        getFSImage().getCorrectLastAppliedOrWrittenTxId());

  // 响应DN的心跳
    return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
        blockReportLeaseId);
  } finally {
    readUnlock("handleHeartbeat");
  }
}


public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
    StorageReport[] reports, final String blockPoolId,
    long cacheCapacity, long cacheUsed, int xceiverCount, 
    int maxTransfers, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary,
    @Nonnull SlowPeerReports slowPeers,
    @Nonnull SlowDiskReports slowDisks) throws IOException {
  ... ...
  heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,
      cacheUsed, xceiverCount, failedVolumes, volumeFailureSummary);
  ... ...  
}

synchronized void updateHeartbeat(final DatanodeDescriptor node,
    StorageReport[] reports, long cacheCapacity, long cacheUsed,
    int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {
  stats.subtract(node);
  blockManager.updateHeartbeat(node, reports, cacheCapacity, cacheUsed,
      xceiverCount, failedVolumes, volumeFailureSummary);
  stats.add(node);
}


void updateHeartbeat(DatanodeDescriptor node, StorageReport[] reports,
    long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes,
    VolumeFailureSummary volumeFailureSummary) {

  for (StorageReport report: reports) {
    providedStorageMap.updateStorage(node, report.getStorage());
  }
  node.updateHeartbeat(reports, cacheCapacity, cacheUsed, xceiverCount,
      failedVolumes, volumeFailureSummary);
}


void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  updateHeartbeatState(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  heartbeatedSinceRegistration = true;
}

void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  // 更新存储
  updateStorageStats(reports, cacheCapacity, cacheUsed, xceiverCount,
      volFailures, volumeFailureSummary);
  // 更新心跳时间
  setLastUpdate(Time.now());
  setLastUpdateMonotonic(Time.monotonicNow());
  rollBlocksScheduled(getLastUpdateMonotonic());
}

private void updateStorageStats(StorageReport[] reports, long cacheCapacity,
    long cacheUsed, int xceiverCount, int volFailures,
    VolumeFailureSummary volumeFailureSummary) {
  long totalCapacity = 0;
  long totalRemaining = 0;
  long totalBlockPoolUsed = 0;
  long totalDfsUsed = 0;
  long totalNonDfsUsed = 0;
  … …

  setCacheCapacity(cacheCapacity);
  setCacheUsed(cacheUsed);
  setXceiverCount(xceiverCount);
  this.volumeFailures = volFailures;
  this.volumeFailureSummary = volumeFailureSummary;
  for (StorageReport report : reports) {

    DatanodeStorageInfo storage =
        storageMap.get(report.getStorage().getStorageID());
    if (checkFailedStorages) {
      failedStorageInfos.remove(storage);
    }

    storage.receivedHeartbeat(report);
    // skip accounting for capacity of PROVIDED storages!
    if (StorageType.PROVIDED.equals(storage.getStorageType())) {
      continue;
    }

    totalCapacity += report.getCapacity();
    totalRemaining += report.getRemaining();
    totalBlockPoolUsed += report.getBlockPoolUsed();
    totalDfsUsed += report.getDfsUsed();
    totalNonDfsUsed += report.getNonDfsUsed();
  }

  // Update total metrics for the node.
  // 更新存储相关信息
  setCapacity(totalCapacity);
  setRemaining(totalRemaining);
  setBlockPoolUsed(totalBlockPoolUsed);
  setDfsUsed(totalDfsUsed);
  setNonDfsUsed(totalNonDfsUsed);
  if (checkFailedStorages) {
    updateFailedStorage(failedStorageInfos);
  }
  long storageMapSize;
  synchronized (storageMap) {
    storageMapSize = storageMap.size();
  }
  if (storageMapSize != reports.length) {
    pruneStorageMap(reports);
  }
}

四、HDFS上传源码解析

1、概述

HDFS的写数据流程

HDFS上传源码解析

2、create创建过程

2.1 DN向NN发起创建请求

@Test
public void testPut2() throws IOException {
  FSDataOutputStream fos = fs.create(new Path("/input"));

  fos.write("hello world".getBytes());
}

//点击create,一直到抽象方法
public abstract FSDataOutputStream create(Path f,
  FsPermission permission,
  boolean overwrite,
  int bufferSize,
  short replication,
  long blockSize,
  Progressable progress) throws IOException;

ctrl+alt+B选择DistributedFileSystem实现方法

@Override
public FSDataOutputStream create(Path f, FsPermission permission,
  boolean overwrite, int bufferSize, short replication, long blockSize,
  Progressable progress) throws IOException {
  
  return this.create(f, permission,
  overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
    : EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
  blockSize, progress, null);
}

@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
  final EnumSet<CreateFlag> cflags, final int bufferSize,
  final short replication, final long blockSize,
  final Progressable progress, final ChecksumOpt checksumOpt)
  throws IOException {
  
  statistics.incrementWriteOps(1);
  storageStatistics.incrementOpCounter(OpType.CREATE);
  Path absF = fixRelativePart(f);
  
  return new FileSystemLinkResolver<FSDataOutputStream>() {

    @Override
    public FSDataOutputStream doCall(final Path p) throws IOException {

    // 创建获取了一个输出流对象
    final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
      cflags, replication, blockSize, progress, bufferSize,
      checksumOpt);
    // 这里将上面创建的dfsos进行包装并返回
    return dfs.createWrappedOutputStream(dfsos, statistics);
    }

    @Override
    public FSDataOutputStream next(final FileSystem fs, final Path p)
      throws IOException {
    return fs.create(p, permission, cflags, bufferSize,
      replication, blockSize, progress, checksumOpt);
    }
  }.resolve(this, absF);
}

public DFSOutputStream create(String src, FsPermission permission,
  EnumSet<CreateFlag> flag, short replication, long blockSize,
  Progressable progress, int buffersize, ChecksumOpt checksumOpt)
  throws IOException {
  
  return create(src, permission, flag, true,
  replication, blockSize, progress, buffersize, checksumOpt, null);
}

public DFSOutputStream create(String src, FsPermission permission,
  EnumSet<CreateFlag> flag, boolean createParent, short replication,
  long blockSize, Progressable progress, int buffersize,
  ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)
  throws IOException {
  
  return create(src, permission, flag, createParent, replication, blockSize,
  progress, buffersize, checksumOpt, favoredNodes, null);
}

public DFSOutputStream create(String src, FsPermission permission,
  EnumSet<CreateFlag> flag, boolean createParent, short replication,
  long blockSize, Progressable progress, int buffersize,
  ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
  String ecPolicyName) throws IOException {
  
  checkOpen();
  
  final FsPermission masked = applyUMask(permission);
  LOG.debug("{}: masked={}", src, masked);
  
  final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
    src, masked, flag, createParent, replication, blockSize, progress,
    dfsClientConf.createChecksum(checksumOpt),
    getFavoredNodesStr(favoredNodes), ecPolicyName);
    
  beginFileLease(result.getFileId(), result);
  
  return result;
}

点击newStreamForCreate,进入DFSOutputStream.java

static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
  FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
  short replication, long blockSize, Progressable progress,
  DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
  throws IOException {
  
  try (TraceScope ignored =
       dfsClient.newPathTraceScope("newStreamForCreate", src)) {
    HdfsFileStatus stat = null;

    // Retry the create if we get a RetryStartFileException up to a maximum
    // number of times
    boolean shouldRetry = true;
    int retryCount = CREATE_RETRY_COUNT;

    while (shouldRetry) {
    shouldRetry = false;
    try {
      // DN将创建请求发送给NN(RPC)
      stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
        new EnumSetWritable<>(flag), createParent, replication,
        blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
      break;
    } catch (RemoteException re) {
      … ….
    }
    }
    Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
    final DFSOutputStream out;

    if(stat.getErasureCodingPolicy() != null) {
    out = new DFSStripedOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes);
    } else {
    out = new DFSOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes, true);
    }

    // 开启线程run,DataStreamer extends Daemon extends Thread
    out.start();

    return out;
  }
}

2.2 NN处理DN的创建请求

点击create

HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    throws IOException;

查找create实现类,点击NameNodeRpcServer,在NameNodeRpcServer.java中搜索create

public HdfsFileStatus create(String src, FsPermission masked,
    String clientName, EnumSetWritable<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
    throws IOException {
  // 检查NN启动
  checkNNStartup();
  ... ...

  HdfsFileStatus status = null;
  try {
    PermissionStatus perm = new PermissionStatus(getRemoteUser()
        .getShortUserName(), null, masked);
  // 重要
    status = namesystem.startFile(src, perm, clientName, clientMachine,
        flag.get(), createParent, replication, blockSize, supportedVersions,
        ecPolicyName, cacheEntry != null);
  } finally {
    RetryCache.setState(cacheEntry, status != null, status);
  }

  metrics.incrFilesCreated();
  metrics.incrCreateFileOps();
  return status;
}

HdfsFileStatus startFile(String src, PermissionStatus permissions,
    String holder, String clientMachine, EnumSet<CreateFlag> flag,
    boolean createParent, short replication, long blockSize,
    CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
    boolean logRetryCache) throws IOException {

  HdfsFileStatus status;
  try {
    status = startFileInt(src, permissions, holder, clientMachine, flag,
        createParent, replication, blockSize, supportedVersions, ecPolicyName,
        logRetryCache);
  } catch (AccessControlException e) {
    logAuditEvent(false, "create", src);
    throw e;
  }
  logAuditEvent(true, "create", src, status);
  return status;
}

private HdfsFileStatus startFileInt(String src,
    PermissionStatus permissions, String holder, String clientMachine,
    EnumSet<CreateFlag> flag, boolean createParent, short replication,
    long blockSize, CryptoProtocolVersion[] supportedVersions,
    String ecPolicyName, boolean logRetryCache) throws IOException {       
  ... ...
  stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
        clientMachine, flag, createParent, replication, blockSize, feInfo,
        toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
  ... ...
}

static HdfsFileStatus startFile(
    ... ...)
    throws IOException {
  
  ... ...
  FSDirectory fsd = fsn.getFSDirectory();

  // 文件路径是否存在校验
  if (iip.getLastINode() != null) {
    if (overwrite) {
      List<INode> toRemoveINodes = new ChunkedArrayList<>();
      List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
      long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,
                                      toRemoveINodes, toRemoveUCFiles, now());
      if (ret >= 0) {
        iip = INodesInPath.replace(iip, iip.length() - 1, null);
        FSDirDeleteOp.incrDeletedFileCount(ret);
        fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
      }
    } else {
      // If lease soft limit time is expired, recover the lease
      fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
                               src, holder, clientMachine, false);
      throw new FileAlreadyExistsException(src + " for client " +
          clientMachine + " already exists");
    }
  }
  fsn.checkFsObjectLimit();
  INodeFile newNode = null;
  INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
  if (parent != null) {
    // 添加文件元数据信息
    iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
        replication, blockSize, holder, clientMachine, shouldReplicate,
        ecPolicyName);
    newNode = iip != null ? iip.getLastINode().asFile() : null;
  }
  ... ...
  setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
  fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
  if (NameNode.stateChangeLog.isDebugEnabled()) {
    NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
        src + " inode " + newNode.getId() + " " + holder);
  }
  return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
}

private static INodesInPath addFile(
    FSDirectory fsd, INodesInPath existing, byte[] localName,
    PermissionStatus permissions, short replication, long preferredBlockSize,
    String clientName, String clientMachine, boolean shouldReplicate,
    String ecPolicyName) throws IOException {

  Preconditions.checkNotNull(existing);
  long modTime = now();
  INodesInPath newiip;
  fsd.writeLock();
  try {
    … …

    newiip = fsd.addINode(existing, newNode, permissions.getPermission());
  } finally {
    fsd.writeUnlock();
  }
  ... ...
  return newiip;
}

INodesInPath addINode(INodesInPath existing, INode child,
                      FsPermission modes)
    throws QuotaExceededException, UnresolvedLinkException {
  cacheName(child);
  writeLock();
  try {
    // 将数据写入到INode的目录树中
    return addLastINode(existing, child, modes, true);
  } finally {
    writeUnlock();
  }
}

2.3 DataStreamer启动流程

NN处理完DN请求后,再次回到DN端,启动对应的线程

//DFSOutputStream.java
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
  FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
  short replication, long blockSize, Progressable progress,
  DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
  throws IOException {
  ... ...
  // DN将创建请求发送给NN(RPC)
  stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
    new EnumSetWritable<>(flag), createParent, replication,
    blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
  ... ...
  
  // 创建输出流
  out = new DFSOutputStream(dfsClient, src, stat,
            flag, progress, checksum, favoredNodes, true);
  // 开启线程run,DataStreamer extends Daemon extends Thread
  out.start();

  return out;
}
//点击DFSOutputStream
protected DFSOutputStream(DFSClient dfsClient, String src,
    HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
    DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
  this(dfsClient, src, flag, progress, stat, checksum);
  this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);

  // Directory => File => Block(128M) => packet(64K) => chunk(chunk 512byte + chunksum 4byte)
  computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
      bytesPerChecksum);

  if (createStreamer) {
    streamer = new DataStreamer(stat, null, dfsClient, src, progress,
        checksum, cachingStrategy, byteArrayManager, favoredNodes,
        addBlockFlags);
  }
}

点击newStreamForCreate方法中的out.start(),进入DFSOutputStream.java

protected synchronized void start() {
  getStreamer().start();
}

protected DataStreamer getStreamer() {
  return streamer;
}

//点击DataStreamer,进入DataStreamer.java
//点击Daemon,进入Daemon.java
//说明:out.start();实际是开启线程,点击DataStreamer,搜索run方法

@Override
public void run() {

  long lastPacket = Time.monotonicNow();
  TraceScope scope = null;
  while (!streamerClosed && dfsClient.clientRunning) {
    // if the Responder encountered an error, shutdown Responder
    if (errorState.hasError()) {
    closeResponder();
    }

    DFSPacket one;
    try {
    // process datanode IO errors if any
    boolean doSleep = processDatanodeOrExternalError();

    final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      … …
      try {
        // 如果dataQueue里面没有数据,代码会阻塞在这儿
        dataQueue.wait(timeout);
      } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
      }
      doSleep = false;
      now = Time.monotonicNow();
      }
      … …
      //  队列不为空,从队列中取出packet
      one = dataQueue.getFirst(); // regular data packet
      SpanId[] parents = one.getTraceParents();
      if (parents.length > 0) {
        scope = dfsClient.getTracer().
          newScope("dataStreamer", parents[0]);
        scope.getSpan().setParents(parents);
      }
      }
    }
    … …
}

3、write上传过程

3.1 向DataStreamer的队列里面写数据

@Test
public void testPut2() throws IOException {
    FSDataOutputStream fos = fs.create(new Path("/input"));

    fos.write("hello world".getBytes());
}

// 一路点击write,直到抽象方法
public abstract void write(int b) throws IOException;
//ctrl+alt+b查看实现类,选择FSOutputSummer.java
public synchronized void write(int b) throws IOException {
  buf[count++] = (byte)b;
  if(count == buf.length) {
    flushBuffer();
  }
}

protected synchronized void flushBuffer() throws IOException {
  flushBuffer(false, true);
}

protected synchronized int flushBuffer(boolean keep,
    boolean flushPartial) throws IOException {
  int bufLen = count;
  int partialLen = bufLen % sum.getBytesPerChecksum();
  int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;

  if (lenToFlush != 0) {
// 向队列中写数据   
// Directory => File => Block(128M) => package(64K) => chunk(chunk 512byte + chunksum 4byte)
writeChecksumChunks(buf, 0, lenToFlush);

    if (!flushPartial || keep) {
      count = partialLen;
      System.arraycopy(buf, bufLen - count, buf, 0, count);
    } else {
      count = 0;
    }
  }

  // total bytes left minus unflushed bytes left
  return count - (bufLen - lenToFlush);
}

private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {

  // 计算chunk的校验和
  sum.calculateChunkedSums(b, off, len, checksum, 0);
  TraceScope scope = createWriteTraceScope();

  // 按照chunk的大小遍历数据
  try {
    for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
      int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();

    // 一个chunk一个chunk的将数据写入队列
      writeChunk(b, off + i, chunkLen, checksum, ckOffset,
          getChecksumSize());
    }
  } finally {
    if (scope != null) {
      scope.close();
    }
  }
}

protected abstract void writeChunk(byte[] b, int bOffset, int bLen,
   byte[] checksum, int checksumOffset, int checksumLen) throws IOException;

//同理查找实现类DFSOutputStream
protected synchronized void writeChunk(byte[] b, int offset, int len,
    byte[] checksum, int ckoff, int cklen) throws IOException {
  
  writeChunkPrepare(len, ckoff, cklen);

  // 往packet里面写chunk的校验和 4byte
  currentPacket.writeChecksum(checksum, ckoff, cklen);

  // 往packet里面写一个chunk 512 byte
  currentPacket.writeData(b, offset, len);

  // 记录写入packet中的chunk个数,累计到127个chuck,这个packet就满了
  currentPacket.incNumChunks();
  getStreamer().incBytesCurBlock(len);

  // If packet is full, enqueue it for transmission
  if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
      getStreamer().getBytesCurBlock() == blockSize) {
    enqueueCurrentPacketFull();
  }
}


synchronized void enqueueCurrentPacketFull() throws IOException {
  LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
          + " appendChunk={}, {}", currentPacket, src, getStreamer()
          .getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
      getStreamer());

  enqueueCurrentPacket();

  adjustChunkBoundary();

  endBlock();
}

void enqueueCurrentPacket() throws IOException {
  getStreamer().waitAndQueuePacket(currentPacket);
  currentPacket = null;
}

void waitAndQueuePacket(DFSPacket packet) throws IOException {
  synchronized (dataQueue) {
    try {
    // 如果队列满了,等待
      // If queue is full, then wait till we have enough space
      boolean firstWait = true;
      try {
        while (!streamerClosed && dataQueue.size() + ackQueue.size() >
            dfsClient.getConf().getWriteMaxPackets()) {
          if (firstWait) {
            Span span = Tracer.getCurrentSpan();
            if (span != null) {
              span.addTimelineAnnotation("dataQueue.wait");
            }
            firstWait = false;
          }
          try {
            dataQueue.wait();
          } catch (InterruptedException e) {
            ... ...
          }
        }
      } finally {
        Span span = Tracer.getCurrentSpan();
        if ((span != null) && (!firstWait)) {
          span.addTimelineAnnotation("end.wait");
        }
      }
      checkClosed();
    // 如果队列没满,向队列中添加数据
      queuePacket(packet);
    } catch (ClosedChannelException ignored) {
    }
  }
}


DataStreamer.java

void queuePacket(DFSPacket packet) {
  synchronized (dataQueue) {
    if (packet == null) return;
    packet.addTraceParent(Tracer.getCurrentSpanId());

  // 向队列中添加数据
    dataQueue.addLast(packet);

    lastQueuedSeqno = packet.getSeqno();
    LOG.debug("Queued {}, {}", packet, this);

  // 通知队列添加数据完成
    dataQueue.notifyAll();
  }
}

3.2 建立管道之机架感知(块存储位置)

全局查找DataStreamer,搜索run方法

@Override
public void run() {

  long lastPacket = Time.monotonicNow();
  TraceScope scope = null;
  while (!streamerClosed && dfsClient.clientRunning) {
    // if the Responder encountered an error, shutdown Responder
    if (errorState.hasError()) {
    closeResponder();
    }

    DFSPacket one;
    try {
    // process datanode IO errors if any
    boolean doSleep = processDatanodeOrExternalError();

    final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      long now = Time.monotonicNow();
      while ((!shouldStop() && dataQueue.size() == 0 &&
        (stage != BlockConstructionStage.DATA_STREAMING ||
          now - lastPacket < halfSocketTimeout)) || doSleep) {
      long timeout = halfSocketTimeout - (now-lastPacket);
      timeout = timeout <= 0 ? 1000 : timeout;
      timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
        timeout : 1000;
      try {
        // 如果dataQueue里面没有数据,代码会阻塞在这儿
        dataQueue.wait(timeout); // 接收到notify消息
      } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
      }
      doSleep = false;
      now = Time.monotonicNow();
      }
      if (shouldStop()) {
      continue;
      }
      // get packet to be sent.
      if (dataQueue.isEmpty()) {
      one = createHeartbeatPacket();
      } else {
      try {
        backOffIfNecessary();
      } catch (InterruptedException e) {
        LOG.warn("Caught exception", e);
      }
      //  队列不为空,从队列中取出packet
      one = dataQueue.getFirst(); // regular data packet
      SpanId[] parents = one.getTraceParents();
      if (parents.length > 0) {
        scope = dfsClient.getTracer().
          newScope("dataStreamer", parents[0]);
        scope.getSpan().setParents(parents);
      }
      }
    }

    // get new block from namenode.
    if (LOG.isDebugEnabled()) {
      LOG.debug("stage=" + stage + ", " + this);
    }
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
      LOG.debug("Allocating new block: {}", this);
      // 步骤一:向NameNode 申请block 并建立数据管道
      setPipeline(nextBlockOutputStream());
      // 步骤二:启动ResponseProcessor用来监听packet发送是否成功
      initDataStreaming();
    } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
      setupPipelineForAppendOrRecovery();
      if (streamerClosed) {
      continue;
      }
      initDataStreaming();
    }

    long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
    if (lastByteOffsetInBlock > stat.getBlockSize()) {
      throw new IOException("BlockSize " + stat.getBlockSize() +
        " < lastByteOffsetInBlock, " + this + ", " + one);
    }
    … …
    // send the packet
    SpanId spanId = SpanId.INVALID;
    synchronized (dataQueue) {

      // move packet from dataQueue to ackQueue
      if (!one.isHeartbeatPacket()) {
      if (scope != null) {
        spanId = scope.getSpanId();
        scope.detach();

        one.setTraceScope(scope);
      }
      scope = null;
      // 步骤三:从dataQueue 把要发送的这个packet 移除出去
      dataQueue.removeFirst();
      // 步骤四:然后往ackQueue 里面添加这个packet
      ackQueue.addLast(one);
      packetSendTime.put(one.getSeqno(), Time.monotonicNow());
      dataQueue.notifyAll();
      }
    }

    LOG.debug("{} sending {}", this, one);

    // write out data to remote datanode
    try (TraceScope ignored = dfsClient.getTracer().
      newScope("DataStreamer#writeTo", spanId)) {
      //  将数据写出去
      one.writeTo(blockStream);
      blockStream.flush();
    } catch (IOException e) {
      errorState.markFirstNodeIfNotMarked();
      throw e;
    }
    … …
}


//点击nextBlockOutputStream
protected LocatedBlock nextBlockOutputStream() throws IOException {
  LocatedBlock lb;
  DatanodeInfo[] nodes;
  StorageType[] nextStorageTypes;
  String[] nextStorageIDs;
  int count = dfsClient.getConf().getNumBlockWriteRetry();
  boolean success;
  final ExtendedBlock oldBlock = block.getCurrentBlock();
  do {
    errorState.resetInternalError();
    lastException.clear();

    DatanodeInfo[] excluded = getExcludedNodes();
  // 向NN获取向哪个DN写数据
    lb = locateFollowingBlock(
        excluded.length > 0 ? excluded : null, oldBlock);

    // 创建管道
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
          0L, false);
    ......
  } while (!success && --count >= 0);

  if (!success) {
    throw new IOException("Unable to create new block.");
  }
  return lb;
}

private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
    ExtendedBlock oldBlock) throws IOException {
  return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
      stat.getFileId(), favoredNodes, addBlockFlags);
}

static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
      DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
      String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)
      throws IOException {
    ... ...
    // 向NN获取向哪个DN写数据
    return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
            excludedNodes, fileId, favoredNodes, allocFlags);
    ... ...
}

LocatedBlock addBlock(String src, String clientName,
      ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
      String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
      throws IOException;

回到namenode,点击NameNodeRpcServer,在该类中搜索addBlock

public LocatedBlock addBlock(String src, String clientName,
    ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
    String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
    throws IOException {
  checkNNStartup();
  LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
      clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
  if (locatedBlock != null) {
    metrics.incrAddBlockOps();
  }
  return locatedBlock;
}

LocatedBlock getAdditionalBlock(
    String src, long fileId, String clientName, ExtendedBlock previous,
    DatanodeInfo[] excludedNodes, String[] favoredNodes,
    EnumSet<AddBlockFlag> flags) throws IOException {
  final String operationName = "getAdditionalBlock";
  NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {}  inodeId {}" +
      " for {}", src, fileId, clientName);

  ... ...
  // 选择块存储位置
  DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
      blockManager, src, excludedNodes, favoredNodes, flags, r);

  ... ...
  return lb;
}

static DatanodeStorageInfo[] chooseTargetForNewBlock(
    BlockManager bm, String src, DatanodeInfo[] excludedNodes,
    String[] favoredNodes, EnumSet<AddBlockFlag> flags,
    ValidateAddBlockResult r) throws IOException {
  ... ...
  return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
                                  excludedNodesSet, r.blockSize,
                                  favoredNodesList, r.storagePolicyID,
                                  r.blockType, r.ecPolicy, flags);
}

public DatanodeStorageInfo[] chooseTarget4NewBlock(... ...
  ) throws IOException {
  ... ...
    
  final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
      numOfReplicas, client, excludedNodes, blocksize, 
      favoredDatanodeDescriptors, storagePolicy, flags);

  ... ...
  return targets;
}

DatanodeStorageInfo[] chooseTarget(String src,
    int numOfReplicas, Node writer,
    Set<Node> excludedNodes,
    long blocksize,
    List<DatanodeDescriptor> favoredNodes,
    BlockStoragePolicy storagePolicy,
    EnumSet<AddBlockFlag> flags) {
  
  return chooseTarget(src, numOfReplicas, writer, 
      new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
      excludedNodes, blocksize, storagePolicy, flags);
}

public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
    int numOfReplicas,
    Node writer,
    List<DatanodeStorageInfo> chosen,
    boolean returnChosenNodes,
    Set<Node> excludedNodes,
    long blocksize,
    BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> flags);

// 查找chooseTarget实现类BlockPlacementPolicyDefault.java
public DatanodeStorageInfo[] chooseTarget(String srcPath,
    int numOfReplicas,
    Node writer,
    List<DatanodeStorageInfo> chosenNodes,
    boolean returnChosenNodes,
    Set<Node> excludedNodes,
    long blocksize,
    final BlockStoragePolicy storagePolicy,
    EnumSet<AddBlockFlag> flags) {
  
  return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
      excludedNodes, blocksize, storagePolicy, flags, null);
}

private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
  Node writer,
  List<DatanodeStorageInfo> chosenStorage,
  boolean returnChosenNodes,
  Set<Node> excludedNodes,
  long blocksize,
  final BlockStoragePolicy storagePolicy,
  EnumSet<AddBlockFlag> addBlockFlags,
  EnumMap<StorageType, Integer> sTypes) {
  … …
   
  int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
  numOfReplicas = result[0];
  int maxNodesPerRack = result[1];
    
  for (DatanodeStorageInfo storage : chosenStorage) {
    // add localMachine and related nodes to excludedNodes
  // 获取不可用的DN
    addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
  }

  List<DatanodeStorageInfo> results = null;
  Node localNode = null;
  boolean avoidStaleNodes = (stats != null
      && stats.isAvoidingStaleDataNodesForWrite());
  //   
  boolean avoidLocalNode = (addBlockFlags != null
      && addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
      && writer != null
      && !excludedNodes.contains(writer));
  // Attempt to exclude local node if the client suggests so. If no enough
  // nodes can be obtained, it falls back to the default block placement
  // policy.

  // 有数据正在写,避免都写入本地
  if (avoidLocalNode) {
    results = new ArrayList<>(chosenStorage);
    Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
    if (writer != null) {
      excludedNodeCopy.add(writer);
    }
    localNode = chooseTarget(numOfReplicas, writer,
        excludedNodeCopy, blocksize, maxNodesPerRack, results,
        avoidStaleNodes, storagePolicy,
        EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
    if (results.size() < numOfReplicas) {
      // not enough nodes; discard results and fall back
      results = null;
    }
  }
  if (results == null) {
    results = new ArrayList<>(chosenStorage);
  // 真正的选择DN节点
    localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
        blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
        sTypes);
  }

  if (!returnChosenNodes) {  
    results.removeAll(chosenStorage);
  }
    
  // sorting nodes to form a pipeline
  return getPipeline(
      (writer != null && writer instanceof DatanodeDescriptor) ? writer
          : localNode,
      results.toArray(new DatanodeStorageInfo[results.size()]));
}

private Node chooseTarget(int numOfReplicas,
   ... ...) {
  
   writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
          maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
   ... ...
}

protected Node chooseTargetInOrder(int numOfReplicas, 
                               Node writer,
                               final Set<Node> excludedNodes,
                               final long blocksize,
                               final int maxNodesPerRack,
                               final List<DatanodeStorageInfo> results,
                               final boolean avoidStaleNodes,
                               final boolean newBlock,
                               EnumMap<StorageType, Integer> storageTypes)
                               throws NotEnoughReplicasException {
  final int numOfResults = results.size();
  if (numOfResults == 0) {
  // 第一个块存储在当前节点
    DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
        excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
        storageTypes, true);

    writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
                                   : null;

    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
  // 第二个块存储在另外一个机架
  if (numOfResults <= 1) {
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  if (numOfResults <= 2) {
    final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
  // 如果第一个和第二个在同一个机架,那么第三个放在其他机架
    if (clusterMap.isOnSameRack(dn0, dn1)) {
      chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else if (newBlock){
    // 如果是新块,和第二个块存储在同一个机架
      chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    } else {
    // 如果不是新块,放在当前机架
      chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
          results, avoidStaleNodes, storageTypes);
    }
    if (--numOfReplicas == 0) {
      return writer;
    }
  }
  chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes);
  return writer;
}

3.3 建立管道之Socket发送

点击DataStreamer的nextBlockOutputStream

protected LocatedBlock nextBlockOutputStream() throws IOException {
  LocatedBlock lb;
  DatanodeInfo[] nodes;
  StorageType[] nextStorageTypes;
  String[] nextStorageIDs;
  int count = dfsClient.getConf().getNumBlockWriteRetry();
  boolean success;
  final ExtendedBlock oldBlock = block.getCurrentBlock();
  do {
    errorState.resetInternalError();
    lastException.clear();

    DatanodeInfo[] excluded = getExcludedNodes();
  // 向NN获取向哪个DN写数据
    lb = locateFollowingBlock(
        excluded.length > 0 ? excluded : null, oldBlock);

    // 创建管道
    success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
          0L, false);
    … …
  } while (!success && --count >= 0);

  if (!success) {
    throw new IOException("Unable to create new block.");
  }
  return lb;
}

boolean createBlockOutputStream(DatanodeInfo[] nodes,
      StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
      long newGS, boolean recoveryFlag) {
    ... ...
  // 和DN创建socket
  s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
  
  // 获取输出流,用于写数据到DN
  OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
  // 获取输入流,用于读取写数据到DN的结果
    InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
  
    IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
        unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
    unbufOut = saslStreams.out;
    unbufIn = saslStreams.in;
    out = new DataOutputStream(new BufferedOutputStream(unbufOut,
        DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
    blockReplyStream = new DataInputStream(unbufIn);
  
  // 发送数据
  new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
            dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
            nodes.length, block.getNumBytes(), bytesSent, newGS,
            checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
            (targetPinnings != null && targetPinnings[0]), targetPinnings,
            nodeStorageIDs[0], nodeStorageIDs);
  ... ...
}

public void writeBlock(... ...) throws IOException {
  ... ...

  send(out, Op.WRITE_BLOCK, proto.build());
}

3.4 建立管道之Socket接收

全局查找DataXceiverServer.java,在该类中查找run方法

public void run() {
  Peer peer = null;
  while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
    try {
    // 接收socket的请求
      peer = peerServer.accept();

      // Make sure the xceiver count is not exceeded
      int curXceiverCount = datanode.getXceiverCount();
      if (curXceiverCount > maxXceiverCount) {
        throw new IOException("Xceiver count " + curXceiverCount
            + " exceeds the limit of concurrent xcievers: "
            + maxXceiverCount);
      }
    // 客户端每发送一个block,都启动一个DataXceiver去处理block
      new Daemon(datanode.threadGroup,
          DataXceiver.create(peer, datanode, this))
          .start();
    } catch (SocketTimeoutException ignored) {
      ... ...
    }
  }

  ... ...
}

点击DataXceiver(线程),查找run方法

public void run() {
  int opsProcessed = 0;
  Op op = null;

  try {
    synchronized(this) {
      xceiver = Thread.currentThread();
    }
    dataXceiverServer.addPeer(peer, Thread.currentThread(), this);
    peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
    InputStream input = socketIn;
    try {
      IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut,
        socketIn, datanode.getXferAddress().getPort(),

      return;
    }
    
    super.initialize(new DataInputStream(input));
    
    do {
      updateCurrentThreadName("Waiting for operation #" + (opsProcessed + 1));

      try {
        if (opsProcessed != 0) {
          assert dnConf.socketKeepaliveTimeout > 0;
          peer.setReadTimeout(dnConf.socketKeepaliveTimeout);
        } else {
          peer.setReadTimeout(dnConf.socketTimeout);
        }
    // 读取这次数据的请求类型
        op = readOp();
      } catch (InterruptedIOException ignored) {
        // Time out while we wait for client rpc
        break;
      } catch (EOFException | ClosedChannelException e) {
        // Since we optimistically expect the next op, it's quite normal to
        // get EOF here.
        LOG.debug("Cached {} closing after {} ops.  " +
            "This message is usually benign.", peer, opsProcessed);
        break;
      } catch (IOException err) {
        incrDatanodeNetworkErrors();
        throw err;
      }

      // restore normal timeout
      if (opsProcessed != 0) {
        peer.setReadTimeout(dnConf.socketTimeout);
      }

      opStartTime = monotonicNow();
    // 根据操作类型处理我们的数据
      processOp(op);
      ++opsProcessed;
    } while ((peer != null) &&
        (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0));
  } catch (Throwable t) {
    ... ... 
  } 
}

protected final void processOp(Op op) throws IOException {
  switch(op) {
  ... ...
  case WRITE_BLOCK:
    opWriteBlock(in);
    break;
  ... ...
  default:
    throw new IOException("Unknown op " + op + " in data stream");
  }
}

private void opWriteBlock(DataInputStream in) throws IOException {
  final OpWriteBlockProto proto = OpWriteBlockProto.parseFrom(vintPrefixed(in));
  final DatanodeInfo[] targets = PBHelperClient.convert(proto.getTargetsList());
  TraceScope traceScope = continueTraceSpan(proto.getHeader(),
      proto.getClass().getSimpleName());
  try {
    writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
        PBHelperClient.convertStorageType(proto.getStorageType()),
        PBHelperClient.convert(proto.getHeader().getBaseHeader().getToken()),
        proto.getHeader().getClientName(),
        targets,
        PBHelperClient.convertStorageTypes(proto.getTargetStorageTypesList(), targets.length),
        PBHelperClient.convert(proto.getSource()),
        fromProto(proto.getStage()),
        proto.getPipelineSize(),
        proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
        proto.getLatestGenerationStamp(),
        fromProto(proto.getRequestedChecksum()),
        (proto.hasCachingStrategy() ?
            getCachingStrategy(proto.getCachingStrategy()) :
          CachingStrategy.newDefaultStrategy()),
        (proto.hasAllowLazyPersist() ? proto.getAllowLazyPersist() : false),
        (proto.hasPinning() ? proto.getPinning(): false),
        (PBHelperClient.convertBooleanList(proto.getTargetPinningsList())),
        proto.getStorageId(),
        proto.getTargetStorageIdsList().toArray(new String[0]));
  } finally {
   if (traceScope != null) traceScope.close();
  }
}

ctrl +alt +b 查找writeBlock的实现类DataXceiver.java

public void writeBlock(... ...) throws IOException {
  ... ...
  try {
    final Replica replica;
    if (isDatanode || 
        stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
      // open a block receiver
    // 创建一个BlockReceiver
      setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
          peer.getRemoteAddressString(),
          peer.getLocalAddressString(),
          stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
          clientname, srcDataNode, datanode, requestedChecksum,
          cachingStrategy, allowLazyPersist, pinning, storageId));
      replica = blockReceiver.getReplica();
    } else {
      replica = datanode.data.recoverClose(
          block, latestGenerationStamp, minBytesRcvd);
    }
    storageUuid = replica.getStorageUuid();
    isOnTransientStorage = replica.isOnTransientStorage();

    //
    // Connect to downstream machine, if appropriate
    // 继续连接下游的机器
    if (targets.length > 0) {
      InetSocketAddress mirrorTarget = null;
      // Connect to backup machine
      mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
      LOG.debug("Connecting to datanode {}", mirrorNode);
      mirrorTarget = NetUtils.createSocketAddr(mirrorNode);

    // 向新的副本发送socket
      mirrorSock = datanode.newSocket();
      try {

        ... ...
        if (targetPinnings != null && targetPinnings.length > 0) {
      // 往下游socket发送数据
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes,
              srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy,
              allowLazyPersist, targetPinnings[0], targetPinnings,
              targetStorageId, targetStorageIds);
        } else {
          new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
              blockToken, clientname, targets, targetStorageTypes,
              srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
              latestGenerationStamp, requestedChecksum, cachingStrategy,
              allowLazyPersist, false, targetPinnings,
              targetStorageId, targetStorageIds);
        }

        mirrorOut.flush();

        DataNodeFaultInjector.get().writeBlockAfterFlush();

        // read connect ack (only for clients, not for replication req)
        if (isClient) {
          BlockOpResponseProto connectAck =
            BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(mirrorIn));
          mirrorInStatus = connectAck.getStatus();
          firstBadLink = connectAck.getFirstBadLink();
          if (mirrorInStatus != SUCCESS) {
            LOG.debug("Datanode {} got response for connect" +
                "ack  from downstream datanode with firstbadlink as {}",
                targets.length, firstBadLink);
          }
        }

      … …

  //update metrics
  datanode.getMetrics().addWriteBlockOp(elapsed());
  datanode.getMetrics().incrWritesFromClient(peer.isLocal(), size);
}

BlockReceiver getBlockReceiver(
    final ExtendedBlock block, final StorageType storageType,
    final DataInputStream in,
    final String inAddr, final String myAddr,
    final BlockConstructionStage stage,
    final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
    final String clientname, final DatanodeInfo srcDataNode,
    final DataNode dn, DataChecksum requestedChecksum,
    CachingStrategy cachingStrategy,
    final boolean allowLazyPersist,
    final boolean pinning,
    final String storageId) throws IOException {
  return new BlockReceiver(block, storageType, in,
      inAddr, myAddr, stage, newGs, minBytesRcvd, maxBytesRcvd,
      clientname, srcDataNode, dn, requestedChecksum,
      cachingStrategy, allowLazyPersist, pinning, storageId);
}

BlockReceiver(final ExtendedBlock block, final StorageType storageType,
  final DataInputStream in,
  final String inAddr, final String myAddr,
  final BlockConstructionStage stage, 
  final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
  final String clientname, final DatanodeInfo srcDataNode,
  final DataNode datanode, DataChecksum requestedChecksum,
  CachingStrategy cachingStrategy,
  final boolean allowLazyPersist,
  final boolean pinning,
  final String storageId) throws IOException {
  ... ...
  if (isDatanode) { //replication or move
    replicaHandler =
        datanode.data.createTemporary(storageType, storageId, block, false);
  } else {
    switch (stage) {
    case PIPELINE_SETUP_CREATE:
    // 创建管道
      replicaHandler = datanode.data.createRbw(storageType, storageId,
          block, allowLazyPersist);
      datanode.notifyNamenodeReceivingBlock(
          block, replicaHandler.getReplica().getStorageUuid());
      break;
    ... ...
    default: throw new IOException("Unsupported stage " + stage + 
          " while receiving block " + block + " from " + inAddr);
    }
  }
  ... ...
}

public ReplicaHandler createRbw(
    StorageType storageType, String storageId, ExtendedBlock b,
    boolean allowLazyPersist) throws IOException {
  try (AutoCloseableLock lock = datasetLock.acquire()) {
    ... ...

    if (ref == null) {
      ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
    }

    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
    // create an rbw file to hold block in the designated volume

    if (allowLazyPersist && !v.isTransientStorage()) {
      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
    }

    ReplicaInPipeline newReplicaInfo;
    try {
    // 创建输出流的临时写文件 
      newReplicaInfo = v.createRbw(b);
      if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
        throw new IOException("CreateRBW returned a replica of state "
            + newReplicaInfo.getReplicaInfo().getState()
            + " for block " + b.getBlockId());
      }
    } catch (IOException e) {
      IOUtils.cleanup(null, ref);
      throw e;
    }

    volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
    return new ReplicaHandler(newReplicaInfo, ref);
  }
}

public ReplicaHandler createRbw(
    StorageType storageType, String storageId, ExtendedBlock b,
    boolean allowLazyPersist) throws IOException {
  try (AutoCloseableLock lock = datasetLock.acquire()) {
    ... ...

    if (ref == null) {
    // 有可能有多个临时写文件
      ref = volumes.getNextVolume(storageType, storageId, b.getNumBytes());
    }

    FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
    // create an rbw file to hold block in the designated volume

    if (allowLazyPersist && !v.isTransientStorage()) {
      datanode.getMetrics().incrRamDiskBlocksWriteFallback();
    }

    ReplicaInPipeline newReplicaInfo;
    try {
    // 创建输出流的临时写文件 
      newReplicaInfo = v.createRbw(b);
      if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) {
        throw new IOException("CreateRBW returned a replica of state "
            + newReplicaInfo.getReplicaInfo().getState()
            + " for block " + b.getBlockId());
      }
    } catch (IOException e) {
      IOUtils.cleanup(null, ref);
      throw e;
    }

    volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
    return new ReplicaHandler(newReplicaInfo, ref);
  }
}

public ReplicaInPipeline createRbw(ExtendedBlock b) throws IOException {

  File f = createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
  LocalReplicaInPipeline newReplicaInfo = new ReplicaBuilder(ReplicaState.RBW)
      .setBlockId(b.getBlockId())
      .setGenerationStamp(b.getGenerationStamp())
      .setFsVolume(this)
      .setDirectoryToUse(f.getParentFile())
      .setBytesToReserve(b.getNumBytes())
      .buildLocalReplicaInPipeline();
  return newReplicaInfo;
}

3.5 客户端接收DN写数据应答Response

全局查找DataStreamer,搜索run方法

@Override
public void run() {

  long lastPacket = Time.monotonicNow();
  TraceScope scope = null;
  while (!streamerClosed && dfsClient.clientRunning) {
    // if the Responder encountered an error, shutdown Responder
    if (errorState.hasError()) {
    closeResponder();
    }

    DFSPacket one;
    try {
    // process datanode IO errors if any
    boolean doSleep = processDatanodeOrExternalError();

    final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
    synchronized (dataQueue) {
      // wait for a packet to be sent.
      long now = Time.monotonicNow();
      while ((!shouldStop() && dataQueue.size() == 0 &&
        (stage != BlockConstructionStage.DATA_STREAMING ||
          now - lastPacket < halfSocketTimeout)) || doSleep) {
      long timeout = halfSocketTimeout - (now-lastPacket);
      timeout = timeout <= 0 ? 1000 : timeout;
      timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
        timeout : 1000;
      try {
        // 如果dataQueue里面没有数据,代码会阻塞在这儿
        dataQueue.wait(timeout); // 接收到notify消息
      } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
      }
      doSleep = false;
      now = Time.monotonicNow();
      }
      if (shouldStop()) {
      continue;
      }
      // get packet to be sent.
      if (dataQueue.isEmpty()) {
      one = createHeartbeatPacket();
      } else {
      try {
        backOffIfNecessary();
      } catch (InterruptedException e) {
        LOG.warn("Caught exception", e);
      }
      //  队列不为空,从队列中取出packet
      one = dataQueue.getFirst(); // regular data packet
      SpanId[] parents = one.getTraceParents();
      if (parents.length > 0) {
        scope = dfsClient.getTracer().
          newScope("dataStreamer", parents[0]);
        scope.getSpan().setParents(parents);
      }
      }
    }

    // get new block from namenode.
    if (LOG.isDebugEnabled()) {
      LOG.debug("stage=" + stage + ", " + this);
    }
    if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
      LOG.debug("Allocating new block: {}", this);
      // 步骤一:向NameNode 申请block 并建立数据管道
      setPipeline(nextBlockOutputStream());
      // 步骤二:启动ResponseProcessor用来监听packet发送是否成功
      initDataStreaming();
    } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
      LOG.debug("Append to block {}", block);
      setupPipelineForAppendOrRecovery();
      if (streamerClosed) {
      continue;
      }
      initDataStreaming();
    }

    long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
    if (lastByteOffsetInBlock > stat.getBlockSize()) {
      throw new IOException("BlockSize " + stat.getBlockSize() +
        " < lastByteOffsetInBlock, " + this + ", " + one);
    }

    if (one.isLastPacketInBlock()) {
      // wait for all data packets have been successfully acked
      synchronized (dataQueue) {
      while (!shouldStop() && ackQueue.size() != 0) {
        try {
        // wait for acks to arrive from datanodes
        dataQueue.wait(1000);
        } catch (InterruptedException  e) {
        LOG.warn("Caught exception", e);
        }
      }
      }
      if (shouldStop()) {
      continue;
      }
      stage = BlockConstructionStage.PIPELINE_CLOSE;
    }

    // send the packet
    SpanId spanId = SpanId.INVALID;
    synchronized (dataQueue) {

      // move packet from dataQueue to ackQueue
      if (!one.isHeartbeatPacket()) {
      if (scope != null) {
        spanId = scope.getSpanId();
        scope.detach();

        one.setTraceScope(scope);
      }
      scope = null;
      // 步骤三:从dataQueue 把要发送的这个packet 移除出去
      dataQueue.removeFirst();
      // 步骤四:然后往ackQueue 里面添加这个packet
      ackQueue.addLast(one);
      packetSendTime.put(one.getSeqno(), Time.monotonicNow());
      dataQueue.notifyAll();
      }
    }

    LOG.debug("{} sending {}", this, one);

    // write out data to remote datanode
    try (TraceScope ignored = dfsClient.getTracer().
      newScope("DataStreamer#writeTo", spanId)) {
      //  将数据写出去
      one.writeTo(blockStream);
      blockStream.flush();
    } catch (IOException e) {
      errorState.markFirstNodeIfNotMarked();
      throw e;
    }
    lastPacket = Time.monotonicNow();

    // update bytesSent
    long tmpBytesSent = one.getLastByteOffsetBlock();
    if (bytesSent < tmpBytesSent) {
      bytesSent = tmpBytesSent;
    }

    if (shouldStop()) {
      continue;
    }

    // Is this block full?
    if (one.isLastPacketInBlock()) {
      // wait for the close packet has been acked
      synchronized (dataQueue) {
      while (!shouldStop() && ackQueue.size() != 0) {
        dataQueue.wait(1000);// wait for acks to arrive from datanodes
      }
      }
      if (shouldStop()) {
      continue;
      }

      endBlock();
    }
    if (progress != null) { progress.progress(); }

    // This is used by unit test to trigger race conditions.
    if (artificialSlowdown != 0 && dfsClient.clientRunning) {
      Thread.sleep(artificialSlowdown);
    }
    } catch (Throwable e) {
    ... ...
    } finally {
    if (scope != null) {
      scope.close();
      scope = null;
    }
    }
  }
  closeInternal();
}

private void initDataStreaming() {
  this.setName("DataStreamer for file " + src +
      " block " + block);
  ... ...
  response = new ResponseProcessor(nodes);
  response.start();
  stage = BlockConstructionStage.DATA_STREAMING;
}

点击response再点击ResponseProcessor,ctrl + f 查找run方法

public void run() {
    ... ...
  ackQueue.removeFirst();
  packetSendTime.remove(seqno);
  dataQueue.notifyAll();
  ... ...
}

五、Yarn源码解析

1、概述

YARN工作机制

yarn源码解析

2、Yarn客户端向RM提交作业

在wordcount程序的驱动类中点击

boolean result = job.waitForCompletion(true);

public boolean waitForCompletion(boolean verbose
                                 ) throws IOException, InterruptedException,
                                          ClassNotFoundException {
  if (state == JobState.DEFINE) {
    submit();
  }
  if (verbose) {
    monitorAndPrintJob();
  } else {
    // get the completion poll interval from the client.
    int completionPollIntervalMillis = 
      Job.getCompletionPollInterval(cluster.getConf());
    while (!isComplete()) {
      try {
        Thread.sleep(completionPollIntervalMillis);
      } catch (InterruptedException ie) {
      }
    }
  }
  return isSuccessful();
}

public void submit() 
       throws IOException, InterruptedException, ClassNotFoundException {
  ensureState(JobState.DEFINE);
  setUseNewAPI();
  connect();
  final JobSubmitter submitter = 
      getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
  status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
    public JobStatus run() throws IOException, InterruptedException, 
    ClassNotFoundException {
      return submitter.submitJobInternal(Job.this, cluster);
    }
  });
  state = JobState.RUNNING;
  LOG.info("The url to track the job: " + getTrackingURL());
 }
 
 JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {
  ... ...
  status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials()); 
  ... ...
}

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException;

创建提交环境,ctrl + alt +B 查找submitJob实现类,YARNRunner.java

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
  
  addHistoryToken(ts);
  // 创建提交环境:
  ApplicationSubmissionContext appContext =
    createApplicationSubmissionContext(conf, jobSubmitDir, ts);

  // Submit to ResourceManager
  try {
    // 向RM提交一个应用程序,appContext里面封装了启动mrappMaster和运行container的命令
    ApplicationId applicationId =
        resMgrDelegate.submitApplication(appContext);
    
    // 获取提交响应
    ApplicationReport appMaster = resMgrDelegate
        .getApplicationReport(applicationId);
    
    String diagnostics =
        (appMaster == null ?
            "application report is null" : appMaster.getDiagnostics());
    if (appMaster == null
        || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
        || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
      throw new IOException("Failed to run job : " +
          diagnostics);
    }
    return clientCache.getClient(jobId).getJobStatus(jobId);
  } catch (YarnException e) {
    throw new IOException(e);
  }
}

public ApplicationSubmissionContext createApplicationSubmissionContext(
    Configuration jobConf, String jobSubmitDir, Credentials ts)
    throws IOException {
  ApplicationId applicationId = resMgrDelegate.getApplicationId();

  // Setup LocalResources
  // 封装了本地资源相关路径
  Map<String, LocalResource> localResources =
      setupLocalResources(jobConf, jobSubmitDir);

  // Setup security tokens
  DataOutputBuffer dob = new DataOutputBuffer();
  ts.writeTokenStorageToStream(dob);
  ByteBuffer securityTokens =
      ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

  // Setup ContainerLaunchContext for AM container
  // 封装了启动mrappMaster和运行container的命令
  List<String> vargs = setupAMCommand(jobConf);
  ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
      jobConf, localResources, securityTokens, vargs);

  ... ...

  return appContext;
}

private List<String> setupAMCommand(Configuration jobConf) {
  List<String> vargs = new ArrayList<>(8);
  // Java进程启动命令开始
  vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
      + "/bin/java");

  Path amTmpDir =
      new Path(MRApps.crossPlatformifyMREnv(conf, Environment.PWD),
          YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
  vargs.add("-Djava.io.tmpdir=" + amTmpDir);
  MRApps.addLog4jSystemProperties(null, vargs, conf);

  // Check for Java Lib Path usage in MAP and REDUCE configs
  warnForJavaLibPath(conf.get(MRJobConfig.MAP_JAVA_OPTS, ""),
      "map",
      MRJobConfig.MAP_JAVA_OPTS,
      MRJobConfig.MAP_ENV);
  warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, ""),
      "map",
      MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,
      MRJobConfig.MAPRED_ADMIN_USER_ENV);
  warnForJavaLibPath(conf.get(MRJobConfig.REDUCE_JAVA_OPTS, ""),
      "reduce",
      MRJobConfig.REDUCE_JAVA_OPTS,
      MRJobConfig.REDUCE_ENV);
  warnForJavaLibPath(conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, ""),
      "reduce",
      MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,
      MRJobConfig.MAPRED_ADMIN_USER_ENV);

  // Add AM admin command opts before user command opts
  // so that it can be overridden by user
  String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS,
      MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS);
  warnForJavaLibPath(mrAppMasterAdminOptions, "app master",
      MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV);
  vargs.add(mrAppMasterAdminOptions);

  // Add AM user command opts 用户命令参数
  String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS,
      MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS);
  warnForJavaLibPath(mrAppMasterUserOptions, "app master",
      MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV);
  vargs.add(mrAppMasterUserOptions);

  if (jobConf.getBoolean(MRJobConfig.MR_AM_PROFILE,
      MRJobConfig.DEFAULT_MR_AM_PROFILE)) {
    final String profileParams = jobConf.get(MRJobConfig.MR_AM_PROFILE_PARAMS,
        MRJobConfig.DEFAULT_TASK_PROFILE_PARAMS);
    if (profileParams != null) {
      vargs.add(String.format(profileParams,
          ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR
              + TaskLog.LogName.PROFILE));
    }
  }

  // 封装了要启动的mrappmaster全类名 
  // org.apache.hadoop.mapreduce.v2.app.MRAppMaster
  vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
  vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
      Path.SEPARATOR + ApplicationConstants.STDOUT);
  vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
      Path.SEPARATOR + ApplicationConstants.STDERR);
  return vargs;
}

向Yarn提交,点击submitJob方法中的submitApplication(),ctrl + alt +B 查找submitApplication实现类,YarnClientImpl.java

public ApplicationId
    submitApplication(ApplicationSubmissionContext appContext)
        throws YarnException, IOException {
  ApplicationId applicationId = appContext.getApplicationId();
  if (applicationId == null) {
    throw new ApplicationIdNotProvidedException(
        "ApplicationId is not provided in ApplicationSubmissionContext");
  }
  // 创建一个提交请求
  SubmitApplicationRequest request =
      Records.newRecord(SubmitApplicationRequest.class);
  request.setApplicationSubmissionContext(appContext);
  ... ...

  //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
  // 继续提交,实现类是ApplicationClientProtocolPBClientImpl
  rmClient.submitApplication(request);

  int pollCount = 0;
  long startTime = System.currentTimeMillis();
  EnumSet<YarnApplicationState> waitingStates = 
                               EnumSet.of(YarnApplicationState.NEW,
                               YarnApplicationState.NEW_SAVING,
                               YarnApplicationState.SUBMITTED);
  EnumSet<YarnApplicationState> failToSubmitStates = 
                                EnumSet.of(YarnApplicationState.FAILED,
                                YarnApplicationState.KILLED);    
  while (true) {
    try {
    // 获取提交给Yarn的反馈
      ApplicationReport appReport = getApplicationReport(applicationId);
      YarnApplicationState state = appReport.getYarnApplicationState();
      ... ...
    } catch (ApplicationNotFoundException ex) {
      // FailOver or RM restart happens before RMStateStore saves
      // ApplicationState
      LOG.info("Re-submit application " + applicationId + "with the " +
          "same ApplicationSubmissionContext");
    // 如果提交失败,则再次提交
      rmClient.submitApplication(request);
    }
  }

  return applicationId;
}

3、RM启动MRAppMaster

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-app</artifactId>
    <version>3.1.3</version>
</dependency>

查找MRAppMaster,搜索main方法

public static void main(String[] args) {
  try {
    ... ...

  // 初始化一个container
    ContainerId containerId = ContainerId.fromString(containerIdStr);
    ApplicationAttemptId applicationAttemptId =
        containerId.getApplicationAttemptId();
    if (applicationAttemptId != null) {
      CallerContext.setCurrent(new CallerContext.Builder(
          "mr_appmaster_" + applicationAttemptId.toString()).build());
    }
    long appSubmitTime = Long.parseLong(appSubmitTimeStr);
    
    // 创建appMaster对象
    MRAppMaster appMaster =
        new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
            Integer.parseInt(nodePortString),
            Integer.parseInt(nodeHttpPortString), appSubmitTime);
    ... ...
  
  // 初始化并启动AppMaster
    initAndStartAppMaster(appMaster, conf, jobUserName);
  } catch (Throwable t) {
    LOG.error("Error starting MRAppMaster", t);
    ExitUtil.terminate(1, t);
  }
}

protected static void initAndStartAppMaster(final MRAppMaster appMaster,
    final JobConf conf, String jobUserName) throws IOException,
    InterruptedException {
  ... ...
  conf.getCredentials().addAll(credentials);
  appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
    @Override
    public Object run() throws Exception {
    // 初始化
      appMaster.init(conf);
    // 启动
      appMaster.start();
      if(appMaster.errorHappenedShutDown) {
        throw new IOException("Was asked to shut down.");
      }
      return null;
    }
  });
}

public void init(Configuration conf) {
  ... ...
  synchronized (stateChangeLock) {
    if (enterState(STATE.INITED) != STATE.INITED) {
      setConfig(conf);
      try {
      // 调用MRAppMaster中的serviceInit()方法
        serviceInit(config);
        if (isInState(STATE.INITED)) {
          //if the service ended up here during init,
          //notify the listeners
      // 如果初始化完成,通知监听器
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

ctrl + alt +B 查找serviceInit实现类,MRAppMaster.java

protected void serviceInit(final Configuration conf) throws Exception {
  ... ...
  // 创建提交路径
  clientService = createClientService(context);
  
  // 创建调度器
  clientService.init(conf);
  
  // 创建job提交RPC客户端
  containerAllocator = createContainerAllocator(clientService, context);
  ... ...
}

点击MRAppMaster.java 中的initAndStartAppMaster 方法中的appMaster.start();

public void start() {
  if (isInState(STATE.STARTED)) {
    return;
  }
  //enter the started state
  synchronized (stateChangeLock) {
    if (stateModel.enterState(STATE.STARTED) != STATE.STARTED) {
      try {
        startTime = System.currentTimeMillis();
    // 调用MRAppMaster中的serviceStart()方法
        serviceStart();
        if (isInState(STATE.STARTED)) {
          //if the service started (and isn't now in a later state), notify
          LOG.debug("Service {} is started", getName());
          notifyListeners();
        }
      } catch (Exception e) {
        noteFailure(e);
        ServiceOperations.stopQuietly(LOG, this);
        throw ServiceStateException.convert(e);
      }
    }
  }
}

protected void serviceStart() throws Exception {
  ... ...
  if (initFailed) {
    JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
    jobEventDispatcher.handle(initFailedEvent);
  } else {
    // All components have started, start the job.
  // 初始化成功后,提交Job到队列中
    startJobs();
  }
}

protected void startJobs() {
  /** create a job-start event to get this ball rolling */
  JobEvent startJobEvent = new JobStartEvent(job.getID(),
      recoveredJobStartTime);
  /** send the job-start event. this triggers the job execution. */
  // 这里将job存放到yarn队列
  // dispatcher = AsyncDispatcher
  // getEventHandler()返回的是GenericEventHandler
  dispatcher.getEventHandler().handle(startJobEvent);
}

ctrl + alt +B 查找handle实现类,GenericEventHandler.java

class GenericEventHandler implements EventHandler<Event> {
  public void handle(Event event) {
    ... ...
    try {
    // 将job存储到yarn队列中
      eventQueue.put(event);
    } catch (InterruptedException e) {
      ... ...
    }
  };
}

4、调度器任务执行(YarnChild)

查找YarnChild,搜索main方法

public static void main(String[] args) throws Throwable {
  Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
  LOG.debug("Child starting");

  ... ...

  task = myTask.getTask();
  YarnChild.taskid = task.getTaskID();
  ... ...

  // Create a final reference to the task for the doAs block
  final Task taskFinal = task;
  childUGI.doAs(new PrivilegedExceptionAction<Object>() {
      @Override
      public Object run() throws Exception {
        // use job-specified working directory
        setEncryptedSpillKeyIfRequired(taskFinal);
        FileSystem.get(job).setWorkingDirectory(job.getWorkingDirectory());
    // 调用task执行(maptask或者reducetask)
        taskFinal.run(job, umbilical); // run the task
        return null;
      }
    });
  } 
  ... ...
}

ctrl + alt +B 查找run实现类,maptask.java

public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, ClassNotFoundException, InterruptedException {
  this.umbilical = umbilical;

  // 判断是否是MapTask
  if (isMapTask()) {
    // If there are no reducers then there won't be any sort. Hence the map 
    // phase will govern the entire attempt's progress.
  // 如果reducetask个数为零,maptask占用整个任务的100%
    if (conf.getNumReduceTasks() == 0) {
      mapPhase = getProgress().addPhase("map", 1.0f);
    } else {
      // If there are reducers then the entire attempt's progress will be 
      // split between the map phase (67%) and the sort phase (33%).
    // 如果reduceTask个数不为零,MapTask占用整个任务的66.7% sort阶段占比
      mapPhase = getProgress().addPhase("map", 0.667f);
      sortPhase  = getProgress().addPhase("sort", 0.333f);
    }
  }
  ... ...
  if (useNewApi) {
    // 调用新的API执行maptask
    runNewMapper(job, splitMetaInfo, umbilical, reporter);
  } else {
    runOldMapper(job, splitMetaInfo, umbilical, reporter);
  }
  done(umbilical, reporter);
}

void runNewMapper(final JobConf job,
                  final TaskSplitIndex splitIndex,
                  final TaskUmbilicalProtocol umbilical,
                  TaskReporter reporter
                  ) throws IOException, ClassNotFoundException,
                           InterruptedException {
  ... ...

  try {
    input.initialize(split, mapperContext);
  // 运行maptask
    mapper.run(mapperContext);
  
    mapPhase.complete();
    setPhase(TaskStatus.Phase.SORT);
    statusUpdate(umbilical);
    input.close();
    input = null;
    output.close(mapperContext);
    output = null;
  } finally {
    closeQuietly(input);
    closeQuietly(output, mapperContext);
  }
}

//Mapper.java(和Map联系在一起)
public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKeyValue()) {
      map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
  } finally {
    cleanup(context);
  }
}

启动ReduceTask,在YarnChild.java类中的main方法中ctrl + alt +B 查找run实现类,reducetask.java

public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  throws IOException, InterruptedException, ClassNotFoundException {
  job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());

  ... ...

  if (useNewApi) {
  // 调用新API执行reduce
    runNewReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  } else {
    runOldReducer(job, umbilical, reporter, rIter, comparator, 
                  keyClass, valueClass);
  }

  shuffleConsumerPlugin.close();
  done(umbilical, reporter);
}

void runNewReducer(JobConf job,
                   final TaskUmbilicalProtocol umbilical,
                   final TaskReporter reporter,
                   RawKeyValueIterator rIter,
                   RawComparator<INKEY> comparator,
                   Class<INKEY> keyClass,
                   Class<INVALUE> valueClass
                   ) throws IOException,InterruptedException, 
                            ClassNotFoundException {
  ... ...
  try {
    // 调用reducetask的run方法
    reducer.run(reducerContext);
  } finally {
    trackedRW.close(reducerContext);
  }
}

// Reduce.java
public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
      // If a back up store is used, reset it
      Iterator<VALUEIN> iter = context.getValues().iterator();
      if(iter instanceof ReduceContext.ValueIterator) {
        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
      }
    }
  } finally {
    cleanup(context);
  }
}

六、MapReduce源码解析

之前有介绍

1、Job提交流程源码和切片源码详解

//Job提交流程源码详解
waitForCompletion()

submit();

// 1建立连接
  connect();  
    // 1)创建提交Job的代理
    new Cluster(getConfiguration());
      // (1)判断是本地运行环境还是yarn集群运行环境
      initialize(jobTrackAddr, conf); 

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)

  // 1)创建给集群提交数据的Stag路径
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);

  // 2)获取jobid ,并创建Job路径
  JobID jobId = submitClient.getNewJobID();

  // 3)拷贝jar包到集群
copyAndConfigureFiles(job, submitJobDir);  
  rUploader.uploadFiles(job, jobSubmitDir);

  // 4)计算切片,生成切片规划文件
writeSplits(job, submitJobDir);
    maps = writeNewSplits(job, jobSubmitDir);
    input.getSplits(job);

  // 5)向Stag路径写XML配置文件
writeConf(conf, submitJobFile);
  conf.writeXml(out);

  // 6)提交Job,返回提交状态
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

FileInputFormat 切片源码解析(input.getSplits(job))

2、MapTask & ReduceTask 源码解析

//MapTask源码解析流程
=================== MapTask ===================
context.write(k, NullWritable.get());   //自定义的map方法的写出,进入
  output.write(key, value);  
  //MapTask727行,收集方法,进入两次 
    collector.collect(key, value,partitioner.getPartition(key, value, partitions));
      HashPartitioner(); //默认分区器
    collect()  //MapTask1082行 map端所有的kv全部写出后会走下面的close方法
      close() //MapTask732行
      collector.flush() // 溢出刷写方法,MapTask735行,提前打个断点,进入
        sortAndSpill() //溢写排序,MapTask1505行,进入
          sorter.sort()   QuickSort //溢写排序方法,MapTask1625行,进入
        mergeParts(); //合并文件,MapTask1527行,进入
      
        collector.close(); //MapTask739行,收集器关闭,即将进入ReduceTask
//ReduceTask源码解析流程
=================== ReduceTask ===================
if (isMapOrReduce())  //reduceTask324行,提前打断点
initialize()   // reduceTask333行,进入
init(shuffleContext);  // reduceTask375行,走到这需要先给下面的打断点
        totalMaps = job.getNumMapTasks(); // ShuffleSchedulerImpl第120行,提前打断点
         merger = createMergeManager(context); //合并方法,Shuffle第80行
          // MergeManagerImpl第232 235行,提前打断点
          this.inMemoryMerger = createInMemoryMerger(); //内存合并
          this.onDiskMerger = new OnDiskMerger(this); //磁盘合并
        rIter = shuffleConsumerPlugin.run();
            eventFetcher.start();  //开始抓取数据,Shuffle第107行,提前打断点
            eventFetcher.shutDown();  //抓取结束,Shuffle第141行,提前打断点
            copyPhase.complete();   //copy阶段完成,Shuffle第151行
            taskStatus.setPhase(TaskStatus.Phase.SORT);  //开始排序阶段,Shuffle第152行
          sortPhase.complete();   //排序阶段完成,即将进入reduce阶段 reduceTask382行
        reduce();  //reduce阶段调用的就是我们自定义的reduce方法,会被调用多次
          cleanup(context); //reduce完成之前,会最后调用一次Reducer里面的cleanup方法

七、Hadoop源码编译

1、环境准备

源码地址:https://hadoop.apache.org/release/3.1.3.html

具体可以看build.txt文件,修改源码中的HDFS副本数的设置

回到Centos系统,Jar包准备(Hadoop源码、JDK8、Maven、Ant 、Protobuf)

  • hadoop-3.1.3-src.tar.gz
  • jdk-8u212-linux-x64.tar.gz
  • apache-maven-3.6.3-bin.tar.gz
  • protobuf-2.5.0.tar.gz(序列化的框架)
  • cmake-3.17.0.tar.gz

2、工具包安装

注意:所有操作必须在root用户下完成

# 分别创建/opt/software/hadoop_source和/opt/module/hadoop_source路径
# 上传软件包到指定的目录,例如 /opt/software/hadoop_source
# 解压软件包指定的目录,例如: /opt/module/hadoop_source
tar -zxvf apache-maven-3.6.3-bin.tar.gz -C  /opt/module/hadoop_source/
tar -zxvf cmake-3.17.0.tar.gz -C  /opt/module/hadoop_source/
tar -zxvf hadoop-3.1.3-src.tar.gz -C  /opt/module/hadoop_source/
tar -zxvf protobuf-2.5.0.tar.gz -C  /opt/module/hadoop_source/

# 安装JDK
tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/hadoop_source/
vim /etc/profile.d/my_env.sh
# 输入如下内容:
#JAVA_HOME
export JAVA_HOME=/opt/module/hadoop_source/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

# 刷新JDK环境变量
source /etc/profile
java -version

# 配置maven环境变量,maven镜像,并验证
vim /etc/profile.d/my_env.sh
#MAVEN_HOME
MAVEN_HOME=/opt/module/hadoop_source/apache-maven-3.6.3
PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin

source /etc/profile
# 修改maven的镜像
vi conf/settings.xml
# 在 mirrors节点中添加阿里云镜像
<mirrors>
    <mirror>
         <id>nexus-aliyun</id>
         <mirrorOf>central</mirrorOf>
         <name>Nexus aliyun</name>
              <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
</mirrors>

# 验证maven安装是否成功
mvn -version 

# 安装相关的依赖(注意安装顺序不可乱,可能会出现依赖找不到问题)
# 安装gcc make
yum install -y gcc* make
# 安装压缩工具
yum -y install snappy*  bzip2* lzo* zlib*  lz4* gzip*
# 安装一些基本工具
yum -y install openssl* svn ncurses* autoconf automake libtool
# 安装扩展源,才可安装zstd
yum -y install epel-release
# 安装zstd
yum -y install *zstd*

# 手动安装cmake
# 在解压好的cmake目录下,执行./bootstrap进行编译,此过程需一小时请耐心等待
./bootstrap
# 执行安装
make && make install 
cmake -version

# 安装protobuf,进入到解压后的protobuf目录 
# 依次执行下列命令 --prefix 指定安装到当前目录
./configure --prefix=/opt/module/hadoop_source/protobuf-2.5.0
make && make install
# 配置环境变量
vim /etc/profile.d/my_env.sh
# 输入如下内容
PROTOC_HOME=/opt/module/hadoop_source/protobuf-2.5.0
PATH=$PATH:$JAVA_HOME/bin:$MAVEN_HOME/bin:$PROTOC_HOME/bin

source /etc/profile
protoc --version

3、编译源码

# 进入解压后的Hadoop源码目录下
#开始编译
mvn clean package -DskipTests -Pdist,native -Dtar
# 注意:第一次编译需要下载很多依赖jar包,编译时间会很久,预计1小时左右,最终成功是全部SUCCESS

# 成功的64位hadoop包在/opt/module/hadoop_source/hadoop-3.1.3-src/hadoop-dist/target下

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/349721.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

聚道云软件连接器:打通金蝶云星空与招商银行CBS,提升企业财务和银行业务效率

【客户介绍】 某企业是一家从事电子商务的企业&#xff0c;随着业务的不断扩大&#xff0c;对于财务管理和银行业务的需求也越来越高。该企业希望能够实现财务和银行业务的自动化处理&#xff0c;提高工作效率。由于业务的不断发展&#xff0c;企业面临着越来越多的资金管理挑…

零基础学习数学建模——(四)备战美赛

本篇博客将讲解如何备战美赛。 什么是美赛 美赛&#xff0c;全称是美国大学生数学建模竞赛&#xff08;MCM/ICM&#xff09;&#xff0c;由美国数学及其应用联合会主办&#xff0c;是最高的国际性数学建模竞赛&#xff0c;也是世界范围内最具影响力的数学建模竞赛。 赛题内容…

Oracle触发器简单应用示例(销售与库存)

目录 一、应用描述 1、应用场景&#xff1a; 2、具体场景&#xff1a; 二、表结构介绍 1、表名介绍&#xff1a; 2、表结构&#xff1a; 三、设置触发器 四、运行示例 1、初始库存描述 2、有库存情况 2.1 1001号产品售出1件 2.2 1001号产品库存已减1 3、无库存情况…

外汇天眼:QoinTech误信假老师话术投资外汇,惨遭黑平台滑点爆仓拒出金

去年11月与12月&#xff0c;外汇天眼先后发布了「钓鱼广告诱加投资群组&#xff0c;限制出金逼迫缴分成费」与「假投顾诱导投资黄金获利&#xff0c;黑平台操作爆仓狠诈700万」这2篇文章&#xff0c;曝光黑平台QoinTech的诈骗手法&#xff0c;呼吁投资人不要上当&#xff0c;没…

[SwiftUI]修改状态栏文字颜色

问题&#xff1a; 如图&#xff0c;在项目 Info.plist 中&#xff0c;将 UIViewControllerBasedStatusBarAppearance 设置为 NO&#xff0c;将UIStatusBarStyle设置为Light Content后&#xff0c;APP的状态栏字体颜色仍然是黑色没变成白色。 修复&#xff1a; https://stacko…

uniapp vuecli项目融合[小记]:将多个项目融合,打包成一个小程序/App,拆分多个H5应用

前言&#xff1a; 目前两个uniapp vuecli开发的项目【A、B】&#xff0c;新规划的项目C&#xff1a;需要融合项目B 80%的功能模块&#xff0c;同时也需要涵盖项目A的所有功能模块。 应用需求&#xff1a; 1、新项目C【小程序】可支持切换到应用A/C界面【内部通过初始化、路由跳…

便捷接口调测:API 开发工具大比拼 | 开源专题 No.62

hoppscotch/hoppscotch Stars: 56.1k License: MIT Hoppscotch 是一个开源的 API 开发生态系统&#xff0c;主要功能包括发送请求和获取实时响应。该项目具有以下核心优势&#xff1a; 轻量级&#xff1a;采用简约的 UI 设计。快速&#xff1a;实时发送请求并获得响应。支持多…

直播项目开发

uni-aapp&#xff0c;egg.js&#xff0c;直播服务器自己搭建&#xff0c;Node.js&#xff0c;socket.io实时送礼物&#xff0c;充值&#xff0c;兼容Android&#xff0c;iOS,小程序&#xff0c;充值时用到微信支付&#xff0c;直播分为主播端和用户端&#xff0c;主播端有摄像头…

湿法蚀刻酸洗槽—— 应用半导体新能源光伏光电行业

PFA清洗槽又被称为防腐蚀槽、酸洗槽、溢流槽、纯水槽、浸泡槽、水箱、滴流槽&#xff0c;是四氟清洗桶后的升级款&#xff0c;是为半导体光伏光电等行业设计&#xff0c;一体成型&#xff0c;无需担心漏液。主要用于浸泡、清洗带芯片硅片电池片的花篮。 由于PFA的特点它能耐受…

分钟级实时数据分析的背后——实时湖仓产品解决方案

随着信息技术的深入应用&#xff0c;企业对市场的响应速度也在不断提升&#xff0c;而且这种响应速度正在变得越来越快&#xff0c;没有最快只有更快。对数据实时性要求的提高&#xff0c;是眼下很多企业遇到的一个新的挑战。 从生产侧的视角来看&#xff0c;系统实时监控与实…

大白话带你认识 JVM

大白话带你认识 JVM 文章目录 大白话带你认识 JVM前言一、JVM 的基本介绍1.1 Java 文件是如何被运行的① 类加载器② 方法区③ 堆④ 栈⑤ 程序计数器小总结1.2 简单的代码例子二、类加载器的介绍2.1 类加载器的流程2.1.1 加载2.1.2 链接2.1.3 初始化2.1.4 卸载2.2 类加载器的加…

c++入门学习(十八)赋值运算符

简单赋值运算符&#xff08;&#xff09;&#xff1a; 最基本的赋值运算符是“”。它表示将右侧的值赋给左侧的变量。例如&#xff0c;x 5意味着将值5赋给变量x。 增量赋值运算符&#xff1a; 这是一组在赋值的同时对变量进行递增操作的运算符。常见的有、-、*、/等。例如&…

选择海外云手机需要考虑什么?

随着跨境电商行业的蓬勃发展&#xff0c;企业们纷纷寻找提升平台流量和广告投放效果的方法&#xff0c;这已成为业界的当务之急。传统的宣传模式在国内受到直播和链接带货等新兴方式的冲击&#xff0c;而在国外&#xff0c;类似的趋势也在悄然兴起&#xff0c;呈现出广阔的发展…

基于Docker、Minikube在PC端构建K8S试验环境

在桌面电脑上使用Docker和Minikube构建Kubernetes&#xff08;K8S&#xff09;试验环境&#xff0c;为学习和测试提供了一个理想的平台。Docker的容器化技术允许在隔离的环境中运行应用&#xff0c;而Minikube则简化了在单节点上部署和管理Kubernetes集群的过程。这种组合使得个…

100.乐理基础-五线谱-是否需要学习五线谱

内容参考于&#xff1a;三分钟音乐社 上一个内容&#xff1a;99.乐理基础-简谱的多声部-CSDN博客 简谱与五线谱的区别&#xff0c;各自的优劣势、使用场景、范围等&#xff1a; 要搞懂这个问题&#xff0c;其实核心就是四个词&#xff1a;首调、固定调、单声部、多声部 首调、…

高斯分布的应用,正态分布的实践应用,什么是极大似然估计法

目录 高斯分布的应用 正态分布的实践应用 什么是极大似然估计法 高斯分布的应用

Web04--Flex布局

1、flex布局 1.1 flex认识 1.2 flex组成 1.3 flex布局 1.3.1 主轴对齐方式 <!DOCTYPE html> <html lang"CN"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.…

Optional lab: Linear Regression using Scikit-LearnⅠ

scikit-learn是一个开源的、可用于商业的机器学习工具包&#xff0c;此工具包包含本课程中需要使用的许多算法的实现 Goals In this lab you will utilize scikit-learn to implement linear regression using Gradient Descent Tools You will utilize functions from sci…

【Hexo博客|Fluid主题】实现链接卡片效果

文章目录 前言一、CardLink库二、配置步骤1. 添加静态js文件2. 使库文件生效3. 编写启用CardLink4. 查看效果效果与前面一致。 ![在这里插入图片描述](https://img-blog.csdnimg.cn/img_convert/06e0630f994d4d67a90e18e291c3fdc5.png#pic_center) 总结 前言 今天在阅读Github…

windows?linux?如何使用JMeter

windows?linux?如何使用JMeter 安装JMeter的步骤以GUI模式启动JMeter如何在非GUI模式下运行JMeter在linux中使用JMeter 安装JMeter的步骤 JMeter 是一个纯 Java应用程序&#xff0c;应该在任何具有兼容Java实现的系统上正确运行。 安装 JMeter 的步骤 步骤1&#xff09;安…