Hadoop3.0大数据处理学习4(案例:数据清洗、数据指标统计、任务脚本封装、Sqoop导出Mysql)

案例需求分析

直播公司每日都会产生海量的直播数据,为了更好地服务主播与用户,提高直播质量与用户粘性,往往会对大量的数据进行分析与统计,从中挖掘商业价值,我们将通过一个实战案例,来使用Hadoop技术来实现对直播数据的统计与分析。下面是简化的日志文件,详细的我会更新在Gitee hadoop_study/hadoopDemo1 · Huathy/study-all/

{"id":"1580089010000","uid":"12001002543","nickname":"jack2543","gold":561,"watchnumpv":1697,"follower":1509,"gifter":2920,"watchnumuv":5410,"length":3542,"exp":183}
{"id":"1580089010001","uid":"12001001853","nickname":"jack1853","gold":660,"watchnumpv":8160,"follower":1781,"gifter":551,"watchnumuv":4798,"length":189,"exp":89}
{"id":"1580089010002","uid":"12001003786","nickname":"jack3786","gold":14,"watchnumpv":577,"follower":1759,"gifter":2643,"watchnumuv":8910,"length":1203,"exp":54}

原始数据清洗代码

  1. 清理无效记录:由于原始数据是通过日志方式进行记录的,在使用日志采集工具采集到HDFS后,还需要对数据进行清洗过滤,丢弃缺失字段的数据,针对异常字段值进行标准化处理。
  2. 清除多余字段:由于计算时不会用到所有的字段。

编码

DataCleanMap

package dataClean;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 22:15
 * @description 实现自定义map类,在里面实现具体的清洗逻辑
 */
public class DataCleanMap extends Mapper<LongWritable, Text, Text, Text> {
    /**
     * 1. 从原始数据中过滤出来需要的字段
     * 2. 针对核心字段进行异常值判断
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String valStr = value.toString();
        // 将json字符串数据转换成对象
        JSONObject jsonObj = JSON.parseObject(valStr);
        String uid = jsonObj.getString("uid");
        // 这里建议使用getIntValue(返回0)而不是getInt(异常)。
        int gold = jsonObj.getIntValue("gold");
        int watchnumpv = jsonObj.getIntValue("watchnumpv");
        int follower = jsonObj.getIntValue("follower");
        int length = jsonObj.getIntValue("length");
        // 过滤异常数据
        if (StringUtils.isNotBlank(valStr) && (gold * watchnumpv * follower * length) >= 0) {
            // 组装k2,v2
            Text k2 = new Text();
            k2.set(uid);
            Text v2 = new Text();
            v2.set(gold + "\t" + watchnumpv + "\t" + follower + "\t" + length);
            context.write(k2, v2);
        }
    }
}

DataCleanJob

package dataClean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Huathy
 * @date 2023-10-22 22:02
 * @description 数据清洗作业
 * 1. 从原始数据中过滤出来需要的字段
 * uid gold watchnumpv(总观看)、follower(粉丝关注数量)、length(总时长)
 * 2. 针对以上五个字段进行判断,都不应该丢失或为空,否则任务是异常记录,丢弃。
 * 若个别字段丢失,则设置为0.
 * <p>
 * 分析:
 * 1. 由于原始数据是json格式,可以使用fastjson对原始数据进行解析,获取指定字段的内容
 * 2. 然后对获取到的数据进行判断,只保留满足条件的数据
 * 3. 由于不需要聚合过程,只是一个简单的过滤操作,所以只需要map阶段即可,不需要reduce阶段
 * 4. 其中map阶段的k1,v1的数据类型是固定的<LongWritable,Text>,k2,v2的数据类型是<Text,Text>k2存储主播ID,v2存储核心字段
 * 中间用\t制表符分隔即可
 */
public class DataCleanJob {
    public static void main(String[] args) throws Exception {
        System.out.println("inputPath  => " + args[0]);
        System.out.println("outputPath  => " + args[1]);
        String path = args[0];
        String path2 = args[1];

        // job需要的配置参数
        Configuration configuration = new Configuration();
        // 创建job
        Job job = Job.getInstance(configuration, "wordCountJob");
        // 注意:这一行必须设置,否则在集群的时候将无法找到Job类
        job.setJarByClass(DataCleanJob.class);
        // 指定输入文件
        FileInputFormat.setInputPaths(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(path2));

        // 指定map相关配置
        job.setMapperClass(DataCleanMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 指定reduce 数量0,表示禁用reduce
        job.setNumReduceTasks(0);

        // 提交任务
        job.waitForCompletion(true);
    }
}

运行

## 运行命令
[root@cent7-1 hadoop-3.2.4]# hadoop jar hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar dataClean.DataCleanJob hdfs://cent7-1:9000/data/videoinfo/231022 hdfs://cent7-1:9000/data/res231022
inputPath  => hdfs://cent7-1:9000/data/videoinfo/231022
outputPath  => hdfs://cent7-1:9000/data/res231022
2023-10-22 23:16:15,845 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2023-10-22 23:16:16,856 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2023-10-22 23:16:17,041 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1697985525421_0002
2023-10-22 23:16:17,967 INFO input.FileInputFormat: Total input files to process : 1
2023-10-22 23:16:18,167 INFO mapreduce.JobSubmitter: number of splits:1
2023-10-22 23:16:18,873 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1697985525421_0002
2023-10-22 23:16:18,874 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-10-22 23:16:19,157 INFO conf.Configuration: resource-types.xml not found
2023-10-22 23:16:19,158 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-10-22 23:16:19,285 INFO impl.YarnClientImpl: Submitted application application_1697985525421_0002
2023-10-22 23:16:19,345 INFO mapreduce.Job: The url to track the job: http://cent7-1:8088/proxy/application_1697985525421_0002/
2023-10-22 23:16:19,346 INFO mapreduce.Job: Running job: job_1697985525421_0002
2023-10-22 23:16:31,683 INFO mapreduce.Job: Job job_1697985525421_0002 running in uber mode : false
2023-10-22 23:16:31,689 INFO mapreduce.Job:  map 0% reduce 0%
2023-10-22 23:16:40,955 INFO mapreduce.Job:  map 100% reduce 0%
2023-10-22 23:16:43,012 INFO mapreduce.Job: Job job_1697985525421_0002 completed successfully
2023-10-22 23:16:43,153 INFO mapreduce.Job: Counters: 33
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=238970
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=24410767
		HDFS: Number of bytes written=1455064
		HDFS: Number of read operations=7
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=2
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=1
		Data-local map tasks=1
		Total time spent by all maps in occupied slots (ms)=7678
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=7678
		Total vcore-milliseconds taken by all map tasks=7678
		Total megabyte-milliseconds taken by all map tasks=7862272
	Map-Reduce Framework
		Map input records=90000
		Map output records=46990
		Input split bytes=123
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=195
		CPU time spent (ms)=5360
		Physical memory (bytes) snapshot=302153728
		Virtual memory (bytes) snapshot=2588925952
		Total committed heap usage (bytes)=214958080
		Peak Map Physical memory (bytes)=302153728
		Peak Map Virtual memory (bytes)=2588925952
	File Input Format Counters 
		Bytes Read=24410644
	File Output Format Counters 
		Bytes Written=1455064
[root@cent7-1 hadoop-3.2.4]# 

## 统计输出文件行数
[root@cent7-1 hadoop-3.2.4]# hdfs dfs -cat hdfs://cent7-1:9000/data/res231022/* | wc -l
46990
## 查看原始数据记录数
[root@cent7-1 hadoop-3.2.4]# hdfs dfs -cat hdfs://cent7-1:9000/data/videoinfo/231022/* | wc -l
90000

数据指标统计

  1. 对数据中的金币数量,总观看PV,粉丝关注数量,视频总时长等指标进行统计(涉及四个字段为了后续方便,可以自定义Writable)
  2. 统计每天开播时长最长的前10名主播以及对应的开播时长

自定义Writeable代码实现

由于原始数据涉及多个需要统计的字段,可以将这些字段统一的记录在一个自定义的数据类型中,方便使用

package videoinfo;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 23:32
 * @description 自定义数据类型,为了保存主播相关核心字段,方便后期维护
 */
public class VideoInfoWriteable implements Writable {
    private long gold;
    private long watchnumpv;
    private long follower;
    private long length;

    public void set(long gold, long watchnumpv, long follower, long length) {
        this.gold = gold;
        this.watchnumpv = watchnumpv;
        this.follower = follower;
        this.length = length;
    }

    public long getGold() {
        return gold;
    }

    public long getWatchnumpv() {
        return watchnumpv;
    }

    public long getFollower() {
        return follower;
    }

    public long getLength() {
        return length;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(gold);
        dataOutput.writeLong(watchnumpv);
        dataOutput.writeLong(follower);
        dataOutput.writeLong(length);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.gold = dataInput.readLong();
        this.watchnumpv = dataInput.readLong();
        this.follower = dataInput.readLong();
        this.length = dataInput.readLong();
    }

    @Override
    public String toString() {
        return gold + "\t" + watchnumpv + "\t" + follower + "\t" + length;
    }
}

基于主播维度 videoinfo

VideoInfoJob

package videoinfo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Huathy
 * @date 2023-10-22 23:27
 * @description 数据指标统计作业
 * 1. 基于主播进行统计,统计每个主播在当天收到的总金币数量,总观看PV,总粉丝关注量,总视频开播市场
 * 分析
 * 1. 为了方便统计主播的指标数据吗,最好是把这些字段整合到一个对象中,这样维护方便
 * 这样就需要自定义Writeable
 * 2. 由于在这里需要以主播维度进行数据的聚合,所以需要以主播ID作为KEY,进行聚合统计
 * 3. 所以Map节点的<k2,v2>是<Text,自定义Writeable>
 * 4. 由于需要聚合,所以Reduce阶段也需要
 */
public class VideoInfoJob {
    public static void main(String[] args) throws Exception {
        System.out.println("inputPath  => " + args[0]);
        System.out.println("outputPath  => " + args[1]);
        String path = args[0];
        String path2 = args[1];

        // job需要的配置参数
        Configuration configuration = new Configuration();
        // 创建job
        Job job = Job.getInstance(configuration, "VideoInfoJob");
        // 注意:这一行必须设置,否则在集群的时候将无法找到Job类
        job.setJarByClass(VideoInfoJob.class);
        // 指定输入文件
        FileInputFormat.setInputPaths(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(path2));

        // 指定map相关配置
        job.setMapperClass(VideoInfoMap.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce
        job.setReducerClass(VideoInfoReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 提交任务
        job.waitForCompletion(true);
    }
}

VideoInfoMap

package videoinfo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 23:31
 * @description 实现自定义Map类,在这里实现核心字段的拼接
 */
public class VideoInfoMap extends Mapper<LongWritable, Text, Text, VideoInfoWriteable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 读取清洗后的每一行数据
        String line = value.toString();
        String[] fields = line.split("\t");
        String uid = fields[0];
        long gold = Long.parseLong(fields[1]);
        long watchnumpv = Long.parseLong(fields[1]);
        long follower = Long.parseLong(fields[1]);
        long length = Long.parseLong(fields[1]);

        // 组装K2 V2
        Text k2 = new Text();
        k2.set(uid);

        VideoInfoWriteable v2 = new VideoInfoWriteable();
        v2.set(gold, watchnumpv, follower, length);
        context.write(k2, v2);
    }
}

VideoInfoReduce

package videoinfo;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-22 23:31
 * @description 实现自定义Map类,在这里实现核心字段的拼接
 */
public class VideoInfoReduce extends Reducer<Text, VideoInfoWriteable, Text, VideoInfoWriteable> {
    @Override
    protected void reduce(Text key, Iterable<VideoInfoWriteable> values, Context context) throws IOException, InterruptedException {
        // 从v2s中把相同key的value取出来,进行累加求和
        long goldSum = 0;
        long watchNumPvSum = 0;
        long followerSum = 0;
        long lengthSum = 0;
        for (VideoInfoWriteable v2 : values) {
            goldSum += v2.getGold();
            watchNumPvSum += v2.getWatchnumpv();
            followerSum += v2.getFollower();
            lengthSum += v2.getLength();
        }
        // 组装k3 v3
        VideoInfoWriteable videoInfoWriteable = new VideoInfoWriteable();
        videoInfoWriteable.set(goldSum, watchNumPvSum, followerSum, lengthSum);
        context.write(key, videoInfoWriteable);
    }
}

基于主播的TOPN计算

VideoInfoTop10Job

package top10;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author Huathy
 * @date 2023-10-23 21:27
 * @description 数据指标统计作业
 * 需求:统计每天开播时长最长的前10名主播以及时长信息
 * 分析:
 * 1. 为了统计每天开播时长最长的前10名主播信息,需要在map阶段获取数据中每个主播的ID和直播时长
 * 2. 所以map阶段的k2 v2 为Text LongWriteable
 * 3. 在reduce阶段对相同主播的时长进行累加求和,将这些数据存储到一个临时的map中
 * 4. 在reduce阶段的cleanup函数(最后执行)中,对map集合的数据进行排序处理
 * 5. 在cleanup函数中把直播时长最长的前10名主播信息写出到文件中
 * setup函数在reduce函数开始执行一次,而cleanup在结束时执行一次
 */
public class VideoInfoTop10Job {
    public static void main(String[] args) throws Exception {
        System.out.println("inputPath  => " + args[0]);
        System.out.println("outputPath  => " + args[1]);
        String path = args[0];
        String path2 = args[1];

        // job需要的配置参数
        Configuration configuration = new Configuration();
        // 从输入路径来获取日期
        String[] fields = path.split("/");
        String tmpdt = fields[fields.length - 1];
        System.out.println("日期:" + tmpdt);
        // 生命周期的配置
        configuration.set("dt", tmpdt);
        // 创建job
        Job job = Job.getInstance(configuration, "VideoInfoTop10Job");
        // 注意:这一行必须设置,否则在集群的时候将无法找到Job类
        job.setJarByClass(VideoInfoTop10Job.class);
        // 指定输入文件
        FileInputFormat.setInputPaths(job, new Path(path));
        FileOutputFormat.setOutputPath(job, new Path(path2));

        job.setMapperClass(VideoInfoTop10Map.class);
        job.setReducerClass(VideoInfoTop10Reduce.class);
        // 指定map相关配置
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);
        // 指定reduce
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 提交任务
        job.waitForCompletion(true);
    }
}

VideoInfoTop10Map

package top10;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Huathy
 * @date 2023-10-23 21:32
 * @description 自定义map类,在这里实现核心字段的拼接
 */
public class VideoInfoTop10Map extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 读取清洗之后的每一行数据
        String line = key.toString();
        String[] fields = line.split("\t");
        String uid = fields[0];
        long length = Long.parseLong(fields[4]);
        Text k2 = new Text();
        k2.set(uid);
        LongWritable v2 = new LongWritable();
        v2.set(length);
        context.write(k2, v2);
    }
}

VideoInfoTop10Reduce

package top10;

import cn.hutool.core.collection.CollUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.*;

/**
 * @author Huathy
 * @date 2023-10-23 21:37
 * @description
 */
public class VideoInfoTop10Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
    // 保存主播ID和开播时长
    Map<String, Long> map = new HashMap<>();

    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        String k2 = key.toString();
        long lengthSum = 0;
        for (LongWritable v2 : values) {
            lengthSum += v2.get();
        }
        map.put(k2, lengthSum);
    }

    /**
     * 任务初始化的时候执行一次,一般在里面做一些初始化资源连接的操作。(mysql、redis连接操作)
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        System.out.println("setup method running...");
        System.out.println("context: " + context);
        super.setup(context);
    }

    /**
     * 任务结束的时候执行一次,做关闭资源连接操作
     *
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {
        // 获取日期
        Configuration configuration = context.getConfiguration();
        String date = configuration.get("dt");
        // 排序
        LinkedHashMap<String, Long> sortMap = CollUtil.sortByEntry(map, new Comparator<Map.Entry<String, Long>>() {
            @Override
            public int compare(Map.Entry<String, Long> o1, Map.Entry<String, Long> o2) {
                return -o1.getValue().compareTo(o2.getValue());
            }
        });
        Set<Map.Entry<String, Long>> entries = sortMap.entrySet();
        Iterator<Map.Entry<String, Long>> iterator = entries.iterator();
        // 输出
        int count = 1;
        while (count <= 10 && iterator.hasNext()) {
            Map.Entry<String, Long> entry = iterator.next();
            String key = entry.getKey();
            Long value = entry.getValue();
            // 封装K3 V3
            Text k3 = new Text(date + "\t" + key);
            LongWritable v3 = new LongWritable(value);
            // 统计的时候还应该传入日期来用来输出统计的时间,而不是获取当前时间(可能是统计历史)!
            context.write(k3, v3);
            count++;
        }
    }
}

任务定时脚本封装

任务依赖关系:数据指标统计(top10统计以及播放数据统计)依赖数据清洗作业
将任务提交命令进行封装,方便调用,便于定时任务调度

编写任务脚本,并以debug模式执行:sh -x data_clean.sh

任务执行结果监控

针对任务执行的结果进行检测,如果执行失败,则重试任务,同时发送告警信息。

#!/bin/bash
# 建议使用bin/bash形式
# 判读用户是否输入日期,如果没有则默认获取昨天日期。(需要隔几天重跑,灵活的指定日期)
if [ "x$1" = "x" ]; then
  yes_time=$(date +%y%m%d --date="1 days ago")
else
  yes_time=$1
fi

jobs_home=/home/jobs
cleanjob_input=hdfs://cent7-1:9000/data/videoinfo/${yes_time}
cleanjob_output=hdfs://cent7-1:9000/data/videoinfo_clean/${yes_time}
videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://cent7-1:9000/res/videoinfoJob/${yes_time}
top10job_input=${cleanjob_output}
top10job_output=hdfs://cent7-1:9000/res/top10/${yes_time}

# 删除输出目录,为了兼容脚本重跑
hdfs dfs -rm -r ${cleanjob_output}
# 执行数据清洗任务
hadoop jar ${jobs_home}/hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
  dataClean.DataCleanJob \
  ${cleanjob_input} ${cleanjob_output}

# 判断数据清洗任务是否成功
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
# echo $? 可以获取上一个命令的执行结果0成功,否则失败
if [ "$?" = "0" ]; then
  echo "clean job execute success ...."
  # 删除输出目录,为了兼容脚本重跑
  hdfs dfs -rm -r ${videoinfojob_output}
  hdfs dfs -rm -r ${top10job_output}
  # 执行指标统计任务1
  echo " execute VideoInfoJob ...."
  hadoop jar ${jobs_home}/hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
    videoinfo.VideoInfoJob \
    ${videoinfojob_input} ${videoinfojob_output}
  hdfs dfs -ls ${videoinfojob_output}/_SUCCESS
  if [ "$?" != "0" ]
  then
    echo " VideoInfoJob execute failed .... "
  fi
  # 指定指标统计任务2
  echo " execute VideoInfoTop10Job ...."
  hadoop jar ${jobs_home}/hadoopDemo1-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
    top10.VideoInfoTop10Job \
    ${top10job_input} ${top10job_output}
  hdfs dfs -ls ${top10job_output}/_SUCCESS
  if [ "$?" != "0" ]
  then
    echo " VideoInfoJob execute failed .... "
  fi
else
  echo "clean job execute failed ... date time is ${yes_time}"
  # 给管理员发送短信、邮件
  # 可以在while进行重试
fi

使用Sqoop将计算结果导出到MySQL

Sqoop可以快速的实现hdfs-mysql的导入导出

快速安装Sqoop工具

image.png

image.png

数据导出功能开发,使用Sqoop将MapReduce计算的结果导出到Mysql中

  1. 导出命令
sqoop export \
--connect 'jdbc:mysql://192.168.56.101:3306/data?serverTimezone=UTC&useSSL=false' \
--username 'hdp' \
--password 'admin' \
--table 'top10' \
--export-dir '/res/top10/231022' \
--input-fields-terminated-by "\t"
  1. 导出日志
[root@cent7-1 sqoop-1.4.7.bin_hadoop-2.6.0]# sqoop export \
> --connect 'jdbc:mysql://192.168.56.101:3306/data?serverTimezone=UTC&useSSL=false' \
> --username 'hdp' \
> --password 'admin' \
> --table 'top10' \
> --export-dir '/res/top10/231022' \
> --input-fields-terminated-by "\t"
Warning: /home/sqoop-1.4.7.bin_hadoop-2.6.0//../hcatalog does not exist! HCatalog jobs will fail.
Please set $HCAT_HOME to the root of your HCatalog installation.
Warning: /home/sqoop-1.4.7.bin_hadoop-2.6.0//../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
2023-10-24 23:42:09,452 INFO sqoop.Sqoop: Running Sqoop version: 1.4.7
2023-10-24 23:42:09,684 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
2023-10-24 23:42:09,997 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
2023-10-24 23:42:10,022 INFO tool.CodeGenTool: Beginning code generation
Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
2023-10-24 23:42:10,921 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `top10` AS t LIMIT 1
2023-10-24 23:42:11,061 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `top10` AS t LIMIT 1
2023-10-24 23:42:11,084 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /home/hadoop-3.2.4
注: /tmp/sqoop-root/compile/6d507cd9a1a751990abfd7eef20a60c2/top10.java使用或覆盖了已过时的 API。
注: 有关详细信息, 请使用 -Xlint:deprecation 重新编译。
2023-10-24 23:42:23,932 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-root/compile/6d507cd9a1a751990abfd7eef20a60c2/top10.jar
2023-10-24 23:42:23,972 INFO mapreduce.ExportJobBase: Beginning export of top10
2023-10-24 23:42:23,972 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
2023-10-24 23:42:24,237 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
2023-10-24 23:42:27,318 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
2023-10-24 23:42:27,325 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
2023-10-24 23:42:27,326 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
2023-10-24 23:42:27,641 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
2023-10-24 23:42:29,161 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1698153196891_0015
2023-10-24 23:42:39,216 INFO input.FileInputFormat: Total input files to process : 1
2023-10-24 23:42:39,231 INFO input.FileInputFormat: Total input files to process : 1
2023-10-24 23:42:39,387 INFO mapreduce.JobSubmitter: number of splits:4
2023-10-24 23:42:39,475 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
2023-10-24 23:42:40,171 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1698153196891_0015
2023-10-24 23:42:40,173 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-10-24 23:42:40,660 INFO conf.Configuration: resource-types.xml not found
2023-10-24 23:42:40,660 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2023-10-24 23:42:41,073 INFO impl.YarnClientImpl: Submitted application application_1698153196891_0015
2023-10-24 23:42:41,163 INFO mapreduce.Job: The url to track the job: http://cent7-1:8088/proxy/application_1698153196891_0015/
2023-10-24 23:42:41,164 INFO mapreduce.Job: Running job: job_1698153196891_0015
2023-10-24 23:43:02,755 INFO mapreduce.Job: Job job_1698153196891_0015 running in uber mode : false
2023-10-24 23:43:02,760 INFO mapreduce.Job:  map 0% reduce 0%
2023-10-24 23:43:23,821 INFO mapreduce.Job:  map 25% reduce 0%
2023-10-24 23:43:25,047 INFO mapreduce.Job:  map 50% reduce 0%
2023-10-24 23:43:26,069 INFO mapreduce.Job:  map 75% reduce 0%
2023-10-24 23:43:27,088 INFO mapreduce.Job:  map 100% reduce 0%
2023-10-24 23:43:28,112 INFO mapreduce.Job: Job job_1698153196891_0015 completed successfully
2023-10-24 23:43:28,266 INFO mapreduce.Job: Counters: 33
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=993808
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=1297
		HDFS: Number of bytes written=0
		HDFS: Number of read operations=19
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=0
		HDFS: Number of bytes read erasure-coded=0
	Job Counters 
		Launched map tasks=4
		Data-local map tasks=4
		Total time spent by all maps in occupied slots (ms)=79661
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=79661
		Total vcore-milliseconds taken by all map tasks=79661
		Total megabyte-milliseconds taken by all map tasks=81572864
	Map-Reduce Framework
		Map input records=10
		Map output records=10
		Input split bytes=586
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=3053
		CPU time spent (ms)=11530
		Physical memory (bytes) snapshot=911597568
		Virtual memory (bytes) snapshot=10326462464
		Total committed heap usage (bytes)=584056832
		Peak Map Physical memory (bytes)=238632960
		Peak Map Virtual memory (bytes)=2584969216
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=0
2023-10-24 23:43:28,282 INFO mapreduce.ExportJobBase: Transferred 1.2666 KB in 60.9011 seconds (21.2968 bytes/sec)
2023-10-24 23:43:28,291 INFO mapreduce.ExportJobBase: Exported 10 records.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/104429.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【机器学习合集】激活函数合集 ->(个人学习记录笔记)

文章目录 综述1. S激活函数(sigmoid&Tanh)2. ReLU激活函数3. ReLU激活函数的改进4. 近似ReLU激活函数5. Maxout激活函数6. 自动搜索的激活函数Swish 综述 这些都是神经网络中常用的激活函数&#xff0c;它们在非线性变换方面有不同的特点。以下是这些激活函数的主要区别&am…

Node编写获取用户信息接口

目录 前言 初始化路由模块 使用postman发送get获取用户信息请求 初始化路由处理函数模块 获取用户基本信息 前言 在前两篇文章中已经介绍了如何编写用户注册接口以及用户登录接口&#xff0c;这篇文章介绍如何获取用户信息&#xff0c;本篇文章建立在Node编写用户登录接口…

06 MIT线性代数-列空间和零空间 Column space Nullspace

1. Vector space Vector space requirements vw and c v are in the space, all combs c v d w are in the space 但是“子空间”和“子集”的概念有区别&#xff0c;所有元素都在原空间之内就可称之为子集&#xff0c;但是要满足对线性运算封闭的子集才能成为子空间 中 2 …

质数(素数)prime :只能被 1 和 它本身整除的自然数,不可再分,(三种方式求出质数)

从 2 开始&#xff0c;到这个数 减 1 结束为止&#xff0c; 都不能被这个数本身整除。例如&#xff1a;5 是否是质数 &#xff1f; 那么 2&#xff0c;3&#xff0c;4&#xff0c;都不能被 5 整除 所以 5 是 质数判断 n 是否是质数&#xff1f; 2&#xff0c;3&#xff0c;4&…

海外公司注册推广的9个实用技巧建议-华媒舍

在全球化的时代背景下&#xff0c;海外市场的开发对于企业来说是非常重要的战略决策。海外公司注册是进入海外市场的第一步&#xff0c;通过注册在海外的公司&#xff0c;企业可以获得更多的商业机会和巨大的价值。本篇文章将为您介绍海外公司注册推广的9个实用建议&#xff0c…

Elasticsearch:使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation (三)

这是继之前文章&#xff1a; Elasticsearch&#xff1a;使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation &#xff08;一&#xff09; Elasticsearch&#xff1a;使用 Open AI 和 Langchain 的 RAG - Retrieval Augmented Generation &#xff08;二&…

数字人解决方案——解决ER-NeRF/RAD-NeRF人像分割的问题

一、训练数据人像分割 训练ER-NeRF或者RAD-NeRF时&#xff0c;在数据处理时&#xff0c;其中有一步是要把人像分割出来&#xff0c;而且人像要分成三块&#xff0c;人的头部&#xff0c;人的有脖子&#xff0c;人的身体部分&#xff0c;效果如下&#xff1a; 从上面的分割的结…

通天之网:卫星互联网与跨境电商的数字化未来

在当今数字化时代&#xff0c;互联网已经成为商业的核心。跨境电商&#xff0c;作为在线商业的一部分&#xff0c;一直在寻求新的途径来拓宽其边界。近年来&#xff0c;卫星互联网技术的发展已经成为这一领域的重要驱动力&#xff0c;不仅将互联网带到了全球各个角落&#xff0…

zookeeper源码(02)源码编译启动及idea导入

本文介绍一下zookeeper-3.9.0源码下载、编译及本地启动。 下载源码 git clone https://gitee.com/apache/zookeeper.gitcd zookeeper git checkout release-3.9.0 git checkout -b release-3.9.0源码编译 README_packaging.md文件 该文件介绍了编译zookeeper需要的环境和命…

什么是React Router?它的作用是什么?

聚沙成塔每天进步一点点 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 欢迎来到前端入门之旅&#xff01;感兴趣的可以订阅本专栏哦&#xff01;这个专栏是为那些对Web开发感兴趣、刚刚踏入前端领域的朋友们量身打造的。无论你是完全的新手还是有一些基础的开发…

elasticsearch-5.6.15集群部署,如何部署x-pack并添加安全认证

目录 一、环境 1、JDK、映射、域名、三墙 2、三台服务器创建用户、并为用户授权 二、配置elasticsearch-5.6.15实例 1、官网获取elasticsearch-5.6.15.tar.gz&#xff0c;拉取到三台服务器 2、elas环境准备 3、修改elasticsearch.yml配置 4、修改软、硬件线程数 5、修改…

【springcloud-config】配置中心客户端导入依赖spring-cloud-config-server后,maven一直爆红问题解决

问题描述 配置中心客户端导入了 spring-cloud-config-server 后&#xff0c;导入依赖爆红&#xff1b; 解决办法&#xff1a; 参考官网中文文档&#xff1a;spring-cloud -config 配置中心 中文文档 补充导入 spring-config-starter-config 配置即可 <!--springcloud-c…

Linux系统之file命令的基本使用

Linux系统之file命令的基本使用 一、file命令介绍1.1 Linux简介1.2 file命令简介 二、file命令的使用帮助2.1 file命令的help帮助信息2.2 file命令的语法解释2.3 file命令的man手册 三、文件类型介绍四、file命令的基本使用4.1 查询file版本4.2 显示文件类型4.3 输出时不显示文…

MATLAB——一维离散小波的单层分解

%% 学习目标&#xff1a;一维离散小波的单层分解 %% clear all; close all; load noissin.mat; [cA,cD]dwt(noissin,sym4); %% cA是近似系数&#xff08;低频部分&#xff09;&#xff0c;cD是细节系数&#xff08;高频部分&#xff09;&#xff0c;采用的小波是sym4 f…

01. 板载硬件资源和开发环境

一、板载硬件资源 STM32F4VGT6-DISCOVERY硬件资源如下&#xff1a; (1). STM32F407VGT6微控制器有1M的FLASH存储器&#xff0c;192K的RAM&#xff0c;LQFP100封装 (2). 板上的ST-LINK_V2可以使用选择的方式把套件切换成一个独立的ST-LINK/V2来 使用&#xff08;可以使用SWD…

使用vite搭建前端项目

1、在vscode 终端那里执行创建前端工程项目&#xff0c;其中shop-admin为项目名称&#xff1a; npm init vite-app shop-admin 提示如需安装其他依赖执行npm install ....,否则忽略(第三步再讲)。 2、执行npm run dev 命令直接运行创建好的项目&#xff0c;在浏览器打开链接…

ubuntu执行普通用户或root用户执行apt-get update时报错Couldn‘t create temporary file /tmp/...

apt-get update无法更新&#xff0c;报错&#xff1a; Couldnt create temporary file /tmp/apt.conf.GSzv74 for passing config to&#xff0c;&#xff0c;&#xff0c; 这是由于/tmp目录没有权限导致的&#xff0c;解决办法&#xff1a; chmod 777 /tmp

遥感语义分割、变化检测论文小trick合集(持续更新)

目录 &#x1f497;&#x1f497;1.影像融合机制 &#x1f497;&#x1f497;2.上下文聚合模块 &#x1f497;&#x1f497;3.adapter即插即用模块 &#x1f497;&#x1f497;1.影像融合机制 参考【多源特征自适应融合网络的高分遥感影像语义分割】文章中的“多源特征自适应…

编译报错 internal compiler error: Segmentation fault 解决方法

问题描述 最近在使用虚拟机 ubuntu 20.04 编译 musl gcc 工具链时&#xff0c;遇到一个奇怪的问题&#xff0c;编译过程中异常退出&#xff0c;清理了多次重新编译&#xff0c;发现编译报错提示的信息是 internal compiler error: Segmentation fault 由于之前是可以正常编译的…

Linux shell编程学习笔记15:定义数组、获取数组元素值和长度

一、 Linux shell 脚本编程中的数组概述 数组是一种常见的数据结构。跟大多数编程语言一样&#xff0c;大多数Linux shell脚本支持数组&#xff0c;但对数组的支持程度各不相同&#xff0c;比如数组的维度&#xff0c;是支持一维数组还是多维数组&#xff1f;再如&#xff0c;…