1、Spark Streaming清洗服务,接收kafka中Topic为“task_ATC”中的数据,保存在MySQL中。
- 打开SpringBoot项目BigData-Etl-KongGuan
请认真阅读:在前面的“使用Spark清洗统计业务数据并保存到数据库中”任务阶段中应该已经完成了所有Topic的数据的清洗,如果已经完成了扇区数据的清洗工作,则此步骤可以跳过,如果前面没有完成扇区数据的清洗工作,可参照“使用Spark清洗统计业务数据并保存到数据库中”任务阶段的其他Topic数据的清洗过程,完成扇区数据的清洗。
- 代码路径:BigData-Etl-KongGuan/src/main/java/com/qrsoft/etl/spark/SparkUtil.java,扇区数据清洗的核心代码如下:
/**
* 业务处理
* @param jsonObject 扇区数据
*/
public void TaskAtc(JSONObject jsonObject) throws Exception {
ATCDao atcD = new ATCDao();
ATCEntity atc = new ATCEntity();
//设置清洗方式
String sectorName = null;
try {
sectorName = jsonObject.getString("PLAN_SECTOR_NAME");
} catch (Exception e) {
//logger.info(" ATC无扇区数据: [{}]");
System.out.println("ATC无扇区数据");
}
try {
//根据扇区,查询是否已经开始对该扇区进行统计
String ACID = jsonObject.getString("ACID");
if(sectorName!=null&&!sectorName.equals("")){
boolean bool = atcD.isExistThisAtc(ACID);
atc.setAcId(jsonObject.getString("ACID"));
atc.setAtcTime(jsonObject.getString("ATC_TIME"));
atc.setExecuteDate(jsonObject.getString("EXECUTE_DATE"));
atc.setPlanSectorName(jsonObject.getString("PLAN_SECTOR_NAME"));
if (bool) {
//存在,在原来基础上+1,修改数据库中该航迹数量
atcD.updateAnAtcMsg(atc);
} else {
//尚未进行统计 创建一个统计信息
atcD.createAnAtcMsg(atc);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
- 上面的代码中会使用到 BigData-Etl-KongGuan/src/main/java/com/qrsoft/etl/dao/entity/ATCEntity.java 类和BigData-Etl-KongGuan/src/main/java/com/qrsoft/etl/dao/ATCDao.java类。
其中ATCEntity类是用于保存扇区数据的实体类:
package com.qrsoft.etl.dao.entity;
import lombok.Data;
import java.io.Serializable;
public class ATCEntity implements Serializable {
private Integer id;
private String acId;
private String atcTime;
private String executeDate;
private String planSectorName;
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getAcId() { return acId;}
public void setAcId(String acId) { this.acId = acId; }
public String getAtcTime() { return atcTime; }
public void setAtcTime(String atcTime) { this.atcTime = atcTime; }
public String getExecuteDate() { return executeDate; }
public void setExecuteDate(String executeDate) { this.executeDate = executeDate; }
public String getPlanSectorName() { return planSectorName; }
public void setPlanSectorName(String planSectorName) { this.planSectorName = planSectorName; }
}
ATCDao类是扇区数据的数据访问类,包括扇区统计等方法:
package com.qrsoft.etl.dao;
import com.qrsoft.etl.dao.entity.ATCEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
public class ATCDao extends IBaseDao {
private final static Logger logger = LoggerFactory.getLogger(ATCDao.class);
/**
* 根据航班号查询是否该扇区有该航班的统计数据存在
* @param acid 航班号
*/
public boolean isExistThisAtc(String acid){
String sql = " SELECT COUNT(*) from atc_number where ACID='"+acid+"';";
Object[] params = {};
ResultSet comRs = this.execute(sql, params);
return getBool(comRs);
}
//修改指定航班的扇区统计信息
public void updateAnAtcMsg(ATCEntity atc) {
String sql = "update atc_number set PLAN_SECTOR_NAME='"+atc.getPlanSectorName()+"',ATC_TIME='"+atc.getAtcTime()+"',EXECUTE_DATE='"+atc.getExecuteDate()+"' where ACID='"+atc.getAcId()+"'; ";
Object[] params = {};
try {
this.update(sql, params);
} catch (SQLException e) {
logger.info("修改指定航班的扇区统计信息失败! "+atc.getAcId());
e.printStackTrace();
}
}
//创建新航班扇区的统计信息
public void createAnAtcMsg(ATCEntity atc) {
String sql = "insert into atc_number (ACID,ATC_TIME,EXECUTE_DATE,PLAN_SECTOR_NAME) values('"+atc.getAcId()+"','"+atc.getAtcTime()+"','"+atc.getExecuteDate()+"','"+atc.getPlanSectorName()+"');";
Object[] params = {};
try {
this.update(sql, params);
} catch (SQLException e) {
logger.info("创建航班的扇区的统计信息失败!"+atc.getAcId());
e.printStackTrace();
}
}
}
2、前端Vue页面设计,并完成数据绑定,展示扇区数据
- 在Vue项目kongguan_web中src/components文件夹下面新建Section.vue,添加页面div设计代码,并对数据进行循环绑定操作:
<template>
<div>
<el-row>
<el-col :xs="12" :sm="12" :lg="{span: '4'}" v-for="item in resultVal">
<div class="grid-content bg-purple">
<div class="top_div_css"></div>
<div class="centen">
<div class="to_titls">{{item.planSectorName}}扇区</div>
<div class="to_titls_two">{{item.count}}架</div>
</div>
</div>
</el-col>
</el-row>
</div>
</template>
... 接下页 ...
- 在Vue项目中kongguan_web/src/components/Section.vue页面中添加css布局设计代码:
解释
... 接上页 ...
<style >
/*.el-col-lg-4-8 {*/
/* width: 20%;*/
/*}*/
.centen{
width: 100%;
}
.to_titls{
margin-top: 15px;
text-align: center;
font-size: 20px;
color: #676767;
}
.to_titls_two{
margin-top: 15px;
text-align: center;
font-size: 25px;
color: #307be3;
}
.top_div_css{
width: 100%;
height: 10px;
background-color: #4eb9f8;
border-top-left-radius: 2em;
border-top-right-radius: 2em;
}
.el-row {
margin-bottom: 20px;
&:last-child {
margin-bottom: 0;
}
}
.el-col {
border-radius: 2em;
}
.bg-purple-dark {
background: #99a9bf;
}
.bg-purple {
background: #ffffff;
}
.bg-purple-light {
background: #e5e9f2;
}
.grid-content {
margin-top: 13px;
border-radius: 1em;
min-height: 100px;
margin-left: 15px;
margin-right: 15px;
/*width: 200px;*/
background-color: #ffffff;
border: 1px solid #ebedf2;
box-shadow: 3px 3px 3px 3px #ebedf2;
}
.row-bg {
padding: 10px 0;
background-color: #f9fafc;
}
</style>
... 接下页 ...
- 在kongguan_web/src/components/Section.vue页面中引入 api,从后台获取数据:
... 接上页 ...
<script>
import echarts from "echarts";
import {getSectionVal } from "../api/user/api.js";
export default {
name: "Home",
data() {
return {
chart: null,
OneSection:"G",
TwoSection:"K",
TwoSection:"E",
TwoSection:"P",
resultVal:{}
};
},
mounted() {
this.loadData();
},
methods: {
initChart() {},
loadData(){
getSectionVal().then(data => {
if (data.isSuccess) {
var res = data.result;
this.resultVal =res;
} else {
this.$message.error("数据获取失败");
}
});
}
}
};
</script>
在上面的页面代码中,首先引入了api/user/api.js组件,在api/user/api.js中添加如下代码,设置请求方式,请求的服务端地址和请求的参数等:
import request from '../../utils/request'
const baseUrl="/api"
//…
//… 其他 function,略。 …
//…
//获取各扇区航班数
export function getSectionVal(data){
return request({
url:baseUrl+"/atc/findSectorSortie",
method:"get",
data:data
})
}
3、后端项目 BigData-KongGuan 的处理过程
- 首先,创建一个数据实体类com.qrsoft.entity.Atc,用于操作和保存扇区数据,代码所在位置BigData-KongGuan/src/main/java/com/qrsoft/entity/Atc.java,内容如下:
package com.qrsoft.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
@Data
@AllArgsConstructor
@NoArgsConstructor
@TableName("atc_number")
public class Atc implements Serializable {
@TableId(value = "id",type = IdType.AUTO)
private Integer id;
@TableField(value = "ACID")
private String acId;
@TableField(value = "ATC_TIME")
private String atcTime;
@TableField(value = "EXECUTE_DATE")
private String executeDate;
@TableField(value = "PLAN_SECTOR_NAME")
private String planSectorName;
@TableField(exist = false)
private String count;
}
- 然后,编写扇区数据的数据访问接口BigData-KongGuan/src/main/java/com/qrsoft/mapper/AtcMapper.java,继承BaseMapper类,代码如下:
package com.qrsoft.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.qrsoft.entity.Atc;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Select;
import java.util.List;
@Mapper
public interface AtcMapper extends BaseMapper<Atc> {
@Select("select PLAN_SECTOR_NAME,COUNT(*) as count from atc_number GROUP BY PLAN_SECTOR_NAME;")
List<Atc> findSectorSortie();
@Select("select EXECUTE_DATE from atc_number group by EXECUTE_DATE order by EXECUTE_DATE desc limit 19;")
List<String> findATCTime();
@Select("select PLAN_SECTOR_NAME,count(*) as count from atc_number where EXECUTE_DATE = #{executeTime} and PLAN_SECTOR_NAME = #{sectorName}")
Atc findATCTime2(String executeTime,String sectorName);
}
- 然后,建立BigData-KongGuan/src/main/java/com/qrsoft/service/AtcService.java类,在类中使用baseMapper调用findSectorSortie方法,查询所有扇区航班架次:
package com.qrsoft.service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.qrsoft.common.Result;
import com.qrsoft.common.ResultConstants;
import com.qrsoft.entity.Atc;
import com.qrsoft.entity.MultiRadar;
import com.qrsoft.mapper.AtcMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@Service
public class AtcService extends ServiceImpl<AtcMapper, Atc> {
@Autowired
private MultiRadarService multiRadarService;
/**
* 查询所有扇区航班架次
*/
public Result findSectorSortie() {
List<Atc> sectorSortie = baseMapper.findSectorSortie();
return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS, sectorSortie);
}
/**
* 根据扇区号查询架次
* @param planSectorName
*/
public Result findLocusCount(String planSectorName) {
QueryWrapper<MultiRadar> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("section",planSectorName);
int count = multiRadarService.count(queryWrapper);
return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS, count);
}
/**
* 扇区架次数动态统计(饼状图)
*/
public Result findATCTime() {
List<String> sectorName = new ArrayList<>();
sectorName.add("K");
sectorName.add("S");
sectorName.add("E");
sectorName.add("P");
sectorName.add("G");
List<String> executeTime = baseMapper.findATCTime();
List list = new ArrayList();
for (int i = 0; executeTime.size() > i; i++) {
ArrayList<Object> objects = new ArrayList<>();
for (int j = 0; sectorName.size() > j; j++) {
Atc atcTime2 = baseMapper.findATCTime2(executeTime.get(i), sectorName.get(j));
HashMap<String, Object> map = new HashMap<>();
if (atcTime2.getPlanSectorName() != null) {
map.put(atcTime2.getPlanSectorName(), atcTime2.getCount());
}else {
map.put(sectorName.get(j),0);
}
objects.add(map);
}
list.add(objects);
}
return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS, list);
}
}
- 上面代码中,会依赖com.qrsoft.service.MultiRadarService类,并使用其继承的ServiceImpl<T,U>中的count()方法,该方法用于根据扇区号查询架次:
package com.qrsoft.service;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.qrsoft.common.Result;
import com.qrsoft.common.ResultConstants;
import com.qrsoft.entity.MultiRadar;
import com.qrsoft.mapper.MultiRadarMapper;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
@Service
public class MultiRadarService extends ServiceImpl<MultiRadarMapper, MultiRadar> {
/**
* 查询综合航迹数据
*/
public Result findMultRadar(){
List<MultiRadar> multiRadars = baseMapper.selectList(null);
return new Result(ResultConstants.SUCCESS, ResultConstants.C_SUCCESS,multiRadars);
}
}
- 建立扇区数据访问的控制器AtcController.java,代码路径:BigData-KongGuan/src/main/java/com/qrsoft/controller/AtcController.java,内容如下:
package com.qrsoft.controller;
import com.qrsoft.common.Result;
import com.qrsoft.service.AtcService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@Api(tags = "扇区操作类")
@RestController
@RequestMapping("/api/atc")
public class AtcController {
@Autowired
private AtcService service;
/**
* 获取各扇区航班数
*/
@ApiOperation(value = "获取各扇区航班数")
@GetMapping("/findSectorSortie")
public Result findSectorSortie(){
return service.findSectorSortie();
}
/**
* 根据扇区名称获取该扇区航班数
* @param planSectorName
*/
@ApiOperation(value = "根据扇区名称获取该扇区航班数")
@GetMapping("/findLocusCount")
public Result findLocusCount(@RequestParam String planSectorName){
return service.findLocusCount(planSectorName);
}
/**
* 扇区架次数动态统计(饼状图)
*/
@ApiOperation(value = "扇区架次数动态统计(饼状图)")
@GetMapping("/findATCTime")
public Result findATCTime(){
return service.findATCTime();
}
}
4、再回到前端src/components/Section.vue页面,解释一下代码
- 首先在src/components/Section.vue页面中设置resultVal:{}接收后台数据,代码如下:
data() {
return {
chart: null,
OneSection:"G",
TwoSection:"K",
TwoSection:"E",
TwoSection:"P",
resultVal:{}
};
},
- 请求后台数据并赋值,代码如下:
loadData(){
getSectionVal().then(data => {
if (data.isSuccess) {
var res = data.result;
this.resultVal =res;
} else {
this.$message.error("数据获取失败");
}
});
}
- 循环绑定数据代码如下:
<el-col :span="4" v-for="item in resultVal">
<div class="grid-content bg-purple">
<div class="top_div_css"></div>
<div class="centen">
<div class="to_titls">{{item.planSectorName}}扇区</div>
<div class="to_titls_two">{{item.count}}架</div>
</div>
</div>
</el-col>
5、在Index.vue页面进行展示
- 在kongguan_web/src/views/Home/Index.vue页面中引入Section组件
import Section from "../../components/Section";
- 声明组件:
components: {AirLine, Section},
- 页面展示:
<el-row :gutter="30" v-show="isShow('/flight/section')">
<el-col :span="24" align="center">
<Section/>
</el-col>
</el-row>
注意:在上面代码中【 v-show="isShow('/flight/section')" 】属性的作用是判断当前登录的用户是否有权限显示当前内容,如果当前登录的用户没有权限,则不会显示当前内容,新用户的权限需要到MySQL数据库中进行设置。
这里有两种方式,可以显示当前内容:
1)去掉【 v-show="isShow('/flight/section')" 】属性,即不判断是否有权限显示。
2)需要使用有权限的用户登录才能显示,或到数据库中分配权限。
参照任务“动态航线图”进行设置。
例如我们前面使用的用户admin,该用户没有权限显示,所以使用admin用户登录系统时是不会显示当前内容的,如果要进行权限设置,可以进入MySQL安装节点(node3节点),然后进入数据库,为admin用户授权。
[root@node3 ~]# mysql -uroot -p123456
mysql> use kongguan;
先查看角色表中,“管理员”的ID:
修改sys_auth表,添加一个【/flight/section】权限:
mysql> insert into sys_auth(auth_name,auth_code,menu_url) values('show section','/flight/section','/flight/section');
修改role_auth表,将权限授权给“管理员”角色:
mysql>insert into role_auth(role_id,auth_id) values(3,195);
- Index.vue页面的完整代码如下:
<template>
<div class="index">
<el-row :gutter="30" v-show="isShow('/flight/section')">
<el-col :span="24" align="center">
<Section/>
</el-col>
</el-row>
<el-row :gutter="30" v-show="isShow('/flight/airline')">
<el-col :span=24 align="center">
<AirLine/>
</el-col>
</el-row>
</div>
</template>
<script>
import AirLine from "../../components/AirLine";
import Section from "../../components/Section";
import {hasPermission} from "../../utils/permission";
export default {
data() {
return {
};
},
mounted() {
},
components: {AirLine, Section},
methods: {
isShow(permission){
return hasPermission(permission);
}
}
};
</script>
<style scoped>
.index {
height: 100%;
overflow: auto;
padding-left: 44px;
padding-right: 44px
}
.index::-webkit-scrollbar {
display: none;
}
.caseClass {
background: url('../../assets/images/index-bg.png') no-repeat;
background-size: cover;
margin-top: 20px;
height: 284px;
}
.el-button {
background: transparent;
}
</style>
- 确保Hadoop、Spark、Kafka、Redis、MySQL等服务均已经正常启动,如果没有正常启动,请参照前面的安装部署任务,完成这些服务的启动。
例如:在node3节点上启动Redis。
例如:查看MySQL是否正常启动。
- 启动后端项目 BigData-KongGuan
- 启动前端项目 kongguan_web
- 最终页面展示效果如下: