读取mysql、kafka数据筛选后放入mysql

要求: 

从kafka的topic-car中读取卡口数据,将超速车辆写入mysql的t_monitor_info表
当通过卡口的车速超过该卡口限速的1.2倍 就认定为超速。

G107

image.png

image.png


1)卡口数据格式如下:

`action_time` long --摄像头拍摄时间戳,精确到秒,
`monitor_id` string --卡口号,
`camera_id` string --摄像头编号,
`car` string --车牌号码,
`speed` double --通过卡口的速度,
`road_id` string --道路id,
`area_id` string --区域id,

区域ID代表:一个城市的行政区域。
摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。
道路ID:城市中每一条道路都有名字,比如:蔡锷路。交通部门会给蔡锷路一个唯一编号。
kafka数据格式:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}
{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":80,"road_id":"01","area_id":"20"}
{"action_time":1682219447,"monitor_id":"0002","camera_id":"1","car":"豫A12345","speed":84.5,"road_id":"01","area_id":"20"}
{"action_time":1682219447,"monitor_id":"0002","camera_id":"1","car":"豫A12345","speed":150,"road_id":"01","area_id":"20"}

MySQL建表语句

DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `car` varchar(255) NOT NULL,
  `monitor_id` varchar(255) DEFAULT NULL,
  `road_id` varchar(255) DEFAULT NULL,
  `real_speed` double DEFAULT NULL,
  `limit_speed` int(11) DEFAULT NULL,
  `action_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

导入包:

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.12.0</version>
        </dependency>

        <!-- 指定mysql-connector的依赖 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.48</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.48</version>
        </dependency>

    </dependencies>

复习FastJson的使用:

image.png


假如需求再次升级,给定一个监控设备表:

DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (
  `monitor_id` varchar(255) NOT NULL,  
  `road_id` varchar(255) NOT NULL,
  `speed_limit` int(11) DEFAULT NULL,
  `area_id` varchar(255) DEFAULT NULL,
   PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('0000', '02', 60, '01');
INSERT INTO `t_monitor_info` VALUES ('0001', '02', 60, '02');
INSERT INTO `t_monitor_info` VALUES ('0002', '03', 80, '01');
INSERT INTO `t_monitor_info` VALUES ('0004', '05', 100, '03');
INSERT INTO `t_monitor_info` VALUES ('0005', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0021', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0023', '05', 0, NULL);
启动集群:
1) start-dfs.sh
2) zk.sh start
3) kf.sh start
4) start-cluster.sh
5) historyserver.sh start
kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car

通过黑窗口发送消息:
kafka-console-producer.sh  --bootstrap-server bigdata01:9092 --topic topic-car

image.png

 思路整理:

        创建一个存放mysql数据的实体类,把从mysql中读取的数据存入这个类对象中(一个类对象中存储的是一条mysql中的数据),然后把这些数据放入list集合中;接着读取kafka中的json数据,解析json数据并放入另一个新建的类对象中。取出存储mysql数据的集合中的“限速”字段,把他放入存储Kafka数据的类对象中,然后对kafka中的数据进行筛选过滤。最后将符合条件的数据放入mysql数据库中。

代码实现:

package com.bigdata.Day03;

import com.alibaba.fastjson.JSON;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;
import org.apache.htrace.fasterxml.jackson.databind.JsonNode;
import org.apache.htrace.fasterxml.jackson.databind.ObjectMapper;

import java.io.Serializable;
import java.lang.management.MonitorInfo;
import java.sql.*;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;
//创建实体类,用于存储从mysql中读取的数据
@Data
@NoArgsConstructor
@AllArgsConstructor
class monitorInfo implements Serializable{
    private String monitorId;
    private String roadId;
    private int limitSpeed;
    private String areaId;
}
//存储从kafka中读取的数据
@Data
@NoArgsConstructor
@AllArgsConstructor
// {"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}
class CarInfo implements Serializable {
    private long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private double speed;
    private String roadId;
    private String areaId;

    // 这个属性不属于这里,但是可以使用
    private int limitSpeed;
}

public class zuoye_1122 {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //连接kafka
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "g1");
        //kafka数据源
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("car",new SimpleStringSchema(),properties);
        //获取kafka数据源
        DataStreamSource<String> dataStreamSource2 = env.addSource(kafkaSource);
        //获取mysql数据源
        //2. 注册驱动(安转驱动)  此时这句话可以省略  如果书写的话,mysql8.0 带 cj
        Class.forName("com.mysql.jdbc.Driver");
        //3. 获取数据库连接对象 Connection
        Connection conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/mydb01","root","root");
        String sql = "select * from `t_monitor_info`";
        PreparedStatement statement = (PreparedStatement) conn.prepareStatement(sql);
        // 此处的返回值是 影响的行数
        ResultSet resultSet = statement.executeQuery();
        ArrayList<monitorInfo> list = new ArrayList<>();
        while(resultSet.next()){
            // 根据列名获取列的数据
            String monitorId = resultSet.getString("monitor_id");
            String roadId = resultSet.getString("road_id");
            int speedLimit = resultSet.getInt("speed_limit");
            String areaId = resultSet.getString("area_id");
            list.add(new monitorInfo(monitorId,roadId,speedLimit,areaId));
        }
        System.out.println(list);
        //将kafka中的json字符串转换为java对象
        SingleOutputStreamOperator<CarInfo> chaosuCar = dataStreamSource2.map(new MapFunction<String, CarInfo>() {
            @Override
            public CarInfo map(String s) throws Exception {
                CarInfo carInfo = JSON.parseObject(s, CarInfo.class);
                return carInfo;
            }
        }).map(new MapFunction<CarInfo, CarInfo>() {
            @Override
            public CarInfo map(CarInfo carInfo) throws Exception {
                for (int i = 0; i < list.size(); i++) {
                    if(Objects.equals(list.get(i).getMonitorId(),carInfo.getMonitorId())){
                        carInfo.setLimitSpeed(list.get(i).getLimitSpeed());
                        System.out.println(carInfo);
                    }
                }
                return carInfo;
            }
        }).filter(new FilterFunction<CarInfo>() {
            @Override
            public boolean filter(CarInfo carInfo) throws Exception {
                return carInfo.getSpeed() > carInfo.getLimitSpeed()*1.2;
            }
        });
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                .withUrl("jdbc:mysql://localhost:3306/mydb01")
                .withDriverName("com.mysql.jdbc.Driver")
                .withUsername("root")
                .withPassword("root")
                .build();
        chaosuCar.addSink(JdbcSink.sink(
                "insert into t_speeding_info values(null,?,?,?,?,?,?)",
                new JdbcStatementBuilder<CarInfo>() {
                    @Override
                    public void accept(java.sql.PreparedStatement preparedStatement, CarInfo carInfo) throws SQLException {
                        preparedStatement.setString(2,carInfo.getMonitorId());
                        preparedStatement.setString(1,carInfo.getCar());
                        preparedStatement.setDouble(4,carInfo.getSpeed());
                        preparedStatement.setString(3,carInfo.getRoadId());
                        preparedStatement.setDouble(5,carInfo.getLimitSpeed());
                        preparedStatement.setLong(6,carInfo.getActionTime());
                    }
                }, JdbcExecutionOptions.builder().withBatchSize(1).build(),jdbcConnectionOptions
        ));
        //5. execute-执行
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/922539.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CVE-2022-24124

根据提示 访问api/get-organizations salmap和手工注入都不行&#xff0c;使用substring() 查库&#xff0c;查到有4个库 ?p1&pageSize10&valuee99nb&sortField&sortOrder&field (substring((select count(schema_name) from information_schema.sche…

采用python3.12 +django5.1 结合 RabbitMQ 和发送邮件功能,实现一个简单的告警系统 前后端分离 vue-element

一、开发环境搭建和配置 #mac环境 brew install python3.12 python3.12 --version python3.12 -m pip install --upgrade pip python3.12 -m pip install Django5.1 python3.12 -m django --version #用于检索系统信息和进程管理 python3.12 -m pip install psutil #集成 pika…

Python文件夹.idea的作用

每当我们创建python的时候&#xff0c;发现文件夹里面都会有.idea文件夹。 那么这个是什么东西呢&#xff1f; .idea是集成开发环境&#xff08;IDE&#xff09;创建项目时自动生成的配置目录。 .idea文件目录介绍&#xff1a; workspace.xml&#xff1a;包含项目的整体配置信…

【计算机网络】多路转接之poll

poll也是一种linux中的多路转接方案(poll也是只负责IO过程中的"等") 解决&#xff1a;1.select的fd有上限的问题&#xff1b;2.每次调用都要重新设置关心的fd 一、poll的使用 int poll(struct pollfd *fds, nfds_t nfds, int timeout); ① struct pollfd *fds&…

使用 Elastic 收集 Windows 遥测数据:ETW Filebeat 输入简介

作者&#xff1a;来自 Elastic Chema Martinez 在安全领域&#xff0c;能够使用 Windows 主机的系统遥测数据为监控、故障排除和保护 IT 环境开辟了新的可能性。意识到这一点&#xff0c;Elastic 推出了专注于 Windows 事件跟踪 (ETW) 的新功能 - 这是一种强大的 Windows 原生机…

.net core MVC入门(一)

文章目录 项目地址一、环境配置1.1 安装EF core需要包1.2 配置数据库连接二、使用EF创建表2.1 整体流程梳理2.1 建表详细流程三、添加第一个视图3.1整体流程梳理3.1 添加视图,并显示在web里四、使用EF增加Catogory数据,并且读取数据到页面4.1整体流程梳理4.2 实现五、增加Cat…

短视频矩阵矩阵,矩阵号策略

随着数字媒体的迅猛发展&#xff0c;短视频平台已经成为企业和个人品牌推广的核心渠道。在这一背景下&#xff0c;短视频矩阵营销策略应运而生&#xff0c;它通过高效整合和管理多个短视频账号&#xff0c;实现资源的最优配置和营销效果的最大化。本文旨在深入探讨短视频矩阵的…

决策回归树【原理/算例/决策回归树 VS 线性回归】

决策回归树 1. 决策回归树原理2. 决策回归树算例3. 手动计算MSE和最优划分属性4. 决策回归树 VS 线性回归 1. 决策回归树原理 决策回归树&#xff0c;虽然叫做“回归”树&#xff0c;但是它的本质还是分类算法&#xff0c;只是分的类别多一点。 1. 回归树的裂分指标 回归树种&…

基于STM32的智能鱼缸控制系统的Proteus仿真

文章目录 一、智能鱼缸控制系统1.题目要求2.思路2.1 主控2.2 传感器2.3 按键2.4 声光报警2.5 自动换水&#xff0c;喂食&#xff0c;供氧2.6 OLED显示2.7 电源部分2.8 远程终端 3.电路仿真3.1 未仿真时3.2 开始仿真&#xff0c;正常显示3.3 按下设置按键&#xff0c;进入阈值界…

【Python爬虫】Scrapy框架实战---百度首页热榜新闻

如何利用Scrapy框架实战提取百度首页热榜新闻的排名、标题和链接 一、安装Scrapy库 二、创建项目&#xff08;以BaiduSpider为例&#xff09; scrapy startproject BaiduSpider生成每个文件的功能&#xff1a; 二、 创建爬虫脚本&#xff08;爬虫名&#xff1a;news&#xff…

mysql-分析MVCC原理

一、MVCC简介 MVCC是一种用来解决读写冲读的无锁并发控制&#xff0c;也就是为事务分配单增长的时间戳&#xff0c;为每个修改保存一个版本&#xff0c;版本与事务时间戳关联&#xff0c;读操作只读该事务开始前的数据库的快照&#xff0c;所以MVCC可以为数据库解决一些问题。…

论文笔记:Retrieval-Augmented Generation for Knowledge-Intensive NLP Tasks

1. 挑战/问题&#xff08;Challenges/Issues&#xff09;&#xff1a; 这篇论文探讨了大型预训练语言模型在处理知识密集型自然语言处理&#xff08;NLP&#xff09;任务时面临的挑战。尽管这些模型在参数中存储了大量事实知识&#xff0c;并在微调后能够在下游NLP任务中取得很…

嵌入式硬件电子电路设计(六)LDO低压差线性稳压器全面详解

引言&#xff1a; LDO&#xff08;Low Dropout Regulator&#xff0c;低压差线性稳压器&#xff09;是一种常用的电源管理组件&#xff0c;用于提供稳定的输出电压&#xff0c;同时允许较小的输入电压与输出电压之间的差值。LDO广泛应用于各种电子设备中&#xff0c;特别是在对…

Spring:AOP面向切面案例讲解AOP核心概念

Spring的AOP&#xff0c;在不惊动(改动)原有设计(代码)的前提下&#xff0c;想给谁添加功能就给谁添加。这个也就是Spring的理念&#xff1a; 无入侵式/无侵入式 AOP中核心概念分别指的是什么? 连接点切入点通知通知类切面 下面以一个例子进行讲解&#xff0c;直接上代码&a…

禁止Chrome的自动升级

一、需求分析 因为用Chromeselenium做了网页自动化填写任务&#xff0c;如果Google Chrome浏览器自动升级&#xff0c;就会导致chromedriver加载失败&#xff0c;自动化任务失效&#xff0c;因此需要禁止Chrome浏览器的自动升级。 二、当前环境 三、实际配置 运行注册表编辑…

2024年wordpress、d-link等相关的多个cve漏洞poc

⚠️ 漏洞 ✅ CVE-2024-10914 在D-Link DNS-320、DNS-320LW、DNS-325和DNS-340L中发现的漏洞&#xff0c;版本直到20241028 GET /cgi-bin/account_mgr.cgi?cmdcgi_user_add&name%27;id;%27 HTTP/1.1✅ CVE-2024-11305 在Altenergy Power Control Software中发现的关键…

Spring框架特性及包下载(Java EE 学习笔记04)

1 Spring 5的新特性 Spring 5是Spring当前最新的版本&#xff0c;与历史版本对比&#xff0c;Spring 5对Spring核心框架进行了修订和更新&#xff0c;增加了很多新特性&#xff0c;如支持响应式编程等。 更新JDK基线 因为Spring 5代码库运行于JDK 8之上&#xff0c;所以Spri…

从搭建uni-app+vue3工程开始

技术栈 uni-app、vue3、typescript、vite、sass、uview-plus、pinia、axios 一、项目搭建 1、创建以 typescript 开发的工程 npx degit dcloudio/uni-preset-vue#vite-ts my-vue3-project2、安装sass npm install -D sass// 安装sass-loader&#xff0c;注意需要版本10&…

WPF中的登录界面

创建如下的目录结构&#xff1a; 2.在App.xaml.cs中设置为先登录验证之后再进入主页面 using Prism.Ioc; using System.Windows; using 校园访客系统.Views;namespace 校园访客系统 {/// <summary>/// Interaction logic for App.xaml/// </summary>public partia…

ros2学习日记_241124_ros相关链接

前言 提醒&#xff1a; 文章内容为方便作者自己后日复习与查阅而进行的书写与发布&#xff0c;其中引用内容都会使用链接表明出处&#xff08;如有侵权问题&#xff0c;请及时联系&#xff09;。 其中内容多为一次书写&#xff0c;缺少检查与订正&#xff0c;如有问题或其他拓展…