序言:
最近项目中用到视频ai分析,由于sdk涉及保密,不便透露,仅对定时任务分析的思路作出分享,仅供参考。
1、定时任务
由于ai服务器的性能上限,只能同时对64个rtsp流分析一种算法,或者对8个rtsp流分析8种算法。因此定时任务,做如下设计。
AiHandlerTask.java
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.ewaycloud.jw.ai.service.AiService;
import com.ewaycloud.jw.camera.entity.Camera;
import com.ewaycloud.jw.camera.mapper.CameraMapper;
import com.ewaycloud.jw.camera.service.CameraService;
import com.ewaycloud.jw.cases.dto.CaseDTO;
import com.ewaycloud.jw.cases.service.CaseService;
import com.ewaycloud.jw.channel.service.HikService;
import com.ewaycloud.jw.task.entity.Task;
import com.ewaycloud.jw.task.mapper.TaskMapper;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
/**
* AI分析 定时任务 处理类
*
* @author gwh
* @date 2024-04-14 13:59:17
*/
@Component
@EnableScheduling
public class AiHandlerTask {
@Resource
AiService aiService;
@Resource
TaskService taskService;
@Resource
CameraService cameraService;
@Resource
private TaskMapper taskMapper;
@Resource
private CameraMapper cameraMapper;
@Resource
private HikService hkService;
@Resource
private CaseService caseService;
/**
* 注解中的Cron表达式: {秒数} {分钟} {小时} {日期} {月份} {星期} {年份(可为空)}
* 注意:日和周其中的一个必须为"?"
* 10/5 20 10 * * ? 每天10点20分第10秒以后,每3秒执行一次,到10点21分就不会执行了
*
* AI算法分析任务: 每5秒执行一次
*/
// @Scheduled(cron = "0/5 * * * * ?")
public void startTask(){
// System.out.println("AI分析定时任务执行 每隔5秒执行一次:" + new Date());
//查询要执行的任务
List<Task> aiTasks = taskMapper.findAiTasks(null);
if (null != aiTasks) {
for(Task vo:aiTasks){
if (null != vo.getDeptId()) {
//查询某谈话室下边的摄像头列表(flag是1 谈话人特写 和2 被谈话人特写 的)
List<Camera> cameraList = cameraMapper.findCamersByDeptId(vo.getDeptId());
if (null != cameraList && cameraList.size()>0) {
for(Camera camera:cameraList) {
//根据摄像头编码cameraCode,调用海康接口拉流
String cameraCode = camera.getCameraCode();
try {
//根据cameraCode、开始时间、结束时间 调用海康接口 拉回放流
//查询时间(IOS8601格式yyyy-MM-dd'T'HH:mm:ss.SSSzzz,和结束时间相差不超过三天
JSONObject data = hkService.playbackURLs( cameraCode, vo.getStartTime(), vo.getEndTime());
//谈话人特写AI分析
if (null != data && null != data.getString("url")) {
String rtspUrl = data.getString("url");
//疑似肢体冲突
// startAiTask(rtspUrl, 1L, vo.getStartTime(), vo.getEndTime(), vo);
//玩手机分析
// startAiTask(rtspUrl, 2L, vo.getStartTime(), vo.getEndTime(), vo);
//倒地分析
// startAiTask(rtspUrl, 3L, vo.getStartTime(), vo.getEndTime(), vo);
//人数异常
startAiTask(rtspUrl, 5L, vo.getStartTime(), vo.getEndTime(), vo);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
}
// System.out.println("AI分析定时任务执行 每隔10秒执行一次:: " + new Date());
}
//执行拉流调用AI分析的方法
public void startAiTask(String rtspUrl, Long aiId, String startTime, String endTime, Task vo) {
//调用AI分析接口
if (null != rtspUrl) {
//调用海康AI算法分析
String aiResponse = "";
if (aiId == 1) {//疑似肢体冲突
aiResponse = aiService.indoorPhysicalConfront(rtspUrl, startTime, endTime);
vo.setBreakName("疑似肢体冲突");
vo.setAiId(1L);
} else if (aiId == 2) {//玩手机
aiResponse = aiService.playCellphone(rtspUrl, startTime, endTime);
vo.setBreakName("玩手机");
vo.setAiId(2L);
} else if (aiId == 3) {//倒地
aiResponse = aiService.failDown(rtspUrl, startTime, endTime);
vo.setBreakName("倒地");
vo.setAiId(3L);
} else if (aiId == 4) {//人员站立
aiResponse = aiService.Standup(rtspUrl,startTime, endTime);
vo.setBreakName("人员站立");
vo.setAiId(4L);
} else if (aiId == 5) {//人数异常
aiResponse = aiService.PeopleNumChange(rtspUrl, startTime, endTime);
vo.setBreakName("人数异常");
vo.setAiId(5L);
} else if (aiId == 6) {//声强突变
aiResponse = aiService.audioAbnormal(rtspUrl, startTime, endTime);
vo.setBreakName("声强突变");
vo.setAiId(6L);
} else if (aiId == 7) {//超时滞留
aiResponse = aiService.overtimeTarry(rtspUrl, startTime, endTime);
vo.setBreakName("超时滞留");
vo.setAiId(7L);
} else if (aiId == 8) {//攀高
aiResponse = aiService.reachHeight(rtspUrl, startTime, endTime);
vo.setBreakName("攀高");
vo.setAiId(8L);
}
JSONObject aiResponseJSONObject = JSON.parseObject(aiResponse);
// System.out.println("AI分析定时任务返回aiResponseJSONObject:" + aiResponseJSONObject);
String taskId = "";
String taskStatus = "";
if (null != aiResponseJSONObject && null != aiResponseJSONObject.getString("taskID") ){
taskId = aiResponseJSONObject.getString("taskID");
//调用海康查询任务状态接口获取AI分析任务状态
String result = aiService.queryTaskVideoStatus(taskId);
JSONObject resultJSONObject = JSON.parseObject(result);
JSONArray statusJSONArray = resultJSONObject.getJSONArray("status");
JSONObject statusJSONObject = (JSONObject) statusJSONArray.get(0);
taskStatus = statusJSONObject.getString("taskStatus");
//将AI分析结果taskStatus插入task表中,更新任务表,状态:1 未执行, 2等待, 3 正在执行 , 4 已完成
vo.setTaskState(Integer.parseInt(taskStatus));
vo.setTaskId(taskId); //保存 海康返回的 taskID
//如果任务完成,关闭rtsp流
if ("4".equals(taskStatus)) {
//根据caseId更新案件表的 task_state =1 , ai任务状态(0:未执行 1:已执行)
Long caseId = vo.getCaseId();
CaseDTO caseDTO = new CaseDTO();
caseDTO.setCaseId(caseId);
caseDTO.setCaseState(1);
caseService.updCaseInfo(caseDTO);
//关闭rtsp流
try {
hkService.clearPlayUrls(rtspUrl);
} catch (Exception e) {
e.printStackTrace();
}
}
}
System.out.println("AI分析定时任务返回 taskId:" + taskId +" breakName: "+ vo.getBreakName() +" taskStatus: "+ taskStatus);
//更新任务表, 根据caseId 和taskId查询任务,如果有更新,没有插入
Task dto = new Task();
dto.setCaseId(vo.getCaseId());
dto.setTaskId(vo.getTaskId());
List<Task> tasks = taskMapper.findTasks(dto);
if(null != tasks && tasks.size()>0){
for(Task po : tasks){
vo.setId(po.getId());
vo.setUpdateTime(new Date());
taskService.updateById(po);
}
}else {
vo.setCreateTime(new Date());
vo.setUpdateTime(new Date());
taskMapper.insert(vo);
}
}
}
}
2、算法实现,由于涉密,只贴出接口
AiService.java
import com.baomidou.mybatisplus.extension.service.IService;
import com.ewaycloud.jw.ai.entity.Ai;
import com.ewaycloud.jw.task.entity.Task;
import java.util.List;
/**
* AI对接
*
* @author gwh
* @date 2024-03-13 13:49:09
*/
public interface AiService extends IService<Ai> {
String getAiDeviceInfo();
/**
* 创建--疑似肢体冲突事件--分析视频分析任务
*
*/
String indoorPhysicalConfront(String streamUrl, String startTime, String endTime);
/**
* 创建--玩手机--分析视频分析任务
*
*/
String playCellphone(String streamUrl, String startTime, String endTime);
/**
* 创建--倒地检测--分析视频分析任务
*
*/
String failDown(String streamUrl, String startTime, String endTime);
/**
* 创建--人员站立--分析视频分析任务
*
*/
String Standup(String streamUrl, String startTime, String endTime);
/**
* 创建--人数异常--分析视频分析任务
*
*/
String PeopleNumChange(String streamUrl, String startTime, String endTime);
/**
* 创建--声强突变--分析视频分析任务
*
*/
String audioAbnormal(String streamUrl, String startTime, String endTime);
/**
* 创建--超时滞留--分析视频分析任务
*
*/
String overtimeTarry(String streamUrl, String startTime, String endTime);
/**
* 创建--攀高--分析视频分析任务
*
*/
String reachHeight(String streamUrl, String startTime, String endTime);
/**
* 查询分析视频分析任务状态
*
*/
String queryTaskVideoStatus(String taskId);
}
3、启动一个线程,Socket监听10006端口,接收ai服务器返回的结果
ListenThread.java
import com.ewaycloud.jw.ai.entity.AiResolveResult;
import com.ewaycloud.jw.ai.mapper.AiResolveResultMapper;
import com.ewaycloud.jw.task.entity.ContentTypeEnum;
import com.ewaycloud.jw.task.entity.Task;
import com.ewaycloud.jw.task.mapper.TaskMapper;
import com.mysql.cj.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Date;
import java.util.List;
/**
* @author gwhui
* @date 2024/1/18 17:38
* @desc 监听处理线程
*/
@Slf4j
@Service
public class ListenThread implements Runnable {
private final AlarmDataParser alarmDataParser = new AlarmDataParser();
private static TaskMapper taskMapper;
@Resource
public void setVerificDao(TaskMapper taskMapper) {
ListenThread.taskMapper = taskMapper;
}
private static AiResolveResultMapper aiResolveResultMapper;
@Resource
public void setVerificDao(AiResolveResultMapper aiResolveResultMapper) {
ListenThread.aiResolveResultMapper = aiResolveResultMapper;
}
@Override
public void run() {
// int listenPort = propertiesUtil.getIntegerProperty("custom.isapi.listen.port", 9999);
int listenPort =10006;
try {
ServerSocket serverSocket = new ServerSocket(listenPort);
System.out.println("启动监听, 监听端口:" + listenPort);
while (!Thread.currentThread().isInterrupted()) {
Socket accept = serverSocket.accept();
accept.setKeepAlive(true);
System.out.println("设备(客户端)信息:" + accept.getInetAddress().getHostAddress());
if (accept.isConnected()) {
handleData(accept);
}
accept.close();
}
serverSocket.close();
System.out.println("停止监听完成");
} catch (InterruptedException e) {
// 线程被中断的处理逻辑
System.out.println("停止监听完成: " + e.getMessage());
} catch (Exception e) {
System.out.println("监听创建异常: " + e.getMessage());
}
}
@Transactional(rollbackFor = Exception.class)
public synchronized void handleData(Socket accept) throws Exception {
InputStream inputData = accept.getInputStream();
OutputStream outputData = accept.getOutputStream();
// 输出数据
ByteArrayOutputStream byOutputData = new ByteArrayOutputStream();
byte[] buffer = new byte[2 * 1024 * 1024];
int length = 0;
// 持续接收处理数据直到接收完毕
String recvAlarmData = "";
while ((length = inputData.read(buffer)) > 0) {
byOutputData.write(buffer, 0, length);
String recvData = byOutputData.toString();
recvAlarmData = recvAlarmData + recvData;
// 获取boundary
String strBoundary = "boundary=";
int beginIndex = recvData.indexOf(strBoundary);
beginIndex += strBoundary.length();
int lenIndex = recvData.indexOf("\r\n", beginIndex);
String strBoundaryMark = recvData.substring(beginIndex, lenIndex);
if (recvAlarmData.contains("--" + strBoundaryMark.trim() + "--")) {
//表单结束符判断接收结束
break;
}
}
// System.out.println("==============recvAlarmData========>> "+recvAlarmData);
if(null != recvAlarmData){
String taskId = null;
int index = recvAlarmData.indexOf("<taskID>");
if(index != -1){
taskId = recvAlarmData.substring(index + 8, index + 40);
}
//获取服务器返回的图片
String bkgUrl = null;
int indexStartBkgUrl = recvAlarmData.indexOf("<bkgUrl>");
int indexEndBkgUrl = recvAlarmData.indexOf("</bkgUrl>");
if(indexStartBkgUrl != -1){
bkgUrl = recvAlarmData.substring(indexStartBkgUrl+8, indexEndBkgUrl);
bkgUrl =bkgUrl.replaceAll("&","&");
}
System.out.println("===AIrecieveData===>>taskId: "+taskId +" bkgUrl: "+ bkgUrl);
//根据taskId查询 任务信息
if(!StringUtils.isNullOrEmpty(taskId)){
Task task = taskMapper.finTaskByTaskId(taskId);
if(null != task){
AiResolveResult vo = new AiResolveResult();
vo.setCreateTime(new Date());
vo.setUpdateTime(new Date());
vo.setTaskId(taskId); //保存海康返回的 taskId
vo.setBreakName(task.getBreakName());
vo.setAiId(task.getAiId());
vo.setDeptId(task.getDeptId());
vo.setCameraId(task.getCameraId());
vo.setBreakTypeId(task.getAiId());
vo.setRiskTime(task.getTalkTime());
vo.setTalkAddress(task.getTalkAddress());
vo.setTalkAddressName(task.getTalkAddressName());
vo.setTalkUnit(task.getTalkUnit());
vo.setTalkUnitName(task.getTalkUnitName());
vo.setPhoto(bkgUrl); //保存海康返回的图片
vo.setCaseId(task.getCaseId());
vo.setCaseName(task.getCaseName());
vo.setInterviewerName(task.getInterviewerName());
//根据taskId查询任务结果表,如果有做更新操作,没有做插入操作
List<AiResolveResult> aiResolveResults = aiResolveResultMapper.findAiResults(vo);
if(null != aiResolveResults && aiResolveResults.size()>0){
for(AiResolveResult aiResolveResult:aiResolveResults){
if(null != aiResolveResult){
aiResolveResult.setPhoto(vo.getPhoto());
aiResolveResultMapper.updateById(aiResolveResult);
}
}
}else {
aiResolveResultMapper.insert(vo);
}
}
}
}
String response = "HTTP/1.1 200 OK" +
"\r\n" +
"Connection: close" +
"\r\n\r\n";
outputData.write(response.getBytes());
outputData.flush();
outputData.close();
inputData.close();
//解析数据
response = parseAlarmInfoByte(byOutputData);
System.out.println("==============response========>> "+response);
}
private String parseAlarmInfoByte(ByteArrayOutputStream byOutputData) throws Exception {
// 事件报文字节
byte[] byAlarmDataInfo = byOutputData.toByteArray();
int iDataLen = byAlarmDataInfo.length;
String szBoundaryMark = "boundary=";
String szContentTypeMark = "Content-Type: ";
int iTypeMarkLen = szContentTypeMark.getBytes("UTF-8").length;
String szContentLenMark = "Content-Length: ";
int iLenMarkLen = szContentLenMark.getBytes("UTF-8").length;
String szContentLenMark2 = "content-length: ";
int iLenMarkLen2 = szContentLenMark2.getBytes("UTF-8").length;
int iContentLen = 0;
String szEndMark = "\r\n";
int iMarkLen = szEndMark.getBytes("UTF-8").length;
String szEndMark2 = "\r\n\r\n";
int iMarkLen2 = szEndMark2.getBytes("UTF-8").length;
String szJson = "text/json";
String szJpg = "image/jpeg";
int iStartBoundary = doDataSearch(byAlarmDataInfo, szBoundaryMark.getBytes("UTF-8"), 0, byAlarmDataInfo.length);
iStartBoundary += szBoundaryMark.getBytes("UTF-8").length;
int iEndBoundary = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartBoundary, byAlarmDataInfo.length);
byte[] byBoundary = new byte[iEndBoundary - iStartBoundary];
System.arraycopy(byAlarmDataInfo, iStartBoundary, byBoundary, 0, iEndBoundary - iStartBoundary);
String szBoundaryEndMark = "--" + new String(byBoundary).trim() + "--";
int iDateEnd = doDataSearch(byAlarmDataInfo, szBoundaryEndMark.getBytes("UTF-8"), 0, byAlarmDataInfo.length);
String szBoundaryMidMark = "--" + new String(byBoundary).trim();
int iBoundaryMidLen = szBoundaryMidMark.getBytes("UTF-8").length;
int startIndex = iEndBoundary;
String szContentType = "";
int[] iBoundaryPos = new int[11]; //boundary个数,这里最大解析10个
int iBoundaryNum = 0;
for (iBoundaryNum = 0; iBoundaryNum < 10; iBoundaryNum++) {
startIndex = doDataSearch(byAlarmDataInfo, szBoundaryMidMark.getBytes("UTF-8"), startIndex, iDateEnd);
if (startIndex < 0) {
break;
}
startIndex += iBoundaryMidLen;
iBoundaryPos[iBoundaryNum] = startIndex;
}
iBoundaryPos[iBoundaryNum] = iDateEnd;//最后一个是结束符
for (int i = 0; i < iBoundaryNum; i++) {
// Content-Type
int iStartType = doDataSearch(byAlarmDataInfo, szContentTypeMark.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
if (iStartType > 0) {
iStartType += iTypeMarkLen;
int iEndType = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartType, iBoundaryPos[i + 1]);
if (iEndType > 0) {
byte[] byType = new byte[iEndType - iStartType];
System.arraycopy(byAlarmDataInfo, iStartType, byType, 0, iEndType - iStartType);
szContentType = new String(byType).trim();
}
}
// Content-Length
int iStartLength = doDataSearch(byAlarmDataInfo, szContentLenMark.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
if (iStartLength > 0) {
iStartLength += iLenMarkLen;
int iEndLength = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartLength, iBoundaryPos[i + 1]);
if (iEndLength > 0) {
byte[] byLength = new byte[iEndLength - iStartLength];
System.arraycopy(byAlarmDataInfo, iStartLength, byLength, 0, iEndLength - iStartLength);
iContentLen = Integer.parseInt(new String(byLength).trim());
}
}
// Content-Length(兼容错误大小写)
int iStartLength2 = doDataSearch(byAlarmDataInfo, szContentLenMark2.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
if (iStartLength2 > 0) {
iStartLength2 += iLenMarkLen2;
int iEndLength2 = doDataSearch(byAlarmDataInfo, szEndMark.getBytes("UTF-8"), iStartLength2, iBoundaryPos[i + 1]);
if (iEndLength2 > 0) {
byte[] byLength2 = new byte[iEndLength2 - iStartLength2];
System.arraycopy(byAlarmDataInfo, iStartLength2, byLength2, 0, iEndLength2 - iStartLength2);
iContentLen = Integer.parseInt(new String(byLength2).trim());
}
}
// 通过\r\n\r\n判断报文数据起始位置
int iStartData = doDataSearch(byAlarmDataInfo, szEndMark2.getBytes("UTF-8"), iBoundaryPos[i], iBoundaryPos[i + 1]);
if (iStartData > 0) {
iStartData += iMarkLen2;
// 有的报文可能没有Content-Length
if (iContentLen <= 0) {
iContentLen = iBoundaryPos[i + 1] - iStartData;
}
// 截取数据内容
byte[] byData = new byte[iContentLen];
System.arraycopy(byAlarmDataInfo, iStartData, byData, 0, iContentLen);
// 根据类型处理数据
int contentType = ContentTypeEnum.getEventType(szContentType);
String storeFolder = System.getProperty("user.dir") + "\\output\\listen\\event\\";
switch (contentType) {
case ContentTypeEnum.APPLICATION_JSON:
case ContentTypeEnum.APPLICATION_XML: {
String rawContent = new String(byData).trim();
alarmDataParser.parseAlarmInfo(contentType, storeFolder, rawContent, null);
break;
}
case ContentTypeEnum.IMAGE_JPEG:
case ContentTypeEnum.IMAGE_PNG:
case ContentTypeEnum.VIDEO_MPG:
case ContentTypeEnum.VIDEO_MPEG4:
case ContentTypeEnum.APPLICATION_ZIP: {
alarmDataParser.parseAlarmInfo(contentType, storeFolder, null, byData);
break;
}
default: {
System.out.println("未匹配到可以解析的content-type, 请自行补全处理!");
}
}
}
}
// 响应报文
String response = "";
// 消费交易事件 (实际如果没有消费机设备可以不需要消费机的处理代码)
String eventType = "";
String eventConfirm = "";
if (eventType.equals("ConsumptionEvent") || eventType.equals("TransactionRecordEvent") || eventType.equals("HealthInfoSyncQuery")) {
response = "HTTP/1.1 200 OK" +
"\r\n" +
"Content-Type: application/json; charset=\"UTF-8\"" +
"\r\n" +
"Content-Length: " + eventConfirm.length() +
"\r\n\r\n" + eventConfirm +
"\r\n";
} else {
response = "HTTP/1.1 200 OK" +
"\r\n" +
"Connection: close" +
"\r\n\r\n";
}
return response;
}
private int doDataSearch(byte[] bySrcData, byte[] keyData, int startIndex, int endIndex) {
if (bySrcData == null || keyData == null || bySrcData.length <= startIndex || bySrcData.length < keyData.length) {
return -1;
}
if (endIndex > bySrcData.length) {
endIndex = bySrcData.length;
}
int iPos, jIndex;
for (iPos = startIndex; iPos < endIndex; iPos++) {
if (bySrcData.length < keyData.length + iPos) {
break;
}
for (jIndex = 0; jIndex < keyData.length; jIndex++) {
if (bySrcData[iPos + jIndex] != keyData[jIndex]) {
break;
}
}
if (jIndex == keyData.length) {
return iPos;
}
}
return -1;
}
}
4、数据库设计
瀚高国产数据库