Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
H
hzjtpushdateService
Project overview
Project overview
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
建金
hzjtpushdateService
Commits
33ecd994
Commit
33ecd994
authored
Jun 22, 2021
by
yzm
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
mqtt服务
parent
e28e8853
Pipeline
#30
failed with stages
Changes
18
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
18 changed files
with
644 additions
and
109 deletions
+644
-109
pom.xml
pom.xml
+9
-0
src/main/java/com/hzjt/config/IMQTTPublisher.java
src/main/java/com/hzjt/config/IMQTTPublisher.java
+17
-0
src/main/java/com/hzjt/config/IMQTTSubscriber.java
src/main/java/com/hzjt/config/IMQTTSubscriber.java
+20
-0
src/main/java/com/hzjt/config/MQTTSubsribe.java
src/main/java/com/hzjt/config/MQTTSubsribe.java
+142
-0
src/main/java/com/hzjt/config/PushCallback.java
src/main/java/com/hzjt/config/PushCallback.java
+78
-0
src/main/java/com/hzjt/controller/EventWriteController.java
src/main/java/com/hzjt/controller/EventWriteController.java
+3
-2
src/main/java/com/hzjt/controller/TraffController.java
src/main/java/com/hzjt/controller/TraffController.java
+236
-77
src/main/java/com/hzjt/domain/MqttAlarm.java
src/main/java/com/hzjt/domain/MqttAlarm.java
+22
-0
src/main/java/com/hzjt/domain/ResponseEnum.java
src/main/java/com/hzjt/domain/ResponseEnum.java
+1
-1
src/main/java/com/hzjt/domain/WriteResultObj.java
src/main/java/com/hzjt/domain/WriteResultObj.java
+5
-1
src/main/java/com/hzjt/handler/WebSocket.java
src/main/java/com/hzjt/handler/WebSocket.java
+4
-3
src/main/java/com/hzjt/mapper/SbtdspsrMapper.java
src/main/java/com/hzjt/mapper/SbtdspsrMapper.java
+2
-0
src/main/java/com/hzjt/mapper/TraffAlarmRecordMapper.java
src/main/java/com/hzjt/mapper/TraffAlarmRecordMapper.java
+2
-0
src/main/java/com/hzjt/service/EventWriteService.java
src/main/java/com/hzjt/service/EventWriteService.java
+68
-14
src/main/java/com/hzjt/service/TraffdevicewriteresultService.java
.../java/com/hzjt/service/TraffdevicewriteresultService.java
+1
-0
src/main/resources/application.properties
src/main/resources/application.properties
+25
-10
src/main/resources/mapper/SbtdspsrMapper.xml
src/main/resources/mapper/SbtdspsrMapper.xml
+5
-1
src/main/resources/mapper/Traffalarmrecord.xml
src/main/resources/mapper/Traffalarmrecord.xml
+4
-0
No files found.
pom.xml
View file @
33ecd994
...
@@ -193,6 +193,15 @@
...
@@ -193,6 +193,15 @@
<scope>
system
</scope>
<scope>
system
</scope>
<systemPath>
${pom.basedir}/lib/ojdbc6.jar
</systemPath>
<systemPath>
${pom.basedir}/lib/ojdbc6.jar
</systemPath>
</dependency>
</dependency>
<!-- mqtt -->
<dependency>
<groupId>
org.springframework.integration
</groupId>
<artifactId>
spring-integration-stream
</artifactId>
</dependency>
<dependency>
<groupId>
org.springframework.integration
</groupId>
<artifactId>
spring-integration-mqtt
</artifactId>
</dependency>
<!-- <dependency>-->
<!-- <dependency>-->
<!-- <groupId>jms</groupId>-->
<!-- <groupId>jms</groupId>-->
<!-- <artifactId>orai18n</artifactId>-->
<!-- <artifactId>orai18n</artifactId>-->
...
...
src/main/java/com/hzjt/config/IMQTTPublisher.java
0 → 100644
View file @
33ecd994
package
com
.
hzjt
.
config
;
public
interface
IMQTTPublisher
{
/**
* 发布消息
*
* @param topic 主题
* @param message 消息
*/
public
void
publishMessage
(
String
topic
,
String
message
);
/**
* 断开MQTT客户端
*/
public
void
disconnect
();
}
src/main/java/com/hzjt/config/IMQTTSubscriber.java
0 → 100644
View file @
33ecd994
package
com
.
hzjt
.
config
;
/**
* <p>
* 订阅者接口
*/
public
interface
IMQTTSubscriber
{
/**
* 订阅消息
*
* @param topic
*/
public
void
subscribeMessage
(
String
topic
);
/**
* 断开MQTT客户端
*/
public
void
disconnect
();
}
src/main/java/com/hzjt/config/MQTTSubsribe.java
0 → 100644
View file @
33ecd994
package
com
.
hzjt
.
config
;
import
com.hzjt.mapper.SbtdspsrMapper
;
import
lombok.extern.slf4j.Slf4j
;
import
org.eclipse.paho.client.mqttv3.*
;
import
org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Component
;
import
java.util.List
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
@Component
@Slf4j
public
class
MQTTSubsribe
{
@Value
(
"${spring.mqtt.password}"
)
private
String
password
;
@Value
(
"${spring.mqtt.username}"
)
private
String
username
;
@Value
(
"${spring.mqtt.url}"
)
private
String
url
;
@Value
(
"${spring.mqtt.qos}"
)
private
Integer
qos
;
// 连接超时时间
@Value
(
"${spring.mqtt.completionTimeout}"
)
private
int
completionTimeout
;
@Autowired
SbtdspsrMapper
sbtdspsrMapper
;
private
String
[]
topics
;
private
int
[]
qoslist
;
private
MqttClient
client
;
private
MqttConnectOptions
mqttConnectOptions
;
@Autowired
private
PushCallback
pushCallback
;
private
ScheduledExecutorService
scheduled
;
public
void
startReconnect
()
{
this
.
scheduled
=
Executors
.
newSingleThreadScheduledExecutor
();
// 定时任务——重新连接mqtt服务器
this
.
scheduled
.
scheduleAtFixedRate
(
new
Runnable
()
{
public
void
run
()
{
if
(!
MQTTSubsribe
.
this
.
client
.
isConnected
())
{
try
{
log
.
info
(
"---mqtt reconnect ---"
);
List
<
String
>
sbbhlist
=
sbtdspsrMapper
.
selectAllWbbh
();
client
.
connect
(
mqttConnectOptions
);
subscribe
(
sbbhlist
);
// client.subscribe(topics, qoslist);
}
catch
(
MqttSecurityException
var2
)
{
var2
.
printStackTrace
();
}
catch
(
MqttException
var3
)
{
var3
.
printStackTrace
();
}
}
}
},
5000L
,
10000L
,
TimeUnit
.
MILLISECONDS
);
}
// 对mqttConnectOptions对象的常规设置
public
MqttConnectOptions
getMqttConnectOptions
()
{
this
.
mqttConnectOptions
=
new
MqttConnectOptions
();
this
.
mqttConnectOptions
.
setCleanSession
(
true
);
// this.mqttConnectOptions.setUserName(username);
// this.mqttConnectOptions.setPassword(password.toCharArray());
this
.
mqttConnectOptions
.
setServerURIs
(
new
String
[]{
url
});
this
.
mqttConnectOptions
.
setConnectionTimeout
(
completionTimeout
);
this
.
mqttConnectOptions
.
setKeepAliveInterval
(
2000
);
return
mqttConnectOptions
;
}
// 连接mqtt服务器订阅信息方法
// topic也可作为参数传入
public
void
subscribe
(
List
<
String
>
topics
)
{
if
(
null
!=
topics
&&
topics
.
size
()>
0
)
{
try
{
this
.
topics
=
topics
.
toArray
(
new
String
[
topics
.
size
()]);
this
.
client
=
new
MqttClient
(
url
,
getClientId
(),
new
MemoryPersistence
());
this
.
getMqttConnectOptions
();
this
.
client
.
setCallback
(
this
.
pushCallback
);
this
.
client
.
connect
(
this
.
mqttConnectOptions
);
//遍历topic ,获得qs
int
[]
qoslist
=
new
int
[
topics
.
size
()];
for
(
int
i
=
0
;
i
<
topics
.
size
();
i
++)
{
qoslist
[
i
]=
qos
;
}
this
.
qoslist
=
qoslist
;
// 可将订阅的一个或多个topic都存入数组中,同时订阅
// String[] topic1 = {defaultTopic};
this
.
client
.
subscribe
(
this
.
topics
,
qoslist
);
boolean
connected
=
this
.
client
.
isConnected
();
log
.
info
(
"连接状态为:"
+
connected
);
String
flag
=
connected
?
"成功"
:
"失败"
;
}
catch
(
MqttException
e
)
{
log
.
info
(
e
.
toString
());
startReconnect
();
}
}
else
{
log
.
info
(
"topic is empty"
);
}
}
// 随机生成唯一client.id方法
public
String
getClientId
()
{
String
nums
=
""
;
String
[]
codeChars
=
{
"0"
,
"1"
,
"2"
,
"3"
,
"4"
,
"5"
,
"6"
,
"7"
,
"8"
,
"9"
,
"a"
,
"b"
,
"c"
,
"d"
,
"e"
,
"f"
,
"g"
,
"h"
,
"i"
,
"j"
,
"k"
,
"l"
,
"m"
,
"n"
,
"o"
,
"p"
,
"q"
,
"r"
,
"s"
,
"t"
,
"u"
,
"v"
,
"w"
,
"x"
,
"y"
,
"z"
,
"A"
,
"B"
,
"C"
,
"D"
,
"E"
,
"F"
,
"G"
,
"H"
,
"I"
,
"J"
,
"K"
,
"L"
,
"M"
,
"N"
,
"O"
,
"P"
,
"Q"
,
"R"
,
"S"
,
"T"
,
"U"
,
"V"
,
"W"
,
"X"
,
"Y"
,
"Z"
};
for
(
int
i
=
0
;
i
<
23
;
i
++)
{
int
charNum
=
(
int
)
Math
.
floor
(
Math
.
random
()
*
codeChars
.
length
);
nums
=
nums
+
codeChars
[
charNum
];
}
return
nums
;
}
public
void
publishMessage
(
String
topic
,
String
message
)
{
try
{
MqttMessage
mqttmessage
=
new
MqttMessage
(
message
.
getBytes
());
mqttmessage
.
setQos
(
this
.
qos
);
client
.
publish
(
topic
,
mqttmessage
);
}
catch
(
MqttException
me
)
{
log
.
info
(
me
.
toString
());
}
}
}
\ No newline at end of file
src/main/java/com/hzjt/config/PushCallback.java
0 → 100644
View file @
33ecd994
package
com
.
hzjt
.
config
;
import
lombok.extern.slf4j.Slf4j
;
import
org.eclipse.paho.client.mqttv3.IMqttDeliveryToken
;
import
org.eclipse.paho.client.mqttv3.MqttCallback
;
import
org.eclipse.paho.client.mqttv3.MqttMessage
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.context.annotation.Configuration
;
import
org.springframework.data.redis.core.RedisTemplate
;
import
java.util.concurrent.ScheduledExecutorService
;
@Configuration
@Slf4j
public
class
PushCallback
implements
MqttCallback
,
IMQTTPublisher
{
@Autowired
private
MQTTSubsribe
mqttSubsribe
;
@Autowired
private
RedisTemplate
redisTemplate
;
// 定时任务——定时缓存查询的数据
private
ScheduledExecutorService
scheduled
;
@Override
public
void
connectionLost
(
Throwable
throwable
)
{
// 连接丢失后,一般在这里面进行重连
log
.
info
(
"连接断开,可以做重连"
);
this
.
mqttSubsribe
.
startReconnect
();
}
@Override
public
void
messageArrived
(
String
topic
,
MqttMessage
message
)
throws
Exception
{
log
.
info
(
"topic result"
+
topic
+
new
String
(
message
.
getPayload
()));
// subscribe后得到的消息会执行到这里面
String
[]
topics
=
topic
.
split
(
"/"
);
// this.scheduled = Executors.newSingleThreadScheduledExecutor();
// //周期定时方法,可以在里面进行定时数据存储操作,我测试时是先将数据存储到了redis中,可做实时数据来用
// this.scheduled.scheduleAtFixedRate(new Runnable() {
// @Override
// public void run() {
// String key = topics[0] + topics[1];
// String value = new String(message.getPayload());
// log.info("topic result");
//// if (redisTemplate.hasKey(key)){
//// redisTemplate.delete(key);
//// }
//// redisTemplate.opsForValue().set(key,value);
//// System.out.println("redis缓存数据"+value);
//// //下面是我对数据的一些处理,仅供参考
//// JSONObject jsonObject = JSONObject.parseObject(value);
//// Map map = jsonObject;
//// List<Map<String,Object>> list = (List<Map<String, Object>>) ((Map) map.get("lines")).get("data");
//// log.error("-----------------------------"+list);
// }
// //此处 120为每120秒执行一次
// },0,120, TimeUnit.SECONDS);
}
@Override
public
void
deliveryComplete
(
IMqttDeliveryToken
token
)
{
log
.
info
(
"deliveryComplete---------"
+
token
.
isComplete
());
}
@Override
public
void
publishMessage
(
String
topic
,
String
message
)
{
}
@Override
public
void
disconnect
()
{
log
.
info
(
"disconnect---------"
);
}
}
\ No newline at end of file
src/main/java/com/hzjt/controller/EventWriteController.java
View file @
33ecd994
...
@@ -2,6 +2,7 @@ package com.hzjt.controller;
...
@@ -2,6 +2,7 @@ package com.hzjt.controller;
import
com.hzjt.domain.ResultObj
;
import
com.hzjt.domain.ResultObj
;
import
com.hzjt.domain.Traffalarmrecord
;
import
com.hzjt.domain.Traffalarmrecord
;
import
com.hzjt.domain.WriteResultObj
;
import
com.hzjt.service.EventWriteService
;
import
com.hzjt.service.EventWriteService
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.web.bind.annotation.GetMapping
;
import
org.springframework.web.bind.annotation.GetMapping
;
...
@@ -24,7 +25,7 @@ public class EventWriteController {
...
@@ -24,7 +25,7 @@ public class EventWriteController {
}
}
@PostMapping
(
"/sendEvents"
)
@PostMapping
(
"/sendEvents"
)
public
List
<
ResultObj
>
sendEvents
(
@RequestParam
(
"recordid"
)
String
recordid
){
public
List
<
Write
ResultObj
>
sendEvents
(
@RequestParam
(
"recordid"
)
String
recordid
){
List
<
Long
>
longrecorid
=
new
ArrayList
<>();
List
<
Long
>
longrecorid
=
new
ArrayList
<>();
for
(
String
id:
recordid
.
split
(
","
))
for
(
String
id:
recordid
.
split
(
","
))
{
{
...
@@ -34,7 +35,7 @@ public class EventWriteController {
...
@@ -34,7 +35,7 @@ public class EventWriteController {
}
}
@GetMapping
(
"/sendEventsByids"
)
@GetMapping
(
"/sendEventsByids"
)
public
List
<
ResultObj
>
sendEventsByids
(){
public
List
<
Write
ResultObj
>
sendEventsByids
(){
List
<
Long
>
recordid
=
new
ArrayList
<>();
List
<
Long
>
recordid
=
new
ArrayList
<>();
recordid
.
add
(
Long
.
valueOf
(
93228
));
recordid
.
add
(
Long
.
valueOf
(
93228
));
recordid
.
add
(
Long
.
valueOf
(
93834
));
recordid
.
add
(
Long
.
valueOf
(
93834
));
...
...
src/main/java/com/hzjt/controller/TraffController.java
View file @
33ecd994
package
com
.
hzjt
.
controller
;
package
com
.
hzjt
.
controller
;
import
cn.hutool.core.codec.Base64Encoder
;
import
cn.hutool.core.codec.Base64Encoder
;
import
cn.hutool.json.JSONUtil
;
import
com.alibaba.fastjson.JSON
;
import
com.alibaba.fastjson.JSONObject
;
import
com.alibaba.fastjson.JSONObject
;
import
com.google.common.util.concurrent.RateLimiter
;
import
com.google.common.util.concurrent.RateLimiter
;
import
com.hzjt.config.MQTTSubsribe
;
import
com.hzjt.domain.*
;
import
com.hzjt.domain.*
;
import
com.hzjt.handler.FileTransferManager
;
import
com.hzjt.handler.FileTransferManager
;
import
com.hzjt.handler.WebSocket
;
import
com.hzjt.handler.WebSocket
;
...
@@ -13,14 +16,18 @@ import com.hzjt.service.*;
...
@@ -13,14 +16,18 @@ import com.hzjt.service.*;
import
com.hzjt.util.*
;
import
com.hzjt.util.*
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.tomcat.util.http.fileupload.IOUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.data.redis.core.StringRedisTemplate
;
import
org.springframework.web.bind.annotation.*
;
import
org.springframework.web.bind.annotation.*
;
import
sun.net.www.protocol.ftp.FtpURLConnection
;
import
javax.annotation.PostConstruct
;
import
javax.servlet.http.HttpServletRequest
;
import
javax.servlet.http.HttpServletResponse
;
import
javax.websocket.server.PathParam
;
import
javax.websocket.server.PathParam
;
import
java.io.IOException
;
import
java.io.*
;
import
java.io.InputStream
;
import
java.net.HttpURLConnection
;
import
java.net.HttpURLConnection
;
import
java.net.URL
;
import
java.net.URL
;
import
java.util.*
;
import
java.util.*
;
...
@@ -54,6 +61,12 @@ public class TraffController {
...
@@ -54,6 +61,12 @@ public class TraffController {
@Value
(
"${json.resisurl}"
)
@Value
(
"${json.resisurl}"
)
private
String
resisjsonvalue
;
private
String
resisjsonvalue
;
@Value
(
"${ftppath}"
)
private
String
ftppath
;
@Value
(
"${ftpServiceUrl}"
)
private
String
ftpServiceUrl
;
@Autowired
@Autowired
private
StringRedisTemplate
stringRedisTemplate
;
private
StringRedisTemplate
stringRedisTemplate
;
...
@@ -78,7 +91,6 @@ public class TraffController {
...
@@ -78,7 +91,6 @@ public class TraffController {
private
String
ftpPassword
;
private
String
ftpPassword
;
@Value
(
"${alarmrecord.check.enable}"
)
@Value
(
"${alarmrecord.check.enable}"
)
private
String
checkEnable
;
private
String
checkEnable
;
Long
ldir
=
new
Long
(
180
);
Long
ldir
=
new
Long
(
180
);
@Autowired
@Autowired
FLVCacheService
flvCacheService
;
FLVCacheService
flvCacheService
;
...
@@ -87,6 +99,10 @@ public class TraffController {
...
@@ -87,6 +99,10 @@ public class TraffController {
// private SimpMessagingTemplate template;
// private SimpMessagingTemplate template;
@Autowired
MQTTSubsribe
mQTTSubsribe
;
String
manualStatus
=
"0"
;
private
static
final
String
TYPE
=
"TRAFFIC_INCIDENT_ALARM"
;
private
static
final
String
TYPE
=
"TRAFFIC_INCIDENT_ALARM"
;
ScheduledExecutorService
schedulepool
=
Executors
.
newScheduledThreadPool
(
5
);
//创
ScheduledExecutorService
schedulepool
=
Executors
.
newScheduledThreadPool
(
5
);
//创
...
@@ -104,6 +120,20 @@ public class TraffController {
...
@@ -104,6 +120,20 @@ public class TraffController {
rateLimiterMap
=
new
ConcurrentHashMap
<>(
16
);
rateLimiterMap
=
new
ConcurrentHashMap
<>(
16
);
}
}
/**
* 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
* PostConstruct在构造函数之后执行,init()方法之前执行。PreDestroy()方法在destroy()方法知性之后执行
* 这里初始化订阅一个主题
*/
@PostConstruct
public
void
init
()
{
System
.
out
.
println
(
DateUtils
.
formatDate
(
new
Date
(
Long
.
valueOf
(
"1624331555000"
))));
//查询素有videowbbh
List
<
String
>
sbbhlist
=
sbtdspsrMapper
.
selectAllWbbh
();
mQTTSubsribe
.
subscribe
(
sbbhlist
);
}
@PostMapping
(
"/alarmevent"
)
@PostMapping
(
"/alarmevent"
)
public
ResultObj
rece
(
@RequestBody
Alarm
trffClientMessage
)
{
public
ResultObj
rece
(
@RequestBody
Alarm
trffClientMessage
)
{
// log.info("alarmevent--------------"+trffClientMessage.toString());
// log.info("alarmevent--------------"+trffClientMessage.toString());
...
@@ -111,7 +141,6 @@ public class TraffController {
...
@@ -111,7 +141,6 @@ public class TraffController {
return
ResultObj
.
error
(
ResponseEnum
.
E_1002
.
getCode
(),
"type类型不正确"
);
return
ResultObj
.
error
(
ResponseEnum
.
E_1002
.
getCode
(),
"type类型不正确"
);
}
}
if
(
trffClientMessage
.
getImg_urls
().
isEmpty
())
{
if
(
trffClientMessage
.
getImg_urls
().
isEmpty
())
{
return
ResultObj
.
error
(
ResponseEnum
.
E_1004
.
getCode
(),
"img_urls值不能为空"
);
return
ResultObj
.
error
(
ResponseEnum
.
E_1004
.
getCode
(),
"img_urls值不能为空"
);
}
}
...
@@ -128,15 +157,15 @@ public class TraffController {
...
@@ -128,15 +157,15 @@ public class TraffController {
String
sbbh
=
""
;
String
sbbh
=
""
;
int
tdbh
=
0
;
int
tdbh
=
0
;
if
(
videoId
.
contains
(
"_"
)
&&
videoId
.
split
(
"_"
).
length
==
2
)
{
//
if( videoId.contains("_") && videoId.split("_").length == 2) {
String
[]
sbAndTd
=
videoId
.
split
(
"_"
);
//
String[] sbAndTd = videoId.split("_");
sbbh
=
sbAndTd
[
0
];
//
sbbh = sbAndTd[0];
tdbh
=
Integer
.
valueOf
(
sbAndTd
[
1
])
+
1
;
//
tdbh = Integer.valueOf(sbAndTd[1]) + 1;
}
//
}
else
{
//
else {
sbbh
=
videoId
;
sbbh
=
videoId
;
tdbh
=
0
;
tdbh
=
0
;
}
//
}
List
<
Sbtdspsr
>
sbtdspsrs
=
sbtdspsrMapper
.
selectBySbbh
(
sbbh
,
tdbh
);
List
<
Sbtdspsr
>
sbtdspsrs
=
sbtdspsrMapper
.
selectBySbbh
(
sbbh
,
tdbh
);
if
(
sbtdspsrs
.
isEmpty
())
{
if
(
sbtdspsrs
.
isEmpty
())
{
log
.
info
(
"设备为:"
+
sbbh
+
",通道为:"
+
tdbh
+
"未录入(备案)"
);
log
.
info
(
"设备为:"
+
sbbh
+
",通道为:"
+
tdbh
+
"未录入(备案)"
);
...
@@ -147,62 +176,72 @@ public class TraffController {
...
@@ -147,62 +176,72 @@ public class TraffController {
log
.
info
(
"设备为:"
+
sbbh
+
",通道为:"
+
tdbh
+
"配置的行政区划"
+
xzbh
+
"不合规"
);
log
.
info
(
"设备为:"
+
sbbh
+
",通道为:"
+
tdbh
+
"配置的行政区划"
+
xzbh
+
"不合规"
);
return
ResultObj
.
error
(
ResponseEnum
.
E_1002
.
getCode
(),
"设备为:"
+
sbbh
+
",通道为:"
+
tdbh
+
"配置的行政区划不合规"
);
return
ResultObj
.
error
(
ResponseEnum
.
E_1002
.
getCode
(),
"设备为:"
+
sbbh
+
",通道为:"
+
tdbh
+
"配置的行政区划不合规"
);
}
}
//ts 時間轉成正常時間
trffClientMessage
.
setDept
(
xzbh
);
trffClientMessage
.
setDept
(
xzbh
);
//判断是否需要手动筛选
manualStatus
=
traffAlarmRecordMapper
.
seletManualStatus
();
log
.
info
(
"是否手动推送 manualStatus"
+
manualStatus
);
if
(
manualStatus
.
equalsIgnoreCase
(
"0"
))
{
log
.
info
(
"send to gaoxin"
);
//手动 推送给高信
webSocket
.
GroupSending
(
JsonUtil
.
beanToString
(
trffClientMessage
));
}
try
{
// 清空redis中的部分旧数据
try
{
// 清空redis中的部分旧数据
// importService.cleanCache();
// importService.cleanCache();
// 将参数result中的部分数据存入redis中,并把格式校验成功的数据发布至对应频道中
// 将参数result中的部分数据存入redis中,并把格式校验成功的数据发布至对应频道中
// importService.cacheAndPublish(JsonUtil.beanToString(trffClientMessage));
// importService.cacheAndPublish(JsonUtil.beanToString(trffClientMessage));
sendevent
(
trffClientMessage
);
sendevent
(
trffClientMessage
,
sbtdspsrs
.
get
(
0
).
getWbbh
()
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"MessageController receive putData error:"
+
e
.
toString
());
log
.
error
(
"MessageController receive putData error:"
+
e
.
toString
());
}
}
return
ResultObj
.
ok
(
trffClientMessage
);
return
ResultObj
.
ok
(
trffClientMessage
);
}
}
public
void
sendevent
(
Alarm
trffClientMessage
)
{
public
void
sendevent
(
Alarm
trffClientMessage
,
String
wbbh
)
{
webSocket
.
GroupSending
(
JsonUtil
.
beanToString
(
trffClientMessage
));
Traffalarmrecord
traffAlarmRecord
=
new
Traffalarmrecord
();
Traffalarmrecord
traffAlarmRecord
=
new
Traffalarmrecord
();
traffAlarmRecord
.
setAreaid
(
Long
.
valueOf
(
trffClientMessage
.
getDept
()));
traffAlarmRecord
.
setAreaid
(
Long
.
valueOf
(
trffClientMessage
.
getDept
()));
traffAlarmRecord
.
setRecordtype
(
trffClientMessage
.
getIncident_type
().
toLowerCase
());
traffAlarmRecord
.
setRecordtype
(
trffClientMessage
.
getIncident_type
().
toLowerCase
());
traffAlarmRecord
.
setRecordtime
(
new
Date
(
Long
.
valueOf
(
trffClientMessage
.
getTs
())));
traffAlarmRecord
.
setRecordtime
(
new
Date
(
Long
.
valueOf
(
trffClientMessage
.
getTs
())));
Integer
channelid
=
Integer
.
valueOf
(
0
);
Integer
channelid
=
Integer
.
valueOf
(
0
);
if
(
trffClientMessage
.
getVideo_id
().
contains
(
"_"
)&&
trffClientMessage
.
getVideo_id
().
split
(
"_"
).
length
==
2
)
{
//
if(trffClientMessage.getVideo_id().contains("_")&& trffClientMessage.getVideo_id().split("_").length == 2) {
String
[]
videoIdArr
=
trffClientMessage
.
getVideo_id
().
split
(
"_"
);
//
String[] videoIdArr = trffClientMessage.getVideo_id().split("_");
traffAlarmRecord
.
setFdid
(
videoIdArr
[
0
]);
//
traffAlarmRecord.setFdid(videoIdArr[0]);
channelid
=
Integer
.
valueOf
(
videoIdArr
[
1
])
+
1
;
//
channelid = Integer.valueOf(videoIdArr[1]) + 1;
}
//
}
else
{
//
else{
traffAlarmRecord
.
setFdid
(
trffClientMessage
.
getVideo_id
()
.
replace
(
"_0"
,
""
)
);
traffAlarmRecord
.
setFdid
(
trffClientMessage
.
getVideo_id
());
channelid
=
0
;
channelid
=
0
;
}
//
}
traffAlarmRecord
.
setChannelid
(
channelid
);
traffAlarmRecord
.
setChannelid
(
channelid
);
//判断是否在正检时间内
//判断是否在正检时间内
int
exists
=
traffAlarmRecordMapper
.
selectInChecktimeTraffAlarm
(
traffAlarmRecord
);
//
int exists = traffAlarmRecordMapper.selectInChecktimeTraffAlarm(traffAlarmRecord);
// log.info("selectInChecktimeTraffAlarm" + exists);
// log.info("selectInChecktimeTraffAlarm" + exists);
if
(
exists
>
0
)
{
//
if (exists > 0) {
return
;
//
return;
}
//
}
traffAlarmRecord
.
setVideourlfrom
(
trffClientMessage
.
getVideo_record_url
());
traffAlarmRecord
.
setVideourlfrom
(
trffClientMessage
.
getVideo_record_url
());
traffAlarmRecord
.
setCreatetime
(
new
Date
());
traffAlarmRecord
.
setCreatetime
(
new
Date
());
traffAlarmRecord
.
setObjlable
(
trffClientMessage
.
getObjLabel
());
traffAlarmRecord
.
setObjlable
(
trffClientMessage
.
getObjLabel
());
traffAlarmRecord
.
setPushstatus
(
9
);
//设置为未推送
traffAlarmRecord
.
setPushstatus
(
9
);
//设置为未推送
// log.info("selectInworkRectifytimeTraffAlarm" + exists);
//
//
log.info("selectInworkRectifytimeTraffAlarm" + exists);
if
((
traffAlarmRecord
.
getRecordtype
().
equalsIgnoreCase
(
"roadworks_out_of_line"
)
||
traffAlarmRecord
.
getRecordtype
().
equalsIgnoreCase
(
"roadworks"
)))
{
//
if ((traffAlarmRecord.getRecordtype().equalsIgnoreCase("roadworks_out_of_line") || traffAlarmRecord.getRecordtype().equalsIgnoreCase("roadworks"))) {
//
//支持对某一个点位报施工事件后 倒推RECTIFYTIME 查看是否是施工事件 一段时间(时间可以配置)之前的停车事件和行人闯入事件矫正为施工事件
//
//支持对某一个点位报施工事件后 倒推RECTIFYTIME 查看是否是施工事件 一段时间(时间可以配置)之前的停车事件和行人闯入事件矫正为施工事件
exists
=
traffAlarmRecordMapper
.
updateInworkRectifytimeTraffAlarm
(
traffAlarmRecord
);
//
exists = traffAlarmRecordMapper.updateInworkRectifytimeTraffAlarm(traffAlarmRecord);
//这个事件如果为停车事件或者行人闯入事件时,设置纠偏状态为1
//
//这个事件如果为停车事件或者行人闯入事件时,设置纠偏状态为1
if
(
exists
>
0
)
{
//
if (exists > 0) {
log
.
info
(
"setRectificationtype=1"
);
//
log.info("setRectificationtype=1");
}
//
}
//
}
//
}
List
<
String
>
imgBase64List
=
trffClientMessage
.
getImg_base64
();
List
<
String
>
imgBase64List
=
trffClientMessage
.
getImg_base64
();
String
imgEnumHead
=
"IMG"
;
String
imgEnumHead
=
"IMG"
;
...
@@ -229,14 +268,12 @@ public class TraffController {
...
@@ -229,14 +268,12 @@ public class TraffController {
traffAlarmRecord
.
setRecordid
((
long
)
(
recordid
));
traffAlarmRecord
.
setRecordid
((
long
)
(
recordid
));
//发送给前端
//发送给前端
Map
map
=
new
HashMap
();
Map
map
=
new
HashMap
();
//判断是否需要手动筛选
String
manualStatus
=
traffAlarmRecordMapper
.
seletManualStatus
();
if
(
manualStatus
.
equalsIgnoreCase
(
"1"
))
{
if
(
manualStatus
.
equalsIgnoreCase
(
"1"
))
{
map
.
put
(
"type"
,
"recordalarm"
);
map
.
put
(
"type"
,
"recordalarm"
);
traffAlarmRecord
.
setManualstatus
(
0
);
traffAlarmRecord
.
setManualstatus
(
0
);
}
else
{
}
else
{
map
.
put
(
"type"
,
"alarm"
);
map
.
put
(
"type"
,
"alarm"
);
traffAlarmRecord
.
setManualstatus
(
1
);
traffAlarmRecord
.
setManualstatus
(
1
);
}
}
...
@@ -289,7 +326,8 @@ public class TraffController {
...
@@ -289,7 +326,8 @@ public class TraffController {
//只更新监控
//只更新监控
if
(
traffAlarmRecord
!=
null
&&
traffAlarmRecord
.
getRecordid
()
!=
null
)
{
if
(
traffAlarmRecord
!=
null
&&
traffAlarmRecord
.
getRecordid
()
!=
null
)
{
traffAlarmRecordMapper
.
updateTraffAlarmRecordUrl
(
traffAlarmRecord
);
traffAlarmRecordMapper
.
updateTraffAlarmRecordUrl
(
traffAlarmRecord
);
log
.
info
(
"send to mqtt video is not empty"
);
sendtomqtt
(
trffClientMessage
,
wbbh
,
traffAlarmRecord
);
// //推送给第三方
// //推送给第三方
// if(!manualStatus.equalsIgnoreCase("1")) {
// if(!manualStatus.equalsIgnoreCase("1")) {
// ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
// ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
...
@@ -297,10 +335,12 @@ public class TraffController {
...
@@ -297,10 +335,12 @@ public class TraffController {
// }
// }
}
}
}
}
// else {
else
{
// ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
// ResultObj obj = eventWriteService.updateAndAutoSendEvent(traffAlarmRecord);
// log.info("send to guangda HttpURLConnection.HTTP_Fail--->response message:" + obj.toString());
log
.
info
(
"send to mqtt video is empty"
);
// }
sendtomqtt
(
trffClientMessage
,
wbbh
,
traffAlarmRecord
);
}
}
catch
(
IOException
e
)
{
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
e
.
toString
());
System
.
out
.
println
(
e
.
toString
());
log
.
error
(
e
.
toString
());
log
.
error
(
e
.
toString
());
...
@@ -322,8 +362,41 @@ public class TraffController {
...
@@ -322,8 +362,41 @@ public class TraffController {
}
}
traffAlarmRecord
.
setPushdesc
(
"推送结束"
);
traffAlarmRecord
.
setPushstatus
(
0
);
traffAlarmRecordMapper
.
updatePushEvent
(
traffAlarmRecord
);
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
log
.
error
(
"alarm fail :"
+
e
.
getMessage
());
log
.
error
(
"alarm fail :"
+
e
.
toString
());
}
}
private
void
sendtomqtt
(
Alarm
trffClientMessage
,
String
wbbh
,
Traffalarmrecord
traffAlarmRecord
)
{
if
(
null
==
wbbh
||
""
.
equals
(
wbbh
))
{
log
.
info
(
"设备为:"
+
traffAlarmRecord
.
getFdid
()
+
",wbbh"
+
" is empty"
);
}
else
{
//mqtt 推送给第三方
MqttAlarm
mqttalarm
=
new
MqttAlarm
();
//查询事件类型mqtt 对应的编号
String
mqttbh
=
traffAlarmRecordMapper
.
seletmqttbh
(
trffClientMessage
.
getIncident_type
());
mqttalarm
.
setAlarmTime
(
DateUtils
.
formatDate
(
new
Date
(
Long
.
valueOf
(
trffClientMessage
.
getTs
()))));
mqttalarm
.
setCompanyId
(
"66211"
);
//设置图片
mqttalarm
.
setImagePath
(
traffAlarmRecord
.
getImg1path
()==
null
?
""
:
ftpServiceUrl
+
"?location="
+
traffAlarmRecord
.
getImg1path
().
replace
(
"ftp://"
+
ftppath
+
"/"
,
""
));
mqttalarm
.
setVideoPath
(
traffAlarmRecord
.
getVideopath
()==
null
?
""
:
ftpServiceUrl
+
"?location="
+
traffAlarmRecord
.
getVideopath
().
replace
(
"ftp://"
+
ftppath
+
"/"
,
""
));
mqttalarm
.
setDeviceId
(
wbbh
);
if
(
null
!=
trffClientMessage
.
getObj_location
())
{
mqttalarm
.
setX
(
trffClientMessage
.
getObj_location
().
get
(
"x"
)
==
null
?
null
:
trffClientMessage
.
getObj_location
().
get
(
"x"
).
toString
());
mqttalarm
.
setY
(
trffClientMessage
.
getObj_location
().
get
(
"y"
)
==
null
?
null
:
trffClientMessage
.
getObj_location
().
get
(
"y"
).
toString
());
mqttalarm
.
setWidth
(
trffClientMessage
.
getObj_location
().
get
(
"width"
)
==
null
?
null
:
trffClientMessage
.
getObj_location
().
get
(
"width"
).
toString
());
mqttalarm
.
setHeight
(
trffClientMessage
.
getObj_location
().
get
(
"height"
)
==
null
?
null
:
trffClientMessage
.
getObj_location
().
get
(
"height"
).
toString
());
}
mqttalarm
.
setEventTypeId
(
mqttbh
);
log
.
info
(
"mqtt data========"
+
JSONUtil
.
toJsonStr
(
mqttalarm
));
mQTTSubsribe
.
publishMessage
(
"event/"
+
wbbh
+
"/videoEvent"
,
JsonUtil
.
beanToString
(
mqttalarm
));
}
}
}
}
...
@@ -565,36 +638,122 @@ public class TraffController {
...
@@ -565,36 +638,122 @@ public class TraffController {
// traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord2);
// traffAlarmRecordMapper.inserTraffAlarmRecord(traffAlarmRecord2);
}
}
//
车流量
推送
//
事件
推送
@GetMapping
(
"/test
2
"
)
@GetMapping
(
"/test
event
"
)
@ResponseBody
@ResponseBody
public
void
test2
()
{
public
void
testevent
()
{
Alarm
alarm
=
new
Alarm
();
String
json
=
"{"
+
alarm
.
setVideo_id
(
"33_65_230_156_554_fbXdTkVe98u_ecvs_0"
);
" \"type\": \"TRAFFIC_INCIDENT_ALARM\","
+
alarm
.
setIncident_type
(
"vehicle_ban"
);
" \"id\": \"d9a2b0f0-f0cf-49b9-9b64-3da86a122afa\","
+
alarm
.
setType
(
"TRAFFIC_INCIDENT_ALARM"
);
" \"video_id\": \"201809180950119121_0\","
+
Map
map
=
new
HashMap
();
" \"ts\": \"1544602970458\","
+
map
.
put
(
"type"
,
"alarm"
);
" \"incident_type\": \"WRONG_DIRECTION\","
+
map
.
put
(
"data"
,
alarm
);
" \"img_urls\": ["
+
map
.
put
(
"recordid"
,
"3344656"
);
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test\",\n"
+
webSocket
.
GroupSending
(
JsonUtil
.
beanToString
(
map
));
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test2\","
+
Traffalarmrecord
traffAlarmRecord
=
new
Traffalarmrecord
();
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test3\","
+
traffAlarmRecord
.
setRecordid
(
Long
.
parseLong
(
"33446"
));
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test4\","
+
traffAlarmRecord
.
setChannelid
(
new
Integer
(
0
));
" \"http://192.168.1.3:8001/api/alg/files/1535740000-test5\""
+
traffAlarmRecord
.
setFdid
(
"33_65_230_156_554_fbXdTkVe98u_ecvs"
);
" ],"
+
traffAlarmRecord
.
setRecordtype
(
"vehicle_ban"
);
" \"video_record_url\": \"http://192.168.1.3:8001/api/alg/files/1576459828-test\","
+
" \"obj_location\": {"
+
" \"x\": 0.1,\n"
+
" \"y\": 0.2,"
+
" \"width\": 0.4,"
+
" \"height\": 0.7"
+
" },"
+
" \"img_base64\": [\"d0xEMHcFAESwdwaUjWDqB6/1Qw4LapAiSICVTOFWsGT0sgt\"]\n"
+
" }"
;
MqttAlarm
alarm
=
new
MqttAlarm
();
alarm
.
setHeight
(
"0.7"
);
alarm
.
setWidth
(
"0.7"
);
mQTTSubsribe
.
publishMessage
(
"event/1111/videoEvent"
,
JSONUtil
.
toJsonStr
(
alarm
));
}
traffAlarmRecordMapper
.
inserTraffAlarmRecord
(
traffAlarmRecord
);
@GetMapping
(
"/api/alg/files"
)
protected
void
fielagent
(
@RequestParam
(
"location"
)
String
location
,
HttpServletRequest
request
,
HttpServletResponse
response
)
{
long
startTime
=
System
.
currentTimeMillis
();
//ftp://reader:reader@33.50.1.22:21/
//ftp.host=33.65.250.179:21:hzjt:1qaz2wsx
String
ftpPath
=
"ftp://"
+
ftppath
+
"/"
+
location
;
FileInputStream
hFile
=
null
;
OutputStream
toClient
=
null
;
InputStream
inputStream
=
null
;
BufferedInputStream
bis
=
null
;
try
{
response
.
reset
();
response
.
setHeader
(
"Expires"
,
"Sat, 10 May 2059 12:00:00 GMT"
);
response
.
setHeader
(
"Cache-Control"
,
"max-age=315360000"
);
if
(
StringUtils
.
isNotBlank
(
ftpPath
))
{
if
(
ftpPath
.
endsWith
(
".jpg"
)
||
ftpPath
.
endsWith
(
".JPG"
)
||
ftpPath
.
endsWith
(
".png"
)
||
ftpPath
.
endsWith
(
".PNG"
)
||
ftpPath
.
endsWith
(
".gif"
)
||
ftpPath
.
endsWith
(
".GIF"
))
{
response
.
setContentType
(
"image/"
+
ftpPath
.
substring
(
ftpPath
.
lastIndexOf
(
"."
)
+
1
)
+
"; charset=utf-8"
);
}
else
if
(
ftpPath
.
endsWith
(
".mp4"
)
||
ftpPath
.
endsWith
(
".MP4"
))
{
response
.
setContentType
(
"video/mpeg4; charset=utf-8"
);
String
mp4file
=
ftpPath
.
substring
(
ftpPath
.
lastIndexOf
(
"/"
)
+
1
);
response
.
setHeader
(
"Content-Disposition"
,
"attachment;fileName="
+
mp4file
);
}
String
destUrl
=
ftpPath
;
destUrl
=
new
String
(
destUrl
.
getBytes
(
"ISO8859-1"
),
"GBK"
);
String
[]
arr
=
destUrl
.
split
(
";"
);
FtpURLConnection
ftpUrl
=
null
;
HttpURLConnection
httpUrl
=
null
;
for
(
int
i
=
0
;
i
<
arr
.
length
;
i
++)
{
try
{
URL
url
=
new
URL
(
arr
[
i
]);
if
(
arr
[
i
].
toUpperCase
().
indexOf
(
"FTP"
)
!=
-
1
)
{
// ftp
ftpUrl
=
(
FtpURLConnection
)
url
.
openConnection
();
ftpUrl
.
setConnectTimeout
(
30000
);
ftpUrl
.
setReadTimeout
(
30000
);
bis
=
new
BufferedInputStream
(
ftpUrl
.
getInputStream
());
response
.
setContentLength
(
ftpUrl
.
getContentLength
());
}
else
{
// http
httpUrl
=
(
HttpURLConnection
)
url
.
openConnection
();
httpUrl
.
setConnectTimeout
(
30000
);
httpUrl
.
setReadTimeout
(
30000
);
bis
=
new
BufferedInputStream
(
httpUrl
.
getInputStream
());
response
.
setContentLength
(
httpUrl
.
getContentLength
());
}
toClient
=
response
.
getOutputStream
();
IOUtils
.
copy
(
bis
,
toClient
);
}
catch
(
Exception
e
)
{
response
.
setContentType
(
"text/html;charset=GBK"
);
response
.
setCharacterEncoding
(
"GBK"
);
PrintWriter
out
=
response
.
getWriter
();
out
.
write
(
"无法打开图片!"
);
out
.
flush
();
log
.
info
(
"ftpagent error "
+
ftpUrl
+
e
.
toString
());
}
finally
{
if
(
bis
!=
null
)
{
bis
.
close
();
}
if
(
bis
!=
null
)
{
bis
.
close
();
}
if
(
httpUrl
!=
null
)
{
httpUrl
.
disconnect
();
}
if
(
ftpUrl
!=
null
)
{
ftpUrl
.
close
();
}
if
(
toClient
!=
null
)
{
toClient
.
close
();
}
}
}
return
;
}
map
.
put
(
"recordid"
,
"3344766"
);
}
catch
(
Exception
e
)
{
webSocket
.
GroupSending
(
JsonUtil
.
beanToString
(
map
));
}
finally
{
Traffalarmrecord
traffAlarmRecord2
=
new
Traffalarmrecord
(
);
IOUtils
.
closeQuietly
(
bis
);
traffAlarmRecord2
.
setRecordid
(
Long
.
parseLong
(
"33447"
)
);
IOUtils
.
closeQuietly
(
toClient
);
traffAlarmRecord2
.
setChannelid
(
new
Integer
(
0
)
);
IOUtils
.
closeQuietly
(
hFile
);
traffAlarmRecord2
.
setFdid
(
"33_65_230_157_554_pKf3kS8Vo0N_ecvs"
);
IOUtils
.
closeQuietly
(
inputStream
);
traffAlarmRecord2
.
setRecordtype
(
"vehicle_ban"
);
}
traffAlarmRecordMapper
.
inserTraffAlarmRecord
(
traffAlarmRecord2
);
}
}
}
}
src/main/java/com/hzjt/domain/MqttAlarm.java
0 → 100644
View file @
33ecd994
package
com
.
hzjt
.
domain
;
import
lombok.Data
;
import
java.math.BigDecimal
;
import
java.util.List
;
import
java.util.Map
;
@Data
public
class
MqttAlarm
{
private
String
deviceId
;
private
String
eventTypeId
;
private
String
companyId
;
private
String
alarmTime
;
private
String
x
;
private
String
y
;
private
String
height
;
private
String
width
;
private
String
imagePath
;
private
String
videoPath
;
private
String
content
;
}
src/main/java/com/hzjt/domain/ResponseEnum.java
View file @
33ecd994
...
@@ -25,7 +25,7 @@ public enum ResponseEnum {
...
@@ -25,7 +25,7 @@ public enum ResponseEnum {
E_1010
(
1010
,
"数据保存失败"
),
E_1010
(
1010
,
"数据保存失败"
),
E_1011
(
1011
,
"数据重复"
),
E_1011
(
1011
,
"数据重复"
),
E_9999
(
9999
,
"系统异常"
),
E_9999
(
9999
,
"系统异常"
),
E_9991
(
9991
,
"发送异常"
),
SUCCESS
(
200
,
"请求成功"
);
SUCCESS
(
200
,
"请求成功"
);
private
int
code
;
private
int
code
;
...
...
src/main/java/com/hzjt/domain/WriteResultObj.java
View file @
33ecd994
...
@@ -16,7 +16,11 @@ public class WriteResultObj {
...
@@ -16,7 +16,11 @@ public class WriteResultObj {
this
.
message
=
message
;
this
.
message
=
message
;
this
.
status
=
status
;
this
.
status
=
status
;
}
}
public
WriteResultObj
(
Integer
status
,
String
message
,
String
data
)
{
this
.
message
=
message
;
this
.
status
=
status
;
this
.
data
=
data
;
}
@Override
@Override
public
String
toString
()
{
public
String
toString
()
{
return
"WriteResultObj{"
+
return
"WriteResultObj{"
+
...
...
src/main/java/com/hzjt/handler/WebSocket.java
View file @
33ecd994
...
@@ -81,8 +81,8 @@ public class WebSocket {
...
@@ -81,8 +81,8 @@ public class WebSocket {
*
*
* @param message
* @param message
*/
*/
public
void
GroupSending
(
String
message
)
{
public
boolean
GroupSending
(
String
message
)
{
boolean
send
=
true
;
for
(
String
name
:
webSocketSet
.
keySet
())
{
for
(
String
name
:
webSocketSet
.
keySet
())
{
try
{
try
{
...
@@ -91,9 +91,10 @@ public class WebSocket {
...
@@ -91,9 +91,10 @@ public class WebSocket {
webSocketSet
.
get
(
name
).
session
.
getBasicRemote
().
sendText
(
message
);
webSocketSet
.
get
(
name
).
session
.
getBasicRemote
().
sendText
(
message
);
}
}
}
catch
(
Exception
e
)
{
}
catch
(
Exception
e
)
{
e
.
printStackTrace
()
;
send
=
false
;
}
}
}
}
return
send
;
}
}
/**
/**
...
...
src/main/java/com/hzjt/mapper/SbtdspsrMapper.java
View file @
33ecd994
...
@@ -30,4 +30,6 @@ public interface SbtdspsrMapper extends BaseMapper<Sbtdspsr>, ConditionMapper<Sb
...
@@ -30,4 +30,6 @@ public interface SbtdspsrMapper extends BaseMapper<Sbtdspsr>, ConditionMapper<Sb
List
<
Sbtdspsr
>
selectBySbbh
(
@Param
(
"sbbh"
)
String
sbbh
,
@Param
(
"tdbh"
)
Integer
tdbh
);
List
<
Sbtdspsr
>
selectBySbbh
(
@Param
(
"sbbh"
)
String
sbbh
,
@Param
(
"tdbh"
)
Integer
tdbh
);
List
<
SbtdspsrParam
>
selectDeviceWrite
();
List
<
SbtdspsrParam
>
selectDeviceWrite
();
List
<
String
>
selectAllWbbh
();
}
}
\ No newline at end of file
src/main/java/com/hzjt/mapper/TraffAlarmRecordMapper.java
View file @
33ecd994
...
@@ -33,4 +33,6 @@ public interface TraffAlarmRecordMapper extends BaseMapper<Traffalarmrecord>, Co
...
@@ -33,4 +33,6 @@ public interface TraffAlarmRecordMapper extends BaseMapper<Traffalarmrecord>, Co
String
seletManualStatus
();
String
seletManualStatus
();
String
seletmqttbh
(
@Param
(
"recordtype"
)
String
recordtype
);
}
}
\ No newline at end of file
src/main/java/com/hzjt/service/EventWriteService.java
View file @
33ecd994
package
com
.
hzjt
.
service
;
package
com
.
hzjt
.
service
;
import
com.hzjt.domain.*
;
import
com.hzjt.domain.*
;
import
com.hzjt.handler.WebSocket
;
import
com.hzjt.mapper.SbtdspsrMapper
;
import
com.hzjt.mapper.SbtdspsrMapper
;
import
com.hzjt.mapper.TraffAlarmRecordMapper
;
import
com.hzjt.mapper.TraffAlarmRecordMapper
;
import
com.hzjt.util.DateUtils
;
import
com.hzjt.util.*
;
import
com.hzjt.util.FTPUtil
;
import
com.hzjt.util.StringEnum
;
import
com.hzjt.util.ThreadPoolUtil
;
import
lombok.extern.slf4j.Slf4j
;
import
lombok.extern.slf4j.Slf4j
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.beans.factory.annotation.Autowired
;
...
@@ -67,7 +65,11 @@ public class EventWriteService {
...
@@ -67,7 +65,11 @@ public class EventWriteService {
@Autowired
@Autowired
private
TraffAlarmRecordMapper
traffalarmrecordMapper
;
private
TraffAlarmRecordMapper
traffalarmrecordMapper
;
@Autowired
WebSocket
webSocket
;
BASE64Encoder
base64Encoder
=
new
BASE64Encoder
();
private
static
CompletionService
<
ResultObj
>
completionService
=
new
ExecutorCompletionService
<
ResultObj
>(
ThreadPoolUtil
.
getPool
());
private
static
CompletionService
<
ResultObj
>
completionService
=
new
ExecutorCompletionService
<
ResultObj
>(
ThreadPoolUtil
.
getPool
());
public
ResultObj
sendEvent
(
Long
recordid
)
{
public
ResultObj
sendEvent
(
Long
recordid
)
{
Traffalarmrecord
traffalarmrecord
=
traffalarmrecordMapper
.
selectByPrimaryKey
(
recordid
);
Traffalarmrecord
traffalarmrecord
=
traffalarmrecordMapper
.
selectByPrimaryKey
(
recordid
);
...
@@ -84,21 +86,73 @@ public class EventWriteService {
...
@@ -84,21 +86,73 @@ public class EventWriteService {
return
resultObj
;
return
resultObj
;
}
}
public
List
<
ResultObj
>
sendEvents
(
List
<
Long
>
recordid
)
{
public
List
<
Write
ResultObj
>
sendEvents
(
List
<
Long
>
recordid
)
{
Condition
condition
=
new
Condition
(
Traffalarmrecord
.
class
);
Condition
condition
=
new
Condition
(
Traffalarmrecord
.
class
);
Example
.
Criteria
criteria
=
condition
.
createCriteria
();
Example
.
Criteria
criteria
=
condition
.
createCriteria
();
criteria
.
andIn
(
"recordid"
,
recordid
);
criteria
.
andIn
(
"recordid"
,
recordid
);
List
<
Traffalarmrecord
>
traffalarmrecordList
=
traffalarmrecordMapper
.
selectByCondition
(
condition
);
List
<
Traffalarmrecord
>
traffalarmrecordList
=
traffalarmrecordMapper
.
selectByCondition
(
condition
);
List
<
ResultObj
>
resultObj
=
new
ArrayList
<>();
List
<
WriteResultObj
>
resultObj
=
new
ArrayList
<>();
try
{
//将返回结果转化为Alarm格式
resultObj
=
sendAllMessage
(
traffalarmrecordList
);
List
<
Alarm
>
alarmList
=
new
ArrayList
<>();
for
(
Traffalarmrecord
record
:
traffalarmrecordList
){
Alarm
alarm
=
new
Alarm
();
alarm
.
setDept
(
record
.
getAreaid
().
toString
());
alarm
.
setIncident_type
(
record
.
getRecordtype
());
alarm
.
setType
(
"TRAFFIC_INCIDENT_ALARM"
);
alarm
.
setTs
(
String
.
valueOf
(
record
.
getRecordtime
().
getTime
()));
alarm
.
setVideo_id
(
record
.
getFdid
());
List
<
String
>
img_urls
=
new
ArrayList
<>();
img_urls
.
add
(
record
.
getImg1urlfrom
());
img_urls
.
add
(
record
.
getImg2urlfrom
());
img_urls
.
add
(
record
.
getImg3urlfrom
());
img_urls
.
add
(
record
.
getImg4urlfrom
());
img_urls
.
add
(
record
.
getImg5urlfrom
());
alarm
.
setImg_urls
(
img_urls
);
alarm
.
setId
(
record
.
getRecordid
().
toString
());
alarm
.
setObjLabel
(
record
.
getObjlable
());
// alarm.setObj_location(record.getLocation());
List
<
String
>
img_base64
=
new
ArrayList
<>();
if
(
record
.
getImg1path
()
!=
null
){
byte
[]
Img
=
FTPUtil
.
getFtpPicBytes
(
record
.
getImg1path
());
img_base64
.
add
(
Img
!=
null
?
base64Encoder
.
encode
(
Img
)
:
null
);
}
if
(
record
.
getImg2path
()
!=
null
){
byte
[]
Img
=
FTPUtil
.
getFtpPicBytes
(
record
.
getImg2path
());
img_base64
.
add
(
Img
!=
null
?
base64Encoder
.
encode
(
Img
)
:
null
);
}
if
(
record
.
getImg3path
()
!=
null
)
{
byte
[]
Img
=
FTPUtil
.
getFtpPicBytes
(
record
.
getImg3path
());
img_base64
.
add
(
Img
!=
null
?
base64Encoder
.
encode
(
Img
)
:
null
);
}
catch
(
TimeoutException
e
)
{
}
log
.
error
(
"eventwrite - sendEvents 请求超时:"
+
e
.
toString
());
if
(
record
.
getImg4path
()
!=
null
)
{
resultObj
.
add
(
ResultObj
.
error
(
ResponseEnum
.
E_1008
.
getCode
(),
ResponseEnum
.
E_1008
.
getMsg
()));
byte
[]
Img
=
FTPUtil
.
getFtpPicBytes
(
record
.
getImg4path
());
}
catch
(
Exception
e
)
{
img_base64
.
add
(
Img
!=
null
?
base64Encoder
.
encode
(
Img
)
:
null
);
log
.
error
(
"eventwrite - sendEvents 异常:"
+
e
.
toString
());
}
resultObj
.
add
(
ResultObj
.
error
(
ResponseEnum
.
E_9999
.
getCode
(),
e
.
toString
()));
if
(
record
.
getImg5path
()
!=
null
)
{
byte
[]
Img
=
FTPUtil
.
getFtpPicBytes
(
record
.
getImg5path
());
img_base64
.
add
(
Img
!=
null
?
base64Encoder
.
encode
(
Img
)
:
null
);
}
alarm
.
setImg_base64
(
img_base64
);
if
(
record
.
getVideopath
()
!=
null
)
{
alarm
.
setVideo_record_url
(
record
.
getVideopath
());
}
//推送给第三方
boolean
send
=
webSocket
.
GroupSending
(
JsonUtil
.
beanToString
(
alarm
));
if
(
send
){
//更新推送状态
record
.
setPushstatus
(
0
);
record
.
setPushdesc
(
"发送成功"
);
traffAlarmRecordMapper
.
updatePushEvent
(
record
);
resultObj
.
add
(
new
WriteResultObj
(
200
,
"推送成功"
,
record
.
getRecordid
().
toString
()));
}
else
{
//推送失败
record
.
setPushstatus
(
1
);
record
.
setPushdesc
(
"发送失败"
);
traffAlarmRecordMapper
.
updatePushEvent
(
record
);
resultObj
.
add
(
new
WriteResultObj
(
ResponseEnum
.
E_9991
.
getCode
(),
"发送失败"
,
record
.
getRecordid
().
toString
()));
}
}
}
return
resultObj
;
return
resultObj
;
}
}
...
...
src/main/java/com/hzjt/service/TraffdevicewriteresultService.java
View file @
33ecd994
...
@@ -44,6 +44,7 @@ public class TraffdevicewriteresultService {
...
@@ -44,6 +44,7 @@ public class TraffdevicewriteresultService {
@Value
(
"${qingzhi.devicewritesupplier.name}"
)
@Value
(
"${qingzhi.devicewritesupplier.name}"
)
String
devicesupplier
;
String
devicesupplier
;
@Value
(
"${qingzhi.redis.token}"
)
@Value
(
"${qingzhi.redis.token}"
)
String
qztoken
;
String
qztoken
;
...
...
src/main/resources/application.properties
View file @
33ecd994
...
@@ -13,16 +13,16 @@ mybatis.configuration.map-underscore-to-camel-case=true
...
@@ -13,16 +13,16 @@ mybatis.configuration.map-underscore-to-camel-case=true
mybatis.configuration.default-fetch-size
=
100
mybatis.configuration.default-fetch-size
=
100
mybatis.configuration.default-statement-timeout
=
3000
mybatis.configuration.default-statement-timeout
=
3000
#mybatis.mapperLocations = classpath:xxx.xml
#mybatis.mapperLocations = classpath:xxx.xml
#
logging.level.com.hzjt=debug
logging.level.com.hzjt
=
debug
# Mysql���ݿ�-����Դ����
# Mysql���ݿ�-����Դ����
#spring.datasource.username=hzjt
#spring.datasource.password=hzjt
#spring.datasource.url=jdbc:oracle:thin:@192.168.168.212:1523:helowin
spring.datasource.username
=
hzjt
spring.datasource.username
=
hzjt
spring.datasource.password
=
hzjt
spring.datasource.password
=
hzjt
spring.datasource.url
=
jdbc:oracle:thin:@33.65.250.179:1521:helowin
spring.datasource.url
=
jdbc:oracle:thin:@192.168.168.212:1521:helowin
#spring.datasource.username=hzjt
#spring.datasource.password=hzjt
#spring.datasource.url=jdbc:oracle:thin:@33.65.219.103:1521:helowin
spring.datasource.driverClassName
=
oracle.jdbc.OracleDriver
spring.datasource.driverClassName
=
oracle.jdbc.OracleDriver
# druid
# druid
spring.datasource.type
=
com.alibaba.druid.pool.DruidDataSource
spring.datasource.type
=
com.alibaba.druid.pool.DruidDataSource
...
@@ -43,8 +43,8 @@ spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowS
...
@@ -43,8 +43,8 @@ spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowS
spring.datasource.useGlobalDataSourceStat
=
true
spring.datasource.useGlobalDataSourceStat
=
true
#redis
#redis
spring.redis.host
=
33.65.250.179
#spring.redis.host=33.65.219.103
#
spring.redis.host=127.0.0.1
spring.redis.host
=
127.0.0.1
spring.redis.port
=
6379
spring.redis.port
=
6379
#spring.redis.password=123456
#spring.redis.password=123456
spring.redis.database
=
0
spring.redis.database
=
0
...
@@ -54,12 +54,14 @@ spring.redis.pool.max-idle=8
...
@@ -54,12 +54,14 @@ spring.redis.pool.max-idle=8
spring.redis.pool.min-idle
=
0
spring.redis.pool.min-idle
=
0
spring.redis.timeout
=
2000
spring.redis.timeout
=
2000
spring.session.store-type
=
none
spring.session.store-type
=
none
ip.host
=
33.65.2
50.179
ip.host
=
33.65.2
19.103
port
=
21
port
=
21
message.dept
=
33030
message.dept
=
33030
message.rate
=
10
message.rate
=
10
ftp.host
=
33.65.250.179:21:hzjt:1qaz2wsx
ftp.host
=
33.65.219.40:21:hzjt:1qaz2wsx
ftppath
=
hzjt:1qaz2wsx@33.65.219.40:21
ftpServiceUrl
=
http://33.65.219.103:8089/api/alg/files
alarm.subscribe.data.key
=
gs:traff:alarmlist
alarm.subscribe.data.key
=
gs:traff:alarmlist
...
@@ -89,4 +91,17 @@ flv.url=http://33.50.1.21:57080/ecvs/get_play_list
...
@@ -89,4 +91,17 @@ flv.url=http://33.50.1.21:57080/ecvs/get_play_list
qingzhi.login.username
=
zksy
qingzhi.login.username
=
zksy
qingzhi.login.password
=
zksy@123
qingzhi.login.password
=
zksy@123
qingzhi.login.url
=
http://33.50.1.213:38080/api/auth/login
qingzhi.login.url
=
http://33.50.1.213:38080/api/auth/login
qingzhi.login.keepaliveurl
=
http://33.50.1.213:38080/api/auth/token/keepalive
qingzhi.login.keepaliveurl
=
http://33.50.1.213:38080/api/auth/token/keepalive
\ No newline at end of file
#mqtt配置 - start
#用户名
spring.mqtt.username
=
#密码
spring.mqtt.password
=
#服务器连接地址
spring.mqtt.url
=
tcp://12.1.97.11:1883
#连接超时
spring.mqtt.completionTimeout
=
3000
spring.mqtt.qos
=
2
#mqtt配置 - end
src/main/resources/mapper/SbtdspsrMapper.xml
View file @
33ecd994
...
@@ -83,7 +83,7 @@
...
@@ -83,7 +83,7 @@
</select>
</select>
<select
id=
"selectBySbbh"
resultType=
"com.hzjt.domain.Sbtdspsr"
>
<select
id=
"selectBySbbh"
resultType=
"com.hzjt.domain.Sbtdspsr"
>
SELECT A.* FROM SBTDSPSR A where sbbh=#{sbbh} and tdbh=#{tdbh}
SELECT A.*
FROM SBTDSPSR A where sbbh=#{sbbh} and tdbh=#{tdbh}
</select>
</select>
<select
id=
"selectDeviceWrite"
resultType=
"com.hzjt.domain.SbtdspsrParam"
>
<select
id=
"selectDeviceWrite"
resultType=
"com.hzjt.domain.SbtdspsrParam"
>
...
@@ -92,4 +92,8 @@
...
@@ -92,4 +92,8 @@
LEFT JOIN TRAFFDEVICECONFIG C ON C.FDID = A.SBBH AND C.CHANNELID = A.TDBH
LEFT JOIN TRAFFDEVICECONFIG C ON C.FDID = A.SBBH AND C.CHANNELID = A.TDBH
WHERE B.PUSHSTATUS != 0 OR B.PUSHDESC IS NULL
WHERE B.PUSHSTATUS != 0 OR B.PUSHDESC IS NULL
</select>
</select>
<select
id=
"selectAllWbbh"
resultType=
"java.lang.String"
>
select concat(concat('event/',wbbh),'/videoEvent') from sbtdspsr where wbbh is not null
</select>
</mapper>
</mapper>
\ No newline at end of file
src/main/resources/mapper/Traffalarmrecord.xml
View file @
33ecd994
...
@@ -179,4 +179,8 @@
...
@@ -179,4 +179,8 @@
<select
id=
"seletManualStatus"
resultType=
"java.lang.String"
>
<select
id=
"seletManualStatus"
resultType=
"java.lang.String"
>
select name from t_code where type=4
select name from t_code where type=4
</select>
</select>
<select
id=
"seletmqttbh"
resultType=
"java.lang.String"
>
select mqttbh from t_code where type=1 and key=#{recordtype}
</select>
</mapper>
</mapper>
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment