【技术解析】Dolphinscheduler实现MapReduce任务的高效管理

MapReduce是一种编程模型,用于处理和生成大数据集,主要用于大规模数据集(TB级数据规模)的并行运算。本文详细介绍了Dolphinscheduler在MapReduce任务中的应用,包括GenericOptionsParser与args的区别、hadoop jar命令参数的完整解释、MapReduce实例代码,以及如何在Dolphinscheduler中配置和运行MapReduce任务。

GenericOptionsParser vs args区别

GenericOptionsParser 如下:

GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
String[] remainingArgs = optionParser.getRemainingArgs();

查看 GenericOptionsParser 源码做了什么?

1、构造方法
public GenericOptionsParser(Configuration conf, String[] args) 
      throws IOException {
    this(conf, new Options(), args); 
}

2、点击 this
public GenericOptionsParser(Configuration conf,
      Options options, String[] args) throws IOException {
    this.conf = conf;
    parseSuccessful = parseGeneralOptions(options, args);
}

3、查看 parseGeneralOptions
private boolean parseGeneralOptions(Options opts, String[] args)
      throws IOException {
    opts = buildGeneralOptions(opts);
    CommandLineParser parser = new GnuParser();
    boolean parsed = false;
    try {
      commandLine = parser.parse(opts, preProcessForWindows(args), true);
      processGeneralOptions(commandLine);
      parsed = true;
    } catch(ParseException e) {
      LOG.warn("options parsing failed: "+e.getMessage());

      HelpFormatter formatter = new HelpFormatter();
      formatter.printHelp("general options are: ", opts);
    }
    return parsed;
}

4、看 GnuParser 
package org.apache.commons.cli;

import java.util.ArrayList;
import java.util.List;

@Deprecated
public class GnuParser extends Parser {
.......
}

org.apache.commons.cli Parser,是不是有点熟悉?对,请参考 https://segmentfault.com/a/1190000045394541 这篇文章吧

5、看 processGeneralOptions 方法
private void processGeneralOptions(CommandLine line) throws IOException {
    if (line.hasOption("fs")) {
      FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
    }

    if (line.hasOption("jt")) {
      String optionValue = line.getOptionValue("jt");
      if (optionValue.equalsIgnoreCase("local")) {
        conf.set("mapreduce.framework.name", optionValue);
      }

      conf.set("yarn.resourcemanager.address", optionValue, 
          "from -jt command line option");
    }
    if (line.hasOption("conf")) {
      String[] values = line.getOptionValues("conf");
      for(String value : values) {
        conf.addResource(new Path(value));
      }
    }

    if (line.hasOption('D')) {
      String[] property = line.getOptionValues('D');
      for(String prop : property) {
        String[] keyval = prop.split("=", 2);
        if (keyval.length == 2) {
          conf.set(keyval[0], keyval[1], "from command line");
        }
      }
    }

    if (line.hasOption("libjars")) {
      // for libjars, we allow expansion of wildcards
      conf.set("tmpjars",
               validateFiles(line.getOptionValue("libjars"), true),
               "from -libjars command line option");
      //setting libjars in client classpath
      URL[] libjars = getLibJars(conf);
      if(libjars!=null && libjars.length>0) {
        conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
        Thread.currentThread().setContextClassLoader(
            new URLClassLoader(libjars, 
                Thread.currentThread().getContextClassLoader()));
      }
    }
    if (line.hasOption("files")) {
      conf.set("tmpfiles", 
               validateFiles(line.getOptionValue("files")),
               "from -files command line option");
    }
    if (line.hasOption("archives")) {
      conf.set("tmparchives", 
                validateFiles(line.getOptionValue("archives")),
                "from -archives command line option");
    }
    conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);

    // tokensFile
    if(line.hasOption("tokenCacheFile")) {
      String fileName = line.getOptionValue("tokenCacheFile");
      // check if the local file exists
      FileSystem localFs = FileSystem.getLocal(conf);
      Path p = localFs.makeQualified(new Path(fileName));
      localFs.getFileStatus(p);
      if(LOG.isDebugEnabled()) {
        LOG.debug("setting conf tokensFile: " + fileName);
      }
      UserGroupInformation.getCurrentUser().addCredentials(
          Credentials.readTokenStorageFile(p, conf));
      conf.set("mapreduce.job.credentials.binary", p.toString(),
               "from -tokenCacheFile command line option");

    }
}

原理是把 fs、jt、D、libjars、files、archives、tokenCacheFile 相关参数放入到 Hadoop的 Configuration中了,终于清楚 GenericOptionsParser是干什么的了

args呢?如果要使用args,以上这种 fs、jt、D、libjars、files、archives、tokenCacheFile 是需要自己解析的。

Hadoop jar完整参数解释

hadoop jar wordcount.jar org.myorg.WordCount \
    -fs hdfs://namenode.example.com:8020 \
    -jt resourcemanager.example.com:8032 \
    -D mapreduce.job.queuename=default \
    -libjars /path/to/dependency1.jar,/path/to/dependency2.jar \
    -files /path/to/file1.txt,/path/to/file2.txt \
    -archives /path/to/archive1.zip,/path/to/archive2.tar.gz \
    -tokenCacheFile /path/to/credential.file \
    /input /output

这条命令会:

  1. 将作业提交到 hdfs://namenode.example.com:8020 文件系统
  2. 使用 resourcemanager.example.com:8032 作为 YARN ResourceManager
  3. 提交到 default 队列
  4. 使用 /path/to/dependency1.jar 和 /path/to/dependency2.jar 作为依赖
  5. 分发本地文件 /path/to/file1.txt 和 /path/to/file2.txt,注意 : 是本地文件哦
  6. 解压并分发 /path/to/archive1.zip 和 /path/to/archive2.tar.gz
  7. 分发凭证文件 /path/to/credential.file

    MR实例

    WordCount经典示例

public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    private IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(LongWritable key,
                       Text value,
                       Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\\s+");
        for (String field : fields) {
            word.set(field);
            context.write(word, one);
        }
    }
}

