Commit 0059040c authored by 高飞's avatar 高飞

解决推送总是少推送几条数据的问题,定时抽取前一天vehicle 数据的时间问题

parent bdc1180e
......@@ -30,11 +30,11 @@ public class ScheduleTaskConfig {
@Scheduled(cron = "0 0 2 * * ? ")//每天凌晨2点执行
private void statis() {
log.info("每天凌晨2点执行前一天数据整合");
String date = DateUtils.getYesterday(-1);
// String date = DateUtils.getYesterday(-1);
//抽取前一天的车流量和事件统计数据入表
Integer result = traffFlowService.statisVehiclesByDay(date);
Integer result = traffFlowService.statisVehiclesByDay();
//抽取前一天的事件统计到新表中
Integer resultrecord = traffalarmrecordService.statisTraffalarmrecordstatByDay(date);
Integer resultrecord = traffalarmrecordService.statisTraffalarmrecordstatByDay();
//
// //删除当天的数据
// traffFlowService.deleteVehiclesByDay();
......
......@@ -4,25 +4,32 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.RateLimiter;
import com.hzjt.domain.*;
import com.hzjt.handler.FileTransferManager;
import com.hzjt.handler.WebSocket;
import com.hzjt.mapper.SbtdspsrMapper;
import com.hzjt.mapper.TraffAlarmRecordMapper;
import com.hzjt.redis.RedisDao;
import com.hzjt.service.EventWriteService;
import com.hzjt.service.FtpService;
import com.hzjt.service.ImportService;
import com.hzjt.service.TraffFlowService;
import com.hzjt.util.DateUtils;
import com.hzjt.util.JsonUtil;
import com.hzjt.util.RedisEnum;
import com.hzjt.util.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@RestController
......@@ -48,8 +55,27 @@ public class TraffController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
RedisDao redisDao;
@Autowired
private TraffAlarmRecordMapper traffAlarmRecordMapper;
@Autowired
FtpService ftpService;
@Autowired
EventWriteService eventWriteService;
@Value("ftpIp")
String ftpIp;
@Value("ftpPort")
private String ftpPort;
@Value("ftpUsername")
private String ftpUsername;
@Value("ftpPassword")
private String ftpPassword;
@Value("${alarmrecord.check.enable}")
private String checkEnable;
// @Autowired
......@@ -58,6 +84,8 @@ public class TraffController {
private static final String TYPE = "TRAFFIC_INCIDENT_ALARM";
ScheduledExecutorService schedulepool = Executors.newScheduledThreadPool(5); //创
@Autowired
public TraffController(
@Value("${port}") String port,
......@@ -120,16 +148,124 @@ public class TraffController {
WebSocket.GroupSending(JsonUtil.beanToString(map));
try { // 清空redis中的部分旧数据
importService.cleanCache();
// importService.cleanCache();
// 将参数result中的部分数据存入redis中,并把格式校验成功的数据发布至对应频道中
importService.cacheAndPublish(JsonUtil.beanToString(trffClientMessage));
// importService.cacheAndPublish(JsonUtil.beanToString(trffClientMessage));
sendevent(trffClientMessage);
} catch (Exception e) {
log.error("MessageController receive putData error:" + e.toString());
}
return ResultObj.ok(trffClientMessage);
}
public void sendevent( Alarm trffClientMessage){
List<String> imgBase64List = trffClientMessage.getImg_base64();
Traffalarmrecord traffAlarmRecord = new Traffalarmrecord();
String imgEnumHead = "IMG";
Ftp ftp = ftpService.reloadFtp();
for (int i = 0; i < imgBase64List.size(); i++) {
/* 图片上传 */
String path = trffClientMessage.getVideo_id() + "/" + DateUtils.formatCurrDayNoSign();
String fileName = UUIDUtils.createuuid() + ".jpg";
if (FTPUtil.uploadFile(ftp, path, fileName, imgBase64List.get(i))) {
TraffAlarmRecordImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, FTPUtil.getFtpUrl(ftp) + path + "/" + fileName);
}
}
List<String> imgUrls = trffClientMessage.getImg_urls();
for (int i = 0; i < imgUrls.size(); i++) {
TraffAlarmRecordFromImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, imgUrls.get(i));
}
String[] videoIdArr = trffClientMessage.getVideo_id().split("_");
String fdid = videoIdArr[0];
traffAlarmRecord.setFdid(fdid);
Integer channelid = Integer.valueOf(videoIdArr[1]) + 1;
traffAlarmRecord.setChannelid(channelid);
if (StringEnum.ONE.getValue().equals(checkEnable)) {
/* 9:免审 */
traffAlarmRecord.setCheckstatus(9);
}
int recordid = traffAlarmRecordMapper.selectmax();
traffAlarmRecord.setRecordid((long)(recordid+1));
traffAlarmRecord.setProcessstatus("0");
traffAlarmRecord.setAreaid(Long.valueOf(trffClientMessage.getDept()));
traffAlarmRecord.setRecordtype(trffClientMessage.getIncident_type().toLowerCase());
traffAlarmRecord.setVideourlfrom(trffClientMessage.getVideo_record_url());
traffAlarmRecord.setRecordtime(new Date(Long.valueOf(trffClientMessage.getTs())));
traffAlarmRecord.setCreatetime(new Date());
traffAlarmRecord.setObjlable(trffClientMessage.getObjLabel());
traffAlarmRecord.setPushstatus(9);//设置为未推送
traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord);
try {
String basepath = "gstraff/" + traffAlarmRecord.getFdid() + (traffAlarmRecord.getChannelid() < 10 ? "0" + traffAlarmRecord.getChannelid() : traffAlarmRecord.getChannelid()) + "/" + DateUtils.formatCurrDayNoSign();
Map<String, Object> transferRecordMap = new HashMap<>();
FileTransferManager.fetchUrlsFromRecord(traffAlarmRecord, transferRecordMap);
List<TransferResult> results = FileTransferManager.transferFile(transferRecordMap, ftp, basepath);
log.info("缓存数据上传结果:" + results.toString());
Traffalarmrecord recordBak = FileTransferManager.traffAlarmRecordUrlUpdate(results);
traffAlarmRecord.setImg2path(recordBak.getImg2path());
traffAlarmRecord.setImg3path(recordBak.getImg3path());
traffAlarmRecord.setImg4path(recordBak.getImg4path());
traffAlarmRecord.setImg5path(recordBak.getImg5path());
traffAlarmRecord.setVideopath(recordBak.getVideopath());
if (traffAlarmRecord != null && traffAlarmRecord.getRecordid() != null) {
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
}
//如果监控视频为空,延时加载重新获取
if(null==recordBak.getVideopath() || "".equalsIgnoreCase(recordBak.getVideopath())) {
schedulepool.schedule(()->{
for (String key : transferRecordMap.keySet()) {
if (key.equals("videopath")) {
final String url = transferRecordMap.get(key).toString();
try {
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
//延迟连接
connection.setReadTimeout(2000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
String pname = DateUtils.formatCurrDayNoSign() + "_" + recordid + "_0000_" + key.replace("path", "");
String fileName = pname + ".mp4";
String ftputl = FTPUtil.getFtpUrl(ftp) + basepath + "/" + fileName;
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
InputStream inputStream = connection.getInputStream();
log.info("url:", url, " --- " + "key:", key);
boolean r = FTPUtil.uploadFile(ftp, basepath, fileName, inputStream);
if (r) traffAlarmRecord.setVideopath(ftputl);
//只更新监控
if (traffAlarmRecord != null && traffAlarmRecord.getRecordid() != null) {
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
//推送给第三方
ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
log.info("调用接口推送给广达返回信息:" + obj.toString());
}
}
} catch (IOException e) {
System.out.println(e.toString());
log.error(e.toString());
return 0;
}
}
}
return 1;
},30, TimeUnit.SECONDS); //等待30秒钟执行
}else {
//推送给第三方
ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
log.info("调用接口推送给广达返回信息:" + obj.toString());
}
} catch (Exception e) {
log.error("解析事件告警数据异常 :" + e.getMessage());
}
}
private RateLimiter getRateLimiter(String videoId) {
RateLimiter rateLimiter;
......
......@@ -110,14 +110,14 @@ public class WebSocket {
try {
List<Map> map = new ArrayList<>();
if (null != webSocketSet.get(name) && null != webSocketSet.get(name).session && null != webSocketSet.get(name).session.getBasicRemote()) {
log.info("name" + name);
// log.info("name" + name);
List<Map> dDayFlowmap = traffFlowService.selectFiveAndDayFlow(name);
if (null != dDayFlowmap)
map.addAll(dDayFlowmap);
List<Map> TypeDayFlow = traffFlowService.selectFiveAndTypeDayFlow(name);
if (null != dDayFlowmap)
map.addAll(TypeDayFlow);
if (!map.isEmpty()) {
if (!map.isEmpty() && null!=webSocketSet.get(name)&&null!=webSocketSet.get(name).session) {
synchronized (webSocketSet.get(name).session) {
webSocketSet.get(name).session.getBasicRemote().sendText(JSONUtil.toJsonStr(map));
}
......
......@@ -39,13 +39,13 @@ public class MyApplicationStartingEventListener implements ApplicationListener<S
ThreadPoolUtil.getSchedulePool().scheduleWithFixedDelay(() -> {
CacheLoadService cacheLoadService = applicationContext.getBean(CacheLoadService.class);
cacheLoadService.loadFtpCache();
}, 200, 60, TimeUnit.SECONDS);
}, 200, 120, TimeUnit.SECONDS);
//判断第三方登录是否有效
ThreadPoolUtil.getSchedulePool().scheduleWithFixedDelay(() -> {
QingZhiLoginCacheService qingZhiLoginCacheService = applicationContext.getBean(QingZhiLoginCacheService.class);
qingZhiLoginCacheService.keepAlive();
}, 3, 60, TimeUnit.SECONDS);
// //判断第三方登录是否有效
// ThreadPoolUtil.getSchedulePool().scheduleWithFixedDelay(() -> {
// QingZhiLoginCacheService qingZhiLoginCacheService = applicationContext.getBean(QingZhiLoginCacheService.class);
// qingZhiLoginCacheService.keepAlive();
// }, 3, 120, TimeUnit.SECONDS);
//查询flv值
ThreadPoolUtil.getSchedulePool().scheduleWithFixedDelay(() -> {
......
......@@ -21,6 +21,6 @@ public interface TraffAlarmRecordMapper extends BaseMapper<Traffalarmrecord>, Co
int selectmax();
Integer statisTraffalarmrecordstatByDay(String starttime);
Integer statisTraffalarmrecordstatByDay();
}
\ No newline at end of file
......@@ -17,9 +17,14 @@ public interface TraffFlowMapper {
List<Map> selectFiveAndDayFlow(String videoid);
List<Map> selectFiveAndTypeDayFlow(String videoid);
Integer statisVehiclesByDay(String startime);
Integer statisVehiclesByDay();
Integer deleteBeforeTwoMonthVehiclesDetails(String startime);
Integer deleteVehiclesByDay();
Integer insertOrUpdatevehicleTodaystatistic();
String selectSbtddspsrRtspByVideoid(String videoid);
}
......@@ -10,6 +10,7 @@ import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
@Slf4j
......@@ -27,11 +28,11 @@ public class CacheLoadService {
public boolean loadFtpCache() {
try {
Storageserver server = new Storageserver();
server.setServerstatus(0);//õ
server.setServerstatus(0);//�����õ�
server.setServertype("ftp");
List<Storageserver> storageServers = storageServerMapper.queryStorageServerAll(server);
if (!storageServers.isEmpty() && storageServers.size() > 0) {
stringRedisTemplate.opsForValue().set(ftplistkey, new Gson().toJson(storageServers));
stringRedisTemplate.opsForValue().set(ftplistkey, new Gson().toJson(storageServers),60*10, TimeUnit.SECONDS);
} else {
System.out.println("storageServers.isEmpty");
}
......
......@@ -43,24 +43,26 @@ public class QingZhiLoginCacheService {
log.info("config params:"+ "redis key:" + qztoken +" ,username:"+username+ " ,password:"+password+ " ,url:"+url+ " ,keepaliveurl:"+keepaliveurl);
}
public void keepAlive() {
public String keepAlive() {
String token=null;
try {
String tokencache = stringRedisTemplate.opsForValue().get(qztoken);
if (tokencache != null) {
doKeepAlive(tokencache);
token= doKeepAlive(tokencache);
} else {
tokencache = loginServer(url, username, password);
if (tokencache != null) {
doKeepAlive(tokencache);
token=doKeepAlive(tokencache);
}
}
} catch (Exception e) {
System.out.println(e.toString());
log.error(e.getMessage());
}
return token;
}
private void doKeepAlive(String token) {
private String doKeepAlive(String token) {
HttpHeaders headers = getHttpHeaders();
headers.add("token",token);
HttpEntity<String> requestEntity = new HttpEntity<>(null, headers);
......@@ -73,6 +75,7 @@ public class QingZhiLoginCacheService {
} else {
log.error("doKeepAlive response " + "empty...");
}
return newToken;
}
private String loginServer(String url, String username, String password) {
......
......@@ -32,8 +32,8 @@ public class TraffFlowService {
return traffFlowMapper.selectFiveAndTypeDayFlow(videoid);
}
public Integer statisVehiclesByDay(String starttime) {
return traffFlowMapper.statisVehiclesByDay(starttime);
public Integer statisVehiclesByDay() {
return traffFlowMapper.statisVehiclesByDay();
}
@Transactional(rollbackFor = Exception.class)
public void deleteBeforeTwoMonthVehicles(String starttime) {
......
......@@ -15,8 +15,8 @@ public class TraffalarmrecordService {
@Autowired
private TraffAlarmRecordMapper traffalarmrecordMapper;
public Integer statisTraffalarmrecordstatByDay(String starttime) {
return traffalarmrecordMapper.statisTraffalarmrecordstatByDay(starttime);
public Integer statisTraffalarmrecordstatByDay() {
return traffalarmrecordMapper.statisTraffalarmrecordstatByDay();
}
......
......@@ -50,6 +50,9 @@ public class TraffdevicewriteresultService {
@Value("${qingzhi.devicewrite.url}")
String devicewriteurl;
@Autowired
QingZhiLoginCacheService qingZhiLoginCacheService;
private static CompletionService<ResultObj> threadService = new ExecutorCompletionService<ResultObj>(ThreadPoolUtil.getPool());
public List<ResultObj> sendDevices(List<String> xhs) {
......@@ -60,6 +63,7 @@ public class TraffdevicewriteresultService {
for (SbtdspsrParams val : sbtdspsrs) {
threadService.submit(() -> {
log.info("sendtoguangda"+val.toString());
ResultObj obj = sendToGuangda(val, param);
obj.setData(val.getSbbh() + "_" + val.getTdbh());
return obj;
......@@ -96,6 +100,7 @@ public class TraffdevicewriteresultService {
WriteResultObj writeResultObj=null;
try {
//调用第三方的restful
log.error("调用第三方的restful" + item.toString());
writeResultObj = deviceWritePost(item);
}
catch (TimeoutException e) {
......@@ -238,11 +243,15 @@ public class TraffdevicewriteresultService {
private WriteResultObj deviceWritePost(DeviceWriteParam param) throws TimeoutException, InterruptedException,Exception{
HttpHeaders headers = new HttpHeaders();
String token = stringRedisTemplate.opsForValue().get(qztoken);
if (token == null) {
// String token = stringRedisTemplate.opsForValue().get(qztoken);
// if (token == null) {
//登录
log.info("login", qztoken);
String token= qingZhiLoginCacheService.keepAlive();
log.error("deviceWritePost {} : redis token empty..", qztoken);
return null;
}
// return null;
// }
log.info("qztoken send guangda without keepalive", qztoken);
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
headers.add("token", token);//
List<DeviceWriteParam> list = new ArrayList<>();
......
......@@ -94,8 +94,8 @@
ruletag,
count(*) total
from vehicle b
where CREATE_TIME>= to_date( #{starttime}|| ' 00:00:00','yyyy-mm-dd hh24:mi:ss')
and CREATE_TIME<![CDATA[ <= ]]>to_date(#{starttime}||' 23:59:59','yyyy-mm-dd hh24:mi:ss')
where CREATE_TIME>= TRUNC (SYSDATE-1)
and CREATE_TIME<![CDATA[ < ]]>TRUNC(SYSDATE)
group by to_char(CREATE_TIME, 'yyyy-mm-dd'),
type,
ruletag,
......
......@@ -122,8 +122,8 @@
sum( case when pushstatus =0 then 1 else 0 end)successpush,
sum( case when pushstatus =1 then 1 else 0 end)failpush
FROM TRAFFALARMRECORD A
where A.RECORDTIME>=to_date(#{starttime}||' 00:00:00','YYYY-MM-DD HH24:MI:SS')
and A.RECORDTIME<![CDATA[ <= ]]>to_date(#{starttime}||' 23:59:59','YYYY-MM-DD HH24:MI:SS')
where A.RECORDTIME>=TRUNC(SYSDATE-1)
and A.RECORDTIME<![CDATA[ < ]]>TRUNC(SYSDATE)
group by to_char(A.RECORDTIME,'yyyy-MM-DD'),recordtype,A.FDID,CHANNELID,areaid
</insert>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment