MapReduce综合应用案例 — 电信数据清洗

文章目录

  • 第1关:数据清洗


第1关:数据清洗

测试说明
平台会对你编写的代码进行测试:

评测之前先在命令行启动hadoop:start-all.sh;

点击测评后MySQL所需的数据库和表会自动创建好。

PhoneLog:封装对象
LogMR:MapReduce操作
DBHelper:MySQL工具类

具体本关的预期输出请查看右侧测试集。

因为大数据实训消耗资源较大,且map/reduce运行比较耗时,所以评测时间较长,大概在60秒左右,请耐心等待。
在这里插入图片描述
在该箭头所指的位置进行代码文件的切换。

代码文件如下:
LogMR类

package com;
import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class LogMR {
    /********** begin **********/
    static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
        Map<String, String> userMap = new HashMap<>();
        Map<String, String> addressMap = new HashMap<>();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        PhoneLog pl = new PhoneLog();
        Text text = new Text();
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Connection connection = DBHelper.getConnection();
            try {
                Statement statement = connection.createStatement();
                String sql = "select * from userphone";
                ResultSet resultSet = statement.executeQuery(sql);
                while (resultSet.next()) {
                    String phone = resultSet.getString(2);
                    String trueName = resultSet.getString(3);
                    userMap.put(phone, trueName);
                }
                String sql2 = "select * from allregion";
                ResultSet resultSetA = statement.executeQuery(sql2);
                while (resultSetA.next()) {
                    String phone = resultSetA.getString(2);
                    String trueName = resultSetA.getString(3);
                    addressMap.put(phone, trueName);
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String str = value.toString();
            String[] split = str.split(",");
            if (split.length == 6) {
                String trueName1 = userMap.get(split[0]);
                String trueName2 = userMap.get(split[1]);
                String address1 = addressMap.get(split[4]);
                String address2 = addressMap.get(split[5]);
                long startTimestamp = Long.parseLong(split[2]);
                String startTime = sdf.format(startTimestamp * 1000);
                long endTimestamp = Long.parseLong(split[3]);
                String endTime = sdf.format(endTimestamp * 1000);
                long timeLen = endTimestamp - startTimestamp;
                pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
                        address2);
                context.write(pl, NullWritable.get());
            }
        }
    }
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        job.setJarByClass(LogMR.class);
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(PhoneLog.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(0);
        Path inPath = new Path("/user/test/input/a.txt");
        Path out = new Path("/user/test/output");
        FileInputFormat.setInputPaths(job, inPath);
        FileOutputFormat.setOutputPath(job, out);
        job.waitForCompletion(true);
    }
    /********** end **********/
}

DBHelper类

package com;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class DBHelper {
    /********** begin **********/
    private static final String driver = "com.mysql.jdbc.Driver";
    private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
    private static final String username = "root";// 数据库的用户名
    private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
    private static Connection conn = null; // 声明数据库连接对象
    static {
        
        try {
            Class.forName(driver);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
    public static Connection getConnection() {
        if (conn == null) {
            try {
                conn = DriverManager.getConnection(url, username, password);
            } catch (SQLException e) {
                e.printStackTrace();
            } // 连接数据库
            return conn;
        }
        return conn;
    }
    /********** end **********/
}

PhoneLog类

package com;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class PhoneLog implements WritableComparable<PhoneLog> {
    private String userA;
    private String userB;
    private String userA_Phone;
    private String userB_Phone;
    private String startTime;
    private String endTime;
    private Long timeLen;
    private String userA_Address;
    private String userB_Address;
    public PhoneLog() {
    }
    public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
            String endTime, Long timeLen, String userA_Address, String userB_Address) {
        this.userA = userA;
        this.userB = userB;
        this.userA_Phone = userA_Phone;
        this.userB_Phone = userB_Phone;
        this.startTime = startTime;
        this.endTime = endTime;
        this.timeLen = timeLen;
        this.userA_Address = userA_Address;
        this.userB_Address = userB_Address;
    }
    public String getUserA_Phone() {
        return userA_Phone;
    }
    public void setUserA_Phone(String userA_Phone) {
        this.userA_Phone = userA_Phone;
    }
    public String getUserB_Phone() {
        return userB_Phone;
    }
    public void setUserB_Phone(String userB_Phone) {
        this.userB_Phone = userB_Phone;
    }
    public String getUserA() {
        return userA;
    }
    public void setUserA(String userA) {
        this.userA = userA;
    }
    public String getUserB() {
        return userB;
    }
    public void setUserB(String userB) {
        this.userB = userB;
    }
    public String getStartTime() {
        return startTime;
    }
    public void setStartTime(String startTime) {
        this.startTime = startTime;
    }
    public String getEndTime() {
        return endTime;
    }
    public void setEndTime(String endTime) {
        this.endTime = endTime;
    }
    public Long getTimeLen() {
        return timeLen;
    }
    public void setTimeLen(Long timeLen) {
        this.timeLen = timeLen;
    }
    public String getUserA_Address() {
        return userA_Address;
    }
    public void setUserA_Address(String userA_Address) {
        this.userA_Address = userA_Address;
    }
    public String getUserB_Address() {
        return userB_Address;
    }
    public void setUserB_Address(String userB_Address) {
        this.userB_Address = userB_Address;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(userA);
        out.writeUTF(userB);
        out.writeUTF(userA_Phone);
        out.writeUTF(userB_Phone);
        out.writeUTF(startTime);
        out.writeUTF(endTime);
        out.writeLong(timeLen);
        out.writeUTF(userA_Address);
        out.writeUTF(userB_Address);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        userA = in.readUTF();
        userB = in.readUTF();
        userA_Phone = in.readUTF();
        userB_Phone = in.readUTF();
        startTime = in.readUTF();
        endTime = in.readUTF();
        timeLen = in.readLong();
        userA_Address = in.readUTF();
        userB_Address = in.readUTF();
    }
    @Override
    public String toString() {
        return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
                + timeLen + "," + userA_Address + "," + userB_Address;
    }
     @Override
     public int compareTo(PhoneLog pl) {
     if(this.hashCode() == pl.hashCode()) {
     return 0;
     }
     return -1;
     }
}

之后在命令行启动hadoop

start-all.sh

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/258036.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android定制ROM简介

Android定制ROM简介 这篇文章是为对自定义ROM、AOSP等词汇不太熟悉的技术爱好者和好奇的人写的。我希望通过向您介绍这个世界来开始博客写作。 在我们将注意力转向定制ROM之前&#xff0c;让我们先了解一些基础知识。 什么是操作系统&#xff1f; 维基百科对此的定义简洁而…

探讨二维半导体的概念、应用前景及其与传统半导体的差异

当探讨二维半导体时&#xff0c;我们置身于科技革新的前沿。这种材料以其纳米级薄度和独特电学性质区别于传统半导体&#xff0c;引发了科学界的广泛兴趣。本文将深入探讨二维半导体的概念、应用前景及其与传统半导体的差异。 什么是二维半导体&#xff1f; 二维半导体是由单…

计算机网络 网络层下 | IPv6 路由选择协议,P多播,虚拟专用网络VPN,MPLS多协议标签

文章目录 5 IPv65.1 组成5.2 IPv6地址5.3 从IPv4向IPv6过渡5.3.1 双协议栈5.3.2 隧道技术 6 因特网的路由选择协议6.1 内部网关协议RIP6.2 内部网关协议 OSPF基本特点 6.3 外部网关协议 BGP6.3.1 路由选择 6.4 路由器组成6.4.1 基本了解6.4.2 结构 7 IP多播7.1 硬件多播7.2 IP多…

0062-Java运算符

文章目录 1.运算符介绍2.算术运算符2.1 介绍2.2 细节说明 3.关系运算符(比较运算符)3.1 介绍3.2 细节说明 4.逻辑运算符4.1 介绍4.2 逻辑运算规则4.3 && 和 & 基本规则4.4 && 和 & 使用区别4.5 || 和 | 基本规则4.6 || 和 | 使用区别 5. ! 取反 基本规…

Logback简介与配置详解

在开发和维护Spring Boot应用程序时&#xff0c;一个强大而灵活的日志框架是至关重要的。Spring Boot默认集成了Logback&#xff0c;一个高性能的Java日志框架。本文将介绍如何配置Logback以满足你的日志记录需求。 Logback简介 官方网址&#xff1a;https://logback.qos.ch/ …

Kafka核心参数(带完善)

客户端 api Kafka提供了以下两套客户端API HighLevel(重点)LowLevel HighLevel API封装了kafka的运行细节&#xff0c;使用起来比较简单&#xff0c;是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节&#xff0c;Partition&#x…

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消费收发实战

文章目录 Spring Cloud Alibaba 集成 RocketMQ 最佳实践集成依赖DashBoard消息收发实战 Spring Cloud Alibaba 集成 RocketMQ 最佳实践 SpringBoot 相对于 SSM 来说已经很大程度上简化了开发&#xff0c;但是使用 SpringBoot 集成一些第三方的框架&#xff0c;还是需要花费一些…

Node.js 工作线程与子进程:应该使用哪一个

Node.js 工作线程与子进程&#xff1a;应该使用哪一个 并行处理在计算密集型应用程序中起着至关重要的作用。例如&#xff0c;考虑一个确定给定数字是否为素数的应用程序。如果我们熟悉素数&#xff0c;我们就会知道必须从 1 遍历到该数的平方根才能确定它是否是素数&#xff…

搭建知识付费平台?明理信息科技为你提供全程解决方案

明理信息科技saas知识付费平台 在当今数字化时代&#xff0c;知识付费已经成为一种趋势&#xff0c;越来越多的人愿意为有价值的知识付费。然而&#xff0c;公共知识付费平台虽然内容丰富&#xff0c;但难以满足个人或企业个性化的需求和品牌打造。同时&#xff0c;开发和维护…

Python Pandas Excel/csv文件的保存与读取(第14讲)

Python Pandas Excel/csv文件的读取于保存(第14讲)         🍹博主 侯小啾 感谢您的支持与信赖。☀️ 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔…

在 Kubernetes 上部署 Python 3.7、Chrome 和 Chromedriver(版本 114.0.5735.90)的完整指南

一、构建基础镜像 docker build -f /u01/isi/DockerFile . -t thinking_code.com/xhh/crawler_base_image:v1.0.2docker push thinking_code.com/xhh/crawler_base_image:v1.0.2 二、K8s运行Pod 三、DockerFile文件 # 基于镜像基础 FROM python:3.7# 设置代码文件夹工作目录…

【论文解读】Comparing VVC, HEVC and AV1 using Objective and Subjective Assessments

时间&#xff1a;2020 级别&#xff1a;IEEE 机构&#xff1a; IEEE 组织 摘要&#xff1a; 对3种最新的视频编码标准HEVC (High Efficiency video Coding)测试模型HM (High Efficiency video Coding)、amedia video 1 (AV1)和Versatile video Coding测试模型 (VTM)进行了客观和…

关于“Python”的核心知识点整理大全25

目录 10.3.4 else 代码块、 10.3.5 处理 FileNotFoundError 异常 alice.py 在这个示例中&#xff0c;try代码块引发FileNotFoundError异常&#xff0c;因此Python找出与该错误匹配的 except代码块&#xff0c;并运行其中的代码。最终的结果是显示一条友好的错误消息&#x…

抖音直播间websocket礼物和弹幕消息推送可能出现重复的情况,解决办法

在抖音直播间里&#xff0c;通过websocket收到的礼物消息数据格式如下&#xff1a; {common: {method: WebcastGiftMessage,msgId: 7283420150152942632,roomId: 7283413007005207308,createTime: 1695803662805,isShowMsg: True,describe: 莎***:送给主播 1个入团卡,priority…

速学数据结构 | 二叉树堆的实现详解篇

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏:《速学数据结构》 《C语言进阶篇》 ⛺️生活的理想&#xff0c;就是为了理想的生活! &#x1f4cb; 前言 &#x1f308;hello&#xff01; 各位宝子们大家好啊&#xff0c;二叉树的概念大家都了解了那么我们…

自动驾驶学习笔记(十九)——Planning模块

#Apollo开发者# 学习课程的传送门如下&#xff0c;当您也准备学习自动驾驶时&#xff0c;可以和我一同前往&#xff1a; 《自动驾驶新人之旅》免费课程—> 传送门 《Apollo 社区开发者圆桌会》免费报名—>传送门 文章目录 前言 Planning作用 Planning内容 Plannin…

uni-app ucharts中饼图与圆环图区别

项目情况&#xff1a; uni-app的用于移动端H5项目&#xff0c;包使用uni_modules目录存放。 图表引用ucharts中的echarts配置的组件方式 区别1 饼图与圆环图在echarts使用的配置都是pie类型。但是配置raduis使用&#xff1a; radius: [40%, 70%] 区别2 组件type指明&#xf…

保护您的Android应用程序:Android应用程序安全一览

保护您的Android应用程序&#xff1a;Android应用程序安全一览 我们都知道Android是为所有人设计的——开放、面向开发者、面向用户&#xff0c;这种开放性为今天和明天的移动技术提供了很多便利。然而&#xff0c;开放性也带来了需要妥善处理的安全风险。 安全是我们所有人都…

广州华锐互动VRAR:利用VR开展新能源汽车触电安全演练,降低培训成本和风险

随着新能源汽车行业的快速发展&#xff0c;相关的安全培训也变得越来越重要。其中&#xff0c;触电急救培训对于保障驾驶员和乘客的安全具有重要意义。传统培训方式存在一些不足&#xff0c;而利用VR技术进行培训则具有很多优势。 利用VR技术开展新能源汽车触电安全演练可以在模…

ansible模块 (7-13)

模块 7、hostname模块&#xff1a; 远程主机名管理模块 ansible 192.168.10.202 -m hostname -a nameliu 8、copy模块&#xff1a; 用于复制指定的主机文件到远程主机的模块 常用参数&#xff1a; dest: 指出要复制的文件在哪&#xff0c;必须使用绝对路径。如果源目标是…