二百五十九、Java——采集Kafka数据,解析成一条条数据,写入另一Kafka中(一般JSON)

一、目的

由于部分数据类型频率为1s,从而数据规模特别大,因此完整的JSON放在Hive中解析起来,尤其是在单机环境下,效率特别慢,无法满足业务需求。

而Flume的拦截器并不能很好的转换数据,因为只能采用Java方式,从Kafka的主题A中采集数据,并解析字段,然后写入到放在Kafka主题B中

二 、原始数据格式

JSON格式比较正常,对象中包含数组

{
    "deviceNo": "39",
    "sourceDeviceType": null,
    "sn": null,
    "model": null,
    "createTime": "2024-09-03 14:10:00",
    "data": {
        "cycle": 300,
        "evaluationList": [{
            "laneNo": 1,
            "laneType": null,
            "volume": 3,
            "queueLenMax": 11.43,
            "sampleNum": 0,
            "stopAvg": 0.54,
            "delayAvg": 0.0,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 2,
            "laneType": null,
            "volume": 7,
            "queueLenMax": 23.18,
            "sampleNum": 0,
            "stopAvg": 0.47,
            "delayAvg": 10.57,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 3,
            "laneType": null,
            "volume": 9,
            "queueLenMax": 11.54,
            "sampleNum": 0,
            "stopAvg": 0.18,
            "delayAvg": 9.67,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        },
        {
            "laneNo": 4,
            "laneType": null,
            "volume": 6,
            "queueLenMax": 11.36,
            "sampleNum": 0,
            "stopAvg": 0.27,
            "delayAvg": 6.83,
            "passRate": 0.0,
            "travelDist": 140.0,
            "travelTimeAvg": 0.0
        }]
    }
}

三、Java代码

package com.kgc;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaKafkaEvaluation {
    // 添加 Kafka Producer 配置
    private static Properties producerProps() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "-1");
        props.put(ProducerConfig.RETRIES_CONFIG, "3");
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        props.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        return props;
    }

    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.70:9092");
        prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        prop.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        // 每一个消费,都要定义不同的Group_ID
        prop.put(ConsumerConfig.GROUP_ID_CONFIG, "evaluation_group");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Collections.singleton("topic_internal_data_evaluation"));
        ObjectMapper mapper = new ObjectMapper();

        // 初始化 Kafka Producer
        KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps());

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                try {
                    JsonNode rootNode = mapper.readTree(record.value());

                    System.out.println("原始数据"+rootNode);

                    String device_no = rootNode.get("deviceNo").asText();
                    String source_device_type = rootNode.get("sourceDeviceType").asText();
                    String sn = rootNode.get("sn").asText();
                    String model = rootNode.get("model").asText();
                    String create_time = rootNode.get("createTime").asText();
                    String cycle = rootNode.get("data").get("cycle").asText();

                    JsonNode evaluationList = rootNode.get("data").get("evaluationList");
                    for (JsonNode evaluationItem : evaluationList) {
                        String lane_no = evaluationItem.get("laneNo").asText();
                        String lane_type = evaluationItem.get("laneType").asText();
                        String volume = evaluationItem.get("volume").asText();
                        String queue_len_max = evaluationItem.get("queueLenMax").asText();
                        String sample_num = evaluationItem.get("sampleNum").asText();
                        String stop_avg = evaluationItem.get("stopAvg").asText();
                        String delay_avg = evaluationItem.get("delayAvg").asText();
                        String pass_rate = evaluationItem.get("passRate").asText();
                        String travel_dist = evaluationItem.get("travelDist").asText();
                        String travel_time_avg = evaluationItem.get("travelTimeAvg").asText();

                        String outputLine = String.format("%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s",
                                device_no, source_device_type, sn, model, create_time, cycle,lane_no, lane_type,
                                volume,queue_len_max,sample_num,stop_avg,delay_avg,pass_rate,travel_dist,travel_time_avg);

                        // 发送数据到 Kafka
                        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic_db_data_evaluation", record.key(), outputLine);
                        producer.send(producerRecord, (RecordMetadata metadata, Exception e) -> {
                            if (e != null) {
                                e.printStackTrace();
                            } else {
                                System.out.println("The offset of the record we just sent is: " + metadata.offset());
                            }
                        });
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            consumer.commitAsync();
        }
    }

}

1、服务器IP都是   192.168.0.70

2、消费Kafka主题(数据源):topic_internal_data_evaluation

3、生产Kafka主题(目标源):topic_db_data_evaluation

4、注意:字段顺序与ODS层表结构字段顺序一致!!!

四、开启Kafka主题topic_db_data_evaluation消费者

