Flink项目实战篇 基于Flink的城市交通监控平台(下)

系列文章目录

Flink项目实战篇 基于Flink的城市交通监控平台(上)
Flink项目实战篇 基于Flink的城市交通监控平台(下)


文章目录

  • 系列文章目录
  • 4. 智能实时报警
    • 4.1 实时套牌分析
    • 4.2 实时危险驾驶分析
    • 4.3 出警分析
    • 4.4 违法车辆轨迹跟踪
  • 5. 实时车辆布控
    • 5.1 实时车辆分布情况
    • 5.2 布隆过滤器(Bloom Filter)
    • 5.3 实时外地车分布情况


4. 智能实时报警

本模块主要负责城市交通管理中,可能存在违章或者违法非常严重的行为,系统可以自动实时报警。可以实现亿级数据在线分布式计算秒级反馈。满足实战的“实时”需要,争分夺秒、聚力办案。做的真正“零”延迟的报警和出警。主要功能包括:实时套牌分析,实时危险驾驶分析等。

4.1 实时套牌分析

当某个卡口中出现一辆行驶的汽车,我们可以通过摄像头识别车牌号,然后在10秒内,另外一个卡口(或者当前卡口)也识别到了同样车牌的车辆,那么很有可能这两辆车之中有很大几率存在套牌车,因为一般情况下不可能有车辆在10秒内经过两个卡口。如果发现涉嫌套牌车,系统实时发出报警信息,同时这些存在套牌车嫌疑的车辆,写入Mysql数据库的结果表中,在后面的模块中,可以对这些违法车辆进行实时轨迹跟踪。

本需求可以使用CEP编程,也可以使用状态编程。我们采用状态编程。

完整的代码:

object RepatitionCarWarning {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    //导入scala包
    import org.apache.flink.streaming.api.scala._

    //设置并行度
    env.setParallelism(1)

    //设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test4")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    val trafficDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val trafficDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    trafficDStream.keyBy(_.car).process(new KeyedProcessFunction[String,TrafficLog,RepatitionCarInfo] {
      lazy private val valueState: ValueState[TrafficLog] = getRuntimeContext.getState(new ValueStateDescriptor[TrafficLog]("valueState",classOf[TrafficLog]))

      override def processElement(value: TrafficLog, ctx: KeyedProcessFunction[String, TrafficLog, RepatitionCarInfo]#Context, out: Collector[RepatitionCarInfo]): Unit = {
        if(valueState.value() != null){//如果状态中包含当前车辆
          val log: TrafficLog = valueState.value()
          //同一车辆数据,判断两次通过卡扣间隔时长
          var dur = (log.actionTime  - value.actionTime).abs
          if(dur < 10*1000){
            out.collect(new RepatitionCarInfo(value.car,"涉嫌套牌",System.currentTimeMillis(),
              s"该车辆连续两次经过的卡扣及对应时间为:${log.monitorId} - ${log.actionTime} , ${value.monitorId} - ${value.actionTime} "))
          }
          //更新状态数据
          if(log.actionTime < value.actionTime){
            valueState.update(value)
          }
        }else{ //状态中不包含当前车辆
          valueState.update(value)
        }
      }
    })
        .addSink(new JdbcWriteSink[RepatitionCarInfo]("RepatitionCarInfo"))

    env.execute()
  }
}

4.2 实时危险驾驶分析

在本项目中,危险驾驶是指在道路上驾驶机动车:追逐超速竞驶。我们规定:如果一辆机动车在2分钟内,超速通过卡口超过3次以上;而且每次超速的超过了规定速度的20%以上;这样的机动车涉嫌危险驾驶。系统需要实时找出这些机动车,并报警,追踪这些车辆的轨迹。注意:如果有些卡口没有设置限速值,可以设置一个城市默认限速。

这样的需求在Flink也是有两种解决思路,第一:状态编程。第二:CEP编程。但是当前的需求使用状态编程过于复杂了。所以我们采用第二种。同时还要注意:Flume在采集数据的过程中出现了数据乱序问题,一般最长延迟5秒。

涉嫌危险驾驶的车辆信息保存到Mysql数据库表(t_violation_list)中,以便后面的功能中统一追踪这些车辆的轨迹。

注意:如果要设置水位线需要设置在两个连接流连接之后。

完整的代码:

case class newTrafficLog(actionTime:Long,monitorId:String,cameraId:String,car:String,speed:Double,roadId:String,areaId:String,monitorLimitSpeed:Int)

object DangerDriveCarWarning {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //设置并行度
    env.setParallelism(1)
    //导入隐式转换
    import org.apache.flink.streaming.api.scala._
    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test5")
    props.setProperty("key.serializer",classOf[StringDeserializer].getName)
    props.setProperty("value.serializer",classOf[StringDeserializer].getName)

    //设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //主流
//    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
      val arr: Array[String] = line.split(",")
      TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
    })

    //广播流,读取mysql中的数据,这里主要是读取卡扣限速的数据
    val bcDStream: BroadcastStream[MonitorLimitSpeedInfo] = env.addSource(new JdbcReadSource("MonitorLimitSpeedInfo"))
     .map(
      one => {
        one.asInstanceOf[MonitorLimitSpeedInfo]
      })
      .broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)

    //将日志流与广播流进行整合,将道路卡扣限速信息与每条车辆运行日志数据结合
    val trafficAllInfoDStream: DataStream[newTrafficLog] = mainDStream.connect(bcDStream).process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog] {
      //处理每个日志元素
      override def processElement(value: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog]#ReadOnlyContext, out: Collector[newTrafficLog]): Unit = {
        //获取状态
        val mapState: ReadOnlyBroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
        //获取当前道路当前卡扣 对应的限速 ,如果没有就设置限速为80
        var limitSpeed = 80
        if (mapState.contains(value.roadId + "_" + value.monitorId)) {
          limitSpeed = mapState.get(value.roadId + "_" + value.monitorId).speedLimit
        }
        out.collect(new newTrafficLog(value.actionTime, value.monitorId, value.cameraId, value.car, value.speed, value.roadId, value.areaId, limitSpeed))

      }

      //处理广播元素
      override def processBroadcastElement(value: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, newTrafficLog]#Context, out: Collector[newTrafficLog]): Unit = {
        //获取状态
        val mapState: BroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
        //更新当前道路当前卡扣的限速数据
        mapState.put(value.roadId + "_" + value.monitorId, value)
        println("广播状态准备就绪")
      }
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[newTrafficLog](Time.seconds(0)) {
      override def extractTimestamp(element: newTrafficLog): Long = element.actionTime
    })


    val keyDs: KeyedStream[newTrafficLog, String] = trafficAllInfoDStream.keyBy(_.car)//按照车辆分组


    //使用CEP编程定义模式, 在1分钟内连续3次超速20%通过道路卡扣的车辆
    val pattern:Pattern[newTrafficLog, newTrafficLog] =
        Pattern.begin[newTrafficLog]("start").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
          .followedBy("second").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
          .followedBy("third").where(nt=>{nt.speed > nt.monitorLimitSpeed*1.2})
          .within(Time.minutes(1))//注意:这里的时间指的是 各个时间之间的相差时间不超过1分钟。时间采用的是事件时间

    val patternStream: PatternStream[newTrafficLog] = CEP.pattern(keyDs,pattern)
    val result: DataStream[DangerDriveCarInfo] = patternStream.select((map: Map[String, Iterable[newTrafficLog]]) => {
      val begin: newTrafficLog = map.get("start").get.last
      val second: newTrafficLog = map.get("second").get.last
      val third: newTrafficLog = map.get("third").get.last
      val builder = s"第一次通过卡扣${begin.monitorId},当前限速:${begin.monitorLimitSpeed},通过的速度为:${begin.speed} |" +
        s"第二次通过卡扣${second.monitorId},当前限速:${second.monitorLimitSpeed},通过的速度为:${second.speed}|" +
        s"第三次通过卡扣${third.monitorId},当前限速:${third.monitorLimitSpeed},通过的速度为:${third.speed}"
      DangerDriveCarInfo(begin.car, "危险驾驶", System.currentTimeMillis(), builder.toString)
    })
//    result.print()
    result.addSink(new JdbcWriteSink[DangerDriveCarInfo]("DangerDriveCarInfo"))
    env.execute()
  }

}

4.3 出警分析

当监控到道路中有一起违法交通事故时,例如:车辆危险驾驶、车辆套牌、发生交通事故等,会有对应的交警出警处理案情。违法事故实时数据会被实时监控放入topicA,交通警察出警记录会实时上报数据被放入topicB中,这里需要对违法交通事故的出警情况进行分析并对超时未处理的警情作出对应的预警。

出警分析如下:如果在topicA中出现一条违法车辆信息,如果在5分钟内已经出警,将出警信息输出到结果库中。如果5分钟内没有出警则发出出警提示。(发出出警的提示,在侧流中发出)。
这里为了方便演示,将从socket中读取数据。

(1)使用IntervalJoin实现,这是只能输出出警信息

object PoliceAnalysis1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //设置事件时间
//    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props  = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test6")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    //获取监控违法车辆信息
//    val illegalDStream: DataStream[IllegalCarInfo] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic1",new SimpleStringSchema(),props))
    val illegalDStream: DataStream[IllegalCarInfo] = env.socketTextStream("mynode5", 8888).map(line => {
      val arr: Array[String] = line.split(",")
      IllegalCarInfo(arr(0), arr(1), arr(2).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IllegalCarInfo](Time.seconds(3)) {
      override def extractTimestamp(element: IllegalCarInfo): Long = element.eventTime
    })

    //获取出警信息
//    val policeDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic2",new SimpleStringSchema(),props))
    val policeDStream: DataStream[PoliceInfo] = env.socketTextStream("mynode5", 9999).map(line => {
      val arr: Array[String] = line.split(",")
      PoliceInfo(arr(0), arr(1), arr(2), arr(3).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[PoliceInfo](Time.seconds(2)) {
      override def extractTimestamp(element: PoliceInfo): Long = element.reporTime
    })

    //两个流进行 intervalJoin ,相对于join ,这里不需要设置窗口,必须后面跟上between 来以时间范围大小进行join
    illegalDStream.keyBy(_.car).intervalJoin(policeDStream.keyBy(_.car))
      //这里假设 违法信息 illegalDStream 先出现,policeDStream数据流后出现
      //between(Time.seconds(10),Time.seconds(10))相当于 illegalDStream.eventTime - 10s <= policeDStream.reporTime <= illegalDStream.eventTime + 10s
      //例如 illegalDStream.eventTime 为 20:05:30  可以与 policeDStream.reporTime 为 20:05:20 - 20:05:40 范围内的数据进行匹配
      .between(Time.seconds(-10),Time.seconds(10))
      .process(new ProcessJoinFunction[IllegalCarInfo,PoliceInfo,String] {
        override def processElement(left: IllegalCarInfo, right: PoliceInfo, ctx: ProcessJoinFunction[IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
            out.collect(s"违法车辆:${left.car} 已经出警,警号:${right.policeId},事故时间:${left.eventTime},出警时间:${right.reporTime}")
        }
      }).print()

    env.execute()
  }
}

(2)使用两个流的connect,可以监测事故超时出警信息

object PoliceAnalysis2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //设置并行度为1
    env.setParallelism(1)

    val props  = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","test6")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    //获取监控违法车辆信息
    //    val illegalDStream: DataStream[IllegalCarInfo] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic1",new SimpleStringSchema(),props))
    val illegalDStream: DataStream[IllegalCarInfo] = env.socketTextStream("mynode5", 8888).map(line => {
      val arr: Array[String] = line.split(",")
      IllegalCarInfo(arr(0), arr(1), arr(2).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[IllegalCarInfo](Time.seconds(3)) {
      override def extractTimestamp(element: IllegalCarInfo): Long = element.eventTime
    })

    //获取出警信息
    //    val policeDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic2",new SimpleStringSchema(),props))
    val policeDStream: DataStream[PoliceInfo] = env.socketTextStream("mynode5", 9999).map(line => {
      val arr: Array[String] = line.split(",")
      PoliceInfo(arr(0), arr(1), arr(2), arr(3).toLong)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[PoliceInfo](Time.seconds(2)) {
      override def extractTimestamp(element: PoliceInfo): Long = element.reporTime
    })

    //定义侧流
    val ic = new OutputTag[IllegalCarInfo]("IllegalCarInfo")
    val pi = new OutputTag[PoliceInfo]("PoliceInfo")

    //以上违法记录信息 与 交警出警信息 进行关联
    val result: DataStream[String] = illegalDStream.keyBy(_.car).connect(policeDStream.keyBy(_.car))
      .process(new KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String] {

        //这里每个 key 都会对应一个状态
        lazy private val illegalCarInfoState: ValueState[IllegalCarInfo] = getRuntimeContext.getState(new ValueStateDescriptor[IllegalCarInfo]("illegalCarInfoState", classOf[IllegalCarInfo]))
        lazy private val policeInfoState: ValueState[PoliceInfo] = getRuntimeContext.getState(new ValueStateDescriptor[PoliceInfo]("policeInfoState", classOf[PoliceInfo]))

        //先有违法信息
        override def processElement1(value: IllegalCarInfo, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
          //获取当前车辆的出警信息
          val policeInfo: PoliceInfo = policeInfoState.value()
          if (policeInfo != null) { //说明有对应的出警记录,说明 当前违法数据迟到了
            //输出结果
            out.collect(s"违法车辆:${value.car} 已经出警,警号:${policeInfo.policeId},事故时间:${value.eventTime},出警时间:${policeInfo.reporTime}")
            //删除出警状态
            policeInfoState.clear()
            //删除出警记录定时器
            ctx.timerService().deleteEventTimeTimer(policeInfo.reporTime + 10000)

          } else { //没有对应的出警记录
            //进来当前车辆的违法信息后,放入状态中
            illegalCarInfoState.update(value)
            //当前车辆有了违法记录就构建定时器,定时器设置当前时间时间后10s触发,除非10s内删除对应的定时器就不会触发
            ctx.timerService().registerEventTimeTimer(value.eventTime + 10000) //这里方便演示设置定时器时长为10s
          }
        }

        //后有出警状态,也有可能出警状态先到
        override def processElement2(value: PoliceInfo, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#Context, out: Collector[String]): Unit = {
          val illegalCarInfo: IllegalCarInfo = illegalCarInfoState.value()
          if (illegalCarInfo != null) {
            //对应当前车辆的违法记录中有数据,说明这个车辆有了对应的出警记录
            println(s"这里打印就是测试是不是一个key有一个状态: 违法车辆中的状态car 是 ${illegalCarInfo.car} ,出警记录中的车辆是${value.car}")
            //有对应的出警记录就正常输出数据即可:
            out.collect(s"违法车辆:${illegalCarInfo.car} 已经出警,警号:${value.policeId},事故时间:${illegalCarInfo.eventTime},出警时间:${value.reporTime}")
            //清空当前车辆违法状态
            illegalCarInfoState.clear()
            //删除违法记录定时器
            ctx.timerService().deleteEventTimeTimer(illegalCarInfo.eventTime + 10000) //删除定时器
          } else { //有了出警记录,但是没有违法记录
            //这里有了出警状态,但是没有发现当前车辆违法记录,说明 出警状态数据早到了,违法记录 迟到了
            //针对这种情况,将出警记录数据放入出警状态中
            policeInfoState.update(value)

            //当前车辆有了出警就构建定时器,定时器设置当前时间时间后10s触发,除非10s内删除对应的定时器就不会触发
            ctx.timerService().registerEventTimeTimer(value.reporTime + 10000) //这里方便演示设置定时器时长为10s
          }
        }

        //触发定时器 定时器触发后会调用onTimer 方法 ,timestamp : 触发器触发时间
        override def onTimer(timestamp: Long, ctx: KeyedCoProcessFunction[String, IllegalCarInfo, PoliceInfo, String]#OnTimerContext, out: Collector[String]): Unit = {
          //获取 违法记录信息状态
          val illegalCarInfo: IllegalCarInfo = illegalCarInfoState.value()

          //获取 出警记录信息状态
          val policeInfo: PoliceInfo = policeInfoState.value()

          if (illegalCarInfo != null) {
            //没有出警记录 ,输出到侧流
            ctx.output(ic, illegalCarInfo)
          }

          if (policeInfo != null) { //没有违法信息  ,输出到侧流
            ctx.output(pi, policeInfo)
          }
          //清空以上两种状态
          illegalCarInfoState.clear()
          policeInfoState.clear()
        }
      })

    result.print("正常流")

    val illegalCarInfoDStream: DataStream[IllegalCarInfo] = result.getSideOutput(ic)
    val policeInfoDStream: DataStream[PoliceInfo] = result.getSideOutput(pi)

    illegalCarInfoDStream.print("没有出警记录,有违法记录的信息:")
    policeInfoDStream.print("有出警记录,没有违法记录车辆信息:")


    env.execute()

  }

}

4.4 违法车辆轨迹跟踪

城市交通中,有些车辆需要实时轨迹跟踪,这些需要跟踪轨迹的车辆,保存在城市违法表中:t_violation_list。系统需要实时打印这些车辆经过的卡口,并且把轨迹数据插入数据表t_track_info(Hbase数据库)中。根据前面所学的知识,我们应该使用Flink中的广播状态完成该功能。

需要在hbase中创建表 t_track_info:create ‘t_track_info’,‘cf1’
清空hbase表命令:truncate ‘t_track_info’;

完整的代码:

object RtCarTracker {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  
    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","test7")

//    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props))
    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    val mapStateDescriptor = new MapStateDescriptor[String, IllegalCarInfo]("MapStateDescriptor", classOf[String], classOf[IllegalCarInfo])
    //获取广播流
    val bcDstream: BroadcastStream[IllegalCarInfo] = env.addSource(new JdbcReadSource("IllegalCarInfo")).map(pojo=>{
      pojo.asInstanceOf[IllegalCarInfo]
    }).broadcast(mapStateDescriptor)

    //连接两个流
    val result: DataStream[CarThroughMonitorInfo] = mainDStream.connect(bcDstream).process(new BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo] {
      override def processElement(value: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo]#ReadOnlyContext, out: Collector[CarThroughMonitorInfo]): Unit = {
        val bcState: ReadOnlyBroadcastState[String, IllegalCarInfo] = ctx.getBroadcastState(mapStateDescriptor)
        if (bcState.get(value.car) != null) {
          out.collect(new CarThroughMonitorInfo(value.car, value.actionTime, value.monitorId, value.roadId, value.areaId))
        }
      }

      override def processBroadcastElement(value: IllegalCarInfo, ctx: BroadcastProcessFunction[TrafficLog, IllegalCarInfo, CarThroughMonitorInfo]#Context, out: Collector[CarThroughMonitorInfo]): Unit = {
        ctx.getBroadcastState(mapStateDescriptor).put(value.car, value)
      }
    })
    result.countWindowAll(20).process(new ProcessAllWindowFunction[CarThroughMonitorInfo,util.List[Put],GlobalWindow] {
      override def process(context: Context, elements: Iterable[CarThroughMonitorInfo], out: Collector[util.List[Put]]): Unit = {
        val list = new util.ArrayList[Put]()
        for(elem<-elements){
          val put = new Put(Bytes.toBytes(elem.car + "_" + elem.date))
          put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("area_id"),Bytes.toBytes(elem.areaID))
          put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("road_id"),Bytes.toBytes(elem.roadID))
          put.addColumn(Bytes.toBytes("cf1"),Bytes.toBytes("monitor_id"),Bytes.toBytes(elem.monitorId))
          list.add(put)
        }
        out.collect(list)
      }
    }).addSink(new HBaseWriteSink())
    env.execute()
  }

}

HBaseSink:

class HBaseWriteSink extends RichSinkFunction[java.util.List[Put]]{
  //打开HBase连接
  var config :conf.Configuration = _
  var conn :Connection = _
  override def open(parameters: Configuration): Unit = {
    config = HBaseConfiguration.create();
    config.set("hbase.zookeeper.quorum","mynode3:2181,mynode4:2181,mynode5:2181")
    conn = ConnectionFactory.createConnection(config)
  }

  override def close(): Unit = {
    conn.close()
  }

  override def invoke(value: java.util.List[Put], context: SinkFunction.Context[_]): Unit = {
    //获取HBase表,在HBase中执行 : create 't_track_info','cf1'
    val table: Table = conn.getTable(TableName.valueOf("t_track_info"))
    table.put(value)
  }
}

从HBase中读取车辆轨迹api:

/**
  * 从Hbase中扫描 rowkey 范围 查询数据
  */
object GetDataFromHBase {
  def main(args: Array[String]): Unit = {
    //获取连接
    val conf = HBaseConfiguration.create();
    conf.set("hbase.zookeeper.quorum", "mynode3:2181,mynode4:2181,mynode5:2181");
    val conn = ConnectionFactory.createConnection(conf);
    //获取表
    val table = conn.getTable(TableName.valueOf("t_track_info"));


    //设置扫描 rowkey 范围
    val scan = new Scan("鲁A65552_1602381959000".getBytes(),"鲁A65552_1602382000000".getBytes())

    //查询获取结果
    val scanner: ResultScanner = table.getScanner(scan)

    //获取结果一条数据
    var result :Result = scanner.next()
    while(result != null){
      val row: Array[Byte] = result.getRow
      val cells: util.List[Cell] = result.listCells()
      import scala.collection.JavaConverters._
      for (cell <- cells.asScala) {
        val rowKey: Array[Byte] = CellUtil.cloneRow(cell)
        val family: Array[Byte] = CellUtil.cloneFamily(cell)
        val qualifier: Array[Byte] = CellUtil.cloneQualifier(cell)
        val value: Array[Byte] = CellUtil.cloneValue(cell)
        println(s"rowKey:${Bytes.toString(row)},列族名称为:${Bytes.toString(family)},列名称为:${Bytes.toString(qualifier)},列值为:${Bytes.toString(value)}")
      }
      result  = scanner.next()
    }
  }

}

5. 实时车辆布控

在交警部门的指挥中心应该实时的知道整个城市的上路车辆情况,需要知道每个区一共有多少辆车?现在是否有大量的外地车进入城市等等。本模块主要是针对整个城市整体的实时车辆情况统计。

5.1 实时车辆分布情况

实时车辆分布情况,是指在一段时间内(比如:10分钟)整个城市中每个区分布多少量车。这里要注意车辆的去重,因为在10分钟内一定会有很多的车,经过不同的卡口。这些车牌相同的车,我们只统计一次。其实就是根据车牌号去重。

代码如下:

object RTCarAnalysis1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    //设置并行度
    env.setParallelism(1)

    //设置事件时间为当前时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","test_group8")

    //读取Kafka中的数据
    val mainDStream: KeyedStream[TrafficLog, String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).keyBy(_.areaId)
    mainDStream.timeWindow(Time.minutes(1))
      .process(new ProcessWindowFunction[TrafficLog,String,String,TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[TrafficLog], out: Collector[String]): Unit = {
          val set = scala.collection.mutable.Set[String]()
          for(elem <- elements){
            set.add(elem.car)
          }
          out.collect(s"开始时间:${context.window.getStart} - 结束时间:${context.window.getEnd},区域ID:${key},车辆总数 = ${set.size}")
        }
      }).print()

    env.execute()

  }

}

5.2 布隆过滤器(Bloom Filter)

在上节的例子中,我们把所有数据的车牌号car都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?

把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用redis这种内存级k-v数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上千万级,亿级的卡口车辆数据呢?(假设)要去重计算。

如果放到redis中,假设有6千万车牌号(每个10-20字节左右的话)可能需要几G的空间来存储。当然放到redis中,用集群进行扩展也不是不可以,但明显代价太大了。

一个更好的想法是,其实我们不需要完整地存车辆的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个车辆的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)。

布隆过滤器的原理:
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilistic data structure),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存在或者可能存在”。
它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是0,就是1。相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少。我们的目标就是,利用某种方法(一般是Hash函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是1,不存在则为0。
Bloom Filter是一种空间效率很高的随机数据结构,它利用位数组很简洁地表示一个集合,并能判断一个元素是否属于这个集合。Bloom Filter的这种高效是有一定代价的:在判断一个元素是否属于某个集合时,有可能会把不属于这个集合的元素误认为属于这个集合(false positive)。因此,Bloom Filter不适合那些“零错误”的应用场合。而在能容忍低错误率的应用场合下,Bloom Filter通过极少的错误换取了存储空间的极大节省。

简单的例子:
下面是一个简单的 Bloom filter 结构,开始时集合内没有元素:
在这里插入图片描述
当来了一个元素 a,进行判断,这里需要一个(或者多个)哈希函数然后二进制运算(模运算),计算出对应的比特位上为 0 ,即是 a 不在集合内,将 a 添加进去:
在这里插入图片描述
之后的元素,要判断是不是在集合内,也是同 a 一样的方法,只有对元素哈希后对应位置上都是 1 才认为这个元素在集合内(虽然这样可能会误判):
在这里插入图片描述
随着元素的插入,Bloom filter 中修改的值变多,出现误判的几率也随之变大,当新来一个元素时,满足其在集合内的条件,即所有对应位都是 1 ,这样就可能有两种情况,一是这个元素就在集合内,没有发生误判;还有一种情况就是发生误判,出现了哈希碰撞,这个元素本不在集合内。
在这里插入图片描述
本项目中可以采用google 提供的BoolmFilter进行位图计算和判断:
BloomFilter.create(Funnels.stringFunnel(),100000),Funnels.stringFunnel()指的是将对什么类型的数据使用布隆过滤器。这里我们使每个区域都对应一个布隆过滤器,位长度为100000,经过测试,可以对100万左右的数量进行去重判断,每个布隆过滤器可以认为相当于一个数组,大概占用空间为100K。

代码如下:

object RTCarAnalysis2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    import org.apache.flink.streaming.api.scala._

    //设置并行度
//    env.setParallelism(1)

    //设置事件时间为当前时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","test_group9")

    //读取Kafka中的数据
    val mainDStream: KeyedStream[TrafficLog, String] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).keyBy(_.areaId)

    //存储 区域 - 车辆数 map
    val map = scala.collection.mutable.Map[String,BloomFilter[CharSequence]]()

    mainDStream.timeWindow(Time.minutes(1))
      .aggregate(
        new AggregateFunction[TrafficLog,Long,Long] {
        override def createAccumulator(): Long = 0L

        override def add(value: TrafficLog, accumulator: Long): Long = {
          //判断前Map中是否包含 area_id
          if(map.contains(value.areaId)){
            //如果包含当前区域,获取当前key对应的数值,并判断
            // 车辆是否重复,
            val bool: Boolean = map.get(value.areaId).get.mightContain(value.car)
            if(!bool){//如果不包含,就加1
              //将当前车辆设置到布隆过滤器中
              map.get(value.areaId).get.put(value.car)
              accumulator + 1L
            }else{
              accumulator
            }
          }else{
            //如果不包含当前 area_id,就设置map
            map.put(value.areaId,BloomFilter.create(Funnels.stringFunnel(),100000))
            //将当前车辆设置到布隆过滤器中
            map.get(value.areaId).get.put(value.car)
            //返回1
            accumulator+ 1L
          }
        }

        override def getResult(accumulator: Long): Long = accumulator

        override def merge(a: Long, b: Long): Long = a+b
      },
      new WindowFunction[Long,String,String,TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
          out.collect(s"窗口起始时间: ${window.getStart} -${window.getEnd} ,区域:${key},车辆总数:${input.last}")

        }
      }
      ).print()

    env.execute()


  }

}

5.3 实时外地车分布情况

这个功能和前面的一样,实时统计外地车在一段时间内,整个城市的分布情况,整个城市中每个区多少分布多少量外地车,即统计每个区域实时外地车分布(每分钟统计一次)

代码如下:

object NonLocalCarAnalysis {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("group.id","grouptest10")


    //设置 BloomFilter Map
    val map = scala.collection.mutable.Map[String,BloomFilter[CharSequence]]()

    env.addSource(new FlinkKafkaConsumer[String]("traffic-topic",new SimpleStringSchema(),props).setStartFromEarliest())
      .map(line=>{
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong,arr(1),arr(2),arr(3),arr(4).toDouble,arr(5),arr(6))

      }).filter(!_.car.startsWith("京"))
      .keyBy(_.areaId)
      .timeWindow(Time.minutes(1))
      //apply 全量函数 ,process:全量函数,reduce 既有增量也有全量 ,aggregate 既有增量,也有全量
      .aggregate(new AggregateFunction[TrafficLog,Long,Long] {
        override def createAccumulator(): Long = 0L

        override def add(value: TrafficLog, accumulator: Long): Long = {
          //判断当前区域是否在map中
          if(map.contains(value.areaId)){//包含当前areaID
            val bool: Boolean = map.get(value.areaId).get.mightContain(value.car)
            if(bool){//布隆过滤器中包含当前车辆数据
              accumulator
            }else{//布隆过滤器中不包含当前车辆数据
              map.get(value.areaId).get.put(value.car)
              accumulator +1L
            }
          }else{
            map.put(value.areaId,BloomFilter.create(Funnels.stringFunnel(),100000))
            map.get(value.areaId).get.put(value.car)
            accumulator +1
          }
        }

        override def getResult(accumulator: Long): Long = accumulator

        override def merge(a: Long, b: Long): Long = a+b
      },new WindowFunction[Long,String,String,TimeWindow] {
        override def apply(key: String, window: TimeWindow, input: Iterable[Long], out: Collector[String]): Unit = {
          out.collect(s"起始时间段:${window.getStart} - ${window.getEnd},区域:${key},车辆数:${input.last}")
        }
      }).print()
    env.execute()
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/275144.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

hive在执行elect count(*) 没有数据显示为0(实际有数据)

set hive.compute.query.using.statsfalse; 是 Hive 的一个配置选项。它的含义是禁用 Hive 在执行查询时使用统计信息。 在 Hive 中&#xff0c;统计信息用于优化查询计划和执行。当该选项设置为 false 时&#xff0c;Hive 将不会使用任何统计信息来帮助决定查询的执行计划。这…

Flink1.17实战教程(第六篇:容错机制)

系列文章目录 Flink1.17实战教程&#xff08;第一篇&#xff1a;概念、部署、架构&#xff09; Flink1.17实战教程&#xff08;第二篇&#xff1a;DataStream API&#xff09; Flink1.17实战教程&#xff08;第三篇&#xff1a;时间和窗口&#xff09; Flink1.17实战教程&…

《深入理解Java虚拟机(第三版)》读书笔记:虚拟机类加载机制、虚拟机字节码执行引擎、编译与优化

下文是阅读《深入理解Java虚拟机&#xff08;第3版&#xff09;》这本书的读书笔记&#xff0c;如有侵权&#xff0c;请联系删除。 文章目录 第6章 类文件结构第7章 虚拟机类加载机制7.2 类加载的时机7.3 类加载的过程7.4 类加载器7.5 Java模块化系统 第8章 虚拟机字节码执…

Redis 核心知识总结

Redis 核心知识总结 认识 Redis 什么是 Redis&#xff1f; Redis 是一个由 C 语言开发并且基于内存的键值型数据库&#xff0c;对数据的读写操作都是在内存中完成&#xff0c;因此读写速度非常快&#xff0c;常用于缓存&#xff0c;消息队列、分布式锁等场景。 有以下几个特…

web三层架构

目录 1.什么是三层架构 2.运用三层架构的目的 2.1规范代码 2.2解耦 2.3代码的复用和劳动成本的减少 3.各个层次的任务 3.1web层&#xff08;表现层) 3.2service 层(业务逻辑层) 3.3dao 持久层(数据访问层) 4.结合mybatis简单实例演示 1.什么是三层架构 三层架构就是把…

Node.js--》node环境配置及nvm和nvm-desktop安装教程

博主最近换了台新电脑&#xff0c;环境得从零开始配置&#xff0c;所以以下是博主从一台纯净机中配置环境&#xff0c;绝对的小白教程&#xff0c;大家第一次安装完全可以参考我的过程&#xff0c;闲话少说&#xff0c;直接开始&#xff01;&#xff01;&#xff01; 接下来介绍…

Linux下安装QQ

安装步骤&#xff1a; 1.进入官网&#xff1a;QQ Linux版-轻松做自己 2.选择版本&#xff1a;X86版下载dep 3安装qq 找到qq安装包位置&#xff0c;然后右击在终端打开输入安装命令&#xff0c;然后点击回车 sudo dpkg -i linuxqq_3.2.0-16736_amd64.deb 卸载qq 使用命令…

如何使用Linux docker方式快速安装Plik并结合内网穿透实现公网访问

文章目录 1. Docker部署Plik2. 本地访问Plik3. Linux安装Cpolar4. 配置Plik公网地址5. 远程访问Plik6. 固定Plik公网地址7. 固定地址访问Plik 本文介绍如何使用Linux docker方式快速安装Plik并且结合Cpolar内网穿透工具实现远程访问&#xff0c;实现随时随地在任意设备上传或者…

ALSA学习(4)——Control设备的创建

参考博客&#xff1a; https://blog.csdn.net/DroidPhone/article/details/6409983 &#xff08;下面的内容基本是原博主的内容&#xff0c;我只是修改了一些格式之类的&#xff09; 文章目录 一、Control接口二、Controls的定义三、Control的名字四、访问标志&#xff08;ACC…

C# NLua Winform 热更新

一、概述 NLua 是一个用于 .NET 平台的 Lua 脚本绑定库。它允许在 C# 代码中嵌入 Lua 脚本&#xff0c;并允许两者之间进行交互。NLua 的主要特点包括&#xff1a; 轻量级&#xff1a;NLua 是一个轻量级的库&#xff0c;易于集成到现有的 .NET 项目中。动态类型&#xff1a;L…

2024年【裂解(裂化)工艺】考试报名及裂解(裂化)工艺考试总结

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 裂解&#xff08;裂化&#xff09;工艺考试报名考前必练&#xff01;安全生产模拟考试一点通每个月更新裂解&#xff08;裂化&#xff09;工艺考试总结题目及答案&#xff01;多做几遍&#xff0c;其实通过裂解&#…

Tuxera NTFS for Mac2024免费Mac读写软件下载教程

在日常生活中&#xff0c;我们使用Mac时经常会遇到外部设备不能正常使用的情况&#xff0c;如&#xff1a;U盘、硬盘、软盘等等一系列存储设备&#xff0c;而这些设备的格式大多为NTFS&#xff0c;Mac系统对NTFS格式分区存在一定的兼容性问题&#xff0c;不能正常读写。 那么什…

如何从 DSA 切换到 PMax 以使您的 Google 付费广告面向未来

为了在 Google Ads 不可避免的过渡期之前&#xff0c;我们将介绍如何从动态搜索广告切换到效果最大化广告 如何从 DSA 切换到 PMax 以使您的 Google 付费广告面向未来 变化是唯一不变的&#xff0c;尤其是在数字广告中——您可能听说过一些关于动态搜索广告 &#xff08;DSA&…

第27关 在K8s集群上使用Helm3部署最新版本v2.10.0的私有镜像仓库Harbor

------> 课程视频同步分享在今日头条和B站 大家好&#xff0c;我是博哥爱运维。 在前面的几十关里面&#xff0c;博哥在k8s上部署服务一直都是用的docker hub上的公有镜像&#xff0c;对于企业服务来说&#xff0c;有些我们是不想把服务镜像放在公网上面的&#xff1b; 同时…

25、Qt设备识别(简单的密钥生成器)

一、说明 在很多商业软件中&#xff0c;需要提供一些可以试运行的版本&#xff0c;这样就需要配套密钥机制来控制&#xff0c;纵观大部分的试用版软件&#xff0c;基本上采用以下几种机制来控制。 1、远程联网激活&#xff0c;每次启动都联网查看使用时间等&#xff0c;这种方…

顶配版SAM:由分割一切迈向感知一切

文章目录 0. 前言1. 论文地址1.1 项目&代码1.2 模型地址1.3 Demo 2. 模型介绍2.1 亮点2.2 方法 3. 量化结果、可视化展示Reference 0. 前言 现有的视觉分割基础模型&#xff0c;如 SAM 及其变体&#xff0c;集中优势在形状、边缘等初级定位感知&#xff0c;或依赖外部模型…

【Android】使用android studio查看内置数据库信息

背景 需要用到android db 逻辑存储用户信息等等。 使用 在 App inspection 工具中查看该 app 内的 db 数据 sql执行 在新的查询框内解析查询即可知道当前的数据信息。 官方文档-使用 Database Inspector 调试数据库

【计算机毕业设计】SSM医疗药品采购系统

项目介绍 ssm医疗药品采购系统。主要功能有&#xff1a; 用户管理&#xff1a;管理员列表&#xff1b; 采购管理&#xff1a;采购列表&#xff1b; 药品出库&#xff1a;药品出库&#xff1b; 库存管理&#xff1a;库存统计&#xff1b; 数据维护&#xff1a;药品列表、仓库…

【Unity入门】PlayerPrefs的简介与使用

目录 PlayerPrefs储存位置用例注意事项 PlayerPrefs PlayerPrefs 是Unity内置的一个静态类&#xff0c;可以用于存储一些简单的数据类型&#xff1a;int ,string ,float。 分别对应的函数为&#xff1a; SetInt()&#xff1a;保存整型数据GetInt()&#xff1a;读取整形数据Se…

MSF(Metasploit Framework)详细教程

一. 简介 Metasploit 是一个开源的渗透测试开源软件&#xff0c;也是一个逐步发展成熟的漏洞研究与渗透测试代码开发平台&#xff0c;此外也将成为支持整个渗透测试过程的安全技术集成开发与应用环境&#xff0c;2009年10月&#xff0c;Metasploit项目被一家渗透测试技术领域的…