提交 bc0bd368 编写于 作者: W wanghoubang

Merge branch 'release/1.0.0' of https://github.com/mqttsnet/thinglinks into release/1.0.0

......@@ -62,6 +62,9 @@ com.mqttsnet.thinglinks
│ └── thinglinks-visual-collection // 服务器监控采集服务 [19401]
├──pom.xml // 公共依赖
~~~
## 平台总体架构
![](doc/imgs/overallArchitecture.png)
## 设备集成LINK架构
......
......@@ -115,7 +115,7 @@ smqtt:
username: ${spring.datasource.dynamic.datasource.master.username}
password: ${spring.datasource.dynamic.datasource.master.password}
http: # http相关配置 端口固定60000
enable: false # 开关
enable: true # 开关
accessLog: false # http访问日志
ssl: # ssl配置
enable: false
......@@ -168,7 +168,7 @@ smqtt:
sourceName: rocket_mq
sourceAttributes:
topic: thinglinks-link-mqttMsg
tags: thinglinks-link
tags: thinglinks
namesrvAddr: 127.0.0.1:19876
instanceName: broker-a
producerGroup: thinglinks
......
此差异已折叠。
......@@ -26,8 +26,12 @@ public interface RemotePublishActorService {
/**
* 通知ThingLins MQTT Broker推送消息
* @param params
* topic topic String 是
* qos 服务等级 Integer 是
* retain 保留消息 Boolean 是
* message 消息 String 是
* @return
*/
@PostMapping("/sendMessage")
public R sendMessage(@RequestBody Map<String, String> params);
@PostMapping("/publish/sendMessage")
public R sendMessage(@RequestBody Map<String, Object> params);
}
......@@ -37,7 +37,7 @@ public class RemotePublishActorFallbackFactory implements FallbackFactory<Remote
* @return
*/
@Override
public R sendMessage(Map<String, String> params) {
public R sendMessage(Map<String, Object> params) {
return R.fail("通知ThingLins MQTT Broker推送消息失败:" + throwable.getMessage());
}
};
......
......@@ -16,11 +16,16 @@ import java.util.List;
@Data
public class TableDto extends BaseEntity{
/**
* 超级表普通列字段的值
* 值需要与创建超级表时普通列字段的数据类型对应上
*/
private List<Fields> schemaFieldValues;
/**
* 超级表标签字段的值
* 值需要与创建超级表时标签字段的数据类型对应上
*/
@NotEmpty(message = "invalid operation: fieldValues can not be empty")
private List<Fields> tagsFieldValues;
/**
......
......@@ -1784,4 +1784,14 @@ public class DateUtils extends org.apache.commons.lang3.time.DateUtils
return zoneDateTime1.format(formatter);
}
/**
* 取得当前时间戳(精确到毫秒秒)Long 类型
*
* @return
*/
public static Long millisecondStampL() {
long ts = System.currentTimeMillis();
long thirtySec = 1000 * 30;
return ts + (thirtySec);
}
}
package com.mqttsnet.thinglinks.common.core.utils;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.mqttsnet.thinglinks.common.core.constant.Constants;
import com.mqttsnet.thinglinks.common.core.text.StrFormatter;
import org.springframework.util.AntPathMatcher;
......@@ -508,4 +510,18 @@ public class StringUtils extends org.apache.commons.lang3.StringUtils
}
return source;
}
/**
*
* 方法描述:jsonToMap
* @param jsonStr json字符串
* @return Map<String, Object> map对象
*/
@SuppressWarnings({ "unchecked", "rawtypes"})
public static Map<String, Object> jsonToMap(String jsonStr) {
Map<String, Object> paramMap = new HashMap<>();
if (jsonStr != null && !"".equals(jsonStr)) {
paramMap = (Map) JSON.parse(jsonStr);
}
return paramMap;
}
}
\ No newline at end of file
package com.mqttsnet.thinglinks.broker.Actors;
import io.github.quickmsg.common.annotation.AllowCors;
import io.github.quickmsg.common.annotation.Header;
import io.github.quickmsg.common.annotation.Router;
import io.github.quickmsg.common.config.Configuration;
import io.github.quickmsg.common.enums.HttpType;
import io.github.quickmsg.common.message.HttpPublishMessage;
import io.github.quickmsg.core.http.AbstractHttpActor;
import cn.hutool.http.HttpRequest;
import com.alibaba.fastjson.JSONObject;
import com.mqttsnet.thinglinks.common.core.domain.R;
import com.mqttsnet.thinglinks.common.core.web.domain.AjaxResult;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
* @Description: Broker推送设备消息
......@@ -26,20 +22,28 @@ import java.nio.charset.StandardCharsets;
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
@Router(value = "/publish", type = HttpType.POST)
@Slf4j
@Header(key = "Content-Type", value = "application/json")
@AllowCors
public class PublishActor extends AbstractHttpActor {
@RestController
@RequestMapping("/publish")
public class PublishActor{
@Override
public Publisher<Void> doRequest(HttpServerRequest request, HttpServerResponse response, Configuration configuration) {
return request
.receive()
.asString()
.map(this.toJson(HttpPublishMessage.class))
.doOnNext(message -> {
//处理request
}).then(response.sendString(Mono.just("success")).then());
/**
* MQTT推送消息接口
* @param params
* @return
*/
@PostMapping("/sendMessage")
public R sendMessage(@RequestBody Map<String, Object> params) {
log.info("MQTT Broker publish {}", params.toString());
JSONObject param = new JSONObject();
param.put("topic", params.get("topic"));
param.put("qos", Integer.valueOf(params.get("qos").toString()));
param.put("retain", Boolean.valueOf(params.get("retain").toString()));
param.put("message", String.valueOf(params.get("message")));
String result = HttpRequest.post("http://127.0.0.1:60000/smqtt/publish")
.header("Content-Type", "application/json;charset=UTF-8")
.body(param.toString())
.execute().body();
return R.ok();
}
}
......@@ -127,6 +127,13 @@
<version>${thinglinks.version}</version>
</dependency>
<!-- thinglinks Api broker -->
<dependency>
<groupId>com.mqttsnet</groupId>
<artifactId>thinglinks-api-broker</artifactId>
<version>${thinglinks.version}</version>
</dependency>
</dependencies>
<build>
......
......@@ -67,4 +67,12 @@ public interface DeviceInfoMapper {
int updateBatch(List<DeviceInfo> list);
int batchInsert(@Param("list") List<DeviceInfo> list);
int deleteByDeviceId(@Param("deviceId")String deviceId);
DeviceInfo findOneByDeviceId(@Param("deviceId")String deviceId);
}
\ No newline at end of file
......@@ -59,7 +59,7 @@ public interface DeviceDatasService {
* @param deviceIdentification 设备标识
* @param msg 数据
*/
void processingTopoAddTopic(String deviceIdentification,String msg) throws Exception;
String processingTopoAddTopic(String deviceIdentification,String msg) throws Exception;
/**
* 处理/topo/delete Topic边设备删除子设备
......@@ -67,7 +67,7 @@ public interface DeviceDatasService {
* @param deviceIdentification 设备标识
* @param msg 数据
*/
void processingTopoDeleteTopic(String deviceIdentification,String msg) throws Exception;
String processingTopoDeleteTopic(String deviceIdentification,String msg) throws Exception;
/**
* 处理/topo/update Topic边设备更新子设备状态
......@@ -75,7 +75,7 @@ public interface DeviceDatasService {
* @param deviceIdentification 设备标识
* @param msg 数据
*/
void processingTopoUpdateTopic(String deviceIdentification,String msg) throws Exception;
String processingTopoUpdateTopic(String deviceIdentification,String msg) throws Exception;
/**
* 处理datas Topic数据上报
......
......@@ -37,6 +37,16 @@ public interface DeviceInfoService {
int batchInsert(List<DeviceInfo> list);
int deleteByDeviceId(String deviceId);
DeviceInfo findOneByDeviceId(String deviceId);
}
......@@ -74,6 +74,22 @@ public class DeviceInfoServiceImpl implements DeviceInfoService {
return deviceInfoMapper.batchInsert(list);
}
@Override
public int deleteByDeviceId(String deviceId){
return deviceInfoMapper.deleteByDeviceId(deviceId);
}
@Override
public DeviceInfo findOneByDeviceId(String deviceId){
return deviceInfoMapper.findOneByDeviceId(deviceId);
}
}
......@@ -376,15 +376,20 @@ public class ProductServiceImpl implements ProductService{
JSONArray properties = service.getJSONArray("properties");
//如果服务下属性值为空,没必要为该服务创建超级表,跳过该循环,进入下个服务
if (properties.isEmpty()) {
continue loop;
continue;
}
//构建超级表的表结构字段列表
List<Fields> schemaFields = new ArrayList<>();
//超级表第一个字段数据类型必须为时间戳
Fields firstColumn = new Fields();
firstColumn.setFieldName("ts");
firstColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(firstColumn);
//超级表第一个字段数据类型必须为时间戳,默认Ts为当前系统时间
Fields tsColumn = new Fields();
tsColumn.setFieldName("ts");
tsColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(tsColumn);
//超级表第二个字段为事件发生时间数据类型必须为时间戳
Fields eventTimeColumn = new Fields();
eventTimeColumn.setFieldName("event_time");
eventTimeColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(eventTimeColumn);
//根据属性对象列表循环构建超级表表结构
for (int j = 0; j < properties.size(); j++) {
JSONObject propertie = properties.getJSONObject(j);
......@@ -403,7 +408,7 @@ public class ProductServiceImpl implements ProductService{
// 1:设备标识:deviceIdentification
List<Fields> tagsFields = new ArrayList<>();
Fields tags = new Fields();
tags.setFieldName("deviceIdentification");
tags.setFieldName("device_identification");
tags.setDataType(DataTypeEnum.BINARY);
tags.setSize(64);
tagsFields.add(tags);
......@@ -424,7 +429,7 @@ public class ProductServiceImpl implements ProductService{
redisService.deleteObject(Constants.TDENGINE_SUPERTABLEFILELDS+superTableName);
}
//在redis里存入新的超级表对的表结构信息
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, JSON.toJSONString(superTableDto));
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, superTableDto);
log.info("缓存超级表数据模型:{}",JSON.toJSONString(superTableDto));
}
}catch (Exception e){
......@@ -581,11 +586,16 @@ public class ProductServiceImpl implements ProductService{
}
//构建超级表的表结构字段列表
List<Fields> schemaFields = new ArrayList<>();
//超级表第一个字段数据类型必须为时间戳
Fields firstColumn = new Fields();
firstColumn.setFieldName("ts");
firstColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(firstColumn);
//超级表第一个字段数据类型必须为时间戳,默认Ts为当前系统时间
Fields tsColumn = new Fields();
tsColumn.setFieldName("ts");
tsColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(tsColumn);
//超级表第二个字段为事件发生时间数据类型必须为时间戳
Fields eventTimeColumn = new Fields();
eventTimeColumn.setFieldName("event_time");
eventTimeColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(eventTimeColumn);
//根据属性对象列表循环构建超级表表结构
for (ProductProperties productProperties : allByServiceId) {
//获取字段名称
......@@ -603,7 +613,7 @@ public class ProductServiceImpl implements ProductService{
// 1:设备标识:deviceIdentification
List<Fields> tagsFields = new ArrayList<>();
Fields tags = new Fields();
tags.setFieldName("deviceIdentification");
tags.setFieldName("device_identification");
tags.setDataType(DataTypeEnum.BINARY);
tags.setSize(64);
tagsFields.add(tags);
......@@ -617,7 +627,7 @@ public class ProductServiceImpl implements ProductService{
redisService.deleteObject(Constants.TDENGINE_SUPERTABLEFILELDS+superTableName);
}
//在redis里存入新的超级表对的表结构信息
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, JSON.toJSONString(superTableDto));
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, superTableDto);
log.info("缓存超级表数据模型:{}",JSON.toJSONString(superTableDto));
superTableDtoList.add(superTableDto);
if (InitializeOrNot){
......
......@@ -604,4 +604,26 @@
</if>
</trim>
</insert>
<!--Author By ShiHuan Sun E-mail: 13733918655@163.com on 2022-05-05-->
<delete id="deleteByDeviceId">
delete from device_info
<where>
<if test="deviceId != null">
and device_id=#{deviceId,jdbcType=VARCHAR}
</if>
</where>
</delete>
<!--Author By ShiHuan Sun E-mail: 13733918655@163.com on 2022-05-05-->
<select id="findOneByDeviceId" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from device_info
<where>
<if test="deviceId != null">
and device_id=#{deviceId,jdbcType=VARCHAR}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
......@@ -100,7 +100,7 @@ public class TdEngineServiceImpl implements TdEngineService {
Integer count = tdEngineMapper.checkTableExists(dataBaseName, tableName);
return count == 1;
} catch (Exception e) {
log.error("检查数据库表是否存在{}", e.getMessage());
log.error("数据库表不否存在");
return false;
}
}
......
package com.mqttsnet.thinglinks.tdengine.util;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @Description 时间工具类
* @author mqtts.net
* @Email 13733918655@163.com
* @Date 2019/12/9 20:45
* @Version 1.0
*/
public class DateUntils {
public static Date getCurrentDate() {
return new Date();
}
public static String getDateString(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static String getDateStrings(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static String getDateStampString(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static Date getDateString(String date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
Date date1 = null;
try {
date1 = dateFormat.parse(date);
} catch (ParseException e) {
e.printStackTrace();
}
return date1;
}
public static String strToDateFormat(String date) throws ParseException {
SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
formatter.setLenient(false);
Date newDate = formatter.parse(date);
formatter = new SimpleDateFormat("yyyy-MM-dd");
return formatter.format(newDate);
}
public static String getDateFormatString(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static String getDateFormatTostr(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static String getCurrentDateWithZero(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static String getCurrentDateWithNight(Date date) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 23:59:59");
String previousDate = dateFormat.format(date);
return previousDate;
}
public static String getPreviousDateWithZero(Date date) {
Date prevDate = new Date(date.getTime() - (24 * 3600000));
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 00:00:00");
String previousDate = dateFormat.format(prevDate);
return previousDate;
}
public static Date getFomatDate(String str) {
//创建SimpleDateFormat对象实例并定义好转换格式
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = null;
try {
// 注意格式需要与上面一致,不然会出现异常
date = sdf.parse(str);
} catch (ParseException e) {
e.printStackTrace();
}
return date;
}
/**
* 将时间戳转换为日期
*
* @param stamp 时间戳
* @return 时间,返回格式为 yyyy-MM-dd-HH-mm-ss
*/
public static String Stamp2Date(Long stamp) {
String result = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(stamp);
return result;
}
/**
* 取得当前时间戳(精确到秒)
*
* @return
*/
public static String timeStamp() {
long time = System.currentTimeMillis();
String t = String.valueOf(time / 1000);
return t;
}
/**
* 取得当前时间戳(精确到秒)Long 类型
*
* @return
*/
public static Long timeStampL() {
long time = System.currentTimeMillis();
return time / 1000;
}
/**
* 获取指定url中的某个参数
*
* @param url
* @param name
* @return
*/
public static String getParamByUrl(String url, String name) {
url += "&";
String pattern = "(\\?|&){1}#{0,1}" + name + "=[a-zA-Z0-9]*(&{1})";
Pattern r = Pattern.compile(pattern);
Matcher m = r.matcher(url);
if (m.find()) {
System.out.println(m.group(0));
return m.group(0).split("=")[1].replace("&", "");
} else {
return null;
}
}
public static void main(String[] args) throws ParseException {
String sTime = "2019-05-17 00:00:00";
String eTime = "2019-05-19 16:00:00";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date d1 = getFomatDate(sTime);
Date d2 = getFomatDate(eTime);
List<String> times = getTimes(d1,d2,3600000L);
for (String str:times) {
System.out.println("得到的时间集合=:"+str);
}
List<String> times1 = getTimes(getFomatDate("2021-09-22 00:00:00"), getFomatDate("2021-09-30 00:00:00"), 86400000);
System.out.println(times1);
}
//获取时间段内,时间间隔的所以时间点集合
public static List<String> getTimes(Date startTime, Date endTime,long ll) throws ParseException {
List<String> times = new ArrayList<>();
Calendar calendar = Calendar.getInstance();
calendar.setTime(startTime);
long startTimeStamp = calendar.getTimeInMillis();//起始的毫秒数
calendar.setTime(endTime);
long endTimeStamp = calendar.getTimeInMillis();//截止时间毫秒数
while (true){
long temp = startTimeStamp + ll;
if(temp <= endTimeStamp){
startTimeStamp = temp;
Calendar _calendar = Calendar.getInstance();
_calendar.setTimeInMillis(temp);
int year = _calendar.get(Calendar.YEAR);
int month = _calendar.get(Calendar.MONTH);
int day = _calendar.get(Calendar.DAY_OF_MONTH);
int hour = _calendar.get(Calendar.HOUR_OF_DAY);//24小时制
//int hour = calendar.get(Calendar.HOUR);//12小时制
int minute = _calendar.get(Calendar.MINUTE);
int second = _calendar.get(Calendar.SECOND);
String time = year + "-" + (month + 1) + "-" + day + " " + hour + ":" + minute + ":" + second;
//时间格式处理
String res;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(time);
long ts = date.getTime();
long lt = new Long(String.valueOf(ts));
Date date1 = new Date(lt);
res = simpleDateFormat.format(date1);
times.add(res);
}else {
break;
}
}
return times;
}
/*
* 将时间转换为时间戳
*/
public static String dateToStamp(String s) throws ParseException {
String res;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(s);
long ts = date.getTime();
res = String.valueOf(ts);
return res;
}
/*
* 将时间戳转换为时间
*/
public static String stampToDate(String s){
String res;
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long lt = new Long(s);
Date date = new Date(lt);
res = simpleDateFormat.format(date);
return res;
}
}
......@@ -117,15 +117,21 @@
<insert id="insertData">
insert into #{dataBaseName}.#{tableName}
<foreach item="item" collection="tagsFieldValues" separator=","
<foreach item="item" collection="schemaFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldName}
</foreach>
values
using #{dataBaseName}.#{superTableName}
tags
<foreach item="item" collection="tagsFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldValue}
</foreach>
values
<foreach item="item" collection="schemaFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldValue}
</foreach>
</insert>
<select id="selectByTimestamp" parameterType="com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto"
......
import request from '@/utils/request'
import { praseStrEmpty } from "@/utils/thinglinks";
export class proOptions {
export function proOptions() {
return request({
url: '/api/tdengine/shadow/proOptions',
method: 'get',
})
}
export class dataList {
export function dataList(query) {
return request({
url: '/api/tdengine/shadow/dataList',
method: 'get',
params: query
})
}
export class dataCharts {
export function dataCharts() {
return request({
url: '/api/tdengine/shadow/dataCharts',
method: 'get',
})
}
......@@ -39,7 +39,13 @@
</p>
</div>
</div>
<el-form :model="queryParams" ref="queryForm" :inline="true" v-show="showSearch" label-width="100px">
<el-form
:model="queryParams"
ref="queryForm"
:inline="true"
v-show="showSearch"
label-width="100px"
>
<el-form-item label="客户端标识" prop="clientId">
<el-input v-model="queryParams.clientId" placeholder="请输入客户端标识" clearable size="small"
@keyup.enter.native="handleQuery" />
......
<template>
<div :class="$style['app-content']">
<div>
<span style=" font-weight: bold">业务数据采集项</span>
<span style=" font-weight: bold">设备影子数据采集</span>
<el-tabs
v-model="activeName"
@tab-click="handleClick"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册