spark和kafka主要通过Scala实现,Hadoop和HBase主要基于java实现。
通过该项目,主要达到以下目的:
(1)通用的数据处理流程,入门大数据领域
(2)真实体验大数据开发工程师的工作
(3)企业级的项目,利用这个思路可以做二次拓展开发
(4)从0到有,数据抽取、数据存储、数据处理、展现
大数据平台架构图:
大数据没有事务的概念,需要不间断完整地把流程跑完,没有事务回滚的概念。
1. 项目需求
打车、叫车,出行的便捷问题等问题在一个出行平台建设中需要解决,与此同时安全出行也是重中之重,为了增加出行的便捷,提高出行的安全,对我们乘车的细节以及发生点我们迫切的需要及时知道,为此特地通过大数据的手段来处理我们海量的出行数据,做到订单的实时监控,乘车轨迹的的回放,虚拟打车站的选定等功能。
重点:乘车轨迹的的细节回放,虚拟打车站
2. 效果示意图
轨迹回放:
订单监控:指标的计算
3. 技术选型
我们的项目建设主要是依据数据的生命周期来做的技术选型,目前主要依照的是大家都在用的一些技术,具体生产中应用要考虑实际的场景。比如人员、技术、接入难度、社区、版权等等各种问题
3.1 数据的生命周期
[参考其他数据平台的建设](https://www.sohu.com/a/242008443_468661)
数据的生产(web应用)>数据的传输>数据存储>计算>应用
3.2 数据传输
数据采集:
采集框架名称 | 主要功能 | 版本 |
---|---|---|
flume | 擅长日志数据的采集和解析 | 1.9.0 |
消息中间件:
概述 | 版本 | |
---|---|---|
Kafka | LinkedIn用Scala语言实现,支持hadoop数据并行加载 | 2.6.2 |
3.3 数据存储
框架名称 | 主要用途 | 版本 |
---|---|---|
Hadoop | 分布式文件存储系统 | 3.2.2 |
Hbase | Key,value对的nosql数据库 | 2.2.7 |
3.4 计算框架
框架名称 | 基本介绍 | 版本 |
---|---|---|
Spark | 基于Spark,一站式解决批流处理问题 | 3.1.1 |
4. 日志格式
本项目会使用到两份数据,原始文件名称为 order.txt以及gps。其中order.txt数据主要用来做我们的虚拟车站功能,gps主要用来做我们的数据回放功能。 日志存放在网盘中,可以下载,在/root
目录下解压
gps数据:
字段 | 名称 | 类型 | 示例 | 备注 |
---|---|---|---|---|
DRIVERID | 司机ID | String | glox.jrrlltBMvCh8nxqktdr2dtopmlH | |
ORDERID | 订单ID | String | jkkt8kxniovIFuns9qrrlvst@iqnpkwz | |
TIME | 时间戳 | String | 1501584540 | unix时间戳,单位为秒 |
LNG | 经度 | String | 104.04392 | GCJ-02坐标系 |
LAT | 纬度 | String | 30.67518 | GCJ-02坐标系 |
订单数据:
字段ID | 字段名称 | 字段样本描述 |
---|---|---|
order_id | 订单ID | string类型且已脱敏 |
product_id | 产品线ID | 1滴滴专车, 2滴滴企业专车, 3滴滴快车, 4滴滴企业快车 |
city_id | 城市ID | 选取海口当地 |
district | 城市区号 | 海口区号 |
county | 二级区县 | 记录区县id |
type | 订单时效 | 0实时,1预约 |
combo_type | 订单类型 | 1包车,4拼车 |
traffic_type | 交通类型 | 1企业时租,2企业接机套餐,3企业送机套餐,4拼车,5接机,6送机,302跨城拼车 |
passenger_count | 乘车人数 | 拼车场景,乘客选择的乘车人数 |
driver_product_id | 司机子产品线 | 司机所属产品线 |
start_dest_distance | 乘客发单时出发地与终点的预估路面距离 | 乘客发单时,出发地与终点的预估路面距离 |
arrive_time | 司机点击‘到达’的时间 | 司机点击‘到达目的地’的时间 |
departure_time | 出发时间 | 如果是实时单,出发时间(departure_time) 与司机点击‘开始计费’的时间(begin_charge_time)含义相同;如果是预约单,是指乘客填写的出发时间 |
pre_total_fee | 预估价格 | 根据用户输入的起始点和目的地预估价格 |
normal_time | 时长 | 分钟 |
bubble_trace_id | ||
product_1level | 一级业务线 | 1专车,3快车,9豪华车 |
dest_lng | 终点经度 | 对应乘客填写的目的地对应的经度 |
dest_lat | 终点纬度 | 对应乘客填写的目的地对应的纬度 |
starting_lng | 起点经度 | 对应乘客填写的起始点对应的经度 |
starting_lat | 起点纬度 | 对应乘客填写的起始点对应的纬度 |
year | 年份 | 对应出行的年份 |
month | 月份 | 对应出行的月份 |
day | 日期 | 对应出行的日期 |
5. 项目架构
-
数据采集 flume 去采集 order gps、发往kafka
-
spark 消费kafka数据存入redis里面(实时的监控)
-
spark 消费kafka数据存入hbase里面(计算绿点)
6. 环境搭建
所有的软件安装,请先bing XX分布式环境安装,然后在结合文档看本项目是如何配置的,否则如果你本身不清楚如何安装的,看下面的安装步骤会很懵 前提:我们的集群,使用了三台机器,机器的基础配置建议4C8G+50G-MEM的配置,否则项目会很卡,大数据环境存储基于MEM,计算基于内存,存在许多IO,所以对性能的要求是较高,学习的话建议大家可以按量购买云产品使用
安装环境 centos7.3
-
所有的软件都基于root用户安装(生产环境中用普通用户),软件都安装在
/root
目录下 -
三台机器需要提前设置好免密配置 免密配置参考
-
所有的文件下载后,都需要改名称,例如
mv hadoop-3.2.2 hadoop
,其他的软件也需要改名称,目的是方便管理、升级 -
所有涉及到的脚本都可以在doc目录下找到
节点 | 角色 |
---|---|
Hadoop01 | HDFS: namenode,datanode,secondarynamenode YARN: resourcemanager, nodemanager Kafka Spark: master,worker Zookeeper: QuorumPeerMain HBase: Hmaster,regionServer |
Hadoop02 | HDFS: datanode, YARN: nodemanager Spark: worker kafka Zookeeper: QuorumPeerMain Hbase: regionServer |
Hadoop03 | HDFS: datanode YARN: nodemanager Spark: worker kafka flume redis(docker) Zookeeper: QuorumPeerMain Hbase: regionServer |
HDFS:基础数据的存储
YARN:计算调度,可以调度本地资源
kafka:流式处理
spark:计算软件,具体任务调度依赖于yarn,yarn依赖于HDFS
6.1 Java1.8安装
需要配置好环境变量
6.2 Hadoop
-
节点1,下载hadoop
wget https://mirrors.bfsu.edu.cn/apache/hadoop/common/hadoop-3.2.2/hadoop-3.2.2.tar.gz
-
修改配置文件
/root/hadoop/etc/hadoop
下 workers, mapred-site.xml ,hadoop-env.sh,yarn-site.xml,core-site.xml,hdfs-site.xml具体文件内容参考放在了doc下
-
cd etc/hadoop 然后cat 以下文件
-
workers表示分别部署在哪几台机器上:cat workers
-
mapred-site.xml配置了环境的位置
-
hadoop-env.sh配置了环境变量
-
yarn-site.xml:yarn是分布式调度软件,主要用来配置yarn的主节点在哪里
-
core-site.xml:定义了数据保存到本地的哪个位置,以及数据的副本个数(生产上一般是三个副本或以上)
-
hdfs-site.xml
-
-
-
分发到其他节点 节点2 节点3
-
启动前要格式化NameNode
-
启动整体集群:/root/hadoop/sbin/start-all.sh
-
jsp查看启动了哪些进程:NameNode、SecondaryNameNode和DataNode代表的是HDFS的进程节点;ResourceManager和NodeManager是yarn的节点。此时yarn和HDFS就启动好了。
6.3 Spark
官网:spark.apache.org
-
节点1,下载对应包
wget https://downloads.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz (注意spark和hadoop版本的对应)
-
修改
/root/spark/conf/
下 workers ,spark-env.sh 两个文件-
cd conf/ cat workers : 表示worker节点在哪些地方启动,master在本机启动,workers需要自己配置
-
-
xsync /root/spark /root/spark
分发到其他节点 -
启动/root/spark/sbin/start-all.sh
通过java -jar fileOperator.jar将日志中的数据以每秒一条的速度定向写入到dest文件夹的gps文件中,flume实时监控目标文件gsp,并把采集到的数据发送到kafka中,kafka中也是每秒一条的速度进行接收。
6.4 Flume
官网:Welcome to Apache Flume — Apache Flume
-
节点3,下载
wget https://mirrors.bfsu.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
-
先把kafka启动起来
-
在节点3启动agent
-
flume启动:nohup bin/flume-ng agent --name a1 --conf conf --conf-file conf/file-mem-kafka-all.conf -Dflume.root.logger=INFO,console &
-
jps中application代表flume进程启动起来了。
6.5 Kafka
-
节点1,下载
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz
-
修改 server.properties 文件,注意brokerid,zookeeper地址修改
-
xsync /root/kafka /root/kafka
-
三台服务器启动kafka:(前提:三台服务器启动zookeeper,参考:kafka入门-CSDN博客)
-
# 三台服务器分别进到zookeeper路径下,启动zookeeper服务: [root@192 local]# cd apache-zookeeper-3.8.4-bin/ [root@192 apache-zookeeper-3.8.4-bin]# bin/zkServer.sh --config conf start
-
bin/kafka-server-start.sh -daemon config/server.properties # daemon为后台启动 不占用当前页面 # 如果启动不生效,可以使用 [root@192 kafka_2.13-3.8.0]# nohup bin/kafka-server-start.sh config/server.properties &
sh kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic chengdu
6.6 HBase
-
节点1,下载
wget https://archive.apache.org/dist/hbase/2.2.7/hbase-2.2.7-bin.tar.gz
-
修改 conf/hbase-site.xml \ regionServers文件
-
分发到其他三个节点
-
bin/start-hbase.sh
6.7 zookeeper
-
节点1,下载
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.6.3/apache-zookeeper-3.6.3-bin.tar.gz
-
修改 zoo.cfg
-
在
/tmp/zookeeper/
添加myid文件,文件内容分别为0,1,2 -
xsync /root/zookeeper /root/hadoop/zookeeper
HBase
6.8 Redis
Redis使用了docker安装,生产中使用的一般是集群,我们这里就不安装redis集群环境了
docker run --name myredis -p 6379:6379 -v /home/disk1/redis:/data -d redis redis-server --appendonly yes
7. 一致性语义
流处理的一致性语义
kafka 是一个stream消息系统,包含三种角色:
Producer:写数据 --> broker:存储数据 --> consumer:从kafka消费数据
1. 一个是Producer到broker端数据如果没有确认,则重复发送,确保数据不丢失
2. consumer消费数据如果失败,则从失败的位置开始消费,并且需要做一个去重
保证数据不丢失,并且数据是不重复的
at-most-once:数据最多发送一次。适用于不太重要的日志数据。
如果在 ack 超时或返回错误时 producer 不重试,则该消息可能最终不会写入 Kafka,因此不会传递给 consumer。在大多数情况下,这样做是为了避免重复的可能性,业务上必须接收数据传递可能的丢失。ack=0.
at-least-once:至少发送一次
如果 producer 收到来自 Kafka broker 的确认(ack)或者 acks = all,则表示该消息已经写入到 Kafka。但如果 producer ack 超时或收到错误,则可能会重试发送消息,客户端会认为该消息未写入 Kafka。如果 broker 在发送 Ack 之前失败,但在消息成功写入 Kafka 之后,此重试将导致该消息被写入两次,因此消息会被不止一次地传递给最终 consumer,这种策略可能导致重复的工作和不正确的结果。
exactly-once:精准一次
producer没有收到broker的回复时,需要重复发送数据,但是即使 producer 重试发送消息,消息也会保证最多一次地传递给最终consumer(通过consumer端的重复值校验:Redis重复id去重的性质)。该语义是最理想的,但也难以实现,因为它需要消息系统本身与生产和消费消息的应用程序进行协作。ack =-1
并且设置offset自身存储,防止consumer消费失败。
Kafka
主要失败原因
- Broker失败:Kafka 作为一个高可用、持久化系统,保证每条消息被持久化并且冗余多份(假设是 n 份),所以 Kafka 可以容忍 n-1 个 broker 故障,意味着一个分区只要至少有一个 broker 可用,分区就可用。Kafka 的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本(ISR)。
- Producer 到 Broker 的 RPC 失败:Kafka 的持久性依赖于生产者接收broker 的 ack 。没有接收成功 ack 不代表生产请求本身失败了。broker 可能在写入消息后,发送 ack 给生产者的时候挂了,甚至 broker 也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在 Kafka 分区日志中重复,进而造成消费端多次收到这条消息。
- 客户端也可能会失败:Exactly-once delivery 也必须考虑客户端失败的情况。但是如何去区分客户端是真的挂了(永久性宕机)还是说只是暂时丢失心跳?追求正确性的话,broker 应该丢弃由 zombie producer 发送的消息。 consumer 也是如此,一旦新的客户端实例已经启动,它必须能够从失败实例的任何状态中恢复,并从安全点( safe checkpoint )开始处理,这意味着消费的偏移量必须始终与生成的输出保持同步。
1. 增加业务处理
- 为每个消息增加唯一主键,生产者不做处理,由消费者根据主键去重
- producer(flume)设置ack=-1确保数据不丢失(flume的配置文件file-mem-kafka-all.conf中设置ack=-1,表示必须给回执,否则一直发送重复的消息)
- 消费者,往往是我们的流式任务,我们需要关闭自动提交 offset 的功能,业务保存offset,将保存offset与消费操作放入到一个事务当中去执行
Sparkstreming/Flink
- 支持一致性语义,只要是在任务调度过程中失败了,那么会去寻找checkpoint 拿到最新的副本数据