大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(已更完)
  • ClickHouse(已更完)
  • Kudu(已更完)
  • Druid(正在更新…)

章节内容

上节我们完成了如下的内容:

  • Apache Druid 数据存储
  • Apache Druid 数据分区
  • 索引服务
  • 压缩机制
  • 数据聚合

在这里插入图片描述

整体流程

  • Kafka 数据源: Kafka 是一个分布式流处理平台,负责接收、存储并传输数据。它支持从各类应用、日志、传感器等设备采集实时数据,将数据划分为多个主题(Topic),并将消息分发给消费者。在这个案例中,Kafka 是 Druid 的数据源。
  • Kafka Producer: 数据生产者(Producer)负责将数据发送到 Kafka 的主题中。例如,应用程序可以向 Kafka 写入日志、用户行为数据、传感器数据等。每条消息可以是 JSON、Avro 等格式的数据记录。
  • Druid Kafka Ingestion: Druid 提供了对 Kafka 的原生支持。通过 Kafka Indexing Service,Druid 可以持续从 Kafka 的某个主题中消费数据,实时地将这些数据摄取到 Druid 中。摄取过程中,Druid 会将数据拆解为小的段(Segment),并将这些段存储在 Druid 集群的深度存储中(如 HDFS、S3 等)。
  • 实时数据摄取和索引: Druid 的 Kafka 摄取任务会监听 Kafka 的分区,按照流数据的到达顺序消费数据,并在内部创建索引。这些索引结构化存储了数据,并通过分片和分区机制,保证了查询的高效性和水平扩展能力。
  • Druid 查询层: Druid 提供了非常强大的查询能力,可以通过 SQL 查询方式进行交互,也支持多维查询、聚合查询等。这些查询可以是低延迟的实时查询,也可以对历史数据进行复杂的分析。用户通过 Druid 查询接口或 BI 工具(如 Apache Superset、Tableau 等)向集群发送查询。
  • Kafka 消费者 Offset 管理: Druid 使用 Kafka 消费者模型,实时消费消息并管理 Offset(偏移量),确保数据不丢失或重复摄取。Offset 会被定期提交到 Kafka 中,保证即使任务重启,摄取进度也能从上一次的位置继续。
  • 持久化和数据存储: 数据在经过摄取和索引后,Druid 会定期将数据段(Segment)持久化到深度存储中,并对旧数据进行合并和压缩,减少存储空间的占用。Druid 的集群架构支持分布式存储和查询,并能根据数据规模进行自动扩展。

案例假设

假设我们在构建一个用户行为分析系统,通过 Kafka 采集用户点击日志,并通过 Druid 实时分析用户行为。

  • Kafka 数据生产: 电商平台的应用程序会将每次用户点击产生的日志记录(例如点击商品、页面浏览等)发送到 Kafka 中的 user-clicks 主题。每条记录都包含用户ID、商品ID、时间戳、页面信息等。
  • Druid 数据摄取: 配置 Druid 的 Kafka Indexing Service,从 user-clicks 主题消费数据。数据会实时流入 Druid 中,Druid 将数据按照时间范围切分为段,并存储到其深度存储中。
  • 实时数据查询与分析: 业务方可以通过 SQL 查询或多维查询接口,实时分析用户的点击行为。查询的例子可能是统计每个小时的页面浏览量、分析不同商品的受欢迎程度等。这些查询可以直接反映用户的当前行为,帮助业务方做出快速决策。
  • 可视化和报表: Druid 的查询结果可以通过 Apache Superset 等工具进行可视化展示,创建实时仪表盘,展示用户行为的各种关键指标。数据分析师和运营人员可以在可视化平台上直观地看到当前系统的运营状态。

需求分析

场景分析

  • 数据量大,需要在这些数据中根据业务需要灵活查询
  • 实时性要求高
  • 数据实时的推过来,要在秒级对数据进行分析并查询出结果

数据描述

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","products":
[{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"},{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}]}
  • ts 交易时间
  • orderId 订单编号
  • userId 用户id
  • orderStatusId 订单状态Id
  • orderStatus 订单状态 0-11:未支付,已支付,发货中,已发货,发货失败,已退款,已关单,订单过期,订单已失效,产品已失效,代付拒绝,支付中
  • payModelId 支付方式id
  • payMode 支付方式:0-6:微信,支付宝,信用卡,银联,货到付款,现金,其他
  • payment:支付金额
  • products:购买商品 (一个订单可能包含多个商品,这里是嵌套结构)
  • productId 商品Id
  • productName 商品名称
  • price 单价
  • productNum 购买数量
  • categoryid 商品分类Id
  • catname1 商品一级分类名称
  • catname2 商品二级分类名称
  • catname3 商品三级分类名称

以上的嵌套的json数据格式,Druid不好处理,需要对数据进行预处理,将数据拉平,处理后的数据格式:

{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"102163","productName":"贝合xxx+粉","price":18.7,"productNum":3,"categoryid":"10360","catname1":"厨卫清洁、纸制用品","catname2":"生活日用","catname3":"浴室用品"}}
{"ts":1607499629841,"orderId":"1009388","userId":"807134","orderStatusId":1,"orderStatus":"已支付","payModeId":0,"payMode":"微信","payment":"933.90","product":
{"productId":"100349","productName":"COxxx0C","price":877.8,"productNum":1,"categoryid":"10302","catname1":"母婴、玩具乐器","catname2":"西洋弦乐器","catname3":"吉他"}}

Kafka生产者

好久没用Scala了,用Scala写一个:

package icu.wzk.kafka

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

import java.util.Properties
import scala.io.BufferedSource

object KafkaProducerForDruid {
  def main(args: Array[String]): Unit = {
    val brokers = "h121.wzk.icu:9092"
    val topic = "druid2"
    val prop = new Properties()
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer])

    val producer = new KafkaProducer[String, String](prop);
    val source: BufferedSource = scala.io.Source.fromFile("orders1.json")
    val iter: Iterator[String] = source.getLines();
    iter.foreach {
      line => val msg = new ProducerRecord[String, String](topic, line);
        producer.send(msg)
        println(msg)
        Thread.sleep(10)
    }
    producer.close()
    source.close()
  }
}

运行结果如下图:
在这里插入图片描述

Druid导入数据

这里就不详细描述了,之前入门阶段已经走过完整的流程了:

  • JSON数据要拉平
  • 不定义 RollUp

加载数据源:
在这里插入图片描述
JSON 拉平:
在这里插入图片描述
时间戳:
在这里插入图片描述
不要进行 RollUp:
在这里插入图片描述
最终结果如下图所示:
在这里插入图片描述
计算结果如下图所示:
在这里插入图片描述
运行测试的SQL,一切正常!
在这里插入图片描述

查询计算

订单总数

-- 查询订单总数
SELECT COUNT(distinct orderId) as orderscount
FROM druid2

运行结果如下图所示:
在这里插入图片描述

用户总数

-- 查询用户总数
SELECT COUNT(distinct userId) as usercount
FROM druid2

运行结果如下图:
在这里插入图片描述

统计结果状态订单数

-- 统计各种订单状态的订单数
SELECT orderStatus, count(*)
FROM (
  SELECT orderId, orderStatus
  FROM druid2
  GROUP BY orderId, orderStatus
)
GROUP BY orderStatus

执行结果如下图所示:
在这里插入图片描述

统计各种支付方式的订单数

-- 统计各种支付方式订单数
SELECT payMode, count(1)
FROM (
  SELECT orderId, payMode
  FROM druid2
  GROUP BY orderId, payMode
)
GROUP BY payMode

执行结果如下图所示:
在这里插入图片描述

订单金额最大的前10名

-- 订单金额最大的前10名
SELECT orderId, payment, count(1) as productcount, sum("product.productNum") as products
FROM druid2
GROUP BY orderId, payment

执行结果如下图所示:
在这里插入图片描述

案例小节

  • 在配置摄入源时要设置为True从流的开始进行消费数据,否则在数据源中可能查不到数据
  • Druid的JOIN能力非常有限,分组或者聚合多的场景推荐使用
  • SQL支持能力非常受限
  • 数据的分区组织只有时间序列一种方式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/885734.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【SQL】筛选字符串与正则表达式

目录 语法 需求 示例 分析 代码 语法 SELECT column1, column2, ... FROM table_name WHERE condition; WHERE 子句用于指定过滤条件,以限制从数据库表中检索的数据。当你执行一个查询时,WHERE 子句允许你筛选出满足特定条件的记录。如果记录满…

UE学习篇ContentExample解读------Blueprints Advanced-下

文章目录 总览描述批次阅览2.1 Timeline animation2.2 Actor tracking2.3 Button Trigger using a blueprint interface2.4 Opening door with trigger2.5 Child Blueprints 概念总结致谢: 总览描述 打开关卡后,引入眼帘的就是针对关卡的总体性文字描述&…

五子棋双人对战项目(1)——WebSocket介绍

目录 一、项目介绍 如何实现实时同步对局? 二、WebSocket 1、什么是WebSocket? 2、WebSocket的报文格式 opcode payload len payload data 3、WebSocket握手过程 4、WebSocket代码的简单编写 三、WebSocket 和 HTTP的关系 1、相同点&#xf…

从 Kafka 到 WarpStream: 用 MinIO 简化数据流

虽然 Apache Kafka 长期以来一直是流数据的行业标准,但新的创新替代方案正在重塑生态系统。其中之一是 WarpStream,它最近在 Confluent 的所有权下进入了新的篇章。此次收购进一步增强了 WarpStream 提供高性能、云原生数据流的能力,巩固了其…

Iceberg 基本操作和快速入门

安装 Iceberg 是一种适用于大型分析表的高性能工具,通过spark启动并运行iceberg,文章是通过docker来进行安装并测试的 新建一个docker-compose.yml文件 文件内容 version: "3" services: spark-iceberg: image: tabulario/spark-iceberg co…

GS-SLAM论文阅读笔记--MM3DGS SLAM

前言 多传感器融合GS-SLAM的另一个IROS2024,不过这篇没有用到激光雷达,而是相机和IMU结合而实现的。今天看一下这篇。 文章目录 前言1.背景介绍2.关键内容2.1 跟踪2.2 深度监督2.3 惯性融合2.4建图2.5 总体流程 3.文章贡献4.个人思考 1.背景介绍 虽然SLAM方法使用…

计算神经学笔记01

- **The term neuromorphic is generally used to describe analog, digital, mixed-mode analog/digital VLSI, and software systems that implement several models of neural systems.** - 神经形态一词通常用于描述模拟、数字、混合模式的模拟/数字超大规模集成电路&…

记录|Modbus-TCP产品使用记录【摩通传动】

目录 前言一、摩通传动实验图1.1 配置软件 IO_Studio1.2 测试软件Modbus Poll1.2.1 读写设置测试1.2.2 AI信号的读取 1.3 对应的C#连接Modbus的测试代码如下【自制,仅供参考】1.4 最终实验图 更新时间 前言 参考文章: 自己需要了解和对比某些产品的Modbu…

C++基础---类和对象(上)

1.类的定义 C程序设计允许程序员使用类(class)定义特定程序中的数据类型。这些数据类型的实例被称为对象 ,这些实例可以包含程序员定义的成员变量、常量、成员函数,以及重载的运算符。语法上,类似C中结构体&#xff0…

3D建模软件 | Blender v4.2.2 绿色版

Blender是一款功能强大的免费开源3D创作套件,适用于创建3D可视化效果,如静态图像、3D动画、视觉特效以及视频编辑。Blender以其跨平台兼容性、高效内存管理、统一的工作流程和活跃的社区支持而受到独立艺术家和小型工作室的青睐。 它提供了从建模、渲染…

10.2 Linux_并发_进程相关函数

创建子进程 函数声明如下: pid_t fork(void); 返回值:失败返回-1,成功返回两次,子进程获得0(系统分配),父进程获得子进程的pid 注意:fork创建子进程,实际上就是将父进程复制一遍作为子进程&…

智慧防灾,科技先行:EasyCVR平台助力地质灾害视频监测系统建设

随着科技的飞速发展,视频监控技术已成为地质灾害监测与预警的重要手段之一。在众多视频监控平台中,EasyCVR视频汇聚平台凭借其强大的视频整合、实时传输、视频处理及分发等能力,在地质灾害场景中展现出显著的应用优势。 一、实时监测与远程监…

实用工具推荐---- PDF 转换

直接上链接:爱PDF |面向 PDF 爱好者的在线 PDF 工具 (ilovepdf.com) 主要功能如下: 全免费!!!!

Kali Linux语言设置成中文

要将Kali Linux设置成中国地区(简体中文),可以按照以下步骤进行操作: 一、更新软件包列表 打开Kali Linux的终端。输入以下命令来更新软件包列表: sudo apt-get update二、安装语言包 输入以下命令来安装locales包…

AI技术在爱奇艺视频搜索中的应用

当前AI技术已经全面在爱奇艺搜索引擎中落地应用。与传统搜索仅能查找片名不同,爱奇艺的AI搜索功能让用户能够在搜索阶段使用更多模糊信息获得想找的影片。首次将生成式AI技术应用于角色搜索、剧情搜索、明星搜索、奖项搜索和语义搜索五大场景。通过对模糊搜索query的…

【笔记】Dynamic Taint Analysis 动态污点分析

Dynamic Taint Analysis 动态污点分析 什么是动态污点分析?为什么要搞动态污点分析? “污点”指的是什么? DTA中的“污点”指代的是不可信的输入,比如用户输入、网络请求、文件数据等。比方说,如果把程序看作一个城市&…

2.点位管理开发(续)及设计思路——帝可得后台管理系统

目录 前言一、页面原型二、修改1、页面展示2、新增 3 、总结思路 前言 提示&#xff1a;本篇继续点位管理的改造 一、页面原型 页面展示新增 二、修改 1、页面展示 页面修改&#xff1a;修改标签换行、顺序顺序、地址过长时换行问题&#xff1b; <el-table v-loading…

四DHCP服务实验

复习 &#xff1a;DHCP基础实验&#xff1a; 1. 在server端安装dhcp yum -y install dhcp 2. 找回dhcp的配置文件&#xff1a;/etc/dhcp/dhcpd.conf cp -a /usr/share/doc/dhcp-4.25/dhcpd.conf.example /etc/dhcp/dhcpd.conf 3. 修改/etc/dhcp/dhcpd.conf配…

音视频入门基础:FLV专题(10)——Script Tag实例分析

一、引言 在《音视频入门基础&#xff1a;FLV专题&#xff08;9&#xff09;——Script Tag简介》中对FLV文件的Script Tag进行了简介。下面用一个具体的例子来对Script Tag进行分析。 二、Script Tag的Tag header实例分析 用notepad打开《音视频入门基础&#xff1a;FLV专题…

国外电商系统开发-需求记录

一、客户需求 1、商城后台需要添加产品、添加一级代理商&#xff1b; 2、一级代理商可以添加二级代理商&#xff0c;二级代理商需要添加店铺&#xff1b; 3、店铺需要购买产品(进货)、店铺也可以推广给用户(用户在用户APP里最近店铺下单、购买产品)&#xff1b; 4、需要对接当地…