【大数据】MapReduce实战

文章目录

    • @[toc]
      • Word Count
        • Mapper
        • Reducer
        • run.sh
        • 本地调试
      • 基于白名单的Word Count
        • Mapper
        • Reducer
        • run.sh
        • 本地调试
      • 文件分发
        • -file
          • Mapper
          • Reducer
          • run.sh
        • -cacheFile
          • Mapper
          • Reducer
          • run.sh
        • -cacheArchive
          • Mapper
          • Reducer
          • run.sh
      • 杀死MapReduce Job
      • 排序
      • 压缩文件
      • mr_ip_lib_python本地调试

因上努力

个人主页:丷从心·

系列专栏:大数据

果上随缘


Word Count

1

Mapper
import re
import sys

p = re.compile(r'\w+')

for line in sys.stdin:
    word_list = line.strip().split(' ')

    for word in word_list:
        if len(p.findall(word)) < 1:
            continue

        word = p.findall(word)[0].lower()

        print('\t'.join([word, '1']))
Reducer
import sys

cur_word = None
cur_cnt = 0

for line in sys.stdin:
    word, val = line.strip().split('\t')

    if cur_word == None:
        cur_word = word

    if cur_word != word:
        print('\t'.join([cur_word, str(cur_cnt)]))

        cur_word = word
        cur_cnt = 0

    cur_cnt += int(val)

print('\t'.join([cur_word, str(cur_cnt)]))
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_wordcount"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py" \
	-reducer "python red.py" \
	-file ./map.py \
	-file ./red.py
本地调试
cat the_man_of_property.txt | python map.py | sort -k1 | python red.py > result.txt
cat result.txt | sort -k2 -rn | head

基于白名单的Word Count

Mapper
import sys


def get_white_list_word(white_list_file):
    white_list_file = open(white_list_file, 'r')

    white_list_word = set()
    for word in white_list_file:
        word = word.strip()

        if word != '':
            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_file):
    white_list_word = get_white_list_word(white_list_file)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_white_list_word_count"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func white_list" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=3" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-file ./map.py \
	-file ./red.py \
	-file ./white_list
本地调试
cat the_man_of_property.txt | python map.py mapper_func white_list | sort -k1 | python red.py reducer_func > result.txt
cat the_man_of_property.txt | grep -o 'against' | wc -l

文件分发

-file
Mapper
import sys


def get_white_list_word(white_list_file):
    white_list_file = open(white_list_file, 'r')

    white_list_word = set()
    for word in white_list_file:
        word = word.strip()

        if word != '':
            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_file):
    white_list_word = get_white_list_word(white_list_file)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_file_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func white_list" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=3" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-file ./map.py \
	-file ./red.py \
	-file ./white_list
-cacheFile
Mapper
import sys


def get_white_list_word(white_list_file):
    white_list_file = open(white_list_file, 'r')

    white_list_word = set()
    for word in white_list_file:
        word = word.strip()

        if word != '':
            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_file):
    white_list_word = get_white_list_word(white_list_file)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_cachefile_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func WL" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=2" \
	-jobconf  "mapred.job.name=mr_cachefile_broadcast" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-cacheFile "hdfs://master:9000/white_list#WL" \
	-file "./map.py" \
	-file "./red.py"
-cacheArchive
Mapper
import os
import sys


def get_cachefile_list(white_list_dir):
    cachefile_list = []

    if os.path.isdir(white_list_dir):
        for cachefile in os.listdir(white_list_dir):
            cachefile = open(white_list_dir + '/' + cachefile)

            cachefile_list.append(cachefile)

    return cachefile_list


def get_white_list_word(white_list_dir):
    white_list_word = set()

    for cachefile in get_cachefile_list(white_list_dir):
        for word in cachefile:
            word = word.strip()

            white_list_word.add(word)

    return white_list_word


def mapper_func(white_list_dir):
    white_list_word = get_white_list_word(white_list_dir)

    for line in sys.stdin:
        word_list = line.strip().split(' ')

        for word in word_list:
            word = word.strip()

            if word != '' and word in white_list_word:
                print('\t'.join([word, '1']))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
Reducer
import sys


def reducer_func():
    cur_word = None
    cur_cnt = 0

    for line in sys.stdin:
        word, val = line.strip().split('\t')

        if cur_word == None:
            cur_word = word

        if cur_word != word:
            print('\t'.join([cur_word, str(cur_cnt)]))

            cur_word = word
            cur_cnt = 0

        cur_cnt += int(val)

    print('\t'.join([cur_word, str(cur_cnt)]))


if __name__ == '__main__':
    module = sys.modules[__name__]
    func = getattr(module, sys.argv[1])

    args = None
    if len(sys.argv) > 1:
        args = sys.argv[2:]

    func(*args)
run.sh
HADOOP_CMD="/usr/local/src/hadoop_2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop_2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_PATH="/the_man_of_property.txt"
OUTPUT_PATH="/output_mr_cachearchive_broadcast"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
	-input $INPUT_PATH \
	-output $OUTPUT_PATH \
	-mapper "python map.py mapper_func WLD" \
	-reducer "python red.py reducer_func" \
	-jobconf "mapred.reduce.tasks=2" \
	-jobconf  "mapred.job.name=mr_cachearchive_broadcast" \
	-jobconf "stream.non.zero.exit.is.failure=false" \
	-cacheArchive "hdfs://master:9000/white_list_dir.tgz#WLD" \
	-file "./map.py" \
	-file "./red.py"

杀死MapReduce Job

hadoop job -kill job_1715841477049_0001

排序

  • k e y key key的字符进行排序

压缩文件

  • 压缩文件不可切分,一个压缩文件占用一个 M a p Map Map

mr_ip_lib_python本地调试

$ head -20 ip_lib.txt | column -t
$ cat ip_lib.txt | awk '{print $1,$NF}' | head | column -t

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/639607.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

PE文件(六)新增节-添加代码作业

一.手动新增节添加代码 1.当预备条件都满足&#xff0c;节表结尾没有相关数据时&#xff1a; 现在我们将ipmsg.exe用winhex打开&#xff0c;在节的最后新增一个节用于存放我们要增加的数据 注意&#xff1a;飞鸽的文件对齐和内存对齐是一致的 先判断节表末尾到第一个节之间…

《书生·浦语大模型实战营》第一课 学习笔记:书生·浦语大模型全链路开源体系

文章大纲 1. 简介与背景智能聊天机器人与大语言模型目前的开源智能聊天机器人与云上运行模式 2. InternLM2 大模型 简介3. 视频笔记&#xff1a;书生浦语大模型全链路开源体系内容要点从模型到应用典型流程全链路开源体系 4. 论文笔记:InternLM2 Technical Report简介软硬件基础…

Flat Ads获广东电视台报道!CEO林啸:助力更多企业实现业务全球化增长

近日,在广州举行的第四届全球产品与增长展会(PAGC2024)上,Flat Ads凭借其卓越的一站式全球化营销和创新的变现方案大放异彩,不仅吸引了众多业界目光,同时也在展会上斩获了备受瞩目的“金帆奖”,展现了其在全球化营销推广领域的卓越实力和专业服务。 在大会现场,Flat Ads的CEO林…

fyne网格包裹布局

fyne网格包裹布局 与之前的网格布局一样&#xff0c;网格环绕布局以网格模式创建元素排列。但是&#xff0c;此网格没有固定数量的列&#xff0c;而是为每个单元格使用固定大小&#xff0c;然后将内容流到显示项目所需的行数。 layout.NewGridWrapLayout(size) 您可以使用其中…

如何官方查询论文分区,中科院及JCR

中科院分区 有一个小程序&#xff1a;中科院文献情报中心分区表 点2023升级版&#xff0c;输入期刊名 大类1区 JCR分区 进入官方网站 Journal Citation Reports 输入要查询的期刊名&#xff0c;点开 拼命往下拉 这就是根据影响因子的排名&#xff0c;在computer science&am…

Dijkstra算法求最短路径 c++

目录 【问题背景】 【相关知识】 【算法思想】 【算法实现】 【伪代码】 【输入输出】 【代码】 【问题背景】 出门旅游&#xff0c;有些城市之间有公路&#xff0c;有些城市之间则没有&#xff0c;如下图。为了节省经费以及方便计划旅程&#xff0c;希望在出发之前知道…

【iceberg数据一致性】iceberg如何保证高并发数据一致性

在使用iceberg写数据时&#xff0c;一直弄不清楚为什么iceberg写入快&#xff0c;并且能够保证数据的一致性。今天决定搞清楚这个问题&#xff0c;经过查询和理解&#xff0c;写下来。 文件格式 iceberg元数据的文件目前有三个&#xff1a;metadata.json&#xff0c;snap.avro…

MyBatis实用方案,如何使项目兼容多种数据库

系列文章目录 MyBatis缓存原理 Mybatis plugin 的使用及原理 MyBatisSpringboot 启动到SQL执行全流程 数据库操作不再困难&#xff0c;MyBatis动态Sql标签解析 Mybatis的CachingExecutor与二级缓存 使用MybatisPlus还是MyBaits &#xff0c;开发者应该如何选择&#xff1f; 巧…

SVN创建项目分支

目录 背景调整目录结构常规目录结构当前现状目标 调整SVN目录调整目录结构创建项目分支 效果展示 背景 当前自己本地做项目的时候发现对SVN创建项目不规范&#xff0c;没有什么目录结构&#xff0c;趁着创建目录分支的契机&#xff0c;顺便调整下SVN服务器上的目录结构 调整目…

Day36 代码随想录打卡|二叉树篇---翻转二叉树

题目&#xff08;leecode T226&#xff09;&#xff1a; 给你一棵二叉树的根节点 root &#xff0c;翻转这棵二叉树&#xff0c;并返回其根节点。 方法&#xff1a; 迭代法 翻转二叉树&#xff0c;即从根节点开始&#xff0c;一一交换每个节点的左右孩子节点&#xff0c;然后…

【Arthas】阿里的线上jvm监控诊断工具的基本使用

关于对运行中的项目做java监测的需求下&#xff0c;Arthas则是一个很好的解决方案。 我们可以用来 1.监控cpu 现成、内存、堆栈 2.排查cpu飚高 造成原因 3.接口没反应 是否死锁 4.接口慢优化 5.代码未按预期执行 是分支不对 还是没提交&#xff1f; 6.线上低级错误 能不能不重启…

伦敦金交易商压箱底的交易技法 居然是……

很多伦敦金交易商&#xff0c;也就是我们常说的伦敦金交易平台&#xff0c;或者伦敦金交易服务提供商&#xff0c;他们会和一些资深的市场分析师合作。另外&#xff0c;一般在这些伦敦金交易商内部&#xff0c;也会有一批高手&#xff0c;他们一边在交易&#xff0c;一边在平台…

【设计模式深度剖析】【3】【创建型】【抽象工厂模式】| 要和【工厂方法模式】对比加深理解

&#x1f448;️上一篇:工厂方法模式 | 下一篇:建造者模式&#x1f449;️ 目录 抽象工厂模式前言概览定义英文原话直译什么意思呢&#xff1f;&#xff08;以运动型车族工厂&#xff0c;生产汽车、摩托产品为例&#xff09; 类图4个角色抽象工厂&#xff08;Abstract Fac…

起底震网病毒的来龙去脉

2010年&#xff0c;震网病毒被发现&#xff0c;引起世界哗然&#xff0c;在后续的10年间&#xff0c;陆陆续续有更多关于该病毒的背景和细节曝光。今年&#xff0c;《以色列时报》和《荷兰日报》又披露了关于此事件的更多信息&#xff0c;基于这些信息&#xff0c;我们重新梳理…

使用 Docker 部署 Jenkins 并设置初始管理员密码

使用 Docker 部署 Jenkins 并设置初始管理员密码 每一次开始&#xff0c;我都特别的认真与胆怯&#xff0c;是因为我期待结局&#xff0c;也能够不会那么粗糙&#xff0c;不会让我失望&#xff0c;所以&#xff0c;就多了些思考&#xff0c;多了些拘束&#xff0c;所以&#xf…

软件测试:功能测试-接口测试-自动化测试-性能测试-验收测试

软件测试的主要流程 一、测试主要的四个阶段 1.测试计划设计阶段&#xff1a;产品立项之后&#xff0c;进行需求分析&#xff0c;需求评审&#xff0c;业务需求评级&#xff0c;绘制业务流程图。确定测试负责人&#xff0c;开始制定测试计划&#xff1b; 2.测试准备阶段&…

不小心丢失mfc140u.dll文件怎么办?mfc140u.dll丢失的解决办法

当您发现mfc140u.dll文件不见了或者受损&#xff0c;别担心&#xff0c;我们可以一起解决这个问题&#xff01;首先&#xff0c;您可能会注意到一个小提示&#xff0c;当您尝试打开某些程序时&#xff0c;屏幕上会跳出一个消息说“找不到mfc140u.dll”或者“mfc140u.dll文件缺失…

心识宇宙 x TapData:如何加速落地实时数仓,助力 AI 企业智慧决策

使用 TapData&#xff0c;化繁为简&#xff0c;摆脱手动搭建、维护数据管道的诸多烦扰&#xff0c;轻量代替 OGG、DSG 等同步工具&#xff0c;「CDC 流处理 数据集成」组合拳&#xff0c;加速仓内数据流转&#xff0c;帮助企业将真正具有业务价值的数据作用到实处&#xff0c…

Python的selenium爬取

1.selenium 1.1.前言 使用python的requests模块还是存在很大的局限性&#xff0c;例如&#xff1a;只发一次请求&#xff1b;针对ajax动态加载的网页则无法获取数据等等问题。特此&#xff0c;本章节将通过selenium模拟浏览器来完成更高级的爬虫抓取任务。 1.2.什么是seleniu…

学习单向链表带哨兵demo

一、定义 在计算机科学中&#xff0c;链表是数据元素的线性集合&#xff0c;其每个元素都指向下一个元素&#xff0c;元素存储上并不连续。 1.可以分三类为 单向链表&#xff0c;每个元素只知道其下一个元素是谁 双向链表&#xff0c;每个元素知道其上一个元素和下一个元素 …