Commit bae1aa02 authored by wangjinjing's avatar wangjinjing

去掉延迟50s

parents
package com.hzjt.controller;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.hzjt.config.MQTTSubsribe;
import com.hzjt.domain.*;
import com.hzjt.handler.FileTransferManager;
import com.hzjt.handler.WebSocket;
import com.hzjt.mapper.SbtdspsrMapper;
import com.hzjt.mapper.TraffAlarmRecordMapper;
import com.hzjt.service.*;
import com.hzjt.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.*;
import sun.net.www.protocol.ftp.FtpURLConnection;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
@Slf4j
public class TraffController {
@Autowired
WebSocket webSocket;
private String port;
private String dept;
private Integer rate = 10;
private SbtdspsrMapper sbtdspsrMapper;
@Autowired
TraffFlowService traffFlowService;
@Value("${flv.resisvalue}")
private String resisvalue;
@Value("${json.resisurl}")
private String resisjsonvalue;
@Value("${ftppath}")
private String ftppath;
@Value("${ftpServiceUrl}")
private String ftpServiceUrl;
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private TraffAlarmRecordMapper traffAlarmRecordMapper;
@Autowired
FtpService ftpService;
@Autowired
EventWriteService eventWriteService;
@Value("ftpIp")
String ftpIp;
@Value("ftpPort")
private String ftpPort;
@Value("ftpUsername")
private String ftpUsername;
@Value("ftpPassword")
private String ftpPassword;
@Value("${alarmrecord.check.enable}")
private String checkEnable;
Long ldir = new Long(180);
@Autowired
FLVCacheService flvCacheService;
Long eightHour=25200000L;
// @Autowired
// private SimpMessagingTemplate template;
@Autowired
MQTTSubsribe mQTTSubsribe;
String manualStatus="0";
private static final String TYPE = "TRAFFIC_INCIDENT_ALARM";
ScheduledExecutorService schedulepool = Executors.newScheduledThreadPool(5); //创
@Autowired
public TraffController(
@Value("${port}") String port,
@Value("${message.dept}") String dept,
@Value("${message.rate}") Integer rate, SbtdspsrMapper sbtdspsrMapper) {
this.port = port;
this.rate = rate;
this.dept = dept.substring(0, 5);
this.sbtdspsrMapper = sbtdspsrMapper;
}
/**
* 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
* PostConstruct在构造函数之后执行,init()方法之前执行。PreDestroy()方法在destroy()方法知性之后执行
* 这里初始化订阅一个主题
*/
@PostConstruct
public void init() {
//查询素有videowbbh
List<String> sbbhlist= sbtdspsrMapper.selectAllWbbh();
sbbhlist.add("notice/66211/videoEventConfirm");
mQTTSubsribe.subscribe(sbbhlist);
}
@PostMapping("/alarmevent")
public ResultObj rece(@RequestBody Alarm trffClientMessage) {
// log.info("alarmevent:{}",trffClientMessage.toString());
if (!TYPE.equals(trffClientMessage.getType())) {
return ResultObj.error(ResponseEnum.E_1002.getCode(), "type类型不正确");
}
if (trffClientMessage.getImg_urls().isEmpty()) {
return ResultObj.error(ResponseEnum.E_1004.getCode(), "img_urls值不能为空");
}
if (StringUtils.isBlank(trffClientMessage.getIncident_type())) {
log.info("incident_type值不能为空" + trffClientMessage.toString());
return ResultObj.error(ResponseEnum.E_1004.getCode(), "incident_type值不能为空");
}
Long current=new Date().getTime()-Long.valueOf(trffClientMessage.getTs());
if(current.longValue()>=eightHour.longValue())
{
log.info("current.longValue()>=eightHour.longValue()=====:{}",(current.longValue()>=eightHour.longValue()));
//系统时间小于8小时
Long lontime=Long.valueOf(trffClientMessage.getTs())+28800000L;
trffClientMessage.setTs(String.valueOf(lontime));
}
String videoId = trffClientMessage.getVideo_id();
if (StringUtils.isBlank(videoId)) {
log.info("video_id值异常 值为:" + videoId);
return ResultObj.error(ResponseEnum.E_1002.getCode(), "video_id值异常 值为:" + videoId);
}
String sbbh=videoId;
int tdbh=0;
List<Sbtdspsr> sbtdspsrs = sbtdspsrMapper.selectBySbbh(sbbh, tdbh);
if (sbtdspsrs.isEmpty()) {
log.info("设备为:" + sbbh + ",通道为:" + tdbh + "未录入(备案)");
return ResultObj.error(ResponseEnum.E_1002.getCode(), "设备为:" + sbbh + ",通道为:" + tdbh + "未录入(备案)");
}
String xzbh = sbtdspsrs.get(0).getXzbh();
if (xzbh.length() != 12) {
log.info("设备为:" + sbbh + ",通道为:" + tdbh + "配置的行政区划" + xzbh + "不合规");
return ResultObj.error(ResponseEnum.E_1002.getCode(), "设备为:" + sbbh + ",通道为:" + tdbh + "配置的行政区划不合规");
}
trffClientMessage.setDept(xzbh);
// //判断是否需要手动筛选
// manualStatus=traffAlarmRecordMapper.seletManualStatus();
// log.info("是否手动推送 manualStatus"+manualStatus);
// if(manualStatus.equalsIgnoreCase("0")) {
// log.info("send to gaoxin");
// //手动 推送给高信
// webSocket.GroupSending(JsonUtil.beanToString(trffClientMessage));
// }
try {
sendevent(trffClientMessage,sbtdspsrs.get(0));
} catch (Exception e) {
log.error("MessageController receive putData error:{}" ,e.toString());
}
return ResultObj.ok(trffClientMessage);
}
public void sendevent(Alarm trffClientMessage,Sbtdspsr spsr) {
String wbbh=spsr.getWbbh();
Traffalarmrecord traffAlarmRecord = new Traffalarmrecord();
traffAlarmRecord.setAreaid(Long.valueOf(trffClientMessage.getDept()));
traffAlarmRecord.setRecordtype(trffClientMessage.getIncident_type().toLowerCase());
traffAlarmRecord.setRecordtime(DateUtils.getDateString(new Date(Long.valueOf(trffClientMessage.getTs()))));
Integer channelid=0;
traffAlarmRecord.setFdid(trffClientMessage.getVideo_id());
traffAlarmRecord.setChannelid(channelid);
traffAlarmRecord.setVideourlfrom(trffClientMessage.getVideo_record_url());
traffAlarmRecord.setCreatetime(DateUtils.getDateString(new Date()));
traffAlarmRecord.setObjlable(trffClientMessage.getObjLabel());
traffAlarmRecord.setPushstatus(9);//设置为未推送
traffAlarmRecord.setProcessstatus("0");
traffAlarmRecord.setManualstatus(0);
traffAlarmRecord.setChannelname(spsr.getBz());
List<String> imgBase64List = trffClientMessage.getImg_base64();
String imgEnumHead = "IMG";
Ftp ftp = ftpService.reloadFtp();
for (int i = 0; i < imgBase64List.size(); i++) {
/* 图片上传 */
String path = trffClientMessage.getVideo_id() + "/" + DateUtils.formatCurrDayNoSign();
String fileName = UUIDUtils.createuuid() + ".jpg";
if (FTPUtil.uploadFile(ftp, path, fileName, imgBase64List.get(i))) {
TraffAlarmRecordImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, FTPUtil.getFtpUrl(ftp) + path + "/" + fileName);
}
}
List<String> imgUrls = trffClientMessage.getImg_urls();
for (int i = 0; i < imgUrls.size(); i++) {
TraffAlarmRecordFromImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, imgUrls.get(i));
}
if (StringEnum.ONE.getValue().equals(checkEnable)) {
/* 9:免审 */
traffAlarmRecord.setCheckstatus(9);
}
// int recordid = traffAlarmRecordMapper.selectmax();
// traffAlarmRecord.setRecordid((long) (recordid));
//发送给前端
Map map = new HashMap();
// if(manualStatus.equalsIgnoreCase("1")) {
// map.put("type", "recordalarm");
// traffAlarmRecord.setManualstatus(0);
//
// }else {
//
// traffAlarmRecord.setManualstatus(1);
// }
map.put("type", "alarm");
log.info(traffAlarmRecord.getRecordtime());
//为了适配mqtt返回的处理状态,将状态设置为mqtt的事件主键
traffAlarmRecord.setEventid("66211"+UUIDUtils.createuuid());
traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord);
map.put("data", trffClientMessage);
map.put("recordid", traffAlarmRecord.getRecordid());
webSocket.GroupSending(JsonUtil.beanToString(map));
try {
String basepath = "gstraff/" + traffAlarmRecord.getFdid() + (traffAlarmRecord.getChannelid() < 10 ? "0" + traffAlarmRecord.getChannelid() : traffAlarmRecord.getChannelid()) + "/" + DateUtils.formatCurrDayNoSign();
Map<String, Object> transferRecordMap = new HashMap<>();
FileTransferManager.fetchUrlsFromRecord(traffAlarmRecord, transferRecordMap);
List<TransferResult> results = FileTransferManager.transferFile(transferRecordMap, ftp, basepath);
Traffalarmrecord recordBak = FileTransferManager.traffAlarmRecordUrlUpdate(results);
traffAlarmRecord.setImg2path(recordBak.getImg2path());
traffAlarmRecord.setImg3path(recordBak.getImg3path());
traffAlarmRecord.setImg4path(recordBak.getImg4path());
traffAlarmRecord.setImg5path(recordBak.getImg5path());
String pname = DateUtils.formatCurrDayNoSign() + "_" + traffAlarmRecord.getRecordid() + "_0000_video";
String fileName = pname + ".mp4";
String ftputl = FTPUtil.getFtpUrl(ftp) + basepath + "/" + fileName;
traffAlarmRecord.setVideopath(ftputl);//先推送过去,再获得视频并上传
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
sendtomqtt(trffClientMessage, wbbh, traffAlarmRecord);
//如果监控视频为空,延时加载重新获取
if (null == recordBak.getVideopath() || "".equalsIgnoreCase(recordBak.getVideopath())) {
schedulepool.schedule(() -> {
for (String key : transferRecordMap.keySet()) {
if (key.equals("videopath")) {
final String url = transferRecordMap.get(key).toString();
try {
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
//延迟连接
connection.setReadTimeout(2000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
InputStream inputStream = connection.getInputStream();
log.info("url:", url, " --- " + "key:", key);
boolean r = FTPUtil.uploadFile(ftp, basepath, fileName, inputStream);
if (r) {
log.info("upload video success");
if (traffAlarmRecord != null && traffAlarmRecord.getRecordid() != null) {
log.info("send to mqtt video is not empty");
}
}
}
else {
log.info("send to mqtt video is empty");
// sendtomqtt(trffClientMessage, wbbh, traffAlarmRecord);
traffAlarmRecord.setVideopath("");
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
}
} catch (IOException e) {
log.error(e.toString());
// sendtomqtt(trffClientMessage, wbbh, traffAlarmRecord);
traffAlarmRecord.setVideopath("");
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
return 0;
}
}
}
return 1;
}, 50, TimeUnit.SECONDS); //等待50秒钟执行
}
traffAlarmRecord.setPushdesc("success");
traffAlarmRecord.setPushstatus(0);
traffAlarmRecordMapper.updatePushEvent(traffAlarmRecord);
} catch (Exception e) {
log.error("alarm fail :" + e.toString());
}
}
private void sendtomqtt(Alarm trffClientMessage, String wbbh, Traffalarmrecord traffAlarmRecord) {
if(null==wbbh || "".equals(wbbh))
{
log.info("设备为:" + traffAlarmRecord.getFdid() + ",wbbh" + " is empty");
}
else {//mqtt 推送给第三方
MqttAlarm mqttalarm = new MqttAlarm();
//查询事件类型mqtt 对应的编号
String mqttbh = traffAlarmRecordMapper.seletmqttbh(trffClientMessage.getIncident_type());
mqttalarm.setAlarmTime(traffAlarmRecord.getRecordtime());
mqttalarm.setCompanyId("66211");
//设置图片
mqttalarm.setImagePath(traffAlarmRecord.getImg1path()==null?"":ftpServiceUrl+"?location="+traffAlarmRecord.getImg1path().replace("ftp://"+ftppath+"/",""));
mqttalarm.setVideoPath(traffAlarmRecord.getVideopath()==null?"":ftpServiceUrl+"?location="+traffAlarmRecord.getVideopath().replace("ftp://"+ftppath+"/",""));
mqttalarm.setDeviceId(wbbh);
// mqttalarm.setEventid(traffAlarmRecord.getProcessstatus());
// mqttalarm.setLocation(traffAlarmRecord.getChannelname().contains("隧道")?2:1);
if (null != trffClientMessage.getObj_location()) {
mqttalarm.setX(trffClientMessage.getObj_location().get("x") == null ? null : trffClientMessage.getObj_location().get("x").toString());
mqttalarm.setY(trffClientMessage.getObj_location().get("y") == null ? null : trffClientMessage.getObj_location().get("y").toString());
mqttalarm.setWidth(trffClientMessage.getObj_location().get("width") == null ? null : trffClientMessage.getObj_location().get("width").toString());
mqttalarm.setHeight(trffClientMessage.getObj_location().get("height") == null ? null : trffClientMessage.getObj_location().get("height").toString());
}
mqttalarm.setEventTypeId(mqttbh);
log.info("mqtt data:{}"+ JSON.toJSONString(mqttalarm));
mQTTSubsribe.publishMessage("event/" + wbbh + "/videoEvent", JSON.toJSONString(mqttalarm));
}
}
//车流量推送
@GetMapping("/test")
@ResponseBody
public void test() {
String manualStatus=traffAlarmRecordMapper.seletManualStatus();
Alarm alarm=new Alarm();
alarm.setVideo_id("33_65_230_156_554_fbXdTkVe98u_ecvs_0");
alarm.setIncident_type("NO_MOTOR_BAN");
String str="iVBORw0KGgoAAAANSUhEUgAAABgAAAAYCAYAAADgdz34AAAB1UlEQVRIS5WVjTEEQRBG30WACBABGSACLgJEgAicCBABIkAEiIAMkAERUG9remtua2Z3rquuam935nvTPf0zY9y2gENgH/B5Ny3/AL6AV+A5PReVZhV9xS6Bk/RdEUVzE7qXXtwDVyVQCaDoNeC3m/T7qRxkHThPvz9gnrzqlw8Bi3RyTyyoJjzkCTJcO8ApoEed5QAF74CHLDRDofwOShA9PgYOwpMAGPN34A04Grl3T6kZ/5KFJ5vAthEIgC4pLGgsLFMAoWp8pktfCFh6MZG2LQAlPLDpvS3ALDBrNhoutRUQh54LcJNhGYt9ONYKcH1XiAJ80CVTdMpWATwB6wIsEKswALpnupZsLE1dfwsorKl3WAP0hTKgCDcFTeeSWQc5YF+APcaXLSGKSq/1sBzah6h7GCmefNMqgP6SV0nTVsBSmq5SaK2A6AxdFi1V3kSxtQDMNPtal5l5s/Oy7SF2wlo/EmBIvbOS+f4ldQVBfbNzsR3Sj7p3UYG4yZBGKuaQELeLqtVNwGG6xUzwo9PJTGgxwY/p5HpYHDghJN0TrqVxaXXWQAqfpQH1nfrZ0uyuFUw+awUJcGNsVjjC9Ts2u6cqMgowhpH/IwGE2fxK99GH9R91mncY+HKoHAAAAABJRU5ErkJggg==";
List arrlist=new ArrayList();
arrlist.add(str);
arrlist.add(str);
arrlist.add(str);
arrlist.add(str);
arrlist.add(str);
alarm.setImg_base64(arrlist);
alarm.setTs("1345677777");
alarm.setType("TRAFFIC_INCIDENT_ALARM");
Map map=new HashMap();
map.put("type", "alarm");
map.put("data", alarm);
map.put("recordid", "33448");
webSocket.GroupSending(JsonUtil.beanToString(map));
Traffalarmrecord traffAlarmRecord=new Traffalarmrecord();
traffAlarmRecord.setRecordid(Long.parseLong("33448"));
traffAlarmRecord.setChannelid(new Integer(1));
traffAlarmRecord.setFdid("20200305112042989");
traffAlarmRecord.setRecordtype("NO_MOTOR_BAN");
// traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord);
map.put("recordid", "33449");
webSocket.GroupSending(JsonUtil.beanToString(map));
Traffalarmrecord traffAlarmRecord2=new Traffalarmrecord();
traffAlarmRecord2.setRecordid(Long.parseLong("33449"));
traffAlarmRecord2.setChannelid(new Integer(1));
traffAlarmRecord2.setFdid("20200305112042989");
traffAlarmRecord2.setRecordtype("WRONG_DIRECTION");
// traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord2);
}
//事件推送
@GetMapping("/testevent")
@ResponseBody
public void testevent() {
String json="{"+
" \"type\": \"TRAFFIC_INCIDENT_ALARM\"," +
" \"id\": \"d9a2b0f0-f0cf-49b9-9b64-3da86a122afa\"," +
" \"video_id\": \"201809180950119121_0\"," +
" \"ts\": \"1544602970458\"," +
" \"incident_type\": \"WRONG_DIRECTION\"," +
" \"img_urls\": [" +
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test\",\n" +
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test2\"," +
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test3\"," +
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test4\"," +
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test5\"" +
" ]," +
" \"video_record_url\": \"http://192.168.1.3:8001/api/alg/files/1576459828-test\"," +
" \"obj_location\": {" +
" \"x\": 0.1,\n" +
" \"y\": 0.2," +
" \"width\": 0.4," +
" \"height\": 0.7" +
" }," +
" \"img_base64\": [\"d0xEMHcFAESwdwaUjWDqB6/1Qw4LapAiSICVTOFWsGT0sgt\"]\n" +
" }";
MqttAlarm alarm=new MqttAlarm();
alarm.setHeight("0.7");
alarm.setWidth("0.7");
mQTTSubsribe.publishMessage("event/1111/videoEvent", JSONUtil.toJsonStr(alarm));
}
@GetMapping("/api/alg/files")
protected void fielagent(@RequestParam("location") String location, HttpServletRequest request, HttpServletResponse response) {
long startTime = System.currentTimeMillis();
//ftp://reader:reader@33.50.1.22:21/
//ftp.host=33.65.219.103:21:hzjt:1qaz2wsx
String ftpPath="ftp://"+ftppath+"/"+location;
FileInputStream hFile = null;
OutputStream toClient = null;
InputStream inputStream = null;
BufferedInputStream bis = null;
try {
response.reset();
response.setHeader("Expires", "Sat, 10 May 2059 12:00:00 GMT");
response.setHeader("Cache-Control", "max-age=315360000");
if (StringUtils.isNotBlank(ftpPath)) {
if (ftpPath.endsWith(".jpg") || ftpPath.endsWith(".JPG") || ftpPath.endsWith(".png") || ftpPath.endsWith(".PNG") || ftpPath.endsWith(".gif") || ftpPath.endsWith(".GIF")) {
response.setContentType("image/" + ftpPath.substring(ftpPath.lastIndexOf(".") + 1) + "; charset=utf-8");
} else if (ftpPath.endsWith(".mp4") || ftpPath.endsWith(".MP4")) {
response.setContentType("video/mpeg4; charset=utf-8");
String mp4file = ftpPath.substring(ftpPath.lastIndexOf("/") + 1);
response.setHeader("Content-Disposition", "attachment;fileName=" + mp4file);
}
String destUrl = ftpPath;
destUrl = new String(destUrl.getBytes("ISO8859-1"), "GBK");
String[] arr = destUrl.split(";");
FtpURLConnection ftpUrl = null;
HttpURLConnection httpUrl = null;
for (int i = 0; i < arr.length; i++) {
try {
URL url = new URL(arr[i]);
if (arr[i].toUpperCase().indexOf("FTP") != -1) { // ftp
ftpUrl = (FtpURLConnection) url.openConnection();
ftpUrl.setConnectTimeout(30000);
ftpUrl.setReadTimeout(30000);
bis = new BufferedInputStream(ftpUrl.getInputStream());
response.setContentLength(ftpUrl.getContentLength());
} else { // http
httpUrl = (HttpURLConnection) url.openConnection();
httpUrl.setConnectTimeout(30000);
httpUrl.setReadTimeout(30000);
bis = new BufferedInputStream(httpUrl.getInputStream());
response.setContentLength(httpUrl.getContentLength());
}
toClient = response.getOutputStream();
IOUtils.copy(bis, toClient);
} catch (Exception e) {
response.setContentType("text/html;charset=GBK");
response.setCharacterEncoding("GBK");
PrintWriter out = response.getWriter();
out.write("无法打开图片!");
out.flush();
log.info("ftpagent error "+ftpUrl+e.toString());
} finally {
if (bis != null) {
bis.close();
}
if (bis != null) {
bis.close();
}
if (httpUrl != null) {
httpUrl.disconnect();
}
if (ftpUrl != null) {
ftpUrl.close();
}
if (toClient != null) {
toClient.close();
}
}
}
return;
}
} catch (Exception e) {
} finally {
IOUtils.closeQuietly(bis);
IOUtils.closeQuietly(toClient);
IOUtils.closeQuietly(hFile);
IOUtils.closeQuietly(inputStream);
}
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment