基于 Flink 的车辆超速监测与数据存储的小实战

基于 Flink 的车辆超速监测与数据存储的小实战

一、实战背景与目标

在智能交通管理领域,实时监控车辆行驶速度并精准识别超速行为对于保障道路交通安全和维护交通秩序具有至关重要的意义。本项目旨在构建一个高效的数据处理系统,能够从 Kafka 的 topic-car 主题中读取卡口车辆数据,依据卡口对应的限速信息,精确判断车辆是否超速(当车速超过卡口限速的 1.2 倍时认定为超速),并将超速车辆的详细信息实时写入 MySQL 的 t_speeding_info 表中,以便后续交通管理部门进行统计分析与违规处理。

二、技术选型与环境搭建

  1. 技术选型
    • Apache Flink:作为核心的分布式流处理框架,具备强大的实时数据处理能力,能够高效地处理来自 Kafka 的海量卡口数据,进行复杂的转换和分析操作。
    • Apache Kafka:作为高性能的分布式消息队列,负责稳定地接收和传输卡口数据,确保数据的可靠性和高可用性,实现数据的解耦与缓冲。
    • MySQL:用于存储监控设备信息以及超速车辆数据,提供持久化的数据存储服务,方便数据的管理与查询。
    • FastJSON:一款快速的 JSON 处理库,用于在 Java 程序中便捷地解析和序列化 JSON 格式的卡口数据。
  2. 环境搭建
    • 首先启动 Hadoop 相关服务:执行 start-dfs.sh 启动分布式文件系统。
    • 启动 ZooKeeper 服务:zk.sh start,为 Kafka 提供分布式协调服务。
    • 启动 Kafka 服务:kf.sh start,确保消息队列正常运行。
    • 启动 Flink 集群:start-cluster.sh,创建 Flink 执行环境。
    • 启动 Flink 的历史服务器:historyserver.sh start,方便查看作业执行历史信息。
    • 创建用于接收卡口数据的 Kafka 主题 topic-carkafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic-car,其中 bigdata01 为 Kafka 服务器地址,--partitions 1 表示主题分区数为 1,--replication-factor 3 表示每个分区的副本数为 3。

三、数据格式与数据库表结构

  1. Kafka 卡口数据格式
    • action_timelong 类型,表示摄像头拍摄时间戳,精确到秒,用于记录车辆通过卡口的具体时间。
    • monitor_idstring 类型,代表卡口号,是识别车辆通过哪个卡口的关键标识。
    • camera_idstring 类型,摄像头编号,由于一个卡口可能配备多个摄像头,此编号用于区分不同摄像头拍摄的数据。
    • carstring 类型,车牌号码,用于唯一标识车辆。
    • speeddouble 类型,车辆通过卡口时的速度,是判断车辆是否超速的核心数据。
    • road_idstring 类型,道路 ID,代表城市中每条道路的唯一编号,有助于关联道路相关信息。
    • area_idstring 类型,区域 ID,标识车辆所在的城市行政区域,方便进行区域交通数据统计分析。
    • 示例数据:{"action_time":1682219447,"monitor_id":"0001","camera_id":"1","car":"豫A12345","speed":34.5,"road_id":"01","area_id":"20"}
  2. MySQL 表结构
    • t_speeding_info 表:
      • idint(11) 类型,自增长主键,用于唯一标识每条超速记录。
      • carvarchar(255) 类型,存储车辆的车牌号码。
      • monitor_idvarchar(255) 类型,记录车辆超速时经过的卡口号。
      • road_idvarchar(255) 类型,对应的道路 ID。
      • real_speeddouble 类型,车辆实际行驶速度。
      • limit_speedint(11) 类型,该卡口的限速值。
      • action_timebigint(20) 类型,车辆超速的时间戳。
    • t_monitor_info 表:
      • monitor_idvarchar(255) 类型,主键,卡口的唯一标识。
      • road_idvarchar(255) 类型,卡口所在道路的 ID。
      • speed_limitint(11) 类型,该卡口的限速信息。
      • area_idvarchar(255) 类型,卡口所在的区域 ID。

四、代码实现详解

以下是使用 Java 结合 Flink 实现的核心代码逻辑:

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Properties;

/**
 * @基本功能: 从 Kafka 读取卡口数据,判断超速并写入 MySQL 超速表
 * @program:FlinkProject
 * @author: BJ
 * @create:2024-11-23 09:09:06
 **/
public class CarSpeedDemo {

    public static void main(String[] args) throws Exception {

        // 1. env - 准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2. source - 加载数据
        // 加载 MySQL 中的监控设备信息
        ArrayList<MonitorInfo> monitorInfos = loadMonitorInfoFromMySQL();

        // 加载 Kafka 中的车辆信息数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "bigdata01:9092");
        properties.setProperty("group.id", "fD1");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<String>("topic-car", new SimpleStringSchema(), properties);
        DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);

        // 3. transformation - 数据处理转换
        SingleOutputStreamOperator<CarInfo> filter = dataStreamSource.process(new ProcessFunction<String, CarInfo>() {
            @Override
            public void processElement(String msg, ProcessFunction<String, CarInfo>.Context context, Collector<CarInfo> collector) throws Exception {
                CarInfo carInfo = JSON.parseObject(msg, CarInfo.class);
                System.out.println(carInfo);
                // 根据监控设备信息设置车辆的限速
                for (MonitorInfo monitorInfo : monitorInfos) {
                    if (Objects.equals(monitorInfo.getMonitor_id(), carInfo.getMonitorId())) {
                        carInfo.setLimitSpeed(monitorInfo.getSpeed_limit());
                    }
                }
                collector.collect(carInfo);
                System.out.println(carInfo);
            }
        }).filter(new FilterFunction<CarInfo>() {
            @Override
            public boolean filter(CarInfo carInfo) throws Exception {
                // 判断车辆是否超速(速度超过限速的 1.2 倍)
                return carInfo.getSpeed() > carInfo.getLimitSpeed() * 1.2;
            }
        });
        filter.print();

        // 4. sink - 数据输出
        JdbcConnectionOptions jdbcConnectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
              .withUrl("jdbc:mysql://localhost:3306/zuoye")
              .withDriverName("com.mysql.jdbc.Driver")
              .withUsername("root")
              .withPassword("123456")
              .build();
        filter.addSink(JdbcSink.sink(
                "insert into t_speeding_info values(null,?,?,?,?,?,?)",
                new JdbcStatementBuilder<CarInfo>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, CarInfo student) throws SQLException {
                        preparedStatement.setString(1, student.getCar());
                        preparedStatement.setString(2, student.getMonitorId());
                        preparedStatement.setString(3, student.getRoadId());
                        preparedStatement.setDouble(4, student.getSpeed());
                        preparedStatement.setDouble(5, student.getLimitSpeed());
                        preparedStatement.setLong(6, student.getActionTime());
                    }
                }, JdbcExecutionOptions.builder().withBatchSize(1).build(), jdbcConnectionOptions

        ));

        // 5. execute - 执行
        env.execute();
    }

    // 从 MySQL 加载监控设备信息的方法
    private static ArrayList<MonitorInfo> loadMonitorInfoFromMySQL() throws Exception {
        ArrayList<MonitorInfo> monitorInfos = new ArrayList<>();
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/zuoye", "root", "123456");
        Statement statement = connection.createStatement();
        ResultSet resultSet = statement.executeQuery("select monitor_id, road_id, speed_limit,area_id FROM t_monitor_info");
        while (resultSet.next()) {
            String monitor_id = resultSet.getString("monitor_id");
            String road_id = resultSet.getString("road_id");
            Integer speed_limit = resultSet.getInt("speed_limit");
            String area_id = resultSet.getString("area_id");
            MonitorInfo monitorInfo = new MonitorInfo(monitor_id, road_id, speed_limit.doubleValue(), area_id);
            System.out.println(monitorInfo);
            monitorInfos.add(monitorInfo);
        }
        statement.close();
        connection.close();
        return monitorInfos;
    }
}

// 车辆信息类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class CarInfo implements Serializable {
    private long actionTime;
    private String monitorId;
    private String cameraId;
    private String car;
    private double speed;
    private String roadId;
    private String areaId;

    // 用于存储车辆对应的限速信息
    private double limitSpeed;
}

// 监控设备信息类
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class MonitorInfo implements Serializable {
    private String monitor_id;
    private String road_id;
    private Double speed_limit;
    private String area_id;
}
  1. 环境配置与数据加载
    • 首先创建 Flink 的流执行环境 StreamExecutionEnvironment,并设置运行模式为自动模式,让 Flink 根据数据特性自动选择合适的执行模式。
    • 从 MySQL 的 t_monitor_info 表中加载监控设备信息,通过 JDBC 连接数据库,执行查询语句获取监控设备的相关数据,如卡口号、道路 ID、限速信息和区域 ID,将这些信息封装成 MonitorInfo 对象并存储在 ArrayList 中,以便后续在数据处理过程中为车辆信息匹配对应的限速。
    • 配置 Kafka 消费者属性,创建 FlinkKafkaConsumer 连接到 topic-car 主题,获取卡口车辆数据的流 dataStreamSource
  2. 数据处理与转换
    • 使用 process 函数对从 Kafka 读取的车辆信息数据进行处理。在 processElement 方法中,先使用 FastJSON 将 JSON 格式的车辆数据字符串转换为 CarInfo 对象。然后遍历之前加载的监控设备信息列表,根据卡口号匹配车辆对应的监控设备,将限速信息设置到 CarInfo 对象的 limitSpeed 属性中。最后将处理后的 CarInfo 对象收集输出。
    • 通过自定义的 FilterFunction 对车辆信息进行过滤,只保留速度超过限速 1.2 倍的超速车辆信息。
  3. 数据输出
    • 配置 JDBC 连接选项,构建 JdbcSink,将超速车辆信息插入到 MySQL 的 t_speeding_info 表中。在 JdbcStatementBuilder 中指定了插入语句的参数设置,将车辆的相关信息逐个设置到 PreparedStatement 中,准备插入数据库。
  4. 任务执行
    • 最后启动 Flink 任务执行,整个系统开始运行,从数据加载、处理到输出的流程将持续进行,实时监测和处理车辆超速信息。

五、总结与展望

本项目成功地利用 Flink、Kafka 和 MySQL 构建了一个车辆超速监测与数据存储系统,实现了从 Kafka 读取卡口数据、判断车辆超速并将超速信息写入 MySQL 的完整流程。通过实时处理卡口数据,交通管理部门能够及时获取超速车辆信息,有助于加强交通监管力度,提高道路交通安全水平。

然而,在实际应用场景中,还可以对该系统进行进一步的优化与扩展。例如,可以增加数据质量监控模块,确保从 Kafka 读取的数据准确性和完整性;优化 Flink 任务的性能,根据集群资源和数据流量调整并行度等参数;还可以考虑将超速数据进行可视化展示,以便更直观地分析交通超速情况的分布与趋势。未来,随着智能交通技术的不断发展,本系统也可以与其他交通管理系统进行集成,如车辆轨迹分析系统、交通流量预测系统等,为构建更加智能、高效的交通管理平台奠定坚实的基础。

希望通过本文的详细介绍,能够帮助读者深入理解基于 Flink 的实时数据处理在智能交通领域的应用,为相关领域的开发与研究提供有益的参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/922559.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Go】-go中的锁机制

目录 一、锁的基础知识 1. 互斥量/互斥锁 2. CAS&#xff08;compare and swap&#xff09; 3. 自旋锁 4. 读写锁 5. 乐观锁 & 悲观锁 6. 死锁 二、go中锁机制 1. Mutex-互斥锁 2. RWMutex-读写锁 2.1 RWMutex流程概览 2.2 写锁饥饿问题 2.3. golang的读写锁源…

Python 使用 Selenuim进行自动化点击入门,谷歌驱动,以百度为例

一、首先要下载谷歌驱动 1.&#xff08;打开谷歌浏览器 - 设置 - 关于谷歌&#xff0c;查看谷歌浏览器版本&#xff0c;否则不对应无法调用&#xff0c;会提示&#xff1a;selenium.common.exceptions.SessionNotCreatedException: Message: session not created: This versio…

RCVS:A Unifed Registration and FusionFramework for Video Streams 译文

摘要:红外与可见光的跨模态配准与融合可以生成更全面的目标和场景信息表示。以前的框架主要关注于解决模态差异以及保留不同模态信息对不同静态图像对之间配准和融合任务性能的影响。然而&#xff0c;这些框架忽略了在现实世界设备上的实际部署&#xff0c;特别是在视频流的背景…

JDBC编程---Java

目录 一、数据库编程的前置 二、Java的数据库编程----JDBC 1.概念 2.JDBC编程的优点 三.导入MySQL驱动包 四、JDBC编程的实战 1.创造数据源&#xff0c;并设置数据库所在的位置&#xff0c;三条固定写法 2.建立和数据库服务器之间的连接&#xff0c;连接好了后&#xff…

Python 抓取笑话内容并存入 CSV

在互联网上&#xff0c;有许多有趣的内容等待我们去挖掘和收集。今天&#xff0c;我们就来深入了解一段 Python 代码&#xff0c;它能够帮助我们从指定网站抓取笑话内容&#xff0c;并将其整理保存为 CSV 文件&#xff0c;方便后续查看和分析。 结果展示&#xff08;文末附完整…

Redis-09 SpringBoot集成Redis

Jedis 和 lettuce 基本已经过时 集成RedisTemplate 单机 1.建 Modul 2.改pom <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instanc…

Linux:自定义Shell

本文旨在通过自己完成一个简单的Shell来帮助理解命令行Shell这个程序。 目录 一、输出“提示” 二、获取输入 三、切割字符串 四、执行指令 1.子进程替换 2.内建指令 一、输出“提示” 这个项目基于虚拟机Ubuntu22.04.5实现。 打开终端界面如图所示。 其中。 之前&#x…

夜天之书 #104 开源软件有断供的风险吗?

近期&#xff0c;Linux 上游因为受美国出口管制条例的影响&#xff0c;将移除部分开发者的 MAINTAINER 权限&#xff0c;引起了新一轮对开源依赖的重新评估。 关于其中开源精神和社群治理的讨论&#xff0c;卫 Sir 的两篇文章已经讨论得比较清楚&#xff08;见尾注&#xff09;…

tensorforce(dqn框架)安装

win7 64位操作系统 python版本&#xff1a;3.8.10 pip install tensorflow 默认的tensorflow的版本是2.31.0&#xff0c;安装tensorforce后自动升级到3.6.0 tensorflow:升级到3.6.0 keras&#xff1a;升级到3.6.0 tensorforce安装 pip3 install tensorforce protobuf 需要降到…

STM32抢占优先级不生效

板类型&#xff1a;STM32F103精英开发板代码背景&#xff1a; 设置了USART1中断和KEY_UP中断(使用EXTI0外部中断)两个中断的优先级分组都设为2&#xff08;2bit抢占优先级&#xff0c;2bit响应优先级)EXTI0中断抢占优先级设为3&#xff0c; 响应优先级设为3USART1抢占优先级设…

4.1_未授权漏洞

未授权漏洞 成因&#xff1a;配置错误&#xff0c;默认口令&#xff08;弱口令&#xff09;&#xff0c;接口配置不当&#xff1b;未授权漏洞 漏洞利用方式 Redis 未授权访问漏洞 Getshell方式 写入webshell&#xff1b; 连接目标redis&#xff1a;redis-cli -h 192.168.7…

快速识别模型:simple_ocr,部署教程

快速识别图片中的英文、标点符号、数学符号、Emoji, 模型会输出图片中文字行的坐标位置、最低得分、识别结果。当前服务用到的模型&#xff1a;检测模型、数字识别、英文符号识别。 一、部署流程 1.更新基础环境 apt update2.安装miniconda wget https://repo.anaconda.com/…

衡山派D133EBS 开发环境安装及SDK编译烧写镜像烧录

1.创建新文件夹&#xff0c;用来存放SDK包&#xff08;其实本质就是路径要对就ok了&#xff09;&#xff0c;右键鼠标通过Open Git Bash here来打开git 输入命令 git clone --depth1 https://gitee.com/lcsc/luban-lite.git 来拉取&#xff0c;如下所示&#xff1a;&#xff0…

蓝桥杯不知道叫什么题目

小蓝有一个整数&#xff0c;初始值为1&#xff0c;他可以花费一些代价对这个整数进行变换。 小蓝可以花贵1的代价将教数增加1。 小蓝可以花费3的代价将整数增加一个值,这个值是整数的数位中最大的那个(1到9) .小蓝可以花费10的代价将整数变为原来的2倍, 例如&#xff0c;如果整…

读取mysql、kafka数据筛选后放入mysql

要求&#xff1a; 从kafka的topic-car中读取卡口数据&#xff0c;将超速车辆写入mysql的t_monitor_info表 当通过卡口的车速超过该卡口限速的1.2倍 就认定为超速。 G107 1&#xff09;卡口数据格式如下&#xff1a; action_time long --摄像头拍摄时间戳&#xff0c;精确到秒…

CVE-2022-24124

根据提示 访问api/get-organizations salmap和手工注入都不行&#xff0c;使用substring() 查库&#xff0c;查到有4个库 ?p1&pageSize10&valuee99nb&sortField&sortOrder&field (substring((select count(schema_name) from information_schema.sche…

采用python3.12 +django5.1 结合 RabbitMQ 和发送邮件功能,实现一个简单的告警系统 前后端分离 vue-element

一、开发环境搭建和配置 #mac环境 brew install python3.12 python3.12 --version python3.12 -m pip install --upgrade pip python3.12 -m pip install Django5.1 python3.12 -m django --version #用于检索系统信息和进程管理 python3.12 -m pip install psutil #集成 pika…

Python文件夹.idea的作用

每当我们创建python的时候&#xff0c;发现文件夹里面都会有.idea文件夹。 那么这个是什么东西呢&#xff1f; .idea是集成开发环境&#xff08;IDE&#xff09;创建项目时自动生成的配置目录。 .idea文件目录介绍&#xff1a; workspace.xml&#xff1a;包含项目的整体配置信…

【计算机网络】多路转接之poll

poll也是一种linux中的多路转接方案(poll也是只负责IO过程中的"等") 解决&#xff1a;1.select的fd有上限的问题&#xff1b;2.每次调用都要重新设置关心的fd 一、poll的使用 int poll(struct pollfd *fds, nfds_t nfds, int timeout); ① struct pollfd *fds&…

使用 Elastic 收集 Windows 遥测数据:ETW Filebeat 输入简介

作者&#xff1a;来自 Elastic Chema Martinez 在安全领域&#xff0c;能够使用 Windows 主机的系统遥测数据为监控、故障排除和保护 IT 环境开辟了新的可能性。意识到这一点&#xff0c;Elastic 推出了专注于 Windows 事件跟踪 (ETW) 的新功能 - 这是一种强大的 Windows 原生机…