Commit 68f8dcfe authored by 高飞's avatar 高飞

init

parents
*.js linguist-language=Java
*.css linguist-language=Java
*.html linguist-language=Java
The MIT License (MIT)
Copyright (c) 2018 杨东川
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
\ No newline at end of file
# Excel_To_DB
一款将Excel表格中的数据导入至数据库中的小工具,SpringBoot+EasyPoi+Redis消息队列实现Excel批量异步导入数据库
详细介绍:
1. [【Excel_To_DB】SpringBoot+EasyPoi+Redis消息队列实现Excel批量异步导入数据库(一)](http://blog.csdn.net/yangdongchuan1995/article/details/79277834)
2. [【Excel_To_DB】SpringBoot+EasyPoi+Redis消息队列实现Excel批量异步导入数据库(二)](http://blog.csdn.net/YangDongChuan1995/article/details/79285341)
3. [【Excel_To_DB】SpringBoot+EasyPoi+Redis消息队列实现Excel批量异步导入数据库(三)](http://blog.csdn.net/yangdongchuan1995/article/details/79290027)
This diff is collapsed.
This diff is collapsed.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com</groupId>
<artifactId>hzjt</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>hzjt</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- easypoi -->
<dependency>
<groupId>cn.afterturn</groupId>
<artifactId>easypoi-base</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>cn.afterturn</groupId>
<artifactId>easypoi-web</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>cn.afterturn</groupId>
<artifactId>easypoi-annotation</artifactId>
<version>3.0.3</version>
</dependency>
<!-- mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.1</version>
</dependency>
<!-- oracle -->
<!-- 数据库相关, 驱动、mybatis、分页插件 -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0</version>
</dependency>
<!-- 连接池druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.7</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.38</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.12</version>
</dependency>
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.13.0</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-base</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-spring-boot-starter</artifactId>
<version>2.1.5</version>
</dependency>
<dependency>
<groupId>tk.mybatis</groupId>
<artifactId>mapper-core</artifactId>
<version>1.1.5</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>4.1.21</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.9.6</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>1.4</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>jms</groupId>
<artifactId>aqapi_g</artifactId>
<version>1</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/aqapi_g.jar</systemPath>
</dependency>
<dependency>
<groupId>jms</groupId>
<artifactId>jmscommon</artifactId>
<version>1</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/jmscommon.jar</systemPath>
</dependency>
<dependency>
<groupId>jms</groupId>
<artifactId>jta</artifactId>
<version>1</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/jta.jar</systemPath>
</dependency>
<dependency>
<groupId>jms</groupId>
<artifactId>ojdbc6</artifactId>
<version>1</version>
<scope>system</scope>
<systemPath>${pom.basedir}/lib/ojdbc6.jar</systemPath>
</dependency>
<!-- <dependency>-->
<!-- <groupId>jms</groupId>-->
<!-- <artifactId>orai18n</artifactId>-->
<!-- <version>1</version>-->
<!-- <scope>system</scope>-->
<!-- <systemPath>${basedir}/lib/orai18n.jar</systemPath>-->
<!-- </dependency>-->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<includeSystemScope>true</includeSystemScope>
</configuration>
</plugin>
<!-- <plugin>-->
<!-- <artifactId>maven-assembly-plugin</artifactId>-->
<!-- <configuration>-->
<!-- <archive>-->
<!-- <manifest>-->
<!-- <mainClass>com.allen.capturewebdata.Main</mainClass>-->
<!-- </manifest>-->
<!-- </archive>-->
<!-- <descriptorRefs>-->
<!-- <descriptorRef>jar-with-dependencies</descriptorRef>-->
<!-- </descriptorRefs>-->
<!-- </configuration>-->
<!-- </plugin>-->
</plugins>
</build>
</project>
package com.hzjt;
import com.hzjt.redis.Receiver;
import com.hzjt.util.Constant;
import tk.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
@SpringBootApplication
@ServletComponentScan
@MapperScan("com.hzjt.mapper")
@PropertySource("file:${spring.profiles.path}")
public class HZJTApplication {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapterSingle, MessageListenerAdapter listenerAdapterList) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 注入多个消息监听器(receiveSingle/receiveList)
container.addMessageListener(listenerAdapterSingle, new PatternTopic(Constant.receiveSingle));
container.addMessageListener(listenerAdapterList, new PatternTopic(Constant.receiveList));
return container;
}
@Bean
MessageListenerAdapter listenerAdapterSingle(Receiver receiver) {
return new MessageListenerAdapter(receiver, Constant.singleMethodName);
}
@Bean
MessageListenerAdapter listenerAdapterList(Receiver receiver) {
return new MessageListenerAdapter(receiver, Constant.listMethodName);
}
@Bean
Receiver receiver() {
return new Receiver();
}
@Value("${ip.host}")
private String ip;
//
// @Bean
// public RestTemplate restTemplate(RestTemplateBuilder builder) {
// // Do any additional configuration here
// return builder.build();
// }
// @Bean
// public WebSocketClient webSocketClient() {
//
// List<WebSocketClient> list=new ArrayList<>();
//
// String[] ips = iptr.split(",");
// for (String ip : ips) {
// try {
// MyWebSocketClient client = new MyWebSocketClient(new URI("ws://" + ip + "8001/api/message/sub/traffic-incident/alarm"));
// client.connect();
// if (!client.getReadyState().equals(WebSocket.READYSTATE.OPEN)) {
// //连接失败,重新连接
// client.connect();
// list.add(client);
// }
//
// } catch (URISyntaxException e) {
// e.printStackTrace();
// }
//
// }
// return list;
// }
// @Bean
// public WebSocketClient webSocketClient() {
// try {
// MyWebSocketClient client = new MyWebSocketClient(new URI("ws://" + ip + ":8001/api/message/sub/traffic-incident/alarm"));
//// MyWebSocketClient client = new MyWebSocketClient(new URI("ws://local" + ip + "8001/api/message/sub/traffic-incident/alarm"));
////
// if (!client.isOpen()) {
// if (client.getReadyState().equals(WebSocket.READYSTATE.NOT_YET_CONNECTED)) {
// try {
// client.connect();
// } catch (IllegalStateException e) {
// }
// } else if (client.getReadyState().equals(WebSocket.READYSTATE.CLOSING) || client.getReadyState().equals(WebSocket.READYSTATE.CLOSED)) {
// client.reconnect();
// }
// }
// return client;
//
// } catch (URISyntaxException e) {
// e.printStackTrace();
// }
// return null;
// }
public static void main(String[] args) {
// ConfigurableApplicationContext applicationContext=SpringApplication.run(HZJTApplication.class,args);
// WebSocket.setApplicationContext(applicationContext);
SpringApplication.run(HZJTApplication.class,args);
}
}
package com.hzjt.config;
import cn.hutool.json.JSONUtil;
import com.hzjt.domain.*;
import com.hzjt.util.JsonUtil;
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import javax.jms.*;
import javax.jms.Message;
import javax.jms.Queue;
import java.util.*;
//
@Component
@Slf4j
public class JMS_Oracle implements CommandLineRunner {
@Autowired
JmsConfig config;
@Autowired
private RestTemplate restTemplate;
// @Autowired
// private RestTemplate restTemplate;
@Override
public void run(String... strings) {
try {
QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.getJdbcUrl(),
new Properties());
QueueConnection conn = queueConnectionFactory.createQueueConnection(config.getUsername(), config.getPassword());
AQjmsSession session = (AQjmsSession) conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Queue queue = (AQjmsDestination) session.getQueue(config.getUsername(), config.getQueueName());
MessageConsumer consumer = session.createConsumer(queue, null, QUEUE_MESSAGE_TYPE.getFactory(), null, false);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
try {
QUEUE_MESSAGE_TYPE payload = (QUEUE_MESSAGE_TYPE) adtMessage.getAdtPayload();
//查询traffconfig 的相关信息推送给第三方数据
//20200604202047023009_1 rtsp://33.52.1.223:5544/1_20200604202047023009_0_0 rtmp://33.51.6.98:1935/live/room24 http://33.51.6.98/live/room?port=1935&app=live&stream=room24
//调用restful 接口直接将数据新增或者更新到python 表中
// {"name":"20200305112026668_1","rtsp_address":"rtsp://33.55.1.81:5544/1_20200305112026668_0_0","rtmp_address":"rtmp://33.51.6.98:1935/live/room954","http_flv_address":"http://33.51.6.98/live/room?port=1935&app=live&stream=room954"}
String msg = payload.getContent();
if (msg.indexOf(",") >= 0) {
String videoid = msg.split(",")[0];
String rtsp = msg.split(",")[1];
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
//查询监控是否存在
PythonQueryResult result = restTemplate.getForObject("http://33.51.6.96:" + 5000 + "/rest/video/query/" + videoid, PythonQueryResult.class);
String[][] list = result.getData();
Rtsprtmp rtmp = new Rtsprtmp();
String id = UUID.randomUUID().toString();
rtmp.setHttp_flv_address("http://33.51.6.98/live/room?port=1935&app=live&stream=room" + id);
rtmp.setName(videoid);
rtmp.setRtsp_address(rtsp);
rtmp.setRtmp_address("rtmp://33.51.6.98:1935/live/room" + id);
//重新拼接rtsp,rtmp
List<Rtsprtmp> rtmplist = new ArrayList<>();
rtmplist.add(rtmp);
HttpEntity<String> requestEntity = new HttpEntity<>(JSONUtil.toJsonStr(rtmplist), headers);
if (null != list && list.length > 0 && list[0].length > 0) {
for (String[] items : list) {
//删除已有的
PythonResult ltObj = restTemplate.getForObject("http://33.51.6.96:" + 5000 + "/rest/video/delete/" + items[0], PythonResult.class);
if (ltObj.getData() != 1 && ltObj.getError() != null) {
log.info(items[0] + "删除失败" + videoid + "rtsp=" + rtsp + "error");
}
}
PythonResult result1 = restTemplate.postForObject("http://33.51.6.96:" + 5000 + "/rest/video/add", requestEntity, PythonResult.class);
if (null != result1.getError()) {
log.info(videoid + "rtsp=" + rtsp + "error");
}
} else {
PythonResult result2 = restTemplate.postForObject("http://33.51.6.96:" + 5000 + "/rest/video/add", requestEntity, PythonResult.class);
if (null != result2.getError()) {
log.info(videoid + "rtsp=" + rtsp + "error");
}
}
}
} catch (Exception e) {
log.info("rtsp=error" + e.toString());
}
}
});
} catch (Exception e) {
log.info("rtsp=error" + e.toString());
}
}
}
package com.hzjt.config;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@Data
public class JmsConfig {
@Value("${spring.datasource.username}")
private String username;
@Value("${spring.datasource.password}")
private String password;
@Value("${spring.datasource.url}")
private String jdbcUrl;
@Value("${jms.queueName}")
private String queueName;
}
\ No newline at end of file
package com.hzjt.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
@Configuration
public class RestTemplateConfig {
@Bean
public RestTemplate restTemplate(ClientHttpRequestFactory factory){
return new RestTemplate(factory);
}
@Bean
public ClientHttpRequestFactory simpleClientHttpRequestFactory(){
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setConnectTimeout(15000);
factory.setReadTimeout(5000);
return factory;
}
}
\ No newline at end of file
package com.hzjt.config;
import com.hzjt.handler.WebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.net.URI;
/**
* 定时任务
*
* @author tycoding
* @date 2019-06-17
*/
//@Slf4j
//@Component
//@Configuration
//@EnableScheduling
public class ScheduleTaskConfig {
// private static final Long MINUTE_30 = 1000 * 60 * 30L;
//
//// @Value("${ip.host}")
//// private String iptr;
//
// @Scheduled(cron = "59 * * * * ?")//每59s 触发一次
// private void reconnectWebsocket() {
//
//// WebSocket webSocket=new WebSocket();
// //根据连接的name ,群发根据videoid 查询的结果
//
//
//
//// String[] ips = iptr.split(",");
//// for (String ip : ips) {
//// try {
////// MyWebSocketClient client = new MyWebSocketClient(new URI("ws://" + ip + "8001/api/message/sub/traffic-incident/alarm"));
////// if (!client.isOpen()) {
////// if (client.getReadyState().equals(WebSocket.READYSTATE.NOT_YET_CONNECTED)) {
////// try {
////// client.connect();
////// } catch (IllegalStateException e) {
////// }
////// } else if (client.getReadyState().equals(WebSocket.READYSTATE.CLOSING) || client.getReadyState().equals(WebSocket.READYSTATE.CLOSED)) {
////// client.reconnect();
////// }
////// }
////
////
//// }catch (Exception ex){
////
////
//// }
// } private static final Long MINUTE_30 = 1000 * 60 * 30L;
//
//// @Value("${ip.host}")
//// private String iptr;
//
// @Scheduled(cron = "59 * * * * ?")//每59s 触发一次
// private void reconnectWebsocket() {
//
//// WebSocket webSocket=new WebSocket();
// //根据连接的name ,群发根据videoid 查询的结果
//
//
//
//// String[] ips = iptr.split(",");
//// for (String ip : ips) {
//// try {
////// MyWebSocketClient client = new MyWebSocketClient(new URI("ws://" + ip + "8001/api/message/sub/traffic-incident/alarm"));
////// if (!client.isOpen()) {
////// if (client.getReadyState().equals(WebSocket.READYSTATE.NOT_YET_CONNECTED)) {
////// try {
////// client.connect();
////// } catch (IllegalStateException e) {
////// }
////// } else if (client.getReadyState().equals(WebSocket.READYSTATE.CLOSING) || client.getReadyState().equals(WebSocket.READYSTATE.CLOSED)) {
////// client.reconnect();
////// }
////// }
////
////
//// }catch (Exception ex){
////
////
//// }
// }
}
package com.hzjt.config;
import org.apache.tomcat.util.threads.ThreadPoolExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class ThreadPoolConfig {
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 设置核心线程数
executor.setCorePoolSize(5);
// 设置最大线程数
executor.setMaxPoolSize(10);
// 设置队列容量
executor.setQueueCapacity(20);
// 设置线程活跃时间(秒)
executor.setKeepAliveSeconds(60);
// 设置默认线程名称
executor.setThreadNamePrefix("hello-");
// 设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任务结束后再关闭线程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
package com.hzjt.config;
import com.hzjt.handler.WebSocket;
import com.hzjt.service.TraffFlowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* ServerEndpointExporter 作用
*
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*
* @return
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
@Autowired
public void setTraffFlowService(TraffFlowService service)
{
WebSocket.traffFlowService=service;
}
}
\ No newline at end of file
package com.hzjt.controller;
import com.hzjt.domain.ResultObj;
import com.hzjt.domain.Traffalarmrecord;
import com.hzjt.service.EventWriteService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.ArrayList;
import java.util.List;
@RestController
public class EventWriteController {
@Autowired
private EventWriteService eventWriteService;
@PostMapping("/sendEvent")
public ResultObj sendEvent(Long recordid){
return eventWriteService.sendEvent(recordid);
}
@PostMapping("/sendEvents")
public List<ResultObj> sendEvents(@RequestParam("recordid") String recordid){
List<Long> longrecorid=new ArrayList<>();
for(String id:recordid.split(","))
{
longrecorid.add(Long.valueOf(id));
}
return eventWriteService.sendEvents(longrecorid);
}
@GetMapping("/sendEventsByids")
public List<ResultObj> sendEventsByids(){
List<Long> recordid=new ArrayList<>();
recordid.add(Long.valueOf(93228));
recordid.add(Long.valueOf(93834));
return eventWriteService.sendEvents(recordid);
}
}
package com.hzjt.controller;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import java.util.Set;
/**
* 路由接口控制器
*
* @author tycoding
* @date 2019-06-13
*/
@Slf4j
@Controller
public class RouterController {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 登陆页面
*
* @return
*/
@GetMapping("/")
public String index() {
return "login";
}
/**
* 首页入口
*
* @return
*/
@GetMapping("/{id}/chat")
public String index(@PathVariable("id") String id) {
return "index";
}
}
package com.hzjt.controller;
import com.google.common.util.concurrent.RateLimiter;
import com.hzjt.domain.*;
import com.hzjt.handler.WebSocket;
import com.hzjt.mapper.SbtdspsrMapper;
import com.hzjt.service.ImportService;
import com.hzjt.service.TraffFlowService;
import com.hzjt.util.DateUtils;
import com.hzjt.util.JsonUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@RestController
public class TraffController {
@Autowired
ImportService importService;
@Autowired
WebSocket webSocket;
private Map<String, RateLimiter> rateLimiterMap;
private String port;
private String dept;
private Integer rate = 10;
private SbtdspsrMapper sbtdspsrMapper;
@Autowired
TraffFlowService traffFlowService;
// @Autowired
// private SimpMessagingTemplate template;
private static final String TYPE = "TRAFFIC_INCIDENT_ALARM";
private static final Logger log = LoggerFactory.getLogger(TraffController.class);
@Autowired
public TraffController(
@Value("${port}") String port,
@Value("${message.dept}") String dept,
@Value("${message.rate}") Integer rate, SbtdspsrMapper sbtdspsrMapper) {
this.port = port;
this.rate = rate;
this.dept = dept.substring(0, 5);
this.sbtdspsrMapper = sbtdspsrMapper;
rateLimiterMap = new ConcurrentHashMap<>(16);
}
@PostMapping("/alarmevent")
public ResultObj rece(@RequestBody Alarm trffClientMessage) {
log.debug("/event/receive接收到数据:" + trffClientMessage.toString());
if (!TYPE.equals(trffClientMessage.getType())) {
log.info("type类型不正确" + trffClientMessage.toString());
return ResultObj.error(ResponseEnum.E_1002.getCode(), "type类型不正确");
}
log.debug("/event/receive data:" + trffClientMessage.toString());
if (trffClientMessage.getImg_urls().isEmpty()) {
log.info("img_urls值不能为空" + trffClientMessage.toString());
return ResultObj.error(ResponseEnum.E_1004.getCode(), "img_urls值不能为空");
}
if (StringUtils.isBlank(trffClientMessage.getIncident_type())) {
log.info("incident_type值不能为空" + trffClientMessage.toString());
return ResultObj.error(ResponseEnum.E_1004.getCode(), "incident_type值不能为空");
}
String videoId = trffClientMessage.getVideo_id();
if (StringUtils.isBlank(videoId) || !videoId.contains("_") || videoId.split("_").length != 2) {
log.info("video_id值异常 值为:" + videoId);
return ResultObj.error(ResponseEnum.E_1002.getCode(), "video_id值异常 值为:" + videoId);
}
/* 限流 */
if (!getRateLimiter(videoId).tryAcquire()) {
log.info("[事件推送]->设备" + videoId + "-推送已达到限流限制");
return ResultObj.error(ResponseEnum.E_9999.getCode(), "设备" + videoId + "请求过于频繁");
}
String[] sbAndTd = videoId.split("_");
String sbbh = sbAndTd[0];
int tdbh = Integer.valueOf(sbAndTd[1]) + 1;
List<Sbtdspsr> sbtdspsrs = sbtdspsrMapper.selectBySbbh(sbbh, tdbh);
if (sbtdspsrs.isEmpty()) {
log.info("设备为:" + sbbh + ",通道为:" + tdbh + "未录入(备案)");
return ResultObj.error(ResponseEnum.E_1002.getCode(), "设备为:" + sbbh + ",通道为:" + tdbh + "未录入(备案)");
}
String xzbh = sbtdspsrs.get(0).getXzbh();
if (xzbh.length() != 12) {
log.info("设备为:" + sbbh + ",通道为:" + tdbh + "配置的行政区划" + xzbh + "不合规");
return ResultObj.error(ResponseEnum.E_1002.getCode(), "设备为:" + sbbh + ",通道为:" + tdbh + "配置的行政区划不合规");
}
//ts 時間轉成正常時間
trffClientMessage.setDept(xzbh);
Map map = new HashMap();
map.put("type", "alarm");
map.put("data", trffClientMessage);
WebSocket.GroupSending(JsonUtil.beanToString(map));
try { // 清空redis中的部分旧数据
importService.cleanCache();
// 将参数result中的部分数据存入redis中,并把格式校验成功的数据发布至对应频道中
importService.cacheAndPublish(JsonUtil.beanToString(trffClientMessage));
} catch (Exception e) {
log.error("MessageController receive putData error:" + e.toString());
//return ResultObj.error(ResponseEnum.E_9999.getCode(), e.toString());
}
return ResultObj.ok(trffClientMessage);
}
private RateLimiter getRateLimiter(String videoId) {
RateLimiter rateLimiter;
if (rateLimiterMap.containsKey(videoId)) {
rateLimiter = rateLimiterMap.get(videoId);
} else {
RateLimiter value = RateLimiter.create(rate);
rateLimiter = rateLimiterMap.putIfAbsent(videoId, value);
if (rateLimiter == null) {
rateLimiter = value;
}
}
return rateLimiter;
}
//车流量推送
@PostMapping("/traffflow")
public ResultObj traffflow(@RequestBody Vehicles vehicles) {
if ("TRAFFIC_STATISTICS_VEHICLES".equalsIgnoreCase(vehicles.getType())) {
String time=DateUtils.formatDate(new Date(Long.valueOf(vehicles.getTs())));
vehicles.setTs(time);
String[] sbAndTd = vehicles.getVideo_id().split("_");
String sbbh = sbAndTd[0];
int tdbh = Integer.valueOf(sbAndTd[1]) + 1;
//重置videoid
vehicles.setVideo_id(sbbh+"_"+tdbh);
log.debug("/event/receive接收到数据:" + vehicles.toString());
//直接放入表中
traffFlowService.saveTraffFlow(vehicles);
List<Vehiclesdetail> vels=vehicles.getObjs() ;
for(Vehiclesdetail detail : vels){
log.info(detail.toString());
detail.setObj_id(UUID.randomUUID().toString());
detail.setVehiclesid(vehicles.getId());
traffFlowService.saveTraffFlowDetail(detail);
}
//查询近五分钟的车流量,当天车流量websocket 直接推送过去
log.info("schedule >>>>>>>>>> WebSocket");
//根据连接的name ,群发根据videoid 查询的结果
webSocket.GroupSendingByVideoid(vehicles.getVideo_id());
}
return ResultObj.ok();
}
//获取自动规则
@PostMapping("/autoRule")
public ResultObj autoRule(@RequestBody AutoRule rules) {
// log.info(rules.toString());
Map map = new HashMap();
map.put("type", "rule");
map.put("data", rules);
webSocket.AppointSending(rules.getVideo_id(),JsonUtil.beanToString(map));
return ResultObj.ok();
}
//获取自动规则
@GetMapping("/test")
public ResultObj autoRule() {
AutoRule rule=new AutoRule();
Map<String, BigDecimal> v=new HashMap<>();
v.put("x",BigDecimal.valueOf(111.2));
v.put("y",BigDecimal.valueOf(64.09232));
Map<String, BigDecimal> v1=new HashMap();
v1.put("x",BigDecimal.valueOf(111.2));
v1.put("y",BigDecimal.valueOf(64.09232));
Map<String, BigDecimal> v2=new HashMap();
v2.put("x",(BigDecimal.valueOf(111.2)));
v2.put("y",BigDecimal.valueOf(64.000923232));
List<Map<String, BigDecimal> > list=new ArrayList<>();
list.add(v);
list.add(v1);
list.add(v2);
List<Map<String, BigDecimal>> list2=new ArrayList<>();
list2.add(v);
list2.add(v1);
list2.add(v2);
List list3=new ArrayList();
list3.add(list);
list3.add(list3);
rule.setRule_area(list3);
//webSocket.AppointSending(rules.getVideo_id(),JsonUtil.beanToString(map));
return ResultObj.ok();
}
}
package com.hzjt.controller;
import com.hzjt.domain.ResultObj;
import com.hzjt.service.TraffdevicewriteresultService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletResponse;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@RestController
@RequestMapping("/traffdevicewriteresult")
public class TraffdevicewriteresultController {
@Autowired
private TraffdevicewriteresultService traffdevicewriteresultService;
/***
* 豸Ϣ
* @param xhs
* @return
*/
@PostMapping("/sendDevices")
public List<ResultObj> sendDevices(@RequestParam("xhs") String xhs){
return traffdevicewriteresultService.sendDevices( Arrays.asList(xhs.split(",")));
}
@GetMapping("/sendDevices")
public List<ResultObj> sendEventsByids(){
List<String> recordid=new ArrayList<>();
recordid.add("5988892e-1dcf-4925-85a7-918e2b101040");
recordid.add("5c606b11-aab4-441e-90c3-70bc26f10733");
return traffdevicewriteresultService.sendDevices(recordid);
}
@RequestMapping("/registall")
public ResultObj registall(HttpServletResponse response) {
return traffdevicewriteresultService.registall(response);
}
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@Data
public class Alarm {
private String type;
private String id;
private String video_id;
private String ts;
private String incident_type;
private List<String> img_urls;
private String video_record_url;
private Map<String, BigDecimal> obj_location;
private List<String> img_base64;
private String dept;
private String objLabel;
}
package com.hzjt.domain;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@Data
public class AutoRule {
private String video_id;
private String ts;
private List<BigDecimal> rule_area_confidence;
private List<Map<String, BigDecimal>> det_area;
private List<Map<String, List<Map<String, BigDecimal>>>> rule_area;
private BigDecimal det_area_confidence;
}
package com.hzjt.domain;
import java.util.HashMap;
import java.util.Map;
/**
* @author chenl
* @version 1.0
* @date 2020/3/15 10:47
*/
public class CodeConst {
//设备注册
public static final Integer device_regist = 1;
//设备更新
public static final Integer device_update = 2;
//设备下架
public static final Integer device_remove = 3;
//状态报告
public static final Integer device_report = 4;
private static Map<Integer, String> eventType = new HashMap<>();
private static Map<String, Integer> eventTypeQZ = new HashMap<>();
static {
eventType.put(1, "异常停车");
eventType.put(2, "行人/非机动车闯入");
eventType.put(3, "倒行/逆行");
eventType.put(4, "拥堵");
eventType.put(5, "交通事件(事故发生)");
eventType.put(6, "恶劣天气-雨");
eventType.put(7, "恶劣天气-雪");
eventType.put(8, "烟火报警");
eventType.put(9, "烟雾报警");
eventType.put(10, "能见度过低");
eventType.put(11, "异常缓驶");
eventType.put(12, "路面障碍物");
eventType.put(13, "道路施工");
eventType.put(14, "拥堵结束");
eventType.put(15, "缓行结束");
eventType.put(16, "特殊事件");
eventType.put(17, "交通事件(事故行人下车)");
eventType.put(18, "其他事件");
}
static {
eventTypeQZ.put("congestion", 4);// 'congestion', '交通拥堵检测'
eventTypeQZ.put("not_congestion", 14);// 'not_congestion', '拥堵结束'
eventTypeQZ.put("illegal_parking", 1);// 'illegal_parking', '车辆异常停驶检测'
eventTypeQZ.put("wrong_direction", 3);// 'wrong_direction', '倒车/逆行检测'
eventTypeQZ.put("no_motor_ban", 2);// 'no_motor_ban', '行人/非机动车闯入检测'
eventTypeQZ.put("slow_deive", 11);// 'slow_deive', '车辆异常缓行检测'
eventTypeQZ.put("not_slow_deive", 15);// 'not_slow_deive', '缓行结束'
eventTypeQZ.put("single_slow_drive", 11);// 'single_slow_drive', '高速公路车辆单车异常缓行检测'
eventTypeQZ.put("accident", 5);// 'accident', '高速公路交通事故检测'
eventTypeQZ.put("roadworks", 13);// 'roadworks', '高速公路道路施工检测'
eventTypeQZ.put("roadworks_out_of_line", 19);// 'roadworks_out_of_line', '高速公路施工违规检测'
eventTypeQZ.put("severe_weather", 6);// 'severe_weather', '恶劣天气(雨雪)检测'
eventTypeQZ.put("low_visibility", 10);// 'low_visibility', '交通雾天检测'
eventTypeQZ.put("abandoned_object", 12);// 'abandoned_object', '路面障碍物检测'
eventTypeQZ.put("tunnel_fire", 8);// 'tunnel_fire', '高速公路隧道火焰检测'
eventTypeQZ.put("tunnel_smoke", 9);// 'tunnel_smoke', '高速公路隧道烟体检测'
eventTypeQZ.put("outside_fire", 8);// 'outside_fire', '交通室外火焰检测'
eventTypeQZ.put("outside_smoke", 9);// 'outside_smoke', '交通室外烟体检测'
}
public static String getEventType(int type) {
if (eventType.containsKey(type)) {
return eventType.get(type);
}
return null;
}
public static Integer getEventTypeQZ(String type) {
if (eventTypeQZ.containsKey(type)) {
Integer code = eventTypeQZ.get(type);
return code == null ? 18 : code;
}
return 18;
}
}
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class DeviceChannelid {
private String deviceid;
private Integer channelid;
}
package com.hzjt.domain;
import lombok.Data;
@Data
public class DeviceWriteParam {
private String deviceID;
private String DeviceType;
private String deviceName;
private String regionID;
private Integer deviceStatus;
private String deviceSupplier;
private Integer dataType;
private Integer RoadType;
private String locationType;
private String locationX;
private String locationY;
private String locationEndX;
private String locationEndY;
private Integer DeviceDirection;
private Integer StartNumK;
private Integer StartNumM;
private String RoadNum;
private String LocationDesc;
private Integer toll;
private String locationID;
private Integer serviceArea;
private Integer rampToll;
private Integer rampServiceArea;
private String rampHubRoad;
private Integer rampHubDirection;
}
package com.hzjt.domain;
import lombok.Data;
/**
* @author chenl
* @version 1.0
* @date 2020/3/15 22:18
*/
@Data
public class EventWriteParam {
private String eventID;
private String eventSource;
private String eventType;
private String occurTime;
private Long regionID;
private String eventDeviceID;
private String eventSupplier;
private String locationType;
private String locationX;
private String locationY;
private String locationEndX;
private String locationEndY;
private String eventProof1;
private String eventProof2;
private String eventProof3;
private String eventProof4;
private String eventProof5;
private String eventvideo1;
}
package com.hzjt.domain;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
public class Ftp {
private String ftpIp;
private Integer ftpPort;
private String ftpUsername;
private String ftpPassword;
@Override
public String toString() {
return "Ftp{" +
"ftpIp='" + ftpIp + '\'' +
", ftpPort=" + ftpPort +
", ftpUsername='" + ftpUsername + '\'' +
", ftpPassword='" + ftpPassword + '\'' +
'}';
}
}
package com.hzjt.domain;
import java.time.LocalDateTime;
import java.util.UUID;
public class Message extends MyMessage {
private String id = UUID.randomUUID().toString();
private LocalDateTime date = LocalDateTime.now();
private String sender;
private Message() {
}
private Message(String content, MessageType type, String sender) {
super(content, type);
this.sender = sender;
date = LocalDateTime.now();
}
public LocalDateTime getDate() {
return date;
}
public String getSender() {
return sender;
}
public String getId() {
return id;
}
public static class MessageBuilder {
private Message message;
private MessageBuilder(Message message) {
this.message = message;
}
public MessageBuilder withContent(String content) {
message.setContent(content);
return this;
}
public MessageBuilder withType(MessageType type) {
message.setType(type);
return this;
}
public MessageBuilder sentBy(String sender) {
message.sender = sender;
return this;
}
public static MessageBuilder empty() {
return new MessageBuilder(new Message());
}
public static MessageBuilder copyOf(Message message) {
Message newMessage = new Message();
newMessage.setContent(message.getContent());
newMessage.setType(message.getType());
newMessage.date = message.getDate();
newMessage.sender = message.getSender();
newMessage.id = message.getId();
return new MessageBuilder(newMessage);
}
public Message build() {
return message;
}
}
}
package com.hzjt.domain;
import lombok.Getter;
@Getter
public enum MessageEnum {
/* websocketmapkey */
MESSAGE_KEY("message");
private String value;
MessageEnum(String value){
this.value =value;
}
}
package com.hzjt.domain;
public enum MessageType {
INFO, WARNING, ERROR;
}
package com.hzjt.domain;
public class MyMessage {
private String content;
private MessageType type;
public MyMessage() {
}
public MyMessage(String content, MessageType type) {
this.content = content;
this.type = type;
}
protected void setContent(String content) {
this.content = content;
}
protected void setType(MessageType type) {
this.type = type;
}
public String getContent() {
return content;
}
public MessageType getType() {
return type;
}
}
This diff is collapsed.
package com.hzjt.domain;
import lombok.Data;
@Data
public class PythonQueryResult {
private String[][] data;
private String error;
}
package com.hzjt.domain;
import lombok.Data;
@Data
public class PythonResult {
private Integer data;
private String error;
}
package com.hzjt.domain;
import oracle.jdbc.driver.OracleConnection;
import oracle.jdbc.internal.OracleTypes;
import oracle.jpub.runtime.MutableStruct;
import oracle.sql.CustomDatum;
import oracle.sql.CustomDatumFactory;
import oracle.sql.Datum;
import oracle.sql.STRUCT;
import java.sql.SQLException;
/**
* @package_Name: com.scale
* @author: wangcy@xxx.qq.com
* @description: TODO
* @date: 2019-08-15 09:14
* @version: v1.0
*/
public class QUEUE_MESSAGE_TYPE implements CustomDatum, CustomDatumFactory {
public static final String _SQL_NAME = "QUEUE_MESSAGE_TYPE";
public static final int _SQL_TYPECODE = OracleTypes.STRUCT;
MutableStruct _struct;
// 12表示字符串
static int[] _sqlType = { 12 };
static CustomDatumFactory[] _factory = new CustomDatumFactory[1];
static final QUEUE_MESSAGE_TYPE _MessageFactory = new QUEUE_MESSAGE_TYPE();
public static CustomDatumFactory getFactory() {
return _MessageFactory;
}
public QUEUE_MESSAGE_TYPE() {
_struct = new MutableStruct(new Object[1], _sqlType, _factory);
}
public Datum toDatum(OracleConnection c) throws SQLException {
return _struct.toDatum(c, _SQL_NAME);
}
public CustomDatum create(Datum d, int sqlType) throws SQLException {
if (d == null)
return null;
QUEUE_MESSAGE_TYPE o = new QUEUE_MESSAGE_TYPE();
o._struct = new MutableStruct((STRUCT) d, _sqlType, _factory);
return o;
}
public String getContent() throws SQLException {
return (String) _struct.getAttribute(0);
}
}
package com.hzjt.domain;
public class ReceiveMessage {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
package com.hzjt.domain;
import lombok.Getter;
/**
* 通用返回值 枚举类
* @author cp
*/
@Getter
public enum ResponseEnum {
/* 错误信息 */
E_1000(1000, "返回值必须为PageResult"),
E_1001(1001, "必须传递分页参数"),
E_1002(1002, "参数值异常"),
E_1003(1003, "参数值转换异常"),
E_1004(1004, "参数值为空"),
/* 保存 更新 重置 删除 等 */
E_1005(1005,"更新失败"),
E_1006(1006,"无结果"),
E_1007(1007,"未登录"),
E_1008(1008,"请求超时"),
E_1009(1009,"请求下游服务异常"),
E_1010(1010,"数据保存失败"),
E_1011(1011,"数据重复"),
E_9999(9999,"系统异常"),
SUCCESS(200,"请求成功");
private int code;
private String msg;
ResponseEnum(int code, String msg) {
this.code = code;
this.msg = msg;
}
}
\ No newline at end of file
package com.hzjt.domain;
public class ResponseMessage {
private String id;
private String name;
private String content;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
public ResponseMessage(String id, String name, String content) {
super();
this.id = id;
this.name = name;
this.content = content;
}
}
package com.hzjt.domain;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
/**
* 自定义响应结构
* @author cp
*/
@Getter
@Setter
public class ResultObj {
// 定义jackson对象
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* 响应业务状态
* 200 成功
* 201 错误
* 400 参数错误
*/
private Integer status;
/**
* 响应消息
*/
private String msg;
/**
* 响应中的数据
*/
private Object data;
public static ResultObj error(Integer status, String msg, Object data) {
return new ResultObj(status, msg, data);
}
public static ResultObj ok(Object data) {
return new ResultObj(data);
}
public static ResultObj ok() {
return ok(null);
}
private ResultObj() {
}
public static ResultObj error(Integer status, String msg) {
return new ResultObj(status, msg, null);
}
private ResultObj(Integer status, String msg, Object data) {
this.status = status;
this.msg = msg;
this.data = data;
}
private ResultObj(Object data) {
this.status = 200;
this.msg = "OK";
this.data = data;
}
/**
* 将json结果集转化为SysResult对象
*
* @param jsonData json数据
* @param clazz SysResult中的object类型
* @return SysResult对象
*/
public static ResultObj formatToPojo(String jsonData, Class<?> clazz) {
try {
if (clazz == null) {
return MAPPER.readValue(jsonData, ResultObj.class);
}
JsonNode jsonNode = MAPPER.readTree(jsonData);
JsonNode data = jsonNode.get("data");
Object obj = null;
if (data.isObject()) {
obj = MAPPER.readValue(data.traverse(), clazz);
} else if (data.isTextual()) {
obj = MAPPER.readValue(data.asText(), clazz);
}
return error(jsonNode.get("status").intValue(), jsonNode.get("msg").asText(), obj);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 没有object对象的转化
*
* @param json 字符串
* @return SysResult对象
*/
public static ResultObj format(String json) {
try {
return MAPPER.readValue(json, ResultObj.class);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
/**
* Object是集合转化
*
* @param jsonData json数据
* @param clazz 集合中的类型
* @return SysResult对象
*/
public static ResultObj formatToList(String jsonData, Class<?> clazz) {
try {
JsonNode jsonNode = MAPPER.readTree(jsonData);
JsonNode data = jsonNode.get("data");
Object obj = null;
if (data.isArray() && data.size() > 0) {
obj = MAPPER.readValue(data.traverse(),
MAPPER.getTypeFactory().constructCollectionType(List.class, clazz));
}
return error(jsonNode.get("status").intValue(), jsonNode.get("msg").asText(), obj);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
@Override
public String toString() {
return "ResultObj{" +
"status=" + status +
", msg='" + msg + '\'' +
", data=" + data +
'}';
}
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import java.io.Serializable;
@Data
public class Rtsprtmp implements Serializable {
private String id;
private String name;
private String rtsp_address;
private String rtmp_address;
private String http_flv_address;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import java.io.Serializable;
@Data
public class Sbtdspsr implements Serializable {
private String xh;
private String sbbh;
private Long tdbh;
private Long tdlx;
private String wbbh;
private String tdmc;
private String tdmcpy;
private Long tdzt;
private String tdazwz;
private String jd;
private String wd;
private Long tpwzx;
private Long tpwzy;
private Long spbmgs;
private Long spxsgs;
private Long ssspsfzzf;
private Long spzt;
private Long lxzt;
private Long lxsfzzf;
private Long spzl;
private Long gjzjg;
private Long ml;
private Long spmlyxgs;
private Long sfzcptzkz;
private String zxcczbh;
private Long zxccxe;
private Long zxccyyrl;
private Long zxccsyrl;
private Long lxblts;
private Long sfqy;
private String jlbh;
private String fxms;
private String jbms;
private String xzbh;
private String ggbh;
private String znbh;
private Long lxfs;
private String jpkzdh;
private Long jdpx;
private Long sfznfxjd;
private String znfxlx;
private Long sqms;
private String squrllj;
private Long ssmlxz;
private Long lxmlxz;
private String zburllj;
private Long zbdk;
private Long dqx;
private Long xqx;
private Long kqx;
private Long pqx;
private Long lxhfqx;
private Long lxxzqx;
private Long lxdjqx;
private Long sfzlx;
private Long sfzx;
private String hb;
private String jmcj;
private String sj;
private Long lxlrcyfs;
private String khdosdxsnr;
private String khdosdxswz;
private String cjry;
private String cjrq;
private String xgry;
private String xgrq;
private String bz;
private Long kz1;
private Long kz2;
private String kz3;
private String kz4;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import java.io.Serializable;
@Data
public class SbtdspsrParam extends Sbtdspsr implements Serializable {
private String sbcsName;
private String deviceconfig;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
@Getter
@Setter
public class SbtdspsrParams extends Sbtdspsr {
/**
* 创建时间
*/
private Date createtime;
/**
* 推送状态(0成功1失败)
*/
private Long pushstatus;
/**
* 推送描述
*/
private String pushdesc;
/**
* 推送次数
*/
private Long pushcount;
/**
* 行政编号
*/
private String xzbh;
/**
* 备注
*/
private String remark;
/**
* 行政名称
*/
private String xztreename;
private String deviceconfig;
}
package com.hzjt.domain;
import lombok.Data;
import java.io.Serializable;
@Data
public class Sbxx implements Serializable {
private String sbbh;
private String wbbh;
private String sbmc;
private String sbmcpy;
private String sbip;
private Long sbdk;
private String sbdlzh;
private String sbdlmm;
private Long sbzl;
private String sbbbh;
private String sbxh;
private String cjbh;
private String cjmc;
private Long sbzt;
private String xhsj;
private String rjbbh;
private String khbh;
private String sbazwz;
private String jd;
private String wd;
private Long tpwzx;
private Long tpwzy;
private Long spsrtds;
private Long spsrdhmtds;
private Long spsctds;
private Long ypsrtds;
private Long ypsctds;
private Long bjsrtds;
private Long bjsctds;
private Long tmtds;
private Long mlczxy;
private Long zdscdk;
private String qybh;
private Long zdzls;
private String jlbh;
private Long jrms;
private String ms;
private String dlfwzbh;
private String qymc;
private String qydz;
private String xzbh;
private String jrfsbh;
private String jrfsmc;
private String lxr;
private String lxdh;
private String vpnwgbh;
private String cjry;
private String cjrq;
private String xgry;
private String xgrq;
private String bz;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.Id;
import java.io.Serializable;
import java.util.Date;
@Getter
@Setter
public class Serverstatreport implements Serializable {
@Id
private String serverid;
private String servername;
private String serverip;
private Integer serverport;
private String serverdesc;
private String network;
private String realip;
private Integer realport;
private Date firstreporttime;
private Date lastlogintime;
private Date lastreporttime;
private Integer timeoutsecond;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import javax.persistence.Id;
import java.io.Serializable;
import java.util.Date;
@Data
public class Storageserver implements Serializable {
private static final long serialVersionUID = 1L;
@Id
private Integer serverid;
private String servername;
private String servergroup;
private String servertype ;
private Integer serverstatus;
private String serveurl ;
private String serveip;
private String serverport;
private String serveruser;
private String serverpassword;
private String creator;
private Date createtime;
private String remark;
}
package com.hzjt.domain;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author chenl
* @version 1.0
* @date 2020/4/9 10:00
*/
public class TraffAlarmRecordDelay implements Delayed {
/* 触发时间*/
private long time;
Traffalarmrecord traffalarmrecord;
public TraffAlarmRecordDelay(Traffalarmrecord traffalarmrecord, long time, TimeUnit unit) {
this.traffalarmrecord = traffalarmrecord;
this.time = System.currentTimeMillis() + (time > 0? unit.toMillis(time): 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
TraffAlarmRecordDelay item = (TraffAlarmRecordDelay) o;
long diff = this.time - item.time;
if (diff <= 0) {
return -1;
}else {
return 1;
}
}
public Traffalarmrecord getTraffalarmrecord(){
return this.traffalarmrecord;
}
@Override
public String toString() {
return "TraffAlarmRecordDelay{" +
"time=" + time +
", traffalarmrecord='" + traffalarmrecord.toString() + '\'' +
'}';
}
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import javax.persistence.Id;
import java.util.Date;
@Data
public class Traffalarmrecord implements java.io.Serializable {
private static final long serialVersionUID = 1L;
@Id
private Long recordid ;// 记录编号 主键序列
private Integer algotype ;//--算法类型 默认是 0:公司 1:第三方公司
private Long areaid ;//--辖区编号
private String fdid ;//设备编号
private Integer channelid ;//--通道编号
private Integer channeletype;//通道类型 可忽略
private String channelname ;//通道名称
private Date recordtime ;//事件记录时间
private String recordtype ;//事件记录类型
private String location ;//目标定位信息
private String img1urlfrom ;//图片源地址
private String img1path ;//图片存储地址
private String img2urlfrom ;//图片源地址
private String img2path ;//图片存储地址
private String img3urlfrom ;//图片源地址
private String img3path ;//图片存储地址
private String img4urlfrom ;//图片源地址
private String img4path ;//图片存储地址
private String img5urlfrom;//图片源地址
private String img5path ;//图片存储地址
private String videourlfrom ;//视频源地址
private String videopath ;//视频存储地址
private Date retrytime ;//video重试的最近一次的读取;时间
private Integer retrycount ;//ideo重试读取总次数
private Integer recordlevel;//事件级别 --默认为0 其他类型暂时未知
private Integer checkstatus ;//0:待审核 1:审核通过 2:审核不通过 9:免审
private String creator;//创建人
private Date createtime ;//创建时间
private String updator ;//更新人
private Date updatetime ;//更新时间
private String remark ;
private Integer pushstatus;//推送状态
private String pushdesc;//推送描述
private String pushcount;//推送次数
private Date pushdate;//推送时间
private String processstatus ;//处理状态;
private String objlable;
@Override
public String toString() {
return "Traffalarmrecord{" +
"recordid=" + recordid +
", algotype=" + algotype +
", areaid=" + areaid +
", fdid='" + fdid + '\'' +
", channelid=" + channelid +
", channeletype=" + channeletype +
", channelname='" + channelname + '\'' +
", recordtime=" + recordtime +
", recordtype='" + recordtype + '\'' +
", location='" + location + '\'' +
", img1urlfrom='" + img1urlfrom + '\'' +
", img1path='" + img1path + '\'' +
", img2urlfrom='" + img2urlfrom + '\'' +
", img2path='" + img2path + '\'' +
", img3urlfrom='" + img3urlfrom + '\'' +
", img3path='" + img3path + '\'' +
", img4urlfrom='" + img4urlfrom + '\'' +
", img4path='" + img4path + '\'' +
", img5urlfrom='" + img5urlfrom + '\'' +
", img5path='" + img5path + '\'' +
", videourlfrom='" + videourlfrom + '\'' +
", videopath='" + videopath + '\'' +
", retrytime=" + retrytime +
", retrycount=" + retrycount +
", recordlevel=" + recordlevel +
", checkstatus=" + checkstatus +
", creator='" + creator + '\'' +
", createtime=" + createtime +
", updator='" + updator + '\'' +
", updatetime=" + updatetime +
", remark='" + remark + '\'' +
", pushstatus=" + pushstatus +
", pushdesc='" + pushdesc + '\'' +
", pushcount='" + pushcount + '\'' +
", pushdate=" + pushdate +
'}';
}
}
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TraffalarmrecordParams extends Traffalarmrecord {
private String starttime;
private String endtime;
private String fdids;
private String channelids;
private Integer page;
private Integer rows;
}
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TraffalarmrecordResult extends Traffalarmrecord {
private String xzmc;
private String tdmc;
private String fileagent;
private String recordname;
}
package com.hzjt.domain;
import lombok.Data;
import javax.persistence.Id;
import java.io.Serializable;
@Data
public class Traffalarmrecordstat implements Serializable {
@Id
private Long countdate;
@Id
private String areaid;
@Id
private String eventtype;
@Id
private String fdid;
@Id
private Integer channelid;
private Integer counthour;
private Short channeltype;
private String channelname;
private String areaname;
private Long totalcount;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.Id;
import java.io.Serializable;
import java.util.Date;
@Getter
@Setter
public class Traffdeviceconfig implements Serializable {
@Id
private String fdid;
@Id
private Short channelid;
private Short devicelocationtype;
private Short devicechecktype;
private Short stakekilometer;
private Short stakemeter;
private Short locationdirection;
private String creator;
private Date createtime;
private Date updatetime;
private String remark1;
private Integer remark2;
private String remark3;
private Integer remark4;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
import javax.persistence.Id;
import java.io.Serializable;
import java.util.Date;
@Getter
@Setter
public class Traffdevicewriteresult implements Serializable {
/**
* 设备编号
*/
@Id
private String sbbh;
/**
* 通道编号
*/
@Id
private Long tdbh;
/**
* 创建时间
*/
private Date createtime;
/**
* 推送状态(0成功1失败)
*/
private Long pushstatus;
/**
* 推送描述
*/
private String pushdesc;
/**
* 推送次数
*/
private Long pushcount;
/**
* 行政编号
*/
private String xzbh;
/**
* 备注
*/
private String remark;
private static final long serialVersionUID = 1L;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import javax.persistence.Id;
@Data
public class Traffuserdevice implements java.io.Serializable {
private static final long serialVersionUID = 1L;
@Id
private String userid;
@Id
private String fdid;
@Id
private Integer channelid;
private Integer channeletype;
private String channelname;
private String creator;
private String createtime;
private String updator;
private String updatetime;
private String remark;
}
package com.hzjt.domain;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@NoArgsConstructor
@AllArgsConstructor
@Data
public class TransferResult {
Long recordid;
String pathvalue;
String urlfrom;
String imgpath;
Boolean result;
}
\ No newline at end of file
package com.hzjt.domain;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@Data
public class TrffClientMessage {
private String type;
private String id;
private String video_id;
private String ts;
private String incident_type;
private List<String> img_urls;
private String video_record_url;
private Map<String, BigDecimal> obj_location;
private List<String> img_base64;
private String dept;
}
package com.hzjt.domain;
import lombok.Getter;
/**
* 用户通用常量类, 单个业务的常量请单开一个类, 方便常量的分类管理
* @author cp
*/
@Getter
public enum UserConstantsEnum {
/**
* cookie中存放用户信息的key值
*/
USER_TOKEN("FACECLOUD_USER"),
FACE_USER_TOKEN("token");
private String value;
UserConstantsEnum(String value){
this.value =value;
}
}
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
import java.util.List;
@Getter
@Setter
public class Vehicles {
private String id;
private String type;
private String video_id;
private String ts;
List<Vehiclesdetail> objs;
}
package com.hzjt.domain;
import lombok.Data;
@Data
public class Vehiclesdetail {
private String obj_id;
private String vehiclesid;
private String type;
private Long direction;
private Long classification_confidence;
private String ruleTag;
}
package com.hzjt.domain;
import lombok.Data;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
@Data
public class Vertexes {
List<Map<String, BigDecimal>> vertexes;
}
package com.hzjt.domain;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class WriteResultObj {
private String data;
private String message;
private Integer status;
public WriteResultObj(){}
public WriteResultObj(Integer status, String message) {
this.message = message;
this.status = status;
}
@Override
public String toString() {
return "WriteResultObj{" +
"data=" + data +
", message='" + message + '\'' +
", status=" + status +
'}';
}
}
package com.hzjt.handler;
import lombok.Getter;
/**
* 定义线程池bean中的key名称 即ThreadPoolManager的threadPoolMap中的key
* @author cp
*/
@Getter
public enum ConsumerNameEnum {
/* ThreadPoolManager key消费者名称 */
GS_TRAFF_EVENT("gs-traff-event"),
GS_TRAFF_PUSH_QZPT_EVENT("gs_traff_push_qzpt_event");
private String name;
ConsumerNameEnum(String name) {
this.name = name;
}
}
package com.hzjt.handler;
import com.hzjt.domain.TraffAlarmRecordDelay;
import org.springframework.stereotype.Component;
import java.util.concurrent.DelayQueue;
/**
* @author chenl
* @version 1.0
* @date 2020/4/9 10:31
*/
@Component
public class DelayQueueManager {
private static final DelayQueue<TraffAlarmRecordDelay> delayQueue = new DelayQueue<>();
public boolean pushData(TraffAlarmRecordDelay traffAlarmRecordDelay){
return delayQueue.offer(traffAlarmRecordDelay);
}
public DelayQueue<TraffAlarmRecordDelay> getQueue(){
return delayQueue;
}
}
package com.hzjt.handler;
import com.hzjt.domain.Ftp;
import com.hzjt.domain.Traffalarmrecord;
import com.hzjt.domain.TransferResult;
import com.hzjt.util.DateUtils;
import com.hzjt.util.FTPUtil;
import com.hzjt.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@Slf4j
public class FileTransferManager {
private static CompletionService<TransferResult> completionService = new ExecutorCompletionService<TransferResult>(ThreadPoolUtil.getPool());
/**
* @param transferRecord
* @param ftp
* @param basePath
* @return
* @throws Exception
*/
public static List<TransferResult> transferFile(final Map<String,Object> transferRecord , final Ftp ftp, final String basePath) throws Exception {
Integer ret = 0 ;
final Long recordid = Long.parseLong(transferRecord.get("recordid").toString());
int count = 0 ;
for(String key : transferRecord.keySet()){
if(key.equals("img1path") || key.equals("img2path")
|| key.equals("img3path") || key.equals("img4path")
|| key.equals("img5path")){
final String url = transferRecord.get(key).toString() ;
count++;
completionService.submit(()->{
TransferResult result = new TransferResult(recordid,key,url,"",false);
try {
String filesuff = "videopath".equals(key) ? ".mp4": ".jpg";
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
//延迟连接
connection.setReadTimeout(2000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
/// System.out.println("connection.getResponseCode:" + connection.getResponseCode() );
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
InputStream inputStream = connection.getInputStream();
log.info("url:",url , " --- "+"key:",key);
String pname = DateUtils.formatCurrDayNoSign() + "_"+recordid+"_0000_"+key.replace("path","");
String fileName = pname +filesuff;
String ftputl = FTPUtil.getFtpUrl(ftp) + basePath + "/" + fileName;
boolean r = FTPUtil.uploadFile(ftp,basePath, fileName,inputStream);
result.setResult(r);
if(r){
result.setImgpath(ftputl);
}
log.info("callable thread:" +result.toString());
}else{
log.error("connection code: "+connection.getResponseCode() +" ," +result.toString());
}
} catch (IOException e) {
System.out.println(e.toString());
log.error(e.toString());
}
return result;
});
}
}
List<TransferResult> listret = new ArrayList<>();
for (int i = 0; i < count ; i++) {
listret.add(completionService.poll(10, TimeUnit.SECONDS).get());
}
return listret;
}
public static void fetchUrlsFromRecord(Traffalarmrecord record , Map<String,Object> transferRecord){
if(record ==null || record.getRecordid()==null){
return;
}
transferRecord.put("recordid" , record.getRecordid());
if(record.getImg1path() == null && record.getImg1urlfrom() != null){
transferRecord.put("img1path" , record.getImg1urlfrom());
}
if(record.getImg2path() == null && record.getImg2urlfrom() != null){
transferRecord.put("img2path" , record.getImg2urlfrom());
}
if(record.getImg3path() == null && record.getImg3urlfrom() != null){
transferRecord.put("img3path" , record.getImg3urlfrom());
}
if(record.getImg4path() == null && record.getImg4urlfrom() != null){
transferRecord.put("img4path" , record.getImg4urlfrom());
}
if(record.getImg5path() == null && record.getImg5urlfrom() != null){
transferRecord.put("img5path" , record.getImg5urlfrom());
}
if(record.getVideopath() ==null && record.getVideourlfrom() != null){
transferRecord.put("videopath" , record.getVideourlfrom());
}
}
public static Traffalarmrecord traffAlarmRecordUrlUpdate(List<TransferResult> results) throws Exception{
Traffalarmrecord record = new Traffalarmrecord();
for (TransferResult result : results){
record.setRecordid(result.getRecordid());
if(result.getResult()){
switch (result.getPathvalue()){
case "img1path" :
record.setImg1path(result.getImgpath());
break;
case "img2path" :
record.setImg2path(result.getImgpath());
break;
case "img3path" :
record.setImg3path(result.getImgpath());
break;
case "img4path" :
record.setImg4path(result.getImgpath());
break;
case "img5path" :
record.setImg5path(result.getImgpath());
break;
case "videopath" :
record.setVideopath(result.getImgpath());
break;
default :
log.info("unknow pathvalue"+ result.getPathvalue());
break;
}
}
}
return record;
}
// public static void main(String[] args) {
// urls.add("http://img63.ddimg.cn/2019/12/18/2019121819114913026.jpg");
// urls.add("http://img62.ddimg.cn/2019/12/18/201912181655214974.jpg");
// urls.add("http://img61.ddimg.cn/2019/12/18/2019121816294684833.jpg");
// urls.add("http://img63.ddimg.cn/2019/12/18/2019121819121717247.jpg");
// urls.add("http://img62.ddimg.cn/2019/12/18/201912181655214974.jpg");
// }
}
package com.hzjt.handler;
import cn.hutool.json.JSONUtil;
import com.hzjt.service.TraffFlowService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ServerEndpoint("/websocket/{name}")
public class WebSocket {
public static TraffFlowService traffFlowService;
/**
* 与某个客户端的连接对话,需要通过它来给客户端发送消息
*/
private Session session;
/**
* 标识当前连接客户端的用户名
*/
private String name;
// private static ApplicationContext applicationContext;
//
// public static void setApplicationContext(ApplicationContext context){
// applicationContext=context;
// }
/**
* 用于存所有的连接服务的客户端,这个对象存储是安全的
*/
private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
@OnOpen
public void OnOpen(Session session, @PathParam(value = "name") String name) {
this.session = session;
this.name = name;
// name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过name来区分
webSocketSet.put(name, this);
log.info("[WebSocket] 连接成功,当前连接人数为:={}", webSocketSet.size());
}
@OnClose
public void OnClose() {
webSocketSet.remove(this.name);
log.info("[WebSocket] 退出成功,当前连接人数为:={}", webSocketSet.size());
}
@OnError
public void OnError(@PathParam("name") String name, Throwable throwable, Session session) {
webSocketSet.remove(name);
log.info("[WebSocket] 退出成功,当前连接人数为:={}", webSocketSet.size());
}
@OnMessage
public void OnMessage(String message) {
log.info("[WebSocket] 收到消息:{}", message);
//判断是否需要指定发送,具体规则自定义
if (message.indexOf("videoid") >= 0) {
//连接查询近五分钟并开始每分钟推送数据
List<Map> map = new ArrayList<>();
List<Map> dDayFlowmap = traffFlowService.selectFiveAndDayFlow(name);
if (null != dDayFlowmap)
map.addAll(dDayFlowmap);
List<Map> TypeDayFlow = traffFlowService.selectFiveAndTypeDayFlow(name);
if (null != TypeDayFlow)
map.addAll(TypeDayFlow);
if (!map.isEmpty()) {
AppointSending(name, JSONUtil.toJsonStr(map));
}
}
}
/**
* 群发
*
* @param message
*/
public static void GroupSending(String message) {
for (String name : webSocketSet.keySet()) {
try {
if (null != webSocketSet.get(name) && null != webSocketSet.get(name).session && null != webSocketSet.get(name).session.getBasicRemote())
webSocketSet.get(name).session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 根据videoid 群发
*/
public void GroupSendingByVideoid(String name) {
if (null == traffFlowService) return;
try {
List<Map> map = new ArrayList<>();
if (null != webSocketSet.get(name) && null != webSocketSet.get(name).session && null != webSocketSet.get(name).session.getBasicRemote()) {
log.info("name" + name);
List<Map> dDayFlowmap = traffFlowService.selectFiveAndDayFlow(name);
if (null != dDayFlowmap)
map.addAll(dDayFlowmap);
List<Map> TypeDayFlow = traffFlowService.selectFiveAndTypeDayFlow(name);
if (null != dDayFlowmap)
map.addAll(TypeDayFlow);
if (!map.isEmpty()) {
synchronized (webSocketSet.get(name).session) {
webSocketSet.get(name).session.getBasicRemote().sendText(JSONUtil.toJsonStr(map));
}
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
/**
* 指定发送
*
* @param name
* @param message
*/
public void AppointSending(String name, String message) {
if (null != webSocketSet.get(name) && null != webSocketSet.get(name).session && null != webSocketSet.get(name).session.getBasicRemote()) {
synchronized (webSocketSet.get(name).session) {
try {
webSocketSet.get(name).session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
package com.hzjt.listener;
import com.hzjt.service.CacheLoadService;
import com.hzjt.service.FtpService;
import com.hzjt.service.QingZhiLoginCacheService;
import com.hzjt.util.ThreadPoolUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.boot.context.event.SpringApplicationEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class MyApplicationStartingEventListener implements ApplicationListener<SpringApplicationEvent>, ApplicationContextAware {
private ApplicationContext applicationContext;
private boolean flag = false;
private FtpService ftpService;
@Autowired
public MyApplicationStartingEventListener() {
}
@Override
public void onApplicationEvent(SpringApplicationEvent event) {
if (event instanceof ApplicationReadyEvent) {
try {
if (!flag) {
ThreadPoolUtil.getSchedulePool().scheduleWithFixedDelay(() -> {
log.info("loadFtpCache schedule----------");
CacheLoadService cacheLoadService = applicationContext.getBean(CacheLoadService.class);
cacheLoadService.loadFtpCache();
}, 200, 60000, TimeUnit.MILLISECONDS);
//判断第三方登录是否有效
ThreadPoolUtil.getSchedulePool().scheduleWithFixedDelay(() -> {
log.info("QingZhiLoginCacheService schedule----------");
QingZhiLoginCacheService qingZhiLoginCacheService = applicationContext.getBean(QingZhiLoginCacheService.class);
qingZhiLoginCacheService.keepAlive();
}, 3, 60, TimeUnit.SECONDS);
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.toString());
System.exit(0);
} finally {
flag = true;
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
\ No newline at end of file
package com.hzjt.mapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
@Mapper
public interface CommonMapper {
long getSeq(@Param("seqname") String seqname);
}
package com.hzjt.mapper;
import com.hzjt.domain.Sbtdspsr;
import com.hzjt.domain.SbtdspsrParam;
import com.hzjt.domain.Traffdeviceconfig;
import org.apache.ibatis.annotations.Param;
import tk.mybatis.mapper.additional.dialect.oracle.InsertListMapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
import java.util.List;
public interface SbtdspsrMapper extends BaseMapper<Sbtdspsr>, ConditionMapper<Sbtdspsr>, InsertListMapper<Sbtdspsr> {
String getMinUpdateTime();
String getMaxUpdateTime();
/**
* ��ȡ����ʱ���ڵ��豸��Ϣ
* @param startTime ��ʼʱ��
* @param endTime ����ʱ��
* @return �豸��Ϣlist
*/
List<Sbtdspsr> getSbtdspsrList(@Param("startTime") String startTime, @Param("endTime") String endTime);
List<Sbtdspsr> selectAllSbbhAndTdbh(@Param("xzbh") String xzbh);
List<Sbtdspsr> selectBySbbh(@Param("sbbh") String sbbh,@Param("tdbh") Integer tdbh);
List<SbtdspsrParam> selectDeviceWrite();
}
\ No newline at end of file
package com.hzjt.mapper;
import com.hzjt.domain.Sbxx;
import org.apache.ibatis.annotations.Param;
import tk.mybatis.mapper.additional.dialect.oracle.InsertListMapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
import java.util.List;
public interface SbxxMapper extends BaseMapper<Sbxx>, ConditionMapper<Sbxx>, InsertListMapper<Sbxx> {
List<String> selectAllSbbh(@Param("ms") String ms);
}
\ No newline at end of file
package com.hzjt.mapper;
import com.hzjt.domain.Storageserver;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface StorageServerMapper extends tk.mybatis.mapper.common.BaseMapper<Storageserver>, tk.mybatis.mapper.common.ConditionMapper<Storageserver>, tk.mybatis.mapper.common.special.InsertListMapper<Storageserver> {
List<Storageserver> queryStorageServerAll(Storageserver storageServer);
}
\ No newline at end of file
package com.hzjt.mapper;
import com.hzjt.domain.DeviceChannelid;
import com.hzjt.domain.Traffalarmrecord;
import com.hzjt.domain.TraffalarmrecordResult;
import org.apache.ibatis.annotations.Param;
import tk.mybatis.mapper.additional.dialect.oracle.InsertListMapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
import java.util.List;
public interface TraffAlarmRecordMapper extends BaseMapper<Traffalarmrecord>, ConditionMapper<Traffalarmrecord>, InsertListMapper<Traffalarmrecord> {
int updateTraffAlarmRecordUrl(Traffalarmrecord record);
int updatePushEvent(Traffalarmrecord traffalarmrecord);
int inserTraffAlarmRecord(Traffalarmrecord traffalarmrecord);
int selectmax();
List<TraffalarmrecordResult> queryTraffalarmrecordByPage(@Param("recordtype")String recordtype, @Param("areaid")Long areaid, @Param("checkstatus")Integer checkstatus,
@Param("starttime")String starttime, @Param("endtime")String endtime,
@Param("deviceChannelids")List<DeviceChannelid> deviceChannelid,
@Param("userAccount") String userAccount, @Param("pushstatus")Integer pushstatus,
@Param("page")Integer page, @Param("rows")Integer rows);
int countQueryTraffalarmrecordByPage(@Param("recordtype")String recordtype,@Param("areaid")Long areaid, @Param("checkstatus")Integer checkstatus,
@Param("starttime")String starttime, @Param("endtime")String endtime,
@Param("deviceChannelids")List<DeviceChannelid> deviceChannelid,
@Param("userAccount") String userAccount,@Param("pushstatus")Integer pushstatus,
@Param("page")Integer page,@Param("rows")Integer rows);
List<TraffalarmrecordResult> queryTraffalarmrecordAllByPage(@Param("recordtype")String recordtype,
@Param("areaid")Long areaid,
@Param("checkstatus")Integer checkstatus);
}
\ No newline at end of file
package com.hzjt.mapper;
import com.hzjt.domain.Vehicles;
import com.hzjt.domain.Vehiclesdetail;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
import java.util.Map;
@Mapper
public interface TraffFlowMapper {
int add(Vehicles excelModel);
int insertlist(Vehiclesdetail vehiclesdetail);
List<Map> selectFiveAndDayFlow(String videoid);
List<Map> selectFiveAndTypeDayFlow(String videoid);
}
package com.hzjt.mapper;
import com.hzjt.domain.Traffalarmrecordstat;
import tk.mybatis.mapper.additional.dialect.oracle.InsertListMapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
public interface TraffalarmrecordstatMapper extends BaseMapper<Traffalarmrecordstat>, ConditionMapper<Traffalarmrecordstat>, InsertListMapper<Traffalarmrecordstat>
{
}
\ No newline at end of file
package com.hzjt.mapper;
import com.hzjt.domain.Traffdeviceconfig;
import com.hzjt.domain.Traffdevicewriteresult;
import tk.mybatis.mapper.additional.dialect.oracle.InsertListMapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
public interface TraffdeviceconfigMapper extends BaseMapper<Traffdeviceconfig>, ConditionMapper<Traffdeviceconfig>, InsertListMapper<Traffdeviceconfig> {
}
\ No newline at end of file
package com.hzjt.mapper;
import com.hzjt.domain.SbtdspsrParams;
import com.hzjt.domain.Traffdevicewriteresult;
import org.apache.ibatis.annotations.Param;
import tk.mybatis.mapper.additional.dialect.oracle.InsertListMapper;
import tk.mybatis.mapper.common.BaseMapper;
import tk.mybatis.mapper.common.ConditionMapper;
import java.util.List;
public interface TraffdevicewriteresultMapper extends BaseMapper<Traffdevicewriteresult>, ConditionMapper<Traffdevicewriteresult>, InsertListMapper<Traffdevicewriteresult>{
List<SbtdspsrParams> querySbtdspsrByXh(@Param("xhs")List<String> xhs);
}
\ No newline at end of file
package com.hzjt.redis;
import com.hzjt.domain.*;
import com.hzjt.handler.FileTransferManager;
import com.hzjt.mapper.TraffAlarmRecordMapper;
import com.hzjt.service.EventWriteService;
import com.hzjt.service.FtpService;
import com.hzjt.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@Service
public class Receiver {
@Autowired
RedisDao redisDao;
@Autowired
private TraffAlarmRecordMapper traffAlarmRecordMapper;
@Autowired
FtpService ftpService;
@Autowired
EventWriteService eventWriteService;
ScheduledExecutorService schedulepool = Executors.newScheduledThreadPool(5); //创
@Value("ftpIp")
String ftpIp;
@Value("ftpPort")
private String ftpPort;
@Value("ftpUsername")
private String ftpUsername;
@Value("ftpPassword")
private String ftpPassword;
@Value("${alarmrecord.check.enable}")
private String checkEnable;
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
/**
* @Description: 用于接收单个对象,将对象同步至数据库,如果同步失败,则存入redis中
* @Param: [message] “fastjson”转换后的json字符串
* @Retrun: void
*/
public void receiveSingle(String Message) throws InterruptedException {
// 将json字符串转换成实体对
Alarm trffClientMessage= JsonUtil.stringToBean(Message,Alarm.class);
List<String> imgBase64List = trffClientMessage.getImg_base64();
Traffalarmrecord traffAlarmRecord = new Traffalarmrecord();
String imgEnumHead = "IMG";
Ftp ftp = ftpService.reloadFtp();
for (int i = 0; i < imgBase64List.size(); i++) {
/* 图片上传 */
String path = trffClientMessage.getVideo_id() + "/" + DateUtils.formatCurrDayNoSign();
String fileName = UUIDUtils.createuuid() + ".jpg";
if (FTPUtil.uploadFile(ftp, path, fileName, imgBase64List.get(i))) {
TraffAlarmRecordImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, FTPUtil.getFtpUrl(ftp) + path + "/" + fileName);
}
}
List<String> imgUrls = trffClientMessage.getImg_urls();
for (int i = 0; i < imgUrls.size(); i++) {
TraffAlarmRecordFromImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, imgUrls.get(i));
}
String[] videoIdArr = trffClientMessage.getVideo_id().split("_");
String fdid = videoIdArr[0];
traffAlarmRecord.setFdid(fdid);
Integer channelid = Integer.valueOf(videoIdArr[1]) + 1;
traffAlarmRecord.setChannelid(channelid);
if (StringEnum.ONE.getValue().equals(checkEnable)) {
/* 9:免审 */
traffAlarmRecord.setCheckstatus(9);
}
int recordid = traffAlarmRecordMapper.selectmax();
traffAlarmRecord.setRecordid((long)(recordid+1));
traffAlarmRecord.setProcessstatus("0");
traffAlarmRecord.setAreaid(Long.valueOf(trffClientMessage.getDept()));
traffAlarmRecord.setRecordtype(trffClientMessage.getIncident_type().toLowerCase());
traffAlarmRecord.setVideourlfrom(trffClientMessage.getVideo_record_url());
traffAlarmRecord.setRecordtime(new Date(Long.valueOf(trffClientMessage.getTs())));
traffAlarmRecord.setCreatetime(new Date());
traffAlarmRecord.setObjlable(trffClientMessage.getObjLabel());
traffAlarmRecord.setPushstatus(9);//设置为未推送
traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord);
try {
String basepath = "gstraff/" + traffAlarmRecord.getFdid() + (traffAlarmRecord.getChannelid() < 10 ? "0" + traffAlarmRecord.getChannelid() : traffAlarmRecord.getChannelid()) + "/" + DateUtils.formatCurrDayNoSign();
Map<String, Object> transferRecordMap = new HashMap<>();
FileTransferManager.fetchUrlsFromRecord(traffAlarmRecord, transferRecordMap);
List<TransferResult> results = FileTransferManager.transferFile(transferRecordMap, ftp, basepath);
log.info("缓存数据上传结果:" + results.toString());
Traffalarmrecord recordBak = FileTransferManager.traffAlarmRecordUrlUpdate(results);
traffAlarmRecord.setImg2path(recordBak.getImg2path());
traffAlarmRecord.setImg3path(recordBak.getImg3path());
traffAlarmRecord.setImg4path(recordBak.getImg4path());
traffAlarmRecord.setImg5path(recordBak.getImg5path());
traffAlarmRecord.setVideopath(recordBak.getVideopath());
if (traffAlarmRecord != null && traffAlarmRecord.getRecordid() != null) {
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
}
//如果监控视频为空,延时加载重新获取
if(null==recordBak.getVideopath() || "".equalsIgnoreCase(recordBak.getVideopath())) {
schedulepool.schedule(()->{
for (String key : transferRecordMap.keySet()) {
if (key.equals("videopath")) {
final String url = transferRecordMap.get(key).toString();
try {
HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
//延迟连接
connection.setReadTimeout(2000);
connection.setConnectTimeout(3000);
connection.setRequestMethod("GET");
String pname = DateUtils.formatCurrDayNoSign() + "_" + recordid + "_0000_" + key.replace("path", "");
String fileName = pname + ".mp4";
String ftputl = FTPUtil.getFtpUrl(ftp) + basepath + "/" + fileName;
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
InputStream inputStream = connection.getInputStream();
log.info("url:", url, " --- " + "key:", key);
boolean r = FTPUtil.uploadFile(ftp, basepath, fileName, inputStream);
if (r) traffAlarmRecord.setVideopath(ftputl);
//只更新监控
if (traffAlarmRecord != null && traffAlarmRecord.getRecordid() != null) {
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
//推送给第三方
ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
log.info("调用接口推送给广达返回信息:" + obj.toString());
}
}
} catch (IOException e) {
System.out.println(e.toString());
log.error(e.toString());
return 0;
}
}
}
return 1;
},30, TimeUnit.SECONDS); //等待30秒钟执行
}else {
//推送给第三方
ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
log.info("调用接口推送给广达返回信息:" + obj.toString());
}
} catch (Exception e) {
log.error("解析事件告警数据异常 :" + e.getMessage());
}
//
redisDao.incrOrDecr(Constant.succSizeTempKey, -1);
}
/**
* @Description: 用于接收对象集合,将集合遍历拆分成单个对象并进行发布
* @Param: [message] “fastjson”转换后的json字符串
* @Retrun: void
*/
public void receiveList(String message) throws InterruptedException {
}
}
package com.hzjt.redis;
import com.hzjt.domain.Alarm;
import com.hzjt.domain.Ftp;
import com.hzjt.domain.Traffalarmrecord;
import com.hzjt.handler.DelayQueueManager;
import com.hzjt.handler.FileTransferManager;
import com.hzjt.mapper.TraffAlarmRecordMapper;
import com.hzjt.service.AlarmRecordStatisticsService;
import com.hzjt.service.FtpService;
import com.hzjt.service.ImportService;
import com.hzjt.service.SeqService;
import com.hzjt.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @Description: 消息接收者,将其在ExcelToDbApplication.java中注入消息监听容器(MessageListenerAdapter)中
* @Author: 杨东川【http://blog.csdn.net/yangdongchuan1995】
* @Date: Created in 2018-2-6
*/
@Service
public class Receiverbak {
@Autowired
ImportService importService;
@Autowired
RedisDao redisDao;
@Autowired
private TraffAlarmRecordMapper traffAlarmRecordMapper;
@Autowired
FtpService ftpService;
@Autowired
AlarmRecordStatisticsService alarmRecordStatisticsService;
@Value("ftpIp")
String ftpIp;
@Value("ftpPort")
private String ftpPort;
@Value("ftpUsername")
private String ftpUsername;
@Value("ftpPassword")
private String ftpPassword;
@Value("${alarmrecord.check.enable}")
private String checkEnable;
@Autowired
private SeqService seqService;
private Long delayTime = 25L;
// @Autowired
// FtpUtil ftpUtil;
@Autowired
private DelayQueueManager delayQueueManager;
private static final Logger log = LoggerFactory.getLogger(Receiverbak.class);
/**
* @Description: 用于接收单个对象,将对象同步至数据库,如果同步失败,则存入redis中
* @Param: [message] “fastjson”转换后的json字符串
* @Retrun: void
*/
public void receiveSinglewww(String Message) throws InterruptedException {
// 将json字符串转换成实体对
Alarm trffClientMessage= JsonUtil.stringToBean(Message,Alarm.class);
List<String> imgBase64List = trffClientMessage.getImg_base64();
Traffalarmrecord traffAlarmRecord = new Traffalarmrecord();
String imgEnumHead = "IMG";
//Ftp ftp = new Ftp(ftpIp,Integer.parseInt(ftpPort),ftpUsername,ftpPassword);
Ftp ftp = ftpService.reloadFtp();
for (int i = 0; i < imgBase64List.size(); i++) {
/* 图片上传 */
String path = trffClientMessage.getVideo_id() + "/" + DateUtils.formatCurrDayNoSign();
String fileName = UUIDUtils.createuuid() + ".jpg";
if (FTPUtil.uploadFile(ftp, path, fileName, imgBase64List.get(i))) {
TraffAlarmRecordImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, FTPUtil.getFtpUrl(ftp) + path + "/" + fileName);
}
}
List<String> imgUrls = trffClientMessage.getImg_urls();
for (int i = 0; i < imgUrls.size(); i++) {
TraffAlarmRecordFromImgEnum.valueOf(imgEnumHead + i).setImg(traffAlarmRecord, imgUrls.get(i));
}
String[] videoIdArr = trffClientMessage.getVideo_id().split("_");
String fdid = videoIdArr[0];
traffAlarmRecord.setFdid(fdid);
Integer channelid = Integer.valueOf(videoIdArr[1]) + 1;
traffAlarmRecord.setChannelid(channelid);
if (StringEnum.ONE.getValue().equals(checkEnable)) {
/* 9:免审 */
traffAlarmRecord.setCheckstatus(9);
}
int recordid = traffAlarmRecordMapper.selectmax();
traffAlarmRecord.setRecordid((long)recordid);
String dept = trffClientMessage.getDept();
traffAlarmRecord.setAreaid(Long.valueOf(dept));
String recordtype = trffClientMessage.getIncident_type().toLowerCase();
traffAlarmRecord.setRecordtype(recordtype);
traffAlarmRecord.setVideourlfrom(trffClientMessage.getVideo_record_url());
traffAlarmRecord.setRecordtime(new Date(Long.valueOf(trffClientMessage.getTs())));
traffAlarmRecord.setCreatetime(new Date());
traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord);
try {
System.out.println("fetch ftp:" + ftp.toString()+"recordId:"+traffAlarmRecord.getRecordid());
String basepath = "gstraff/" + traffAlarmRecord.getFdid() + (traffAlarmRecord.getChannelid() < 10 ? "0" + traffAlarmRecord.getChannelid() : traffAlarmRecord.getChannelid()) + "/" + DateUtils.formatCurrDayNoSign();
Map<String, Object> transferRecordMap = new HashMap<>();
FileTransferManager.fetchUrlsFromRecord(traffAlarmRecord, transferRecordMap);
FileTransferManager.transferFile(transferRecordMap, ftp, basepath);
// Traffalarmrecord recordBak = FileTransferManager.traffAlarmRecordUrlUpdate(results);
// traffAlarmRecord.setImg1path(recordBak.getImg1path());
// traffAlarmRecord.setImg2path(recordBak.getImg2path());
// traffAlarmRecord.setImg3path(recordBak.getImg3path());
// traffAlarmRecord.setImg4path(recordBak.getImg4path());
// traffAlarmRecord.setImg5path(recordBak.getImg5path());
// traffAlarmRecord.setVideopath(recordBak.getVideopath());
if (traffAlarmRecord != null && traffAlarmRecord.getRecordid() != null) {
traffAlarmRecordMapper.updateTraffAlarmRecordUrl(traffAlarmRecord);
}
} catch (Exception e) {
log.error("解析事件告警数据异常 :" + e.getMessage());
}
//
redisDao.incrOrDecr(Constant.succSizeTempKey, -1);
}
}
package com.hzjt.redis;
import com.hzjt.util.JsonUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ListOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
@Repository
public class RedisDao {
@Autowired
private StringRedisTemplate template;
/**
* @Description: 操作redis中数据结构为String的数据,进行set操作
* @Param: [key, value]
* @Retrun: void
*/
public <T> void setStringKey(String key, T value) {
ValueOperations<String, String> ops = template.opsForValue();
// 将参数value转换为String类型
String str = JsonUtil.beanToString(value);
ops.set(key, str);
}
/**
* @Description: 操作redis中数据结构为String的数据,进行get操作,获取单个对象的json字符串
* @Param: [key, clazz]
* @Retrun: T
*/
public <T> T getStringValue(String key, Class<T> clazz) {
ValueOperations<String, String> ops = this.template.opsForValue();
String str = ops.get(key);
// 将json串转换成对应(clazz)的对象
return JsonUtil.stringToBean(str, clazz);
}
/**
* @Description: 操作redis中数据结构为String的数据,进行get操作,获取对象集合的json字符串
* @Param: [key, clazz]
* @Retrun: java.util.List<T>
*/
public <T> List<T> getStringListValue(String key, Class<T> clazz) {
ValueOperations<String, String> ops = this.template.opsForValue();
String str = ops.get(key);
// 将json串转换成对应(clazz)的对象集合
return JsonUtil.stringToList(str, clazz);
}
/**
* @Description: 操作redis中数据结构为List的数据,进行get操作,获取对应list中“所有”的数据
* @Param: [key, clazz]
* @Retrun: java.util.List<T>
*/
public <T> List<T> getListValue(String key, Class<T> clazz) {
ListOperations<String, String> ops = template.opsForList();
// 获取对应list中的所有的数据
List<String> list = ops.range(key, 0, -1);
// 创建大小为对应list大小(ops.size(key)的ArrayList,避免后期进行扩容操作
List<T> result = new ArrayList<T>(ops.size(key).intValue());
// 遍历从redis中获取到的list,依次将其转换为对应(clazz)的对象并添加至结果集(result)中
for (String s : list) {
result.add(JsonUtil.stringToBean(s, clazz));
}
return result;
}
/**
* @Description: 操作redis中数据结构为List的数据,进行push操作(这里默认从左left进行插入)
* @Param: [key, value]
* @Retrun: void
*/
public <T> void leftPushKey(String key, T value) {
ListOperations<String, String> ops = template.opsForList();
// 将参数value转换为String类型
String str = JsonUtil.beanToString(value);
// 将转换后的json字符串存入redis
ops.leftPush(key, str);
}
/**
* @Description: 操作redis中数据结构为List的数据,进行pop操作(这里默认从右right进行取出)
* @Param: [key, clazz]
* @Retrun: T
*/
public <T> T rightPopValue(String key, Class<T> clazz) {
ListOperations<String, String> ops = template.opsForList();
String str = ops.rightPop(key);
return JsonUtil.stringToBean(str, clazz);
}
/**
* @Description: 操作redis中数据结构为List的数据,进行size操作,获取对应的list的长度大小
* @Param: [key]
* @Retrun: java.lang.Long
*/
public Long getListSize(String key) {
ListOperations<String, String> ops = template.opsForList();
return ops.size(key);
}
/**
* @Description: 消息发布
* @Param: [channelName, value] 频道名称
* @Retrun: void
*/
public <T> void publish(String channelName, T value) {
// 将参数value转换为String类型
String str = JsonUtil.beanToString(value);
// 将消息(str)发布到指定的频道(channelName)
template.convertAndSend(channelName, str);
}
/**
* @Description: 操作redis中数据结构为String的数据,进行increment操作
* @Param: [key, num]
* @Retrun: java.lang.Long
*/
public Long incrOrDecr(String key, long num) {
ValueOperations<String, String> ops = template.opsForValue();
return ops.increment(key, num);
}
/**
* @Description: 清空参数keyList中的所有值(key)所对应的redis里的数据
* @Param: [keyList]
* @Retrun: void
*/
public void cleanCache(List<String> keyList) {
template.delete(keyList);
}
}
package com.hzjt.service;
import com.hzjt.domain.Traffalarmrecordstat;
import com.hzjt.mapper.TraffalarmrecordstatMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@Slf4j
@Service
public class AlarmRecordStatisticsService {
private Map<String, Long> statisticsMap = new HashMap<>(16);
private TraffalarmrecordstatMapper traffalarmrecordstatMapper;
@Autowired
public AlarmRecordStatisticsService(TraffalarmrecordstatMapper traffalarmrecordstatMapper) {
this.traffalarmrecordstatMapper = traffalarmrecordstatMapper;
}
public synchronized void statistics(String key) {
Long count = statisticsMap.putIfAbsent(key, 1L);
if (count != null) {
statisticsMap.put(key, ++count);
}
}
@Transactional(rollbackFor = Exception.class)
public void statisticsToDb() {
if (statisticsMap.isEmpty()) {
return;
}
Map<String, Long> statisticsDbMap;
synchronized (this) {
statisticsDbMap = statisticsMap;
statisticsMap = new HashMap<>(16);
}
Iterator<Map.Entry<String, Long>> iterator = statisticsDbMap.entrySet().iterator();
if (iterator.hasNext()) {
do {
Map.Entry<String, Long> next = iterator.next();
String key = next.getKey();
Long count = next.getValue();
String[] dataArr = key.split("&");
Traffalarmrecordstat traffalarmrecordstat = new Traffalarmrecordstat();
traffalarmrecordstat.setAreaid(dataArr[0]);
traffalarmrecordstat.setFdid(dataArr[1]);
traffalarmrecordstat.setChannelid(Integer.valueOf(dataArr[2]));
traffalarmrecordstat.setEventtype(dataArr[3]);
String countdate = dataArr[4];
traffalarmrecordstat.setCountdate(Long.valueOf(countdate));
Traffalarmrecordstat dbTraffalarmrecordstat = traffalarmrecordstatMapper.selectByPrimaryKey(traffalarmrecordstat);
if (dbTraffalarmrecordstat == null) {
traffalarmrecordstat.setTotalcount(count);
traffalarmrecordstat.setCounthour(Integer.valueOf(countdate.substring(4, 6)));
traffalarmrecordstatMapper.insertSelective(traffalarmrecordstat);
continue;
}
dbTraffalarmrecordstat.setTotalcount(dbTraffalarmrecordstat.getTotalcount() + count);
traffalarmrecordstatMapper.updateByPrimaryKeySelective(dbTraffalarmrecordstat);
} while (iterator.hasNext());
}
log.info("traffalarmrecordstat");
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
package com.hzjt.service;
import com.hzjt.domain.Alarm;
public interface ImportService {
/**
* @Description: 清空redis中的部分旧数据
* @Param: []
* @Retrun: void
*/
void cleanCache();
/**
* @Description: 将参数result中的部分数据存入redis中,并把格式校验成功的数据发布至对应频道中
* @Param: [result]
* @Retrun: void
*/
void cacheAndPublish(String result);
/**
* @Description: 根据key值,返回redis中对应的结果
* @Param: [key]
* @Retrun: long
*/
long getTempSize(String key);
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment