Flink 实现超速监控:从 Kafka 读取卡口数据写入 MySQL

目录

1. 背景

2. 实现需求

2.1 数据格式

2.2 超速判断规则

3. 实现步骤

3.1 创建 Kafka Topic

3.2 准备数据发送工具

3.3 Flink 实现代码

4. 代码说明

5. 项目运行验证

6. 总结


1. 背景

在智慧交通项目中,监控车辆是否超速是一个常见的需求。通过 Flink 处理流数据,可以实时监控车辆通过卡口时的速度,并将超速车辆信息写入数据库供后续分析。

本文将展示如何从 Kafka 的 topic-car 中读取车辆卡口数据,筛选出超速车辆,并将其信息写入 MySQL 数据库。


2. 实现需求

2.1 数据格式

  • Kafka 数据格式(JSON 示例):
{"action_time": 1682219447, "monitor_id": "0001", "camera_id": "1", "car": "豫A12345", "speed": 34.5, "road_id": "01", "area_id": "20"}
  • MySQL 表结构:
CREATE TABLE t_speeding_info (
    id INT AUTO_INCREMENT PRIMARY KEY,
    car VARCHAR(255) NOT NULL,
    monitor_id VARCHAR(255),
    road_id VARCHAR(255),
    real_speed DOUBLE,
    limit_speed INT,
    action_time BIGINT
);
DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (
  `monitor_id` varchar(255) NOT NULL,  
  `road_id` varchar(255) NOT NULL,
  `speed_limit` int(11) DEFAULT NULL,
  `area_id` varchar(255) DEFAULT NULL,
   PRIMARY KEY (`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('0000', '02', 60, '01');
INSERT INTO `t_monitor_info` VALUES ('0001', '02', 60, '02');
INSERT INTO `t_monitor_info` VALUES ('0002', '03', 80, '01');
INSERT INTO `t_monitor_info` VALUES ('0004', '05', 100, '03');
INSERT INTO `t_monitor_info` VALUES ('0005', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0021', '04', 0, NULL);
INSERT INTO `t_monitor_info` VALUES ('0023', '05', 0, NULL);

2.2 超速判断规则

当车辆通过卡口时的 车速(speed)超过限速(limitSpeed)的 1.2 倍,即视为超速,将数据写入 t_speeding_info 表。


3. 实现步骤

3.1 创建 Kafka Topic

在 Kafka 中创建一个名为 topic-car 的主题:

kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car

3.2 准备数据发送工具

通过 Kafka Producer 向 topic-car 发送数据:

kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic-car

示例数据:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}

3.3 Flink 实现代码

3.3.1 定义数据模型
package com.bigdata.day05;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarInfo {
    private long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private double speed;
    private String roadId;
    private String areaId;

    // 限速字段,用于超速判断
    private double limitSpeed;
}
3.3.2 Flink 处理逻辑
package com.bigdata.windows;

import com.bigdata.day05.CarInfo;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import com.alibaba.fastjson.JSON;

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

public class SpeedingMonitor {
    public static void main(String[] args) throws Exception {
        // 1. 创建 Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 配置 Kafka 消费者
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "bigdata01:9092");
        kafkaProps.setProperty("group.id", "car-monitor-group");
        
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "topic-car",
                new SimpleStringSchema(),
                kafkaProps
        );

        // 3. 从 Kafka 中读取数据流
        DataStream<String> kafkaStream = env.addSource(kafkaConsumer);

        // 4. 数据处理:解析 JSON,判断超速
        DataStream<CarInfo> filteredStream = kafkaStream
                .map(line -> {
                    // 将 JSON 数据转换为 CarInfo 对象
                    CarInfo carInfo = JSON.parseObject(line, CarInfo.class);
                    carInfo.setLimitSpeed(120); // 设置卡口限速(假定限速为 120)
                    return carInfo;
                })
                .filter(carInfo -> carInfo.getSpeed() > carInfo.getLimitSpeed() * 1.2); // 超速判断

        // 打印超速车辆信息
        filteredStream.print();

        // 5. 数据写入 MySQL
        filteredStream.addSink(JdbcSink.sink(
                "INSERT INTO t_speeding_info (car, monitor_id, road_id, real_speed, limit_speed, action_time) VALUES (?, ?, ?, ?, ?, ?)",
                (PreparedStatement ps, CarInfo carInfo) -> {
                    ps.setString(1, carInfo.getCar());
                    ps.setString(2, carInfo.getMonitorId());
                    ps.setString(3, carInfo.getRoadId());
                    ps.setDouble(4, carInfo.getSpeed());
                    ps.setInt(5, (int) carInfo.getLimitSpeed());
                    ps.setLong(6, carInfo.getActionTime());
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1) // 每次写入一条数据
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://localhost:3306/smart_traffic")
                        .withUsername("root")
                        .withPassword("123456")
                        .build()
        ));

        // 6. 启动任务
        env.execute("Speeding Monitor");
    }
}

4. 代码说明

  1. Kafka 数据流处理

    • 使用 FlinkKafkaConsumer 从 Kafka 读取实时流数据。
    • 通过 FastJSON 将 JSON 数据解析为 Java 对象。
  2. 超速判断逻辑

    • 使用 .filter() 对流数据进行过滤,筛选超速车辆。
  3. MySQL Sink

    • 使用 Flink 的 JdbcSink 将超速数据写入 MySQL 表 t_speeding_info

5. 项目运行验证

5.1 启动 Flink 程序

在 IDEA 中运行 SpeedingMonitor 程序,确保 MySQL 服务正常运行。

5.2 发送测试数据

通过 Kafka Producer 向 topic-car 发送数据:

{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":145.0,"road_id":"01","area_id":"20"}

5.3 验证 MySQL 数据

查询 MySQL 表 t_speeding_info

SELECT * FROM t_speeding_info;

结果示例:

idcarmonitor_idroad_idreal_speedlimit_speedaction_time
1豫A12345000101145.01201682219447

6. 总结

通过本文,完整实现了从 Kafka 读取车辆卡口数据,筛选出超速车辆并写入 MySQL 的流程。使用 Flink 和 Kafka 的实时处理能力,可以轻松构建高效的智慧交通系统。

如果本文对你有帮助,请点赞、收藏,并分享给更多人! 😊

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/924159.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Qt】重写QComboBox下拉展示多列数据

需求 点击QComboBox时&#xff0c;下拉列表以多行多列的表格展示出来。 实现 直接上代码&#xff1a; #include <QComboBox> #include <QTableWidget> #include <QVBoxLayout> #include <QWidget> #include <QEvent> #include <QMouseEve…

NVR录像机汇聚管理EasyNVR多个NVR同时管理基于B/S架构的技术特点与能力应用

EasyNVR视频融合平台基于云边端协同设计&#xff0c;能够轻松接入并管理海量的视频数据。该平台兼容性强、拓展灵活&#xff0c;提供了视频监控直播、录像存储、云存储服务、回放检索以及平台级联等一系列功能。B/S架构使得EasyNVR实现了视频监控的多元化兼容与高效管理。 其采…

了解网络威胁情报:全面概述

网络威胁情报 CTI 是指系统地收集和分析与威胁相关的数据&#xff0c;以提供可操作的见解&#xff0c;从而增强组织的网络安全防御和决策过程。 在数字威胁不断演变的时代&#xff0c;了解网络威胁情报对于组织来说至关重要。复杂网络攻击的兴起凸显了制定强有力的策略以保护敏…

linux运行vue编译后的项目

如果你的 Vue 项目使用了 history 模式&#xff08;而非默认的 hash 模式&#xff09;&#xff0c;在纯静态服务器中会出现类似的问题。因为 Vue Router 的 history 模式要求所有未匹配的路径都重定向到 index.html&#xff0c;以便 Vue 前端处理路径。 首先在本地执行npm run…

基础入门-Web应用架构类别源码类别镜像容器建站模版编译封装前后端分离

知识点&#xff1a; 1、基础入门-Web应用-搭建架构上的技术要点 2、基础入门-Web应用-源码类别上的技术要点 一、演示案例-架构类别-模版&分离&集成&容器&镜像 1、套用模版型 csdn / cnblog / github / 建站系统等 安全测试思路上的不同&#xff1a; 一般…

使用Github Action将Docker镜像转存到阿里云私有仓库,供国内服务器使用,免费易用

文章目录 一、前言二、 工具准备&#xff1a;三、最终效果示例四、具体步骤第一大部分是配置阿里云1. 首先登录阿里云容器镜像服务 [服务地址](https://cr.console.aliyun.com/cn-hangzhou/instances)2. 选择个人版本3. 创建 命名空间4. 进入访问凭证来查看&#xff0c;用户名字…

Goland或Idea启动报错

Goland或Idea启动不了 报错如图&#xff1a; 原因&#xff1a;破解导致 解决方案 环境变量中有关Goland的全部删除

keepalived+lVS(dr)高可用集群

keepalivedlVS(dr)高可用集群 规划 服务器名称IP描述masterkeepalivedlvsVIP:192.168.238.100DIP:192.168.238.151keepalived的master节点和lvs负载均衡backupkeepalivedlvsVIP:192.168.238.100DIP:192.168.238.152keepalived的备份节点和lvs负载均衡server1VIP:192.168.238.…

探索.NET世界的无限可能——带你轻松了解.NET

前言 由于目前用到的技术栈有C#&#xff0c;而学习C#离不开.NET框架&#xff0c;正如学习Java离不开学习Spring框架一样。 .NET是微软开发的一个非常强大的框架&#xff0c;它不仅擅长桌面和移动开发&#xff0c;而且还能够支持Web开发和游戏引擎开发&#xff0c;在现在热门的…

web3.js + Ganache 模拟以太坊账户间转账

转账前&#xff1a; 转账后&#xff1a; async function interact() {const web3 new Web3(new Web3.providers.HttpProvider(http://127.0.0.1:7545))web3.eth.Contract.handleRevert trueconst accounts await web3.eth.getAccounts()console.log(accounts)let balance1, …

题解 洛谷 Luogu P1182 数列分段 Section II 二分答案 C/C++

题目传送门&#xff1a; P1182 数列分段 Section II - 洛谷 | 计算机科学教育新生态https://www.luogu.com.cn/problem/P1182思路&#xff1a; 二分答案&#xff0c;每次以区间 [l, r] 中点 m 为每段和的阈值 判断在此前提下&#xff0c;划分段数是否不大于 M 是就记录答案…

26.100ASK_T113-PRO 测试摄像头 输出信息

1.测试代码 读到摄象头参数 输出 video_test.c #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <sys/ioctl.h> #include <unistd.h> #include <stdio.h> #include <string.h> #include <linux/type…

git使用文档手册

创建一个本地代码工作空间&#xff0c;比如这里使用test目录作为工作目录 针对仓库地址 http://192.168.31.125:9557/poxiaoai-crm/project-crm.git。 1. 安装 Git 确保您的系统已经安装了 Git。如果未安装&#xff0c;请根据操作系统访问 Git 官网 下载并安装。 验证安装 …

HTML5和CSS3新增特性

HTML5的新特性 HTML5新增的语义化标签 HTML5 的新增特性主要是针对于以前的不足&#xff0c;增加了一些新的标签、新的表单和新的表单属性等。 这些新特性都有兼容性问题&#xff0c;基本是 IE9 以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量…

BUUCTF—Reverse—不一样的flag(7)

是不是做习惯了常规的逆向题目&#xff1f;试试这道题&#xff0c;看你在能不能在程序中找到真正的flag&#xff01;注意&#xff1a;flag并非是flag{XXX}形式&#xff0c;就是一个’字符串‘&#xff0c;考验眼力的时候到了&#xff01; 注意&#xff1a;得到的 flag 请包上 f…

通信与网络安全之IPSEC

IPSec&#xff08;IP Security&#xff09;是IETF制定的为保证在Internet上传送数据的安全保密性能的三层隧道加密协议。IPSec在网络层对IP报文提供安全服务。IPSec协议本身定义了如何在IP数据包中增加字段来保证IP包的完整性、 私有性和真实性&#xff0c;以及如何加密数据包。…

树莓派搭建NextCloud:给数据一个安全的家

前言 NAS有很多方案&#xff0c;常见的有 Nextcloud、Seafile、iStoreOS、Synology、ownCloud 和 OpenMediaVault &#xff0c;以下是他们的特点&#xff1a; 1. Nextcloud 优势&#xff1a; 功能全面&#xff1a;支持文件同步、共享、在线文档编辑、视频会议、日历、联系人…

AWS账户注册未完成会收费吗?

在当今云计算的时代&#xff0c;亚马逊网络服务&#xff08;AWS&#xff09;已经成为众多企业和开发者的首选平台。然而&#xff0c;对于许多刚接触云服务的人来说&#xff0c;关于AWS账户注册的费用问题常常引发疑虑&#xff1a;如果我在注册过程中未能完成操作&#xff0c;是…

在线音乐播放器 —— 测试报告

自动化脚本源代码&#xff1a;Java: 利用Java解题与实现部分功能及小项目的代码集合 - Gitee.com 目录 前言 一、项目简介 1.项目背景 2.应用技术 &#xff08;1&#xff09;后端开发 &#xff08;2&#xff09;前端开发 &#xff08;3&#xff09;数据库 二、项目功能…

TCP/IP协议攻击与防范

一、TCP/IP协议攻击介绍 1.1 Internet的结构​ LAN&#xff1a;局域网 WAN&#xff1a;广域网 WLAN&#xff1a;无线局域网 私有IP地址与公有IP地址&#xff1f; 私有地址&#xff1a;A类&#xff1a;10.0.0.0~10.255.255.255 B类&#xff1a;172.16.0.0~172.31.255.255…