[root@localhost bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.70:9092  --topic topic_db_data_evaluation  --from-beginning

五、运行测试

1、启动项目

2、消费者输出数据

然后再用Flume采集写入HDFS就行了,不过ODS层表结构需要转变

六、ODS层新表结构

create external table  if not exists  hurys_dc_ods.ods_evaluation(
    device_no           string        COMMENT '设备编号',
    source_device_type  string        COMMENT '设备类型',
    sn                  string        COMMENT '设备序列号 ',
    model               string        COMMENT '设备型号',
    create_time         timestamp     COMMENT '创建时间',
    cycle               int           COMMENT '评价数据周期',
    lane_no             int           COMMENT '车道编号',
    lane_type           int           COMMENT '车道类型 0:渠化1:来向2:出口3:去向4:左弯待转区5:直行待行区6:右转专用道99:未定义车道',
    volume              int           COMMENT '车道内过停止线流量(辆)',
    queue_len_max       float         COMMENT '车道内最大排队长度(m)',
    sample_num          int           COMMENT '评价数据计算样本量',
    stop_avg            float         COMMENT '车道内平均停车次数(次)',
    delay_avg           float         COMMENT '车道内平均延误时间(s)',
    pass_rate           float         COMMENT '车道内一次通过率',
    travel_dist         float         COMMENT '车道内检测行程距离(m)',
    travel_time_avg     float         COMMENT '车道内平均行程时间'
)
comment '评价数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by ','
stored as SequenceFile
;

七、Flume采集配置文件

八、运行Flume任务,检查HDFS文件、以及ODS表数据

--刷新表分区
msck repair table ods_evaluation;
--查看表分区
show partitions hurys_dc_ods.ods_evaluation;
--查看表数据
select * from hurys_dc_ods.ods_evaluation
where day='2024-09-03';

搞定,这样就不需要在Hive中解析JSON数据了!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/872802.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

JVM系列(十) -垃圾收集器介绍

一、摘要 在之前的几篇文章中,我们介绍了 JVM 内部布局、对象的创建过程、运行期的相关优化手段以及垃圾对象的回收算法等相关知识。 今天通过这篇文章,结合之前的知识,我们一起来了解一下 JVM 中的垃圾收集器。 二、垃圾收集器 如果说收集算法是内存回收的方法论,那么…

集成电路学习:什么是NOR Flash Memory非易失性闪存存储器

一、NOR Flash Memory&#xff1a;非易失性闪存存储器 NOR Flash Memory&#xff0c;即非易失性闪存存储器的一种&#xff0c;是Flash存储器的一个重要分支。Flash存储器&#xff0c;又称为闪存&#xff0c;结合了ROM&#xff08;只读存储器&#xff09;和RAM&#xff08;随机存…

Windows下Python和PyCharm的应用(三)__Numpy与矩阵

1、背景介绍 矩阵运算是Python语言的基石。 而支持矩阵运算的基础语言包就是Numpy。 参考链接&#xff1a; Python中Numpy的使用_numpy在python中的用法-CSDN博客 这篇博客介绍的numpy比我的这篇博客介绍的更加的详细。本博客只是根据本人 的实际应用&#xff0c;对最关键的…

Clean Minimalist GUI Pack (简约风格UI界面)

Unity 最简洁易用的 GUI 资源包。如果你在寻找资源商店上 UI 极简主义革命的发起者,你已经找到了。 这一极干净简约的 GUI 资源包是一款适合移动设备使用的游戏 UI 资源包,其中包含许多图标和元素,可用于创建具有简洁风格的完整游戏 UI。 功能: • 包括 3 种皮肤:深色、浅…

【数据结构】详细介绍各种排序算法,包含希尔排序,堆排序,快排,归并,计数排序

目录 1. 排序 1.1 概念 1.2 常见排序算法 2. 插入排序 2.1 直接插入排序 2.1.1 基本思想 2.1.2 代码实现 2.1.3 特性 2.2 希尔排序(缩小增量排序) 2.2.1 基本思想 2.2.2 单个gap组的比较 2.2.3 多个gap组比较(一次预排序) 2.2.4 多次预排序 2.2.5 特性 3. 选择排…

AFSim 仿真系统----性能工具

什么是 WPR/WPA&#xff1f; Windows 性能记录器 (WPR) 和 Windows 性能分析器 (WPA) 是 Windows 性能工具包中提供的性能监控工具。它们是免费的工具&#xff0c;可以通过下载和安装 Windows 评估和部署工具包 (ADK) 来获得。 WPR 是一个工具&#xff0c;允许用户动态部署事…

Unity3D在2D游戏中获取触屏物体的方法

我们的需求是&#xff1a; 假如屏幕中一个棋盘&#xff0c;每个棋子是button构成的&#xff0c;我们希望手指或者鼠标在哪里&#xff0c;就显示那个位置的button信息。 网上有很多获取触屏物体信息的信息的方法如下面代码所示&#xff1a; Camera cam Camera.main; // pre-de…

一个php快速项目搭建框架源码,带一键CURD等功能

介绍&#xff1a; 框架易于功能扩展&#xff0c;代码维护&#xff0c;方便二次开发&#xff0c;帮助开发者简单高效降低二次开发成本&#xff0c;满足专注业务深度开发的需求。 百度网盘下载 图片&#xff1a;

对称密码学

1. 使用OpenSSL 命令行 在 Ubuntu Linux Distribution (发行版&#xff09;中&#xff0c; OpenSSL 通常可用。当然&#xff0c;如果不可用的话&#xff0c;也可以使用下以下命令安装 OpenSSL: $ sudo apt-get install openssl 安装完后可以使用以下命令检查 OpenSSL 版本&am…

uniapp 自定义微信小程序 tabBar 导航栏

背景 做了一个校园招聘类小程序&#xff0c;使用 uniapp vue3 uview-plus pinia 构建&#xff0c;这个小程序要实现多角色登录&#xff0c;根据权限动态切换 tab 栏文字、图标。 使用pages.json中配置tabBar无法根据角色动态配置 tabBar&#xff0c;因此自定义tabBar&…

如何构建你自己的实时人脸识别系统

引言 随着人工智能技术的发展&#xff0c;人脸识别已经成为日常生活中的常见技术&#xff0c;被广泛应用于安全验证、个性化服务等多个领域。本教程将指导你如何使用Python编程语言结合OpenCV和face_recognition库来构建一个基本的人脸识别系统。我们将从安装必要的库开始&…

C++ 设计模式——解释器模式

目录 C 设计模式——解释器模式1. 主要组成成分2. 逐步构建解释器模式步骤1: 定义抽象表达式步骤2: 实现终结符表达式步骤3: 实现非终结符表达式步骤4: 构建语法树步骤5: 实现内存管理步骤6: 创建上下文和客户端 3. 解释器模式 UML 图UML 图解析 4. 解释器模式的优点5. 解释器模…

生日贺卡录放音芯片,多段音频录音ic生产厂商,NVF04M-32minute

可以录音播放的生日贺卡与传统的纸质贺卡相比&#xff0c;它有着创意以及个性的特点&#xff0c;仅需少量的电子元器件&#xff0c;即可实现录音功能&#xff0c;搭配上文字&#xff0c;让声音存储在生日贺卡里&#xff0c;让贺卡也变得有温度&#xff0c;祝福我想亲口对TA说。…

fantastic-admin前端+django后端,初始化全流程记录

fantastic-admin前端是我目前看到最完善的前端框架&#xff0c;只需要简单的设置就可以快速开始项目。 但是我本人的能力有限&#xff0c;对前端知识一知半解&#xff0c;之前废了九牛二虎之力才跑通了前后端流程&#xff0c;由于新的项目需要&#xff0c;有了开发新后台的想法…

【有啥问啥】大模型应用中的哈希链推理任务

大模型应用中的哈希链推理任务 随着人工智能技术的快速发展&#xff0c;尤其是大模型&#xff08;如GPT、BERT、Vision Transformer等&#xff09;的广泛应用&#xff0c;确保数据处理和模型推理的透明性与安全性变得愈发重要。哈希链推理任务作为一种技术手段&#xff0c;能够…

学习计算机网络

a类0~127&#xff0c;b类128~191&#xff0c;c类192~223 网络地址&#xff1a;看子网掩码&#xff0c;分网络位和主机位&#xff0c;后面是主机位&#xff0c;主机位全部为0&#xff0c;网络地址。 直接广播地址&#xff1a;看子网掩码&#xff0c;分网络位和主机位&#xff…

Jenkins构建CI/CD

CI/CD 软件开发的连续方法基于自动执行脚本&#xff0c;以最大限度地减少在开发应用程序时引入错误的可能性。从新代码的开发到部署&#xff0c;它们需要较少的人为干预甚至根本不需要干预。 它涉及在每次小迭代中不断构建&#xff0c;测试和部署代码更改&#xff0c;从而减少…

vue2+ueditor集成秀米编辑器

一、百度富文本编辑器 1.首先下载 百度富文本编辑器 下载地址&#xff1a;GitHub - fex-team/ueditor: rich text 富文本编辑器 2.把下载好的文件整理好 放在图片目录下 3. 安装插件vue-ueditor-wrap npm install vue-ueditor-wrap 4.在你所需要展示的页面 引入vue-uedito…

判断给定的一个不限长的数字串大小变化趋势、经典面试题:猴子排成圈踢出求最后剩下大王编号以及Debian服务器php中安装IMAP扩展各种报错解决过程

一、判断给定的一个不限长的数字串大小变化趋势 自制了一道面试题&#xff1a;给定一个不限长的数字字符串&#xff0c;判断每一位数字的大小变化趋势是否是^或v趋势&#xff0c;如果是就返回true&#xff0c;如果不是就返回false。比如121即属于^&#xff0c;322129即属于v。这…

Verilog和Matlab实现RGB888互转YUV444

文章目录 一、色彩空间1.1 RGB色彩空间1.2 CMYK色彩空间1.3 YUV色彩空间 二、色彩空间转换公式2.1 RGB转CMYK2.2 CMYK转RGB2.3 RGB888转YUV4442.4 YUV444转RGB888 三、MATLAB实现RGB888转YUV4443.1 matlab代码3.2 matlab结果 四、Verilog实现RGB888转YUV444 一、色彩空间 色彩空…