Commit 6bac4217 authored by wangjinjing's avatar wangjinjing

init

parent 7861b8df
/target/ .*
!.mvn/wrapper/maven-wrapper.jar !.gitignore
*.class
### STS ### # Package Files #
.apt_generated *.jar
.classpath *.war
.factorypath *.ear
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ### #package files
.idea target
*.iws *.xml.bak
*.iml *.iml
*.ipr .idea
#demo file
AlexDemo.java
### NetBeans ### release/
/nbproject/private/
/build/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
\ No newline at end of file
This diff is collapsed.
<?xml version="1.0" encoding="UTF-8"?> <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion> <modelVersion>4.0.0</modelVersion>
<groupId>com.quartz.cn</groupId>
<artifactId>VideoofCultural</artifactId> <groupId>com.littlersmall.rabbitmq-access</groupId>
<version>1.0</version> <artifactId>rabbitmq-access</artifactId>
<packaging>jar</packaging> <version>1.0-SNAPSHOT</version>
<description></description>
<properties> <properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<skipTests>true</skipTests>
</properties> </properties>
<dependencies> <dependencies>
<!-- lombok-->
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.projectlombok</groupId>
<artifactId>spring-boot-starter-web</artifactId> <artifactId>lombok</artifactId>
</dependency> <version>1.18.6</version>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.1</version>
</dependency>
<!--druid连接池-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.0.5</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.commons</groupId> <groupId>org.springframework.amqp</groupId>
<artifactId>commons-lang3</artifactId> <artifactId>spring-rabbit</artifactId>
<version>3.6</version> <version>2.0.4.RELEASE</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency> </dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.15-beta2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-joda</artifactId>
<version>2.9.6</version>
</dependency>
<!--quartz依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.6</version>
</dependency>
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.3.8</version>
</dependency>
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--kafka依赖-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- HttpClient依赖 -->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId> <artifactId>log4j-to-slf4j</artifactId>
...@@ -169,35 +34,36 @@ ...@@ -169,35 +34,36 @@
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>javax.annotation</groupId>
<artifactId>jul-to-slf4j</artifactId> <artifactId>javax.annotation-api</artifactId>
<version>1.7.25</version> <version>1.3.2</version>
<scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.apache.commons</groupId>
<artifactId>spring-boot-starter-amqp</artifactId> <artifactId>commons-lang3</artifactId>
<version>3.6</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<finalName>${project.artifactId}</finalName>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.springframework.boot</groupId> <artifactId>maven-assembly-plugin</artifactId>
<artifactId>spring-boot-maven-plugin</artifactId> <configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin> </plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration> <configuration>
<failOnMissingWebXml>false</failOnMissingWebXml> <source>1.8</source>
<target>1.8</target>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
</project> </project>
\ No newline at end of file
package com.cx.cn.cxquartz;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
@SpringBootApplication
public class CXQuartzApplication extends SpringBootServletInitializer {
public static void main(String[] args) {
SpringApplication.run(CXQuartzApplication.class, args);
}
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder builder) {
return builder.sources(CXQuartzApplication.class);
}
}
package com.cx.cn.cxquartz.bean;
import java.io.Serializable;
public class QuartzTaskInformations implements Serializable {
private Long id;
private Integer version;
private String taskno;
private String taskname;
private String schedulerrule;
private String frozenstatus;
private String executorno;
private Long frozentime;
private Long unfrozentime;
private Long createtime;
private Long lastmodifytime;
private String sendtype;
private String url;
private String executeparamter;
private String timekey;
private Long objectx;
private Long objecty;
private Long objectw;
private Long objecth;
private String recordtype;
private String metatype;
private String imgsrc;
private String objectType;
private String videoid;
public String getVideoid() {
return videoid;
}
public void setVideoid(String videoid) {
this.videoid = videoid;
}
public String getImgsrc() {
return imgsrc;
}
public void setImgsrc(String imgsrc) {
this.imgsrc = imgsrc;
}
public String getObjectType() {
return objectType;
}
public void setObjectType(String objectType) {
this.objectType = objectType;
}
public String getMetatype() {
return metatype;
}
public void setMetatype(String metatype) {
this.metatype = metatype;
}
public Long getObjectx() {
return objectx;
}
public void setObjectx(Long objectx) {
this.objectx = objectx;
}
public Long getObjecty() {
return objecty;
}
public void setObjecty(Long objecty) {
this.objecty = objecty;
}
public Long getObjectw() {
return objectw;
}
public void setObjectw(Long objectw) {
this.objectw = objectw;
}
public Long getObjecth() {
return objecth;
}
public void setObjecth(Long objecth) {
this.objecth = objecth;
}
public String getRecordtype() {
return recordtype;
}
public void setRecordtype(String recordtype) {
this.recordtype = recordtype;
}
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Integer getVersion() {
return version;
}
public void setVersion(Integer version) {
this.version = version;
}
public String getTaskno() {
return taskno;
}
public void setTaskno(String taskno) {
this.taskno = taskno == null ? null : taskno.trim();
}
public String getTaskname() {
return taskname;
}
public void setTaskname(String taskname) {
this.taskname = taskname == null ? null : taskname.trim();
}
public String getSchedulerrule() {
return schedulerrule;
}
public void setSchedulerrule(String schedulerrule) {
this.schedulerrule = schedulerrule == null ? null : schedulerrule.trim();
}
public String getFrozenstatus() {
return frozenstatus;
}
public void setFrozenstatus(String frozenstatus) {
this.frozenstatus = frozenstatus == null ? null : frozenstatus.trim();
}
public String getExecutorno() {
return executorno;
}
public void setExecutorno(String executorno) {
this.executorno = executorno == null ? null : executorno.trim();
}
public Long getFrozentime() {
return frozentime;
}
public void setFrozentime(Long frozentime) {
this.frozentime = frozentime;
}
public Long getUnfrozentime() {
return unfrozentime;
}
public void setUnfrozentime(Long unfrozentime) {
this.unfrozentime = unfrozentime;
}
public Long getCreatetime() {
return createtime;
}
public void setCreatetime(Long createtime) {
this.createtime = createtime;
}
public Long getLastmodifytime() {
return lastmodifytime;
}
public void setLastmodifytime(Long lastmodifytime) {
this.lastmodifytime = lastmodifytime;
}
public String getSendtype() {
return sendtype;
}
public void setSendtype(String sendtype) {
this.sendtype = sendtype == null ? null : sendtype.trim();
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url == null ? null : url.trim();
}
public String getExecuteparamter() {
return executeparamter;
}
public void setExecuteparamter(String executeparamter) {
this.executeparamter = executeparamter == null ? null : executeparamter.trim();
}
public String getTimekey() {
return timekey;
}
public void setTimekey(String timekey) {
this.timekey = timekey == null ? null : timekey.trim();
}
}
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.helper.MessageHelper;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.service.quartz.SbtdspsrService;
import com.cx.cn.cxquartz.vo.Sbtdspsr;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.TriggerContext;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import org.springframework.scheduling.support.PeriodicTrigger;
import java.util.Date;
import java.util.List;
import java.util.UUID;
@Configuration
@EnableScheduling
public class PerformedTaskCornChange implements SchedulingConfigurer {
@Autowired
private SbtdspsrService sbtdspsrService;
@Autowired
private RabbitTemplate rabbitTemplate;
private Long timer=1000L;
public Long getTimer() {
return timer;
}
public void setTimer(Long timer) {
this.timer = timer;
}
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.addTriggerTask(new Runnable() {
@Override
public void run() {
//判断最小执行时间, // 如果小于2分钟,立即执行所有设备的获取rtsp与hls 的服务,下次执行时间为间隔2分钟
// 如果大于2分钟,下次执行时间为间隔1秒
Long time= sbtdspsrService.getPeriodicseconds();
if(time<1200){
setTimer(120000L);
List<Sbtdspsr> list= sbtdspsrService.getPerformedTasks();
if(null!=list && list.size()>0) {
for(Sbtdspsr sbtdspsr:list) {
//丢到rabbitMq中
String msgId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(msgId);
rabbitTemplate.convertAndSend(QueueConstants.QueueRTSPEnum.QUEUE_RTSP_ENUM.getExchange(),
QueueConstants.QueueRTSPEnum.QUEUE_RTSP_ENUM.getRouteKey(),
MessageHelper.objToMsg(sbtdspsr),
correlationData);
}
}
else{
//查询所有数据
}
}
else {
setTimer(1000L);
}
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
PeriodicTrigger trigger=new PeriodicTrigger(timer);
Date nextDate= trigger.nextExecutionTime(triggerContext);
return nextDate;
}
});
}
}
package com.cx.cn.cxquartz.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
package com.cx.cn.cxquartz.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
// @Configuration
@EnableCaching //开启注解
public class RedisConfig extends CachingConfigurerSupport {
/**
* retemplate相关配置
*
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);
//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(om);
// 值采用json序列化
template.setValueSerializer(jacksonSeial);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());
// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();
return template;
}
/**
* 对hash类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
}
/**
* 对redis字符串类型数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForValue();
}
// @Bean
// public JedisConnectionFactory redisConnectionFactory() {
// JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory();
// jedisConnectionFactory.setHostName("<server-hostname-here>");
// jedisConnectionFactory.setPort(6379);
// jedisConnectionFactory.setPassword("<server-password-here>");
// jedisConnectionFactory.afterPropertiesSet();
// return jedisConnectionFactory;
// }
/**
* 对链表类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForList();
}
/**
* 对无序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForSet();
}
/**
* 对有序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForZSet();
}
}
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.redis.Consumer;
import com.cx.cn.cxquartz.redis.OrderConsumer;
import com.cx.cn.cxquartz.redis.QueueConfiguration;
import com.cx.cn.cxquartz.redis.container.RedisMQConsumerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedisPCQueueConfig {
@Autowired
RedisMQConsumerContainer mqContainer;
// 初始化完毕后调取 init
@Bean(initMethod = "init", destroyMethod = "destroy")
public RedisMQConsumerContainer redisQueueConsumerContainer() {
for(int i=0;i<6;i++) {
Consumer orderConsumer = new OrderConsumer();
mqContainer.addConsumer(
QueueConfiguration.builder().queue("taskinfo").consumer(orderConsumer).build()
);
}
return mqContainer;
}
}
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.redis.Consumer;
import com.cx.cn.cxquartz.redis.OrderConsumer;
import com.cx.cn.cxquartz.redis.QueueConfiguration;
import com.cx.cn.cxquartz.redis.container.RedisMQConsumerContainer;
import com.cx.cn.cxquartz.service.quartz.SbtdspsrService;
import com.cx.cn.cxquartz.util.RestUtil;
import com.cx.cn.cxquartz.vo.Sbtdspsr;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import java.util.List;
@Configuration
@EnableScheduling
public class ScheduleTaskConfig {
@Value("${file.getrtspbyurl}")
private String getrtspbyurl;
@Autowired
private SbtdspsrService sbtdspsrService;
//
// @Autowired
// RedisMQConsumerContainer mqContainer;
RestUtil restUtil=new RestUtil();
// /***
// * 每隔20分钟执行一遍判断rtsp 是否变换
// */
//// @Scheduled(cron = "0 0 2 * * ? ")
// private void statis() {
// //查询所有监控设备,更新rtsp 地址
// List<Sbtdspsr> sbtdpsrList= sbtdspsrService.list();
// //调用decice 端口获得新的rtsp 地址,如果与表里的一样无需更新,不一样则立即更新
// for(Sbtdspsr sbtd:sbtdpsrList)
// {
// restUtil.rtspChangeVlue(sbtd.getSbbh(),sbtd.getSqurllj(),getrtspbyurl);
// }
// }
// @Scheduled(cron = "0/5 * * * * ?")
// private void statistoday() {
// Consumer orderConsumer = new OrderConsumer();
// mqContainer.addConsumer(
// QueueConfiguration.builder().queue("taskinfo").consumer(orderConsumer).build()
// );
//
// }
}
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 生产者申明一个direct类型(直连型)交换机,然后发送消息到这个交换机指定路由键。
* 消费者指定消费这个交换机的这个路由键,即可接收到消息,其他消费者收不到。
* 用户登录直连型交换机配置
* 1. 声明Exchange交换器;
* 2. 声明Queue队列;
* 3. 绑定BindingBuilder绑定队列到交换器,并设置路由键;
* 消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。
* * 配置上了的话,其实消费者也是生成者的身份,也能推送该消息。
*/
@Configuration
public class SendToDXConfig {
/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange sendToDXDirectExchange() {
return new DirectExchange(QueueConstants.QueueSendToDXEnum.QUEUE_SEND_TO_DX_ENUM.getExchange());
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue sendToDXDirectQueue() {
return new Queue(QueueConstants.QueueSendToDXEnum.QUEUE_SEND_TO_DX_ENUM.getQueue(), true);
}
/**
* 将队列和交换机绑定,并设置用于匹配路由键
*
* @return
*/
@Bean
public Binding BindingSendToDXDirect() {
return BindingBuilder.bind(sendToDXDirectQueue()).to(sendToDXDirectExchange()).with(QueueConstants.QueueSendToDXEnum.QUEUE_SEND_TO_DX_ENUM.getRouteKey());
}
}
\ No newline at end of file
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 生产者申明一个direct类型(直连型)交换机,然后发送消息到这个交换机指定路由键。
* 消费者指定消费这个交换机的这个路由键,即可接收到消息,其他消费者收不到。
* 用户登录直连型交换机配置
* 1. 声明Exchange交换器;
* 2. 声明Queue队列;
* 3. 绑定BindingBuilder绑定队列到交换器,并设置路由键;
* 消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。
* 配置上了的话,其实消费者也是生成者的身份,也能推送该消息。
*/
@Configuration
public class SendToVoiceConfig {
/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange sendToVoiceDirectExchange() {
return new DirectExchange(QueueConstants.QueueSendToVoiceEnum.QUEUE_SEND_TO_VOICE_ENUM.getExchange());
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue sendToVoiceDirectQueue() {
return new Queue(QueueConstants.QueueSendToVoiceEnum.QUEUE_SEND_TO_VOICE_ENUM.getQueue(), true);
}
/**
* 将队列和交换机绑定,并设置用于匹配路由键
*
* @return
*/
@Bean
public Binding BindingSendToVoiceDirect() {
return BindingBuilder.bind(sendToVoiceDirectQueue()).to(sendToVoiceDirectExchange()).with(QueueConstants.QueueSendToVoiceEnum.QUEUE_SEND_TO_VOICE_ENUM.getRouteKey());
}
}
\ No newline at end of file
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 视频图片直连型交换机配置
* 1. 声明Exchange交换器;
* 2. 声明Queue队列;
* 3. 绑定BindingBuilder绑定队列到交换器,并设置路由键;
* 消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。
* 配置上了的话,其实消费者也是生成者的身份,也能推送该消息。
*/
@Configuration
public class TaskComsumExchangeConfig {
/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange OrderCancelDirectExchange() {
return new DirectExchange(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getExchange());
}
// /**
// * 创建队列 true表示是否持久
// *
// * @return
// */
// @Bean
// public Queue OrderCancelDirectQueue() {
// return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue(), true);
// }
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue Model1DirectQueue() {
return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue()+"_1", true);
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue Model2DirectQueue() {
return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue()+"_2", true);
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue Model3DirectQueue() {
return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue()+"_3", true);
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue Model4DirectQueue() {
return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue()+"_4", true);
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue Model5DirectQueue() {
return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue()+"_5", true);
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue Model0DirectQueue() {
return new Queue(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getQueue()+"_0", true);
}
/**
* 将队列和交换机绑定,并设置用于匹配路由键
*
* @return
*/
@Bean
public Binding BindingDirect() {
BindingBuilder.bind(Model1DirectQueue()).to(OrderCancelDirectExchange()).with(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_1");
BindingBuilder.bind(Model2DirectQueue()).to(OrderCancelDirectExchange()).with(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_2");
BindingBuilder.bind(Model3DirectQueue()).to(OrderCancelDirectExchange()).with(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_3");
BindingBuilder.bind(Model4DirectQueue()).to(OrderCancelDirectExchange()).with(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_4");
BindingBuilder.bind(Model5DirectQueue()).to(OrderCancelDirectExchange()).with(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_5");
return BindingBuilder.bind(Model0DirectQueue()).to(OrderCancelDirectExchange()).with(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_0");
}
}
package com.cx.cn.cxquartz.config;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 生产者申明一个direct类型(直连型)交换机,然后发送消息到这个交换机指定路由键。
* 消费者指定消费这个交换机的这个路由键,即可接收到消息,其他消费者收不到。
* 用户登录直连型交换机配置
* 1. 声明Exchange交换器;
* 2. 声明Queue队列;
* 3. 绑定BindingBuilder绑定队列到交换器,并设置路由键;
* 消费者单纯的使用,其实可以不用添加这个配置,直接建后面的监听就好,使用注解来让监听器监听对应的队列即可。
* 配置上了的话,其实消费者也是生成者的身份,也能推送该消息。
*/
@Configuration
public class getSnapShotConfig {
/**
* 创建交换机
*
* @return
*/
@Bean
public DirectExchange sendToVoiceDirectExchange() {
return new DirectExchange(QueueConstants.QueueRTSPEnum.QUEUE_RTSP_ENUM.getExchange());
}
/**
* 创建队列 true表示是否持久
*
* @return
*/
@Bean
public Queue sendToVoiceDirectQueue() {
return new Queue(QueueConstants.QueueRTSPEnum.QUEUE_RTSP_ENUM.getQueue(), true);
}
/**
* 将队列和交换机绑定,并设置用于匹配路由键
*
* @return
*/
@Bean
public Binding BindingSendToVoiceDirect() {
return BindingBuilder.bind(sendToVoiceDirectQueue()).to(sendToVoiceDirectExchange()).with(QueueConstants.QueueRTSPEnum.QUEUE_RTSP_ENUM.getRouteKey());
}
}
\ No newline at end of file
package com.cx.cn.cxquartz.controller;
import com.cx.cn.cxquartz.service.quartz.SbtdspsrService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
@Controller
@RequestMapping("/video")
public class SbtdspsrController {
private static final Logger logger = LoggerFactory.getLogger(SbtdspsrController.class);
@Autowired
private SbtdspsrService sbtdspsrService;
//
// @RequestMapping(value = "/getSbtdspsrbyrtsp", method = RequestMethod.GET)
// public String addTaskpage() {
// return "addtask";
// }
// @RequestMapping(value = "/list", method = RequestMethod.GET)
// @ResponseBody
// public List<Sbtdspsr> list() {
// return sbtdspsrService.list();
//
// }
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.Code;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author wjj
* @since 2021-04-29
*/
@Mapper
public interface CodeMapper {
Code selectalarmNum(String keyid);
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.Face;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author wjj
* @since 2021-04-29
*/
@Mapper
public interface FaceMapper {
int insertFace(Face face);
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.Pedestrian;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author wjj
* @since 2021-04-29
*/
@Mapper
public interface PedestrianMapper {
int insertpedestrian(Pedestrian pedestrian);
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.PeopleRideBicyc;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author wjj
* @since 2021-04-29
*/
@Mapper
public interface PeopleridebicycMapper {
int insertPeopleRideBicyc( PeopleRideBicyc peopleridebicyc);
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.bean.QuartzTaskInformations;
import com.cx.cn.cxquartz.vo.Sbtdspsr;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
import java.util.Map;
@Mapper
public interface SbtdspsrMapper {
List<Sbtdspsr> selectByRtsp( String rtsp);
List<Sbtdspsr> list( );
List<QuartzTaskInformations> selectRecogByRtsp(String rtsp);
int updateRecogByRtsp(Map map);
int updateRTSPorHLSParam(Sbtdspsr sbtdspsr);
Long getPeriodicseconds();
String getRtspOrHLSByDeviceCode(String deviceCode);
List<Sbtdspsr> getPerformedTasks();
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.Storageserver;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
@Mapper
public interface StorageServerMapper {
List<Storageserver> queryStorageServerAll(Storageserver storageServer);
}
\ No newline at end of file
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.Taskinfo;
import org.apache.ibatis.annotations.Mapper;
import java.util.List;
import java.util.Map;
@Mapper
public interface TaskinfoMapper {
List<Taskinfo> getTaskinfoByMutiParam (Map map);
int addTaskinfo(Taskinfo taskinfo);
int updateTaskinfo(Taskinfo taskinfo);
int delTaskinfoByid(String id);
}
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.TraffAlarmRecord;
import org.apache.ibatis.annotations.InsertProvider;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import java.util.List;
import java.util.Map;
@Mapper
public interface TraffAlarmRecordMapper {
int inserTraffAlarmRecord(TraffAlarmRecord traffalarmrecord);
int selectmax();
int updateTraffAlarmRecordUrl(TraffAlarmRecord traffalarmrecord);
List<TraffAlarmRecord> getTraffAlarmRecordByProgress(Map<String, Object> map);
int updateTraffAlarmRecordProcess(TraffAlarmRecord traffalarmrecord);
}
\ No newline at end of file
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.TraffpictureParam;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TraffPictureMapper {
int inserTraffpicture(TraffpictureParam traffalarmrecord);
int updateTraffpicture(TraffpictureParam traffalarmrecord);
String queryimgpath(TraffpictureParam traffpicture);
String queryimgdataByid(String id);
int updateTraffpicturePushStatus(TraffpictureParam traffpicture );
}
\ No newline at end of file
package com.cx.cn.cxquartz.dao;
import com.cx.cn.cxquartz.vo.Traffic;
import org.apache.ibatis.annotations.Mapper;
/**
* <p>
* Mapper 接口
* </p>
*
* @author wjj
* @since 2021-04-29
*/@Mapper
public interface TrafficMapper {
int insertTraffic(Traffic traffic);
}
package com.cx.cn.cxquartz.helper;
import com.cx.cn.cxquartz.util.JsonUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
/**
* RabbitMQ消息处理类
*/
public class MessageHelper {
public static Message objToMsg(Object obj) {
if (null == obj) {
return null;
}
Message message = MessageBuilder.withBody(JsonUtil.objToStr(obj).getBytes()).build();
// 消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}
public static Message msgToMsg(String mess) {
if (null == mess) {
return null;
}
Message message = MessageBuilder.withBody(mess.getBytes()).build();
// 消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setContentType(MessageProperties.CONTENT_TYPE_JSON);
return message;
}
public static <T> T msgToObj(Message message, Class<T> clazz) {
if (null == message || null == clazz) {
return null;
}
String str = new String(message.getBody());
return JsonUtil.strToObj(str, clazz);
}
}
package com.cx.cn.cxquartz.job;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
@ServerEndpoint("/websocket/{name}")
public class WebSocket {
private static final Logger logger = LoggerFactory.getLogger(WebSocket.class);
/**
* 与某个客户端的连接对话,需要通过它来给客户端发送消息
*/
private Session session;
/**
* 标识当前连接客户端的用户名
*/
private String name;
/**
* 用于存所有的连接服务的客户端,这个对象存储是安全的
*/
private static ConcurrentHashMap<String, WebSocket> webSocketSet = new ConcurrentHashMap<>();
@OnOpen
public void OnOpen(Session session, @PathParam(value = "name") String name) {
this.session = session;
this.name = name;
// name是用来表示唯一客户端,如果需要指定发送,需要指定发送通过name来区分
webSocketSet.put(name, this);
}
@OnClose
public void OnClose() {
webSocketSet.remove(this.name);
}
@OnError
public void OnError(@PathParam("name") String name, Throwable throwable, Session session) {
webSocketSet.remove(name);
}
@OnMessage
public void OnMessage(String message) {
//判断是否需要指定发送,具体规则自定义
if(message.indexOf("HEARTBEAT")>=0){
Map map=new HashMap();
map.put("type","HEARTBEAT");
map.put("ts",new Date().getTime());
try {
AppointSending(name, new ObjectMapper().writeValueAsString(map));
}catch (Exception ex)
{
logger.error("websocket->OnMessage eror:{}",ex.toString());
}
}
}
/**
* 群发
*
* @param message
*/
public static void GroupSending(String message) {
for (String name : webSocketSet.keySet()) {
try {
if (null != webSocketSet.get(name) && null != webSocketSet.get(name).session && null != webSocketSet.get(name).session.getBasicRemote())
webSocketSet.get(name).session.getBasicRemote().sendText(message);
} catch (Exception e) {
logger.error("websocket->GroupSending eror:{}",e.toString());
}
}
}
/**
* 指定发送
*
* @param name
* @param message
*/
public void AppointSending(String name, String message) {
if (null != webSocketSet.get(name) && null != webSocketSet.get(name).session && null != webSocketSet.get(name).session.getBasicRemote()) {
synchronized (webSocketSet.get(name).session) {
try {
webSocketSet.get(name).session.getBasicRemote().sendText(message);
} catch (Exception e) {
logger.error("websocket->AppointSending eror:{}",e.toString());
}
}
}
}
}
package com.cx.cn.cxquartz.rabbitmq;
/**
* 消息队列常量
*/
public class QueueConstants {
/**
* 任务队列常量
*/
public interface QueueTaskConsumer{
/**
* 交换机名称
*/
String EXCHANGE = "RabbitMQ.DirectExchange.TaskConsumer";
/**
* 队列名称
*/
String QUEUE = "RabbitMQ.DirectQueue.TaskConsumer";
/**
* 路由键
*/
String ROUTEKEY = "RabbitMQ.RouteKey.TaskConsumer";
}
/**
* 消息通知队列
*/
public enum QueueTaskEnum {
QUEUE_TASK_ENUM(QueueConstants.QueueTaskConsumer.EXCHANGE, QueueConstants.QueueTaskConsumer.QUEUE,
QueueConstants.QueueTaskConsumer.ROUTEKEY);
/**
* 交换机名称
*/
private String exchange;
/**
* 队列名称
*/
private String queue;
/**
* 路由键
*/
private String routeKey;
QueueTaskEnum(String exchange, String queue, String routeKey) {
this.exchange = exchange;
this.queue = queue;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
}
/**
* t推送第三方队列常量
*/
public interface QueueSendToDXConsumer{
/**
* 交换机名称
*/
String EXCHANGE = "RabbitMQ.DirectExchange.SendToDXConsumer";
/**
* 队列名称
*/
String QUEUE = "RabbitMQ.DirectQueue.SendToDXConsumer";
/**
* 路由键
*/
String ROUTEKEY = "RabbitMQ.RouteKey.SendToDXConsumer";
}
/**
* 消息第三方声音告警队列
*/
public enum QueueSendToDXEnum {
QUEUE_SEND_TO_DX_ENUM(QueueConstants.QueueSendToDXConsumer.EXCHANGE, QueueConstants.QueueSendToDXConsumer.QUEUE,
QueueConstants.QueueSendToDXConsumer.ROUTEKEY);
/**
* 交换机名称
*/
private String exchange;
/**
* 队列名称
*/
private String queue;
/**
* 路由键
*/
private String routeKey;
QueueSendToDXEnum(String exchange, String queue, String routeKey) {
this.exchange = exchange;
this.queue = queue;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
}
/**
* 推送第三方队列常量
*/
public interface QueueSendToVoiceConsumer{
/**
* 交换机名称
*/
String EXCHANGE = "RabbitMQ.DirectExchange.SendToVoiceConsumer";
/**
* 队列名称
*/
String QUEUE = "RabbitMQ.DirectQueue.SendToVoiceConsumer";
/**
* 路由键
*/
String ROUTEKEY = "RabbitMQ.RouteKey.SendToVoiceConsumer";
}
/**
* 消息第三方队列
*/
public enum QueueSendToVoiceEnum {
QUEUE_SEND_TO_VOICE_ENUM(QueueConstants.QueueSendToVoiceConsumer.EXCHANGE, QueueConstants.QueueSendToVoiceConsumer.QUEUE,
QueueConstants.QueueSendToVoiceConsumer.ROUTEKEY);
/**
* 交换机名称
*/
private String exchange;
/**
* 队列名称
*/
private String queue;
/**
* 路由键
*/
private String routeKey;
QueueSendToVoiceEnum(String exchange, String queue, String routeKey) {
this.exchange = exchange;
this.queue = queue;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
}
/**
* 获得rtsp 或者hls的队列
*/
public interface QueueRTSPConsumer{
/**
* 交换机名称
*/
String EXCHANGE = "RabbitMQ.DirectExchange.RTSPConsumer";
/**
* 队列名称
*/
String QUEUE = "RabbitMQ.DirectQueue.RTSPConsumer";
/**
* 路由键
*/
String ROUTEKEY = "RabbitMQ.RouteKey.RTSPConsumer";
}
/**
* 消息第三方队列
*/
public enum QueueRTSPEnum {
QUEUE_RTSP_ENUM(QueueConstants.QueueRTSPConsumer.EXCHANGE, QueueConstants.QueueRTSPConsumer.QUEUE,
QueueConstants.QueueRTSPConsumer.ROUTEKEY);
/**
* 交换机名称
*/
private String exchange;
/**
* 队列名称
*/
private String queue;
/**
* 路由键
*/
private String routeKey;
QueueRTSPEnum(String exchange, String queue, String routeKey) {
this.exchange = exchange;
this.queue = queue;
this.routeKey = routeKey;
}
public String getExchange() {
return exchange;
}
public void setExchange(String exchange) {
this.exchange = exchange;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public String getRouteKey() {
return routeKey;
}
public void setRouteKey(String routeKey) {
this.routeKey = routeKey;
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import java.io.IOException;
public interface BaseConsumer {
/**
* 消息消费入口
*
* @param message
* @param channel
* @throws IOException
*/
void consume(Message message, Channel channel) throws IOException;
}
\ No newline at end of file
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import com.cx.cn.cxquartz.service.quartz.TraffPictureService;
import com.cx.cn.cxquartz.service.quartz.impl.ResultService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.reflect.Proxy;
import java.util.Map;
/**
* 在Consumer中, 真正的业务逻辑其实只是保存消息到各自的数据表中, 但我们又不得不在调用consume方法之前校验消费幂等性, 发送后, 还要更新消息状态为"已消费"状态, 并手动ack。
* 实际项目中, 可能还有很多生产者-消费者的应用场景, 如记录日志, 发送短信等等, 都需要rabbitmq, 如果每次都写这些重复的公用代码, 没必要, 也难以维护。
* 所以, 我们可以将公共代码抽离出来, 让核心业务逻辑只关心自己的实现, 而不用做其他操作, 其实就是AOP。
* <p>
* 为达到这个目的, 有很多方法, 可以用spring aop, 可以用拦截器, 可以用静态代理, 也可以用动态代理, 在这里用的是动态代理。
*/
public class BaseConsumerProxy {
private static final Logger logger = LoggerFactory.getLogger(BaseConsumerProxy.class);
/**
* 代理对象
*/
private Object target;
/**
* 消息业务操作对象
*/
@Autowired
TraffPictureService traffPictureService;
@Autowired
ResultService resultService;
public BaseConsumerProxy(Object target, TraffPictureService traffPictureService) {
this.target = target;
this.traffPictureService = traffPictureService;
}
public BaseConsumerProxy(Object target) {
this.target = target;
}
/**
* 使用动态代理实现消费端幂等性验证和消费确认(ack)
*
* @return
*/
public Object getProxy() {
ClassLoader classLoader = target.getClass().getClassLoader();
Class[] interfaces = target.getClass().getInterfaces();
//Lambda表达式方式实现InvocationHandler接口
return Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> {
Message message = (Message) args[0];
Channel channel = (Channel) args[1];
//String correlationId = getCorrelationId(message);
// 消费幂等性, 防止消息被重复消费
// 重启服务器, 由于有一条未被ack的消息, 所以重启后监听到消息, 进行消费, 但是由于消费前会判断该消息的状态是否未被消费, 发现status=3, 即已消费,
// 所以, 直接return, 这样就保证了消费端的幂等性, 即使由于网络等原因投递成功而未触发回调, 从而多次投递, 也不会重复消费进而发生业务异常。
// if (isConsumed(correlationId)) {
// logger.info("重复消费, correlationId: {}", correlationId);
// return null;
// }
MessageProperties properties = message.getMessageProperties();
long tag = properties.getDeliveryTag();
try {
// 真正消费的业务逻辑
Object result = method.invoke(target, args);
//traffPictureService.updateStatus(correlationId, QueueConstants.MessageLogStatus.CONSUMED_SUCCESS);
// 消费确认 虽然消息确实被消费了, 但是由于是手动确认模式, 而最后又没手动确认, 所以, 消息仍被rabbitmq保存。
// 所以, 手动ack能够保证消息一定被消费, 但一定要记得basicAck。
channel.basicAck(tag, false);
return result;
} catch (Exception e) {
logger.error("getProxy error", e);
channel.basicNack(tag, false,true);
return null;
}
});
}
/**
* 获取CorrelationId
*
* @param message
* @return
*/
private String getCorrelationId(Message message) {
String correlationId = null;
MessageProperties properties = message.getMessageProperties();
Map<String, Object> headers = properties.getHeaders();
for (Map.Entry entry : headers.entrySet()) {
String key = (String) entry.getKey();
String value = (String) entry.getValue();
if (key.equals("spring_returned_message_correlation")) {
correlationId = value;
}
}
return correlationId;
}
/**
* 消息是否已被消费
*
* @param correlationId
* @return
*/
private boolean isConsumed(String correlationId) {
//查看数据是否入表
return false;
// MessageLog msgLog = msgLogService.selectByMsgId(correlationId);
// return null == msgLog || msgLog.getStatus().equals(QueueConstants.MessageLogStatus.CONSUMED_SUCCESS);
}
}
\ No newline at end of file
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
//@Component
public class ResendMessage {
private static final Logger logger = LoggerFactory.getLogger(ResendMessage.class);
@Autowired
private RabbitTemplate rabbitTemplate;
// 最大投递次数
private static final int MAX_TRY_COUNT = 3;
/**
* 每30s拉取投递失败的消息, 保证消息100%投递成功并被消费.
* 实际应用场景中, 可能由于网络原因, 或者消息未被持久化MQ就宕机了, 使得投递确认的回调方法ConfirmCallback没有被执行,
* 从而导致数据库该消息状态一直是投递中的状态, 此时就需要进行消息重投, 即使也许消息已经被消费了。
* 定时任务只是保证消息100%投递成功, 而多次投递的消费幂等性需要消费端自己保证。
* 我们可以将回调和消费成功后更新消息状态的代码注释掉, 开启定时任务, 查看是否重投。
*/
@Scheduled(cron = "0/30 * * * * ?")
public void resend() {
logger.info("开始执行定时任务(重新投递消息)");
// List<MessageLog> msgLogs = msgLogService.selectTimeoutMsg();
// //查询推送给失败的数据,重新投递,投递最大次数为三次
// msgLogs.forEach(msgLog -> {
// String msgId = msgLog.getMsgId();
// if (msgLog.getTryCount() >= MAX_TRY_COUNT) {
// msgLogService.updateStatus(msgId, QueueConstants.MessageLogStatus.DELIVER_FAIL);
// logger.info("超过最大重试次数, 消息投递失败, msgId: {}", msgId);
// } else {
// // 投递次数+1
// msgLogService.updateTryCount(msgId, msgLog.getNextTryTime());
//
// CorrelationData correlationData = new CorrelationData(msgId);
// // 重新投递
// rabbitTemplate.convertAndSend(msgLog.getExchange(), msgLog.getRoutingKey(),
// MessageHelper.objToMsg(msgLog.getMsg()), correlationData);
//
// logger.info("第 " + (msgLog.getTryCount() + 1) + " 次重新投递 MsgID:" + msgId + "的消息!");
// }
// });
logger.info("定时任务执行结束(重新投递消息)");
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import com.cx.cn.cxquartz.helper.MessageHelper;
import com.cx.cn.cxquartz.service.quartz.TraffAlarmRecordService;
import com.cx.cn.cxquartz.service.quartz.impl.EventWriteService;
import com.cx.cn.cxquartz.util.JsonUtil;
import com.cx.cn.cxquartz.vo.JobTjParam;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 消息处理并推送第三方
*/
@Component
public class SendToDXConsumer implements BaseConsumer {
private static final Logger logger = LoggerFactory.getLogger(SendToDXConsumer.class);
@Autowired
TraffAlarmRecordService traffAlarmRecordService;
@Autowired
EventWriteService eventWriteService;
/**
* 消息消费入口
*
* @param message
* @param channel
* @throws IOException
*/
@Override
public void consume(Message message, Channel channel) throws IOException {
logger.info("SendToDXConsumer 收到消息: {}", message.toString());
Map result = MessageHelper.msgToObj(message, Map.class);
if (null != result.get("id") && null!=result.get("traff") && null!=result.get("callback")) {
JobTjParam jobTjParam=JsonUtil.strToObj(result.get("traff").toString(),JobTjParam.class);
if(null!=jobTjParam) {
// eventWriteService.sendEventByCallUrl(Long.parseLong(result.get("id").toString())
// , jobTjParam, result.get("callback").toString());
}
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import com.cx.cn.cxquartz.helper.MessageHelper;
import com.cx.cn.cxquartz.service.quartz.TraffAlarmRecordService;
import com.cx.cn.cxquartz.service.quartz.impl.EventWriteService;
import com.cx.cn.cxquartz.util.JsonUtil;
import com.cx.cn.cxquartz.vo.JobTjParam;
import com.cx.cn.cxquartz.vo.Voice;
import com.cx.cn.cxquartz.vo.VoiceData;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* 消息处理并推送第三方
*/
@Component
public class SendToVoiceConsumer implements BaseConsumer {
private static final Logger logger = LoggerFactory.getLogger(SendToVoiceConsumer.class);
@Autowired
TraffAlarmRecordService traffAlarmRecordService;
@Autowired
EventWriteService eventWriteService;
@Value("${voice.unionId}")
private String unionId;
@Value("${voice.appKey}")
private String appKey;
@Value("${voice.corpId}")
private String corpId;
@Value("${voice.eventId}")
private Integer eventId;
/**
* 消息消费入口
*
* @param message
* @param channel
* @throws IOException
*/
@Override
public void consume(Message message, Channel channel) {
logger.info("SendToVoiceConsumer 收到消息: {}", message.toString());
Map result = MessageHelper.msgToObj(message, Map.class);
if (null != result.get("id") && null!=result.get("traff") && null!=result.get("callback")) {
JobTjParam jobTjParam=JsonUtil.strToObj(result.get("traff").toString(), JobTjParam.class);
//回调声音触发事件
if (null!=jobTjParam &&unionId.contains(jobTjParam.getDeviceId())) {
VoiceData voicedata = new VoiceData();
voicedata.setAppKey(appKey);
voicedata.setCorpId(corpId);
voicedata.setRequestData(new Voice(eventId,unionId));
// logger.info(" send to voice: {}", new ObjectMapper().writeValueAsString(voicedata));
//同步推送
// eventWriteService.sendVoice(voicedata);
}
//推送告警到前端
// webSocket.GroupSending(JsonUtil.objToStr(traffpictureParamresult));
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import com.cx.cn.cxquartz.bean.QuartzTaskInformations;
import com.cx.cn.cxquartz.helper.MessageHelper;
import com.cx.cn.cxquartz.service.quartz.TraffAlarmRecordService;
import com.cx.cn.cxquartz.service.quartz.impl.ResultService;
import com.cx.cn.cxquartz.service.quartz.impl.VideoRTSPorURLService;
import com.cx.cn.cxquartz.util.JsonUtil;
import com.cx.cn.cxquartz.vo.Sbtdspsr;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
@Component
public class SnapShotConsumer implements BaseConsumer{
private static final Logger logger = LoggerFactory.getLogger(TaskConsumConsumer.class);
@Autowired
VideoRTSPorURLService videoRTSPorURLService;
@Override
public void consume(Message message, Channel channel) throws IOException {
logger.info("SnapShotConsumer 收到消息: {}", message.toString());
Sbtdspsr result = MessageHelper.msgToObj(message, Sbtdspsr.class);
if (result.getTdlx().equals("1")) {
//调用rtsp 的服务
String token=videoRTSPorURLService.getRTSPAccessToekenByDeviceCode(result.getSbbh());
videoRTSPorURLService.getRTSPByDeviceCode(token,result.getSbbh());
}
else{
//调用hls 的服务
String token=videoRTSPorURLService.getHLSToekenByDeviceCode(result.getSbbh());
videoRTSPorURLService.getHLSByDeviceCode(token,result.getSbbh());
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer;
import com.cx.cn.cxquartz.helper.MessageHelper;
import com.cx.cn.cxquartz.service.quartz.TraffAlarmRecordService;
import com.cx.cn.cxquartz.service.quartz.impl.ResultService;
import com.cx.cn.cxquartz.util.JsonUtil;
import com.cx.cn.cxquartz.bean.QuartzTaskInformations;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.IOException;
import java.util.Map;
/**
* 消息处理并推送第三方
*/
//@Component
public class TaskConsumConsumer implements BaseConsumer {
private static final Logger logger = LoggerFactory.getLogger(TaskConsumConsumer.class);
@Autowired
TraffAlarmRecordService traffAlarmRecordService;
@Autowired
ResultService resultService;
/**
* 消息消费入口
*
* @param message
* @param channel
* @throws IOException
*/
@Override
public void consume(Message message, Channel channel) throws IOException {
logger.info("TaskConsumConsumer 收到消息: {}", message.toString());
Map result = MessageHelper.msgToObj(message, Map.class);
if (null != result.get("task")) {
QuartzTaskInformations taskinfo = JsonUtil.strToObj( result.get("task").toString(),QuartzTaskInformations.class);
if (null != result.get("result")) {
Map objresult = JsonUtil.strToObj(result.get("result").toString(), Map.class);
//处理消息
resultService.processResult(taskinfo, objresult);
}
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer.listener;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumerProxy;
import com.cx.cn.cxquartz.rabbitmq.comsumer.TaskConsumConsumer;
import com.cx.cn.cxquartz.service.quartz.TraffPictureService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 任务消息监听接受器
*/
//@Component
public class OrderCancelReceiver {
private static final Logger logger = LoggerFactory.getLogger(OrderCancelReceiver.class);
@Autowired
private TaskConsumConsumer taskConsumConsumer;
@Autowired
private TraffPictureService traffPictureService;
@RabbitListener(queues = QueueConstants.QueueTaskConsumer.QUEUE,containerFactory="rabbitListenerContainerFactory")
public void process(Message message, Channel channel) {
try {
logger.info("consumer->OrderCancelReceiver消费者收到消息 : " + message.toString());
BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(taskConsumConsumer, traffPictureService);
BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();
if (null != proxy) {
proxy.consume(message, channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer.listener;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumerProxy;
import com.cx.cn.cxquartz.rabbitmq.comsumer.SnapShotConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.TaskConsumConsumer;
import com.cx.cn.cxquartz.service.quartz.TraffAlarmRecordService;
import com.cx.cn.cxquartz.service.quartz.TraffPictureService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 任务消息监听接受器
*/
@Component
public class RTSPorHLSReceiver {
private static final Logger logger = LoggerFactory.getLogger(RTSPorHLSReceiver.class);
@Autowired
private SnapShotConsumer snapShotConsumer;
@RabbitListener(queues = QueueConstants.QueueRTSPConsumer.QUEUE,containerFactory="rabbitListenerContainerFactory")
public void process(Message message, Channel channel) {
try {
logger.info("consumer->RTSPorHLSReceiver消费者收到消息 : " + message.toString());
BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(snapShotConsumer);
BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();
if (null != proxy) {
proxy.consume(message, channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer.listener;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumerProxy;
import com.cx.cn.cxquartz.rabbitmq.comsumer.SendToDXConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.TaskConsumConsumer;
import com.cx.cn.cxquartz.service.quartz.impl.ResultService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 推送给第三方队列的监听
*/
@Component
public class SendtoDXReceiver {
private static final Logger logger = LoggerFactory.getLogger(SendtoDXReceiver.class);
@Autowired
private SendToDXConsumer sendToDXConsumer;
/**
* 消息接收处理
*
* @param message
* @param channel
* @throws IOException
*/
// @RabbitListener(queues = QueueConstants.QueueSendToDXConsumer.QUEUE,containerFactory="rabbitListenerContainerFactory")
// public void consume(Message message, Channel channel) throws IOException {
// logger.info("consumer->QueueSendToDXConsumer 消费者收到消息 : " + message.toString());
// BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(sendToDXConsumer);
// BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();
// if (null != proxy) {
// proxy.consume(message, channel);
// }
// }
}
package com.cx.cn.cxquartz.rabbitmq.comsumer.listener;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumerProxy;
import com.cx.cn.cxquartz.rabbitmq.comsumer.SendToVoiceConsumer;
import com.cx.cn.cxquartz.service.quartz.impl.ResultService;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* 推送给声音告警队列的监听
*/
@Component
public class SendtoVoiceAlarmReceiver {
@Autowired
private SendToVoiceConsumer sendToVoiceConsumer;
/**
* 消息接收处理
*
* @param message
* @param channel
* @throws IOException
*/
@RabbitListener(queues = QueueConstants.QueueSendToVoiceConsumer.QUEUE,containerFactory="rabbitListenerContainerFactory")
public void consume(Message message, Channel channel) throws IOException {
BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(sendToVoiceConsumer);
BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();
if (null != proxy) {
proxy.consume(message, channel);
}
}
}
package com.cx.cn.cxquartz.rabbitmq.comsumer.listener;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumer;
import com.cx.cn.cxquartz.rabbitmq.comsumer.BaseConsumerProxy;
import com.cx.cn.cxquartz.rabbitmq.comsumer.SnapShotConsumer;
import com.cx.cn.cxquartz.service.quartz.TraffPictureService;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 任务消息监听接受器
*/
@Component
public class SnapShotReceiver {
private static final Logger logger = LoggerFactory.getLogger(SnapShotReceiver.class);
@Autowired
private SnapShotConsumer snapShotConsumer;
@Autowired
private TraffPictureService traffPictureService;
@RabbitListener(queues = QueueConstants.QueueRTSPConsumer.QUEUE)
public void process(Message message, Channel channel) {
try {
logger.info("consumer->OrderCancelReceiver消费者收到消息 : " + message.toString());
BaseConsumerProxy baseConsumerProxy = new BaseConsumerProxy(snapShotConsumer, traffPictureService);
BaseConsumer proxy = (BaseConsumer) baseConsumerProxy.getProxy();
if (null != proxy) {
proxy.consume(message, channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.cx.cn.cxquartz.rabbitmq.producer.callback;
import com.cx.cn.cxquartz.controller.ExtController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.nio.charset.StandardCharsets;
/**
* 消息发送确认的回调
* 实现接口:implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback
* ConfirmCallback:只确认消息是否正确到达交换机中,不管是否到达交换机,该回调都会执行;
* ReturnCallback:如果消息从交换机未正确到达队列中将会执行,正确到达则不执行;
*/
@Component
public class ConsumerConfirmAndReturnCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private static final Logger logger = LoggerFactory.getLogger(ExtController.class);
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* PostConstruct: 用于在依赖关系注入完成之后需要执行的方法上,以执行任何初始化.
*/
@PostConstruct
public void init() {
//指定 ConfirmCallback
rabbitTemplate.setConfirmCallback(this);
//指定 ReturnCallback
rabbitTemplate.setReturnCallback(this);
}
/**
* 消息从交换机成功到达队列,则returnedMessage方法不会执行;
* 消息从交换机未能成功到达队列,则returnedMessage方法会执行;
* 需要开启 return 确认机制
* spring.rabbitmq.publisher-returns=true
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("returnedMessage回调方法->" + new String(message.getBody(), StandardCharsets.UTF_8) + ",\n replyCode:" + replyCode
+ "\n replyText:" + replyText + "\n exchange:" + exchange + ",\\n routingKey:" + routingKey);
}
/**
* 消息找不到对应的Exchange会先触发此方法
* 如果消息没有到达交换机,则该方法中isSendSuccess = false,error为错误信息;
* 如果消息正确到达交换机,则该方法中isSendSuccess = true;
* 需要开启 confirm 确认机制
* spring.rabbitmq.publisher-confirms=true
*/
@Override
public void confirm(CorrelationData correlationData, boolean isSendSuccess, String error) {
if (correlationData != null) {
// logger.info("confirm回调方法->回调消息ID为: " + correlationData.getId());
if (isSendSuccess) {
logger.info("confirm回调方法->消息成功发送到交换机!");
} else {
logger.info("confirm回调方法->消息[{}]发送到交换机失败!,原因 : [{}]", correlationData, error);
}
}
}
}
package com.cx.cn.cxquartz.redis;
public interface Consumer {
void consume(Object message);
}
package com.cx.cn.cxquartz.redis;
import com.cx.cn.cxquartz.bean.QuartzTaskInformations;
import com.cx.cn.cxquartz.helper.MessageHelper;
import com.cx.cn.cxquartz.rabbitmq.QueueConstants;
import com.cx.cn.cxquartz.service.quartz.SbtdspsrService;
import com.cx.cn.cxquartz.service.quartz.impl.VideoRTSPorURLService;
import com.cx.cn.cxquartz.util.HttpClientUtil;
import com.cx.cn.cxquartz.util.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.*;
@Component
public class OrderConsumer implements Consumer {
public static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);
@Value("${local.czurl}")
private String czurl;
@Value("${local.fxurl}")
private String fxurl;
@Value("${file.recogurl}")
private String recogurl;
@Value("${file.model}")
private String model;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private VideoRTSPorURLService videoRTSPorURLService;
@Autowired
private SbtdspsrService sbtdspsrService;
private static OrderConsumer orderConsumer;
@PostConstruct
public void init() {
orderConsumer = this;
orderConsumer.rabbitTemplate = this.rabbitTemplate;
orderConsumer.czurl=this.czurl;
orderConsumer.fxurl=this.fxurl;
orderConsumer.recogurl=this.recogurl;
orderConsumer.model=this.model;
orderConsumer.videoRTSPorURLService=this.videoRTSPorURLService;
orderConsumer.sbtdspsrService=this.sbtdspsrService;
}
public OrderConsumer(){
}
@Override
public void consume(Object message) {
if(message instanceof QuartzTaskInformations){
QuartzTaskInformations msg =(QuartzTaskInformations) message;
try {
//调用抽帧服务
String devicecode=msg.getExecuteparamter();
log.info("开始消费消息{}", msg.getId());
//如果设备编号是用一次废一次的,此刻需要现场取得rtsp
if(null!=devicecode&&devicecode.startsWith("33") && devicecode.length()==18){
//调用抽帧服务
String token= orderConsumer.videoRTSPorURLService.getRTSPAccessToekenByDeviceCode(devicecode);
String rtsp=orderConsumer.videoRTSPorURLService.getRTSPByDeviceCode(token,devicecode);
}
else{
//取表里最新的rtsp 或者hls 的值
String rtsp=orderConsumer.sbtdspsrService.getRtspOrHLSByDeviceCode(devicecode);
}
//将rtsp 作为参数调用抽帧服务
String result="{\n" +
" \"ret\": 0,\n" +
" \"desc\": \"succ!\",\n" +
" \"url\": \"http://zjh189.ncpoi.cc:7080/download/202109/08/33050300001327599605/33050300001327599605_20210908_134131031.jpg\",\n" +
" \"localuri\": \"/home/ubuntu/pictures/slice/202109/08/33050300001327599605/33050300001327599605_20210908_134131031.jpg\",\n" +
" \"timestamp\": \"2021-09-08 13:41:31.031\",\n" +
" \"devicecode\": \"33050300001327599605\"\n" +
"}";
//抽帧结果放到rabbttmq 中,根据msg 的检测metatype ,分别派发到不同的queue中,方便以后10条10条的去皮皮昂分析
Map m = new HashMap();
// m.put("deviceSnapshot", JsonUtil.objToStr(msg));
// m.put("result", result);
String msgId = UUID.randomUUID().toString();
CorrelationData correlationData = new CorrelationData(msgId);
orderConsumer.rabbitTemplate.convertAndSend(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getExchange(),
QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey()+"_"+msg.getObjectType(),
MessageHelper.objToMsg(result),
correlationData);
// String roistr= URLEncoder.encode("[")+msg.getObjectx()+","+msg.getObjecty()+","+msg.getObjectw()+","+msg.getObjecth()+URLEncoder.encode("]");
// String result= CompletableFuture.supplyAsync(() -> HttpClientUtil.doGet(orderConsumer.recogurl + "?deviceCode="+msg.getExecuteparamter()+"&model="+orderConsumer.model+"&roi="+roistr)).get(2, TimeUnit.SECONDS);
// String roistr= URLEncoder.encode("[")+msg.getObjectx()+","+msg.getObjecty()+","+msg.getObjectw()+","+msg.getObjecth()+URLEncoder.encode("]");
//// String result= CompletableFuture.supplyAsync(() -> HttpClientUtil.doGet(orderConsumer.recogurl + "?deviceCode="+msg.getExecuteparamter()+"&model="+orderConsumer.model+"&roi="+roiarray)).get(2, TimeUnit.SECONDS);
// log.info(" deviceCode:{} ",msg.getExecuteparamter());
// String result = HttpClientUtil.doGet(orderConsumer.recogurl + "?deviceCode=" + msg.getExecuteparamter() + "&model=" + orderConsumer.model + "&roi=" + roistr);
// log.info(" picture result:{} ",result);
// if (null != result && result.contains("ret"))//放入rabbitmq.数据由消费者去处理
// {
// Map objresult = JsonUtil.strToObj(result, Map.class);
// //处理消息
// if (null != objresult.get("ret") && objresult.get("ret").toString().equals("0")
// && null != objresult.get("desc")
// && objresult.get("desc").toString().contains("succ")) {
// Map m = new HashMap();
// m.put("task", JsonUtil.objToStr(msg));
// m.put("result", result);
// String msgId = UUID.randomUUID().toString();
// CorrelationData correlationData = new CorrelationData(msgId);
// //根绝不同的检测事件对象metatype分发到不同的通道
//
// orderConsumer.rabbitTemplate.convertAndSend(QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getExchange(),
// QueueConstants.QueueTaskEnum.QUEUE_TASK_ENUM.getRouteKey(),
// MessageHelper.objToMsg(m),
// correlationData);
// }
// }
//添加具体的消费逻辑,修改数据库什么的
log.info("消费消息{}完成", msg.getId());
}catch (Exception ex){
log.error("消费消息{}error:{}", msg.getId(), ex.toString());
}
}
}
}
package com.cx.cn.cxquartz.redis;
public class QueueConfiguration {
private String queue;
private Consumer consumer;
public static Builder builder(){
return new Builder();
}
public static class Builder{
private QueueConfiguration configuration = new QueueConfiguration();
public Builder queue(String queue) {
configuration.queue = queue;
return this;
}
public Builder consumer(Consumer consumer) {
configuration.consumer = consumer;
return this;
}
public QueueConfiguration build() {
if (configuration.queue == null || configuration.queue.length() == 0) {
if (configuration.consumer != null) {
configuration.queue = configuration.getClass().getSimpleName();
}
}
return configuration;
}
}
public void setQueue(String queue) {
this.queue = queue;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public String getQueue() {
return queue;
}
public Consumer getConsumer() {
return consumer;
}
}
package com.cx.cn.cxquartz.redis;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
@Service
public class QueueSender {
@Autowired
private RedisTemplate redisTemplate;
public QueueSender(RedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
public void sendMsg(String queue, Object msg) {
redisTemplate.opsForList().leftPush(queue, msg);
}
}
package com.cx.cn.cxquartz.redis.container;
import com.cx.cn.cxquartz.bean.QuartzTaskInformations;
import com.cx.cn.cxquartz.redis.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;
public class QueueListener implements Runnable {
public static final Logger log = LoggerFactory.getLogger(QueueListener.class);
private RedisTemplate redisTemplate;
private String queue;
private Consumer consumer;
public QueueListener(RedisTemplate redisTemplate, String queue, Consumer consumer) {
this.redisTemplate = redisTemplate;
this.queue = queue;
this.consumer = consumer;
}
public QueueListener() {
}
/**
* 使用队列右出获取消息
* 没获取到消息则线程 sleep 一秒,减少资源浪费
* 实现了 Runnable 接口,可以作为线程任务执行
*/
@Override
public void run() {
while (RedisMQConsumerContainer.RUNNING){
Object message = redisTemplate.opsForList().rightPop(queue,500,TimeUnit.MICROSECONDS);
if( message instanceof QuartzTaskInformations){
consumer.consume(message);
}else{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
\ No newline at end of file
package com.cx.cn.cxquartz.redis.container;
import com.cx.cn.cxquartz.redis.QueueConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@Component
public class RedisMQConsumerContainer {
public static final Logger log = LoggerFactory.getLogger(RedisMQConsumerContainer.class);
public static boolean RUNNING;
@Autowired
private RedisTemplate redisTemplate;
private Map<String,QueueConfiguration> consumerMap= new HashMap<>();
private ExecutorService executor;
public RedisMQConsumerContainer() {
init();
}
public void addConsumer(QueueConfiguration configuration) {
// if (consumerMap.containsKey(configuration.getQueue())) {
// log.warn("Key:{} this key already exists, and it will be replaced", configuration.getQueue());
// }
if (configuration.getConsumer() == null) {
log.warn("Key:{} consumer cannot be null, this configuration will be skipped", configuration.getQueue());
}
// consumerMap.put(configuration.getQueue(), configuration);
executor.submit(new QueueListener(redisTemplate,configuration.getQueue(),configuration.getConsumer()));
log.info("队列 {} 提交消息任务",configuration.getQueue());
}
public void destroy() {
log.info("Redis消息队列线程池关闭中");
RUNNING = false;
this.executor.shutdown();
log.info("QueueListener exiting.");
while (!this.executor.isTerminated()) {
}
log.info("QueueListener exited.");
}
public void init() {
log.info("消息队列线程池初始化");
RUNNING = true;
this.executor = Executors.newCachedThreadPool(r -> {
final AtomicInteger threadNumber = new AtomicInteger(6);
return new Thread(r, "RedisMQListener-" + threadNumber.getAndIncrement());
});
}
}
\ No newline at end of file
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.dao.StorageServerMapper;
import com.cx.cn.cxquartz.vo.Storageserver;
import com.google.gson.Gson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class CacheLoadService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${redis.cachekey.ftplist}")
private String ftplistkey;
@Autowired
private StorageServerMapper storageServerMapper;
public boolean loadFtpCache() {
try {
Storageserver server = new Storageserver();
server.setServerstatus(0);//�����õ�
server.setServertype("ftp");
List<Storageserver> storageServers = storageServerMapper.queryStorageServerAll(server);
if (!storageServers.isEmpty() && storageServers.size() > 0) {
stringRedisTemplate.opsForValue().set(ftplistkey, new Gson().toJson(storageServers),60, TimeUnit.SECONDS);
} else {
System.out.println("storageServers.isEmpty");
}
return true;
} catch (Exception e) {
System.out.println(e.toString());
return false;
}
}
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.Code;
/**
* <p>
* 服务类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
public interface CodeService {
Code selectalarmNum(String keyid);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.Face;
/**
* <p>
* 服务类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
public interface FaceService {
int insertFace(Face face);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.util.RedisEnum;
import com.cx.cn.cxquartz.vo.Ftp;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Service
public class FtpService {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(FtpService.class);
@Autowired
private StringRedisTemplate stringRedisTemplate;
private List<Ftp> ftpList;
@Autowired
CacheLoadService cacheLoadService;
@Autowired
public FtpService(StringRedisTemplate stringRedisTemplate) {
// try {
// this.ftpList = getFtpList(stringRedisTemplate.opsForValue().get(RedisEnum.FTPLIST.getValue()));
// }catch (Exception e){
// logger.error("ftpList error:"+e.toString());
// }
}
public Ftp reloadFtp() {
try {
// cacheLoadService.loadFtpCache();
if(null==stringRedisTemplate.opsForValue().get(RedisEnum.FTPLIST.getValue())){
cacheLoadService.loadFtpCache();
}
ftpList = getFtpList(stringRedisTemplate.opsForValue().get(RedisEnum.FTPLIST.getValue()));
}catch (Exception e){
logger.error("ftpListerror:"+e.toString());
}
Long count = null;
try {
count = stringRedisTemplate.opsForValue().increment(RedisEnum.FTPLIST_INDEX.getValue(), 1L);
} catch (Exception e) {
logger.error("redis error" + e.toString());
}
if (count == null) {
count = 0L;
}
long index = count % ftpList.size();
return ftpList.get((int) index);
}
private List<Ftp> getFtpList(String ftpJson) {
try {
JavaType javaType =OBJECT_MAPPER.getTypeFactory().constructParametricType(ArrayList.class, Map.class);
List<Map> jsonArr = OBJECT_MAPPER.readValue(ftpJson, javaType);
List<Ftp> ftpList = new ArrayList<>();
for (int i = 0; i < jsonArr.size(); i++) {
Ftp ftp = new Ftp();
Map jsonObject = jsonArr.get(i);
ftp.setFtpIp(jsonObject.get("serveip").toString());
ftp.setFtpPort(Integer.parseInt(jsonObject.get("serverport")==null?"21":jsonObject.get("serverport").toString()));
ftp.setFtpUsername(jsonObject.get("serveruser").toString());
ftp.setFtpPassword(jsonObject.get("serverpassword").toString());
ftpList.add(ftp);
}
}catch (Exception ex){
logger.error("ftpListerror:"+ex.toString());
}
return ftpList;
}
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.Pedestrian;
/**
* <p>
* 服务类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
public interface PedestrianService {
int insertpedestrian(Pedestrian pedestrian);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.PeopleRideBicyc;
/**
* <p>
* 服务类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
public interface PeopleridebicycService {
int insertPeopleRideBicyc( PeopleRideBicyc peopleridebicyc);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.bean.QuartzTaskInformations;
import com.cx.cn.cxquartz.vo.Sbtdspsr;
import java.util.List;
public interface SbtdspsrService {
List<Sbtdspsr> selectByRtsp(String videoid);
List<QuartzTaskInformations> selectRecogByRtsp(String rtsp);
int updateRecogByRtsp(String rtsp,String devicecode);
List<Sbtdspsr> list();
int updateRTSPorHLSParam(Sbtdspsr sbtdspsr);
List<Sbtdspsr> getPerformedTasks();
Long getPeriodicseconds();
String getRtspOrHLSByDeviceCode(String devicecode);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.Taskinfo;
import java.util.List;
import java.util.Map;
public interface TaskinfoService {
List<Taskinfo> getTaskinfoByMutiParam (Map map);
int addTaskinfo(Taskinfo taskinfo);
int updateTaskinfo(Taskinfo taskinfo);
int delTaskinfoByid(String id);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.TraffAlarmRecord;
import java.util.List;
import java.util.Map;
public interface TraffAlarmRecordService {
int inserTraffAlarmRecord(TraffAlarmRecord traffalarmrecord);
int selectmax();
int updateTraffAlarmRecordUrl(TraffAlarmRecord record);
int updateTraffAlarmRecordProcess(TraffAlarmRecord recordid);
List<TraffAlarmRecord> getTraffAlarmRecordByProgress(Map<String, Object> map);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.TraffpictureParam;
public interface TraffPictureService {
int inserTraffpicture( TraffpictureParam traffpicture);
int updateTraffpicture( TraffpictureParam traffpicture);
String queryimgpath(TraffpictureParam traffpicture);
String queryimgdataByid(String id);
}
package com.cx.cn.cxquartz.service.quartz;
import com.cx.cn.cxquartz.vo.Traffic;
/**
* <p>
* 服务类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
public interface TrafficService {
int insertTraffic(Traffic traffic);
}
package com.cx.cn.cxquartz.service.quartz.impl;
import com.cx.cn.cxquartz.dao.CodeMapper;
import com.cx.cn.cxquartz.service.quartz.CodeService;
import com.cx.cn.cxquartz.vo.Code;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>
* 服务实现类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
@Service
public class CodeServiceImpl implements CodeService {
@Autowired
private CodeMapper codeMapper;
@Override
public Code selectalarmNum(String keyid) {
return codeMapper.selectalarmNum(keyid);
}
}
package com.cx.cn.cxquartz.service.quartz.impl;
import com.cx.cn.cxquartz.dao.FaceMapper;
import com.cx.cn.cxquartz.service.quartz.FaceService;
import com.cx.cn.cxquartz.vo.Face;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>
* 服务实现类
* </p>
*
* @author wjj
* @since 2021-04-29
*/
@Service
public class FaceServiceImpl implements FaceService {
@Autowired
private FaceMapper faceMapper;
@Override
public int insertFace(Face face) {
return faceMapper.insertFace(face);
}
}
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment