实验四 Spark Streaming编程初级实践

一、Flume简介

数据流 :数据流通常被视为一个随时间延续而无限增长的动态数据集合,是一组顺序、大量、快速、连续到达的数据序列。通过对流数据处理,可以进行卫星云图监测、股市走向分析、网络攻击判断、传感器实时信号分析。

二、Flume安装配置

1.下载安装包

https://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz

2.解压文件夹

通过xshell把安装包上传虚拟机

解压文件,修改文件名称

1.sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt

2.sudo mv ./apache-flume-1.7.0-bin ./flume

3.sudo chown -R root:root  ./flume

3.配置环境变量

输入:

sudo vim ~/.bashrc

在文件里面添加

(根据你自己的安装路径)

export JAVA_HOME=/opt/jdk-1.8;

export FLUME_HOME=/opt/flume                   

export FLUME_CONF_DIR=$FLUME_HOME/conf

export PATH=$PATH:$FLUME_HOME/bin

4.刷新环境变量

source ~/.bashrc

5.修改配置文件

cd /opt/flume/conf

cp ./flume-env.sh.template ./flume-env.sh

修改flume-env.sh文件

在文件开头增加一行,设置JAVA_HOME

vim flume-env.sh


export JAVA_HOME=/opt/jdk-1.8;

三、查看flume版本信息

/opt/flume/bin/flume-ng version

四、使用Avro数据源测试Flume

Avro可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制。请对Flume的相关配置文件进行设置,从而可以实现如下功能:在一个终端中新建一个文件helloworld.txt(里面包含一行文本“Hello World”),在另外一个终端中启动Flume以后,可以把helloworld.txt中的文本内容显示出来。

cd /opt/flume/conf

1. 新建文件avro.conf

vim avro.conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#定制source,绑定channel、主机以及端口
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141

#描述并配置sinks组件
a1.sinks.k1.type = logger

#描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#将sources和sink通过同一个channel连接绑定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

上面Avro Source参数说明如下:

 Avro Source的别名是avro,也可以使用完整类别名称org.apache.flume.source.AvroSource,因此,上面有一行设置是a1.sources.r1.type = avro,表示数据源的类型是avro。

  bind绑定的ip地址或主机名,使用0.0.0.0表示绑定机器所有的接口。      

a1.sources.r1.bind = 0.0.0.0,就表示绑定机器所有的接口。

port表示绑定的端口。

a1.sources.r1.port = 4141,表示绑定的端口是4141。

 a1.sinks.k1.type = logger,表示sinks的类型是logger。

2.启动flume agent a1

这个终端不要关闭

 3.新建输入文件

cd /opt/flume

echo “Hello World”>> ./helloworld.txt

4.执行命令

/opt/flume/bin/flume-ng avro-client --conf conf -H localhost -p 4141 -F /opt/flume/helloworld.txt

执行之后,我们就可以在前面不让关闭的那个终端看到Hello World了 

五、使用netcat数据源测试Flume

在一个Linux终端(这里称为“Flume终端”)中,启动Flume,在另一个终端(这里称为“Telnet终端”)中,输入命令“telnet localhost 44444”,然后,在Telnet终端中输入任何字符,让这些字符可以顺利地在Flume终端中显示出来。

 1.创建netcat的agent配置

cd /opt/flume

vim netcat.conf

 

    #/usr/local/flume/conf/netcat.conf
    # Name the components on this agent  
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    # Describe/configure the source  
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
        #同上,记住该端口名

    # Describe the sink  
    a1.sinks.k1.type = logger

    # Use a channel which buffers events in memory  
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100

    # Bind the source and sink to the channel  
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

2.启动flume agent

cd /opt/flume/conf

 不要关闭这个终端

/opt/flume/bin/flume-ng agent --conf ./conf --conf-file netcat.conf --name a1 -Dflume.root.logger=INFO,console

 3.新打开一个终端

输入:

telnet localhost 44444

或者

nc localhost 44444

 #在这个终端输入字符串就可以显示在前面那个终端里了,但是中文是不支持的,显示长度也有限

如果报出下面的错误:

解决方法:

安装telnet插件

yum install telnet

在前面的终端查看数据

 

六、使用Flume作为Spark Streaming数据源

Flume是非常流行的日志采集系统,可以作为Spark Streaming的高级数据源。请把Flume Source设置为netcat类型,从终端上不断给Flume Source发送各种消息,Flume把消息汇集到Sink,这里把Sink类型设置为avro,由Sink把消息推送给Spark Streaming,由自己编写的Spark Streaming应用程序对消息进行处理。

1.创建flume-to-spark.conf

cd /opt/flume/conf

vim flume-to-spark.conf

把以下内容输入文件 

        #flume-to-spark.conf: A single-node Flume configuration
        # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1

        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 33333

        # Describe the sink
        a1.sinks.k1.type = avro
        a1.sinks.k1.hostname = localhost
        a1.sinks.k1.port =44444

        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000000
        a1.channels.c1.transactionCapacity = 1000000

        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1

#说明:

1.Flume suorce类为netcat,绑定到localhost的33333端口,消息可以通过telnet localhost 33333 发送到flume suorce

2.Flume Sink类为avro,绑定44444端口,flume sink通过localhost 44444端口把消息发送出来。而spark streaming程序一直监听44444端口。

#注意!!先不要启动Flume agent,因为44444端口还没打开,sink的消息无处可去,44444端口由spark streaming程序打开。

2.spark准备工作

下载spark-streaming-flume_2.11-2.3.4.jar

官网:

https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume

镜像网站(下载速度较快)

https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume_2.11/

3.放置文件

把这个jar文件放到/opt/spark/jars/flume目录下

把文件拖进去就行了

cd /usr/local/spark/jars

mkdir flume

4.修改spark目录下的

vim spark-env.sh

添加以下内容 

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath):$(/opt/hbase/bin/hbase classpath):/usr/local/spark/examples/jars/*:/opt/spark/jars/kafka/*:/usr/local/kafka/libs/*:/opt/spark/jars/flume/*:/opt/flume/lib/*

七、编写spark程序使用Flume数据源

1.创建python文件

cd /opt/spark-2.4.0

mkdir test

cd test

vim FlumeEventCount.py
from __future__ import print_function
 
import sys
 
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import pyspark
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: flume_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
 
    sc = SparkContext(appName="FlumeEventCount")
    ssc = StreamingContext(sc, 2)
 
    hostname= sys.argv[1]
    port = int(sys.argv[2])
    stream = FlumeUtils.createStream(ssc, hostname, port,pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.count().map(lambda cnt : "Recieve " + str(cnt) +" Flume events!!!!").pprint()
 
    ssc.start()
    ssc.awaitTermination()                             

注意:可能需要安装pyspark,命令为:

pip3 install pyspark

2. 测试实际效果

(1)启动Spark streaming程序
./bin/spark-submit --driver-class-path /opt/spark-2.4.0/jars/*:/opt/spark-2.4.0/jars/flume/* ./test/FlumeEventCount.py localhost 44444

 (2)启动Flume Agent

启动一个新的终端

cd /opt/flume
bin/flume-ng agent --conf ./conf --conf-file ./conf/flume-to-spark.conf --name a1 -Dflume.root.logger=INFO,console

 (3)连接33333端口

启动一个新的终端

telnet localhost 33333

现在你可以在最后这个终端里输入一些字符了。在你输入字符后可以看到第一个终端会显示如下的信息

结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/508764.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Mysql故障和优化

一、MySQL故障 二、MySQL优化 1.硬件优化&#xff1a; 2.数据库设计与规划 1.提前估计数据量&#xff0c;使用什么存储引擎 2.数据库服务器专机专用&#xff0c;避免额外的服务可能导致的性能下降和不稳定性 3.增加多台服务器&#xff0c;以达到稳定、高效的效果。主从同步、…

C++ 2024-4-1 作业

#include <iostream> using namespace std;class A { public:int a;A(int a):a(a){cout<<"A的有参构造"<<endl;} }; class B:virtual public A { public:int b;B(int a,int b):A(a),b(b){cout<<"B的有参构造"<<endl;} }; cl…

vscode通过ssh连接服务器(吐血总结)

一、通过ssh连接服务器 1、打开vscode&#xff0c;进入拓展&#xff08;CtrlShiftX&#xff09;&#xff0c;下载拓展Remote - SSH。 2、点击远程资源管理器选项卡&#xff0c;选择远程&#xff08;隧道/SSH&#xff09;类别。 3、点击SSH配置。 4、在中间上部分弹出的配置文件…

Mac反编译APK

文章目录 第一种方式: brew installapktool 使用说明dex2jar 使用说明 第二种方式: 下载安装包apktool 使用说明 (根据官方介绍没有操作成功,后续成功再更新这里)dex2jar 使用说明 安装 JD-GUI 查看jar包中的class文件JD-GUI 使用说明 第一种方式: brew install 安装过程可能很…

Excel 隔几行批量插入空白行

例如如下表格&#xff0c;每隔6行插入一行数据&#xff1a; 1&#xff09;第7个单元格输入1 2&#xff09;选中6个单元格&#xff0c;然后双击填充数据&#xff1a; 3&#xff09;F5 找到常量 Ctrlshift 复制插入的数据&#xff0c;然后选中数据 按F5&#xff0c;定位到空值

第21章-直连路由和静态路由

1. 直连路由 1&#xff09;定义&#xff1a;指路由器接口直接相连的网段的路由&#xff1b; 2&#xff09;特点&#xff1a; ① 不需要特别的配置&#xff0c;双UP(物理层数据链路层)&#xff1b; ② 在路由器的接口上配置IP地址即可&#xff1b; ③ 开机自动产生&#xff1b; …

如何做用户体验优化

本文是从用户体验优化角度谈用户体验&#xff0c;其实用户体验不是设计必须的步骤&#xff0c;而是分散在产品设计中的产品设计思想。 一、用户体验分类 用户体验是指用户在“使用”某个产品或服务过程中的全部感受&#xff0c;包括情感、信仰、喜好、认知印象、生理和心理反应…

789. 数的范围 (二分学习)

1.确定一个区间&#xff0c;使得目标值一定在区间中 2.找一个性质满足&#xff1a; &#xff08;1&#xff09;性质具有二段性 &#xff08;2&#xff09;答案是二段性的分界点 3.整数二分&#xff08;处理红色右端点和绿色左端点&#xff09; //代码1&#xff1a;右端点 int…

探讨在大数据体系中API的通信机制与工作原理

** 引言 关联阅读博客文章&#xff1a;深入解析大数据体系中的ETL工作原理及常见组件 关联阅读博客文章&#xff1a;深入理解HDFS工作原理&#xff1a;大数据存储和容错性机制解析 ** 在当今数字化时代&#xff0c;数据已经成为企业发展和决策的核心。随着数据规模的不断增长…

网络安全 | 什么是网络安全?

关注WX&#xff1a;CodingTechWork 网络安全 网络安全-介绍 网络安全是指用于防止网络攻击或减轻其影响的任何技术、措施或做法。网络安全旨在保护个人和组织的系统、应用程序、计算设备、敏感数据和金融资产&#xff0c;使其免受简单而不堪其绕的计算机病毒、复杂而代价高昂…

人工智能之深度学习笔记——每天五分钟快速掌握深度学习理论

本专栏会对深度学习以及深度学习搭建技巧做一个详尽的介绍&#xff0c;相信大家阅读完本专栏之后&#xff0c;深度学习已经不是一个遥不可及的名词&#xff0c;我们会知道它究竟是什么&#xff0c;本专栏尽可能地简单详细地介绍每一个深度学习知识&#xff0c;帮助每天只用很少…

vue3中播放flv流视频,以及组件封装超全

实现以上功能的播放&#xff0c;只需要传入一个流的地址即可&#xff0c;当然组件也只有简单的实时播放功能 下面直接上组件 里面的flvjs通过npm i flv.js直接下载 <template><div class"player" style"position: relative;"><p style&…

什么是EDM邮件推广营销?

电子邮件作为最古老的互联网沟通工具之一&#xff0c;凭借其无可比拟的直达性、个性化潜力与高投资回报率&#xff0c;始终占据着企业营销策略的核心地位。随着人工智能技术的革新应用&#xff0c;云衔科技以其前瞻视野与深厚技术底蕴&#xff0c;倾力打造了一站式智能EDM邮件营…

Excel·VBA二维数组组合函数之穷举推理题

看到一个帖子《CSDN-求助一道推理题》&#xff0c;与之前《python穷举暴力破解《2018年刑侦推理题》用python穷举的推理题很类似 那么是否可以使用《ExcelVBA二维数组组合函数、组合求和》combin_arr2d函数&#xff0c;生成结果进行穷举呢&#xff1f; Sub 穷举推理题()Dim …

搜维尔科技:Manus Prime 3 Mocap数据手套,体验极致的每指触觉!

完全适用于VR虚拟现实场景 特斯拉也在使用的量子数据 Tesla 目前正在使用 MANUS Quantum Metagloves创建一个数据集&#xff0c;帮助他们训练 Tesla 机器人。 量子数据训练QUANTUM AI 我们以类似的方式使用 Quantum Metagloves 来生成一流的手指跟踪数据集&#xff0c;并将其…

吴恩达2022机器学习专项课程(一) 4.5 线性回归的梯度下降

问题预览/关键词 本节内容梯度下降公式梯度下降公式的推导过程梯度下降在线性回归误差平方成本函数的收敛梯度下降在多曲面的收敛 笔记 1.本节内容 给线性回归模型的误差平方成本函数执行梯度下降。 2.梯度下降公式 线性回归下误差成本函数的梯度下降公式。 3.梯度下降公…

uniapp 小程序和app map地图上显示多个酷炫动态的标点,头像后端传过来,真机测试有效

展示效果 二、引入地图 如果需要搜索需要去腾讯地图官网上看文档&#xff0c;找到对应的内容 1.申请开发者密钥&#xff08;key&#xff09;&#xff1a;申请密钥 2.开通webserviceAPI服务&#xff1a;控制台 ->应用管理 -> 我的应用 ->添加key-> 勾选WebService…

OpenHarmony相机和媒体库-如何在ArkTS中调用相机拍照和录像。

介绍 此Demo展示如何在ArkTS中调用相机拍照和录像&#xff0c;以及如何使用媒体库接口进行媒体文件的增、删、改、查操作。 本示例用到了权限管理能力ohos.abilityAccessCtrl 相机模块能力接口ohos.multimedia.camera 图片处理接口ohos.multimedia.image 音视频相关媒体业…

SSM框架学习——MyBatis关联映射

MyBatis关联映射 为什么要关联映射 实际开发中&#xff0c;对数据库操作常常会涉及多张表&#xff0c;所以在OOP中就涉及对象与对象的关联关系。针对多表操作&#xff0c;MyBatis提供关联映射。 关联关系概述 一对一&#xff1a;A类中定义B类的属性b&#xff0c;B类中定义A…

华为云RDS for Mysql入门与配置

华为云RDS for MySQL支持混合SSD实例&#xff0c;它结合了华为云容器、本地SSD盘和高速云盘。 优势&#xff1a; 主备实例提供故障自动切换和手动切换&#xff0c;业务中断时间为秒级&#xff0c;以及异地灾难备份&#xff0c;最大程度上在出现故障的情况下保障整个数据库集群…