【智能大数据分析 | 实验三】Storm实验:实时WordCountTopology

在这里插入图片描述

【作者主页】Francek Chen
【专栏介绍】 ⌈ ⌈ 智能大数据分析 ⌋ ⌋ 智能大数据分析是指利用先进的技术和算法对大规模数据进行深入分析和挖掘,以提取有价值的信息和洞察。它结合了大数据技术、人工智能(AI)、机器学习(ML)和数据挖掘等多种方法,旨在通过自动化的方式分析复杂数据集,发现潜在的价值和关联性,实现数据的自动化处理和分析,从而支持决策和优化业务流程。与传统的人工分析相比,智能大数据分析具有自动化、深度挖掘、实时性和可视化等特点。智能大数据分析广泛应用于各个领域,包括金融服务、医疗健康、零售、市场营销等,帮助企业做出更为精准的决策,提升竞争力。
【GitCode】专栏资源保存在我的GitCode仓库:https://gitcode.com/Morse_Chen/Intelligent_bigdata_analysis。

文章目录

    • 一、实验目的
    • 二、实验要求
    • 三、实验原理
      • (一)Topologies
      • (二)Spouts
      • (三)Bolts
    • 四、实验环境
    • 五、实验内容和步骤
      • (一)启动 Storm 集群
      • (二)导入依赖 jar 包
      • (三)编写程序
      • (四)打包上传并运行
    • 六、实验结果
    • 七、实验心得


一、实验目的

掌握如何用 Java 代码来实现 Storm 任务的拓扑,掌握一个拓扑中 Spout 和 Bolt 的关系及如何组织它们之间的关系,掌握如何将 Storm 任务提交到集群。

二、实验要求

编写一个 Storm 拓扑,一个 Spout 每个一秒钟随机生成一个单词并发射给 Bolt,Bolt 统计接收到的每个单词出现的频率并每隔一秒钟实时打印一次统计结果,最后将任务提交到集群运行,并通过日志查看任务运行结果。

三、实验原理

Storm 集群和 Hadoop 集群表面上看很类似。但是 Hadoop 上运行的是 MapReduce jobs,而在 Storm 上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个 MapReduce job 最终会结束, 而一个 topology 永远会运行(除非你手动 kill 掉)。

(一)Topologies

一个 topology 是spoutsbolts组成的图,通过 stream groupings 将图中的 spouts 和 bolts 连接起来,如图所示。

在这里插入图片描述
一个 topology 会一直运行直到你手动 kill 掉,Storm 自动重新分配执行失败的任务, 并且 Storm 可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。

运行一个 topology 很简单。首先,把你所有的代码以及所依赖的 jar 打进一个 jar 包。然后运行类似下面的这个命令:

storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这个命令会运行主类:backtype.strom.MyTopology,参数是arg1arg2。这个类的 main 函数定义这个 topology 并且把它提交给 Nimbus。storm jar负责连接到 Nimbus 并且上传 jar 包。

Topology 的定义是一个 Thrift 结构,并且 Nimbus 就是一个 Thrift 服务, 你可以提交由任何语言创建的 topology。上面的方面是用 JVM-based 语言提交的最简单的方法。

(二)Spouts

消息源 spout 是 Storm 里面一个 topology 里面的消息生产者。一般来说消息源会从一个外部源读取数据并且向 topology 里面发出消息:tuple。Spout 可以是可靠的也可以是不可靠的。如果这个 tuple 没有被 storm 成功处理,可靠的消息源 spouts 可以重新发射一个 tuple, 但是不可靠的消息源 spouts 一旦发出一个 tuple 就不能重发了。

消息源可以发射多条消息流 stream。使用OutputFieldsDeclarer.declareStream来定义多个 stream,然后使用SpoutOutputCollector来发射指定的 stream。

Spout类里面最重要的方法是nextTuple。要么发射一个新的 tuple 到 topology 里面或者简单的返回如果已经没有新的 tuple。要注意的是 nextTuple 方法不能阻塞,因为 storm 在同一个线程上面调用所有消息源 spout 的方法。

另外两个比较重要的 spout 方法是ackfail。storm 在检测到一个 tuple 被整个 topology 成功处理的时候调用 ack,否则调用 fail。storm 只对可靠的 spout 调用 ack 和 fail。

(三)Bolts

所有的消息处理逻辑被封装在 bolts 里面。Bolts 可以做很多事情:过滤,聚合,查询数据库等等。Bolts 可以简单的做消息流的传递。复杂的消息流处理往往需要很多步骤,从而也就需要经过很多 bolts。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

Bolts 可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义 stream,使用OutputCollector.emit来选择要发射的 stream。

Bolts 的主要方法是execute,它以一个 tuple 作为输入,bolts 使用OutputCollector来发射 tuple,bolts 必须要为它处理的每一个 tuple 调用OutputCollector的 ack 方法,以通知 Storm 这个 tuple 被处理完成了,从而通知这个 tuple 的发射者 spouts。 一般的流程是: bolts 处理一个输入 tuple,发射0个或者多个 tuple,然后调用 ack 通知 storm 自己已经处理过这个 tuple 了。storm 提供了一个 IBasicBolt 会自动调用 ack。

四、实验环境

  • 云创大数据实验平台:
    在这里插入图片描述

  • Java 版本:jdk1.7.0_79

  • Hadoop 版本:hadoop-2.7.1

  • ZooKeeper 版本:zookeeper-3.4.6

  • Storm 版本:storm-0.10.0

  • Eclipse 版本:eclipse-jee-luna-SR2-win32-x86_64

五、实验内容和步骤

本实验主要演示一个完整的 Storm 拓扑编码过程,主要包含 Spout、Bolt 和构建 Topology 几个步骤。

(一)启动 Storm 集群

首先,启动 Storm 集群。

实验的准备工作是:域名映射、免密登录、JDK 配置、部署 ZooKeeper、部署 Storm 等。该实验可以点击一键搭建后能看到搭建成功,即可自动搭建好环境。

(二)导入依赖 jar 包

其次,将 Storm 安装包的 lib 目录内如下 jar 包导入到开发工具:

在这里插入图片描述

然后再在 Eclipse 中对每个 jar 执行如下操作进行添加配置:

在这里插入图片描述

出现这样即可:

在这里插入图片描述

(三)编写程序

我们在项目的 src 中首先创建一个cproc.word包。

在这里插入图片描述

然后,编写代码,实现一个完整的 Topology,内容如下:

Spout 随机发送单词,代码实现:

package cproc.word;

import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class WordReaderSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
    {
        this.collector = collector;
    }
    @Override
    public void nextTuple() {
    	 //这个方法会不断被调用,为了降低它对CPU的消耗,让它sleep一下
     Utils.sleep(1000);
     final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
     Random rand = new Random();
     String word = words[rand.nextInt(words.length)];
     collector.emit(new Values(word));
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

Bolt 单词计数,并每隔一秒打印一次,代码实现:

package cproc.word;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;

public class WordCounterBolt extends BaseBasicBolt {
    private static final long serialVersionUID = 5683648523524179434L;
    private HashMap<String, Integer> counters = new HashMap<String, Integer>();
    private volatile boolean edit = false;
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        //定义一个线程1秒钟打印一次统计的信息
        new Thread(new Runnable() {
          public void run() {
             while (true) {
               if (edit) {
                   for (Entry<String, Integer> entry : counters.entrySet())
                   {
                      System.out.println(entry.getKey() + " : " + entry.getValue());
                    }
                    edit = false;
                }
                try {
                    Thread.sleep(1000);
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                  }
              }
            }
        }).start();
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String str = input.getString(0);
        if (!counters.containsKey(str)) {
            counters.put(str, 1);
        } else {
            Integer c = counters.get(str) + 1;
            counters.put(str, c);
        }
        edit = true;
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }
}

构建 Topology 并提交到集群主函数,代码实现:

package cproc.word;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.AuthorizationException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.topology.TopologyBuilder;

public class WordCountTopo {
    public static void main(String[] args) throws Exception{
      //构建Topology
      TopologyBuilder builder = new TopologyBuilder();
      builder.setSpout("word-reader", new WordReaderSpout());
      builder.setBolt("word-counter", new WordCounterBolt())
      .shuffleGrouping("word-reader");
      Config conf = new Config();
      //集群方式提交
      StormSubmitter.submitTopologyWithProgressBar("wordCount", conf,
      builder.createTopology());
    }
}

(四)打包上传并运行

将 Storm 代码打成wordCount-Storm.jar (打包的时候不要包含 storm 中的 jar,不然会报错的,将无法运行,即:wordCount-Storm.jar中只包含上面三个类的代码) 上传到主节点的/usr/cstor/storm/bin目录下。

在这里插入图片描述

这里需要注意的是我们不勾选上图框选的选项,这样就不会打包项目中的jar包。

在这里插入图片描述

在主节点进入 Storm 安装目录的 bin 下面用以下命令提交任务:

cd /usr/cstor/storm/bin
./storm jar wordCount-Storm.jar cproc.word.WordCountTopo wordCount

在这里插入图片描述

因为 topology 会永远运行,需要手动 kill 掉,使用以下命令结束 storm 任务:

./storm kill wordCount

在这里插入图片描述

六、实验结果

Storm 任务执行时,可以查看 Storm 日志文件,日志里面打印了统计的单词结果,日志内容如图。注意要到 slave1 服务器上查看日志文件。

cd /usr/cstor/storm/logs
ls
cat wordCount-1-1728785733-worker-6703.log

在这里插入图片描述
……

在这里插入图片描述

七、实验心得

  在本次 Storm 实验中,我深入了解了如何使用 Apache Storm 实现一个实时 WordCountTopology。Apache Storm 是一个开源的分布式实时计算系统,用于处理大量的数据流。通过本次实验,我不仅掌握了 Storm 的基本概念,还学会了如何使用 Java 代码来实现 Storm 任务的拓扑,以及如何将 Storm 任务提交到集群中运行。

  实验的核心是创建一个能够实时统计单词频率的 Topology。这个 Topology 由一个 Spout 和多个 Bolt 组成。Spout 负责生成或接收外部数据流,并将其转换为 Storm 内部的 Tuple(消息传递的基本单元)。在这个实验中,Spout 每秒随机生成一个单词,并将其发送给 Bolt。Bolt 则负责处理接收到的 Tuple,进行单词统计,并每隔一秒打印一次统计结果。

  在实验过程中,我首先通过 Eclipse 创建了一个 StormTest 项目,并导入了所需的依赖 jar 包。然后,我创建了三个 Java 类:WordReaderSpoutWordCounterBoltWordCountTopo。WordReaderSpout 负责生成单词,WordCounterBolt 负责将单词拆分和统计单词频率。最后,在 WordCountTopo 类中定义了 Topology 的结构,并将这些组件组织起来。

  在将 Topology 提交到 Storm 集群之前,我首先在本地模式下进行了测试。通过运行storm jar命令,我成功地将 Topology 提交给 Nimbus(Storm 的主节点),并在本地机器上模拟了 Storm 集群的运行环境。测试结果显示,Topology 能够正确地生成单词、拆分单词并统计单词频率。

  接下来,我将 Topology 提交到了实际的 Storm 集群中运行。在集群模式下,我需要注意一些额外的配置,如设置 worker 的数量、executor 的数量以及 task 的数量等。这些配置对于优化 Topology 的性能至关重要。通过合理地配置并行度,我成功地提高了 Topology 的处理效率。

  总的来说,这次 Storm 实验让我对分布式实时计算系统有了更深入的了解。通过实践,我不仅掌握了 Storm 的基本概念和操作方法,还学会了如何优化 Topology 的性能和解决实际问题。我相信这些经验和知识将对我未来的学习和工作产生积极的影响。

:以上文中的数据文件及相关资源下载地址:
链接:https://pan.quark.cn/s/002139a4dc81
提取码:V7B3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/896191.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

nginx中的HTTP 负载均衡

HTTP 负载均衡&#xff1a;如何实现多台服务器的高效分发 为了让流量均匀分配到两台或多台 HTTP 服务器上&#xff0c;我们可以通过 NGINX 的 upstream 代码块实现负载均衡。 方法 在 NGINX 的 HTTP 模块内使用 upstream 代码块对 HTTP 服务器实施负载均衡&#xff1a; upstr…

小新学习Docker之Ansible 的脚本 --- playbook 剧本

一、playbook 剧本简介 playbooks 本身由以下各部分组成&#xff1a; &#xff08;1&#xff09;Tasks&#xff1a;任务&#xff0c;即通过 task 调用 ansible 的模板将多个操作组织在一个 playbook 中运行 &#xff08;2&#xff09;Variables&#xff1a;变量 &#xff08;3…

【Java后端】之 ThreadLocal 详解

想象一下&#xff0c;你有一个工具箱&#xff0c;里面放着各种工具。在多人共用这个工具箱的时候&#xff0c;很容易出现混乱&#xff0c;比如有人拿走了你的锤子&#xff0c;或者你找不到合适的螺丝刀。为了避免这种情况&#xff0c;最好的办法就是每个人都有自己独立的工具箱…

商业智能(BI)及其常见技术

简介 商业智能&#xff08;Business Intelligence, BI&#xff09;是一系列技术和方法的集合&#xff0c;旨在帮助企业从大量数据中提取有用的信息&#xff0c;支持决策制定和业务优化。商业智能系统通常包括数据收集、数据存储、数据处理、数据分析和数据可视化等多个环节&am…

数据结构-顺序栈

栈&#xff1a;是一种特殊的线性表&#xff0c;其只允许在表尾进行插入和删除元素操作。进行数据插入和删除操作的一端称为栈顶&#xff0c;另一端称为栈底。栈中的数据元素遵守后进先出LIFO&#xff08;Last In First Out&#xff09;的原则。 压栈&#xff1a;栈的插入操作叫…

nvm实现node多版本管理

在项目中是否遇到vue2和vue3项目同时要启动&#xff0c;而又得频繁去安装卸载node版本的困扰&#xff1f; Node Version Manager (nvm) 是一个用于管理多个 Node.js 版本的工具&#xff0c;它允许你在同一台机器上安装和切换不同版本的 Node.js&#xff0c;非常适合开发者在不…

Ubuntu22.04中安装英伟达驱动并部署Pytorch深度学习环境

安装英伟达驱动 本文基于windows10ubuntu22.04双系统&#xff0c;给ubuntu22.04安装英伟达驱动。 安装依赖。 sudo apt update # 获取最新的软件包信息 sudo apt upgrade # 升级软件包 sudo apt install g sudo apt install gcc sudo apt install make禁用ubuntu默认驱动Nouv…

Crawler4j在多线程网页抓取中的应用

网页爬虫作为获取网络数据的重要工具&#xff0c;其效率和性能直接影响到数据获取的速度和质量。Crawler4j作为一个强大的Java库&#xff0c;专门用于网页爬取&#xff0c;提供了丰富的功能来帮助开发者高效地抓取网页内容。本文将探讨如何利用Crawler4j进行多线程网页抓取&…

RHCE的学习(3)

第三章 远程登录服务 简介 概念 远程连接服务器通过文字或图形接口方式来远程登录系统&#xff0c;让你在远程终端前登录linux主机以取得可操作主机接口&#xff08;shell&#xff09;&#xff0c;而登录后的操作感觉就像是坐在系统前面一样 功能: 分享主机的运算能力 服务…

京存助力北京某电力研究所数据采集

北京某电力研究所已建成了一套以光纤为主&#xff0c;卫星、载波、微波等多种通信方式共存&#xff0c;分层级的电力专用的网络通信架构体系。随着用电、配电对网络的要求提高&#xff0c;以及终端通信入网的迅速发展&#xff0c;迫切地需要高效的通信管理系统来应对大规模、复…

Java项目-基于springboot框架的校园在线拍卖系统项目实战(附源码+文档)

作者&#xff1a;计算机学长阿伟 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、ElementUI等&#xff0c;“文末源码”。 开发运行环境 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/…

ubuntu 虚拟机将linux文件夹映射为windows网络位置

在使用虚拟机时可以选择将windows的文件夹设置为共享文件夹方便在虚拟机中访问windows中的文件,同理,也可以将linux的文件夹共享为一个网络文件夹,通过windows的添加一个网络位置功能,将linux的文件夹映射到windows本地,方便windows访问使用linux的文件夹 参照如下:https://blo…

ThingsBoard 规则链节点:Create Alarm节点详解

引言 用法和含义 主要功能 配置步骤 使用场景 实际项目中的应用案例 案例1&#xff1a;智能温室管理系统 案例2&#xff1a;工厂设备监控系统 总结 引言 ThingsBoard 是一个开源的物联网平台&#xff0c;它提供了设备管理、数据收集、处理和可视化等功能。规则链是 Thi…

私域电商新纪元:消费增值模式引领业绩飞跃

朋友们&#xff0c;你们好&#xff01;我是吴军&#xff0c;热衷于引领各位深入发掘私域电商领域的独特魅力及其隐藏的机遇。 今日&#xff0c;我想讲述一个激励人心的真实案例。就在过去的一个月里&#xff0c;我们的合作伙伴取得了惊人的业绩突破&#xff0c;销售额一举跨越…

雷池WAF自动化实现安全运营实操案例终极篇

免责声明 本教程仅为合法的教学目的而准备&#xff0c;严禁用于任何形式的违法犯罪活动及其他商业行为&#xff0c;在使用本教程前&#xff0c;您应确保该行为符合当地的法律法规&#xff0c;继续阅读即表示您需自行承担所有操作的后果&#xff0c;如有异议&#xff0c;请立即停…

队列(数据结构)——C语言

目录 1.概念与结构 2.队列的实现 初始化QueueInit 申请新节点BuyNode 入队QueuePush 判断队为空QueueEmpty 出队QueuePop 读取队头数据QueueFront 读取队尾数据QueueBack 元素个数QueueSize 销毁队列QueueDestroy 3.整体代码 (文章中结点和节点是同一个意思) 1.概…

15. 软件接口

文章目录 第15章 软件接口15.1接口的概念多个接口操作、事件和属性接口演进 15.2 设计接口接口的范围交互方式交换数据的表示形式和结构可扩展标记语言&#xff08;XML&#xff09;JavaScript 对象表示法&#xff08;JSON&#xff09;Protocol Buffers 错误处理 15.3 接口文档…

200元运动蓝牙耳机有哪些?爆款测评PK力荐!

在运动场景下&#xff0c;传统的入耳式和半入耳式耳机虽然占据了大部分市场&#xff0c;但并不适合所有人&#xff0c;尤其是在长时间运动中佩戴时&#xff0c;耳道的压迫感往往会导致不适。而骨传导耳机虽然通过不塞入耳道的方式改善了佩戴舒适度&#xff0c;但在音质方面与入…

[openvino]windows上配置C++openvino后测试代码

测试环境&#xff1a; vs2022 w_openvino_toolkit_windows_2024.3.0.16041.1e3b88e4e3f_x86_64.zip 代码&#xff1a; #include <iostream>#include <openvino/openvino.hpp>int main(int, char**){// -------- Get OpenVINO runtime version --------std::cou…

UG NX12.0建模入门笔记:1.2 鼠标的基本操作

文章目录 前言&#xff1a;鼠标的操作1.鼠标左键&#xff1a;单击—>单选&#xff1b;长按并滑动—>框选。2.鼠标右键&#xff1a;在不同的地方单击弹出不同的菜单。3.鼠标中键&#xff1a;滚动中键—>放大缩小【镜头拉近拉远】。4.鼠标中键&#xff1a;摁住鼠标中键&…