public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    private IntWritable result = new IntWritable();

    @Override
    protected void reduce(Text key,
                          Iterable<IntWritable> values,
                          Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {

        int sum = 0;
        for (IntWritable val : values) {
            sum += val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}


public class WCJob {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        // TODO 如果要是本地访问远程的hdfs,需要指定hdfs的根路径,否则只能访问本地的文件系统
//        conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");

        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        for (String arg : args) {
            System.out.println("arg :" + arg);
        }

        for (String remainingArg : remainingArgs) {
            System.out.println("remainingArg :" + remainingArg);
        }

        if (remainingArgs.length < 2) {
            throw new RuntimeException("input and output path must set.");
        }

        Path outputPath = new Path(remainingArgs[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        boolean exists = fileSystem.exists(outputPath);
        // 如果目标目录存在,则删除
        if (exists) {
            fileSystem.delete(outputPath, true);
        }

        Job job = Job.getInstance(conf, "MRWordCount");
        job.setJarByClass(WCJob.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

文件分发

public class ConfigMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    private List<String> whiteList = new ArrayList<>();
    private Text text = new Text();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
        // 获取作业提交时传递的文件
        URI[] files = context.getCacheFiles();

        if (files != null && files.length > 0) {
            // 读取文件内容
            File configFile = new File("white.txt"); // 文件名要与传递的文件名保持一致
            try (BufferedReader reader = new BufferedReader(new FileReader(configFile))){
                String line = null;
                while ((line = reader.readLine()) != null) {
                    whiteList.add(line);
                }
            }
        }
    }

    @Override
    protected void map(LongWritable key,
                       Text value,
                       Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String[] datas = line.split("\\s+");

        List<String> whiteDatas = Arrays.stream(datas).filter(data -> whiteList.contains(data)).collect(Collectors.toList());

        for (String data : whiteDatas) {
            text.set(data);
            context.write(text , NullWritable.get());
        }
    }
}


public class ConfigJob {

    public static void main(String[] args) throws Exception {

        // 设置用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        Configuration conf = new Configuration();

        conf.set("fs.defaultFS", "hdfs://xx.xx.xx.xx:8020");

        GenericOptionsParser optionParser = new GenericOptionsParser(conf, args);
        String[] remainingArgs = optionParser.getRemainingArgs();

        if (remainingArgs.length < 2) {
            throw new RuntimeException("input and output path must set.");
        }

        Path outputPath = new Path(remainingArgs[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        boolean exists = fileSystem.exists(outputPath);
        // 如果目标目录存在,则删除
        if (exists) {
            fileSystem.delete(outputPath, true);
        }

        Job job = Job.getInstance(conf, "MRConfig");
        job.setJarByClass(ConfigJob.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setMapperClass(ConfigMapper.class);
        FileInputFormat.addInputPath(job, new Path(remainingArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

Dolphinscheduler MR使用

Yarn test队列设置

YARN 的配置目录中找到 capacity-scheduler.xml 文件。通常位于 $HADOOP_HOME/etc/hadoop/ 目录下。

修改 capacity-scheduler.xml

<property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>default, test</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.test.capacity</name>
    <value>30</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.test.maximum-capacity</name>
    <value>50</value>
</property>

<property>
    <name>yarn.scheduler.capacity.root.test.user-limit-factor</name>
    <value>1</value>
</property>

刷新队列配置 yarn rmadmin -refreshQueues

流程定义设置

file

执行结果

file

离线任务实例

file

YARN作业展示

file

源码分析

org.apache.dolphinscheduler.plugin.task.mr.MapReduceArgsUtils#buildArgs

String others = param.getOthers();
// TODO 这里其实就是想说,没有通过 -D mapreduce.job.queuename 形式指定队列,是用页面上直接指定队列名称的,页面上 Yarn队列 输入框
if (StringUtils.isEmpty(others) || !others.contains(MR_YARN_QUEUE)) {
    String yarnQueue = param.getYarnQueue();
    if (StringUtils.isNotEmpty(yarnQueue)) {
        args.add(String.format("%s%s=%s", D, MR_YARN_QUEUE, yarnQueue));
    }
}

// TODO 这里就是页面上,选项参数 输入框
// -conf -archives -files -libjars -D
if (StringUtils.isNotEmpty(others)) {
    args.add(others);
}

转载自Journey 原文链接:https://segmentfault.com/a/1190000045403915

本文由 白鲸开源科技 提供发布支持!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/918383.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据结构哈希表-(开放地址法+二次探测法解决哈希冲突)(创建+删除+插入)+(C语言代码)

#include<stdio.h> #include<stdlib.h> #include<stdbool.h> #define M 20 #define NULLDEL -1 #define DELDEY -2typedef struct {int key;int count; }HashTable;//创建和插入 void Insert(HashTable ha[], int m, int p, int key) {int i, HO, HI;HO key…

【android USB 串口通信助手】stm32 源码demo 单片机与手机通信 Android studio 20241118

android 【OTG线】 接 下位机STM32【USB】 通过百度网盘分享的文件&#xff1a;USBToSerialPort.apk 链接&#xff1a;https://pan.baidu.com/s/122McdmBDUxEtYiEKFunFUg?pwd8888 提取码&#xff1a;8888 android 【OTG线】 接 【USB转TTL】 接 【串口(下位机 SMT32等)】 需…

大数据技术Kafka详解 ① | 消息队列(Messages Queue)

目录 1、消息队列的介绍 2、消息队列的应用场景 2.1、应用耦合 2.2、异步处理 2.3、限流削峰 2.4、消息驱动的系统 3、消息队列的两种模式 3.1、点对点模式 3.2、发布/订阅模式 4、常用的消息队列介绍 4.1、RabbitMQ 4.2、ActiveMQ 4.3、RocketMQ 4.4、Kafka 4.…

一家餐饮企业,「闯入」AI阵地

作者| 皮爷 出品|产业家 “我们需要用AI来帮助我们门店破除内卷的状态。”一位连锁餐饮品牌告诉产业家&#xff0c;“这也是我们想尽快把AI用起来的原因&#xff0c;看看能不能带来一些帮助。” 这种情况正发生在一众餐饮企业中。 与这种情况对应的一个背景是&#xff0c…

MySQL的编程语言

一、MySQL基础 使用系统的全局变量@@VERSION查看当前使用的MySQL的版本信息,SQL语句如下: select @@version; 将局部变量varl声明为char的类型,长度值为10,并为其赋值为“程菲” begin declare var1 char(10); set @var1="程菲"; end 通过局部变量查看d_eams数…

【青牛科技】电动工具直流调速专用集成电路GS069,具有电源电压范围宽、功耗小、抗干扰能力强等特性

GS069是芯谷科技推出的一款CMOS工艺、电动工具直流调速专用集成电路。具有电源电压范围宽、功耗小、抗干扰能力强等特点&#xff0c;广泛应用于各种电动工具。 产品基本参数 产品应用 1、应用图&#xff1a; 2、测试参数&#xff1a;&#xff08;VCC9V&#xff0c;RL2K&#…

PyTorch 中使用自动求导计算梯度

使用 PyTorch 进行自动求导和梯度计算 在 PyTorch 中&#xff0c;张量的 requires_grad 属性决定了是否需要计算该张量的梯度。设置为 True 的张量会在计算过程中记录操作&#xff0c;以便在调用 .backward() 方法时自动计算梯度。通过构建计算图&#xff0c;PyTorch 能够有效…

安装pytest失败ModuleNotFoundError: No module named ‘distutils‘

下载一下即可解决 pip install setuptools 下载完成后&#xff0c;再进行下载 pip install pytest

数据结构树和二叉树知识点和递归序列

二叉树知识点 一.树的概念1.1关于树的名词解释 二.二叉树的概念1. 二叉树性质&#xff1a; 三.满二叉树与完全二叉树递归前序遍历递归中序遍历递归后续遍历 一.树的概念 树是一种非线性数据结构&#xff0c;它是由n个或大于n个的结点来组成具有层次关系的一个集合&#xff08;…

【汇编语言】数据处理的两个基本问题(二) —— 解密汇编语言:数据长度与寻址方式的综合应用

文章目录 前言1. 指令要处理的数据有多长&#xff1f;1.1 通过寄存器指明数据的尺寸1.1.1 字操作1.1.2 字节操作 1.2 用操作符X ptr指明内存单元的长度1.2.1 访问字单元1.2.2 访问字节单元1.2.3 为什么要用操作符X ptr指明 1.3 其他方法 2. 寻址方式的综合应用2.1 问题背景&…

【ArcGIS微课1000例】0130:图层组详解与使用

文章目录 一、图层组概述二、创建图层组三、在图层组中管理图层四、对话框中图层组的列表一、图层组概述 图层组包含其他图层。图层组有助于对地图中相关类型的图层进行组织,并且可用于定义高级绘制选项。例如,假设在地图上有两个图层分别用于表示铁路和高速公路。您可将这些…

Cyberchef配合Wireshark提取并解析TCP/FTP流量数据包中的文件

前一篇文章中讲述了如何使用cyberchef提取HTTP/TLS数据包中的文件,详见《Cyberchef配合Wireshark提取并解析HTTP/TLS流量数据包中的文件》,链接这里,本文讲述下如何使用cyberchef提取FTP/TCP数据包中的文件。 FTP 是最为常见的文件传输协议,和HTTP协议不同的是FTP协议传输…

SpringBoot多环境配置的实现

前言 开发过程中必然使用到的多环境案例&#xff0c;通过简单的案例分析多环境配置的实现过程。 一、案例 1.1主配置文件 spring:profiles:active: prod server:port: 80801.2多环境配置文件 开发环境 blog:domain: http://localhost:8080测试环境 blog:domain: https:/…

本草纲目数字化:Spring Boot在中药实验管理中的应用

1系统概述 1.1 研究背景 随着计算机技术的发展以及计算机网络的逐渐普及&#xff0c;互联网成为人们查找信息的重要场所&#xff0c;二十一世纪是信息的时代&#xff0c;所以信息的管理显得特别重要。因此&#xff0c;使用计算机来管理中药实验管理系统的相关信息成为必然。开发…

linux文件与重定向

目录 一、共识原理 二、回顾C语言文件函数 1.fopen 2.fwrite 3.fclose 三、文件系统调用 1.open 2.write 3.访问文件的本质 4.stdin&&stdout&&stderror 5.文件的引用计数 四、重定向 1.文件描述符的分配规则 2. 输出重定向 3.重定向系统调用 4.…

【微服务】SpringBoot 整合ELK使用详解

目录 一、前言 二、为什么需要ELK 三、ELK介绍 3.1 什么是elk 3.2 elk工作原理 四、ELK搭建 4.1 搭建es环境 4.1.1 获取es镜像 4.1.2 启动es容器 4.1.3 配置es参数 4.1.4 重启es容器并访问 4.2 搭建kibana 4.2.1 拉取kibana镜像 4.2.2 启动kibana容器 4.2.3 修改…

基于YOLOv8深度学习的汽车车身车损检测系统研究与实现(PyQt5界面+数据集+训练代码)

本文研究并实现了一种基于YOLOV8深度学习模型的汽车车身车损检测系统&#xff0c;旨在解决传统车损检测中效率低、精度不高的问题。该系统利用YOLOV8的目标检测能力&#xff0c;在单张图像上实现了车身损坏区域的精确识别和分类&#xff0c;尤其是在车身凹痕、车身裂纹和车身划…

ui->tableView升序

亮点 //设置可排序ui->tableView->setSortingEnabled(true);ui->tableView->sortByColumn(0,Qt::AscendingOrder); //排序void Widget::initTable() {//设置焦点策略:ui->tableView->setFocusPolicy(Qt::NoFocus);//显示网格线:ui->tableView->se…

Android Framework AMS(16)进程管理

该系列文章总纲链接&#xff1a;专题总纲目录 Android Framework 总纲 本章关键点总结 & 说明&#xff1a; 说明&#xff1a;本章节主要解读AMS 进程方面的知识。关注思维导图中左上侧部分即可。 我们本章节主要是对Android进程管理相关知识有一个基本的了解。先来了解下L…

QT_CONFIG宏使用

时常在Qt代码中看到QT_CONFIG宏&#xff0c;之前以为和#define、DEFINES 差不多&#xff0c;看了定义才发现不是那么回事&#xff0c;定义如下&#xff1a; 看注释就知道了QT_CONFIG宏&#xff0c;其实是&#xff1a;实现了一个在编译时期安全检查&#xff0c;检查指定的Qt特性…