提交 7f89b74d 编写于 作者: xiaonannet's avatar xiaonannet

优化

上级 9f8b0716
......@@ -13,12 +13,13 @@ import javax.validation.constraints.NotBlank;
*/
@Data
public class BaseEntity {
private static final long serialVersionUID = 1L;
/**
* 数据库名称
*/
@NotBlank(message = "invalid operation: databaseName can not be empty")
private String databaseName;
private String dataBaseName;
/**
* 超级表名称
......
......@@ -12,6 +12,7 @@ import lombok.Data;
*/
@Data
public class Fields {
private static final long serialVersionUID = 1L;
/**
* 字段名称
......@@ -55,8 +56,8 @@ public class Fields {
this.dataType = DataTypeEnum.DOUBLE;
break;
case ("datetime"):
if ("eventTime_td".equals(fieldName))
this.fieldName = "ptEventTime_td";
if ("eventTime".equals(fieldName))
this.fieldName = "eventTime";
this.dataType = DataTypeEnum.TIMESTAMP;
break;
}
......
......@@ -16,6 +16,8 @@ import java.util.List;
*/
@Data
public class FieldsVo {
private static final long serialVersionUID = 1L;
/**
* 字段名称
*/
......
......@@ -5,6 +5,8 @@ import com.fasterxml.jackson.annotation.JsonFormat;
import java.sql.Timestamp;
public class IotSequential extends BaseEntity {
private static final long serialVersionUID = 1L;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss.SSS", timezone = "GMT+8")
private Timestamp statetime;
......
......@@ -140,6 +140,6 @@ public class Constants
/**
* TDengine superTableFields cache key
*/
public static final String TDENGINE_SUPERTABLEFILELDS = "TDengine_superTableFields:";
public static final String TDENGINE_SUPERTABLEFILELDS = "TDengine_SuperTableFields:";
}
package com.mqttsnet.thinglinks.common.core.constant;
/**
* @ClassDescription: 时序性数据库常用常量
* @ClassName: TdEngineConstant
* @Author: thinglinks
* @Date: 2021-12-31 14:42:58
* @Version 1.0
*/
public class TdEngineConstant {
/**
* 字段名后缀
*/
public static final String FIELD_NAME_SUFFIX = "_td";
}
......@@ -479,4 +479,33 @@ public class StringUtils extends org.apache.commons.lang3.StringUtils
{
return (T) obj;
}
/**
* 去除字符串首尾出现的某个字符.
*
* @param source 源字符串.
* @param element 需要去除的字符.
* @return String.
*/
public static String trimFirstAndLastChar(String source, String element) {
if(source==null){
return "";
}
source = source.trim(); // 循环去掉字符串首的beTrim字符
if(source.isEmpty()){
return "";
}
String beginChar = source.substring(0, 1);
if (beginChar.equalsIgnoreCase(element)) {
source = source.substring(1, source.length());
}
// 循环去掉字符串尾的beTrim字符
String endChar = source.substring(source.length() - 1, source.length());
if (endChar.equalsIgnoreCase(element) && source.length()>1) {
source = source.substring(0, source.length() - 1);
}else{
source = "";
}
return source;
}
}
\ No newline at end of file
package com.mqttsnet.thinglinks.common.core.utils;
/**
* @Description: 字符串工具类-智能截取
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @CreateDate: 2021/11/15$ 19:03$
* @UpdateUser: ShiHuan Sun
* @UpdateDate: 2021/11/15$ 19:03$
* @UpdateRemark: 修改内容
* @Version: 1.0
*/
public class SubStringUtil {
/**
* 从头开始截取
*
* @param str 字符串
* @param end 结束位置
* @return
*/
public static String subStrStart(String str, int end){
return subStr(str, 0, end);
}
/**
* 从尾开始截取
*
* @param str 字符串
* @param start 开始位置
* @return
*/
public static String subStrEnd(String str, int start){
return subStr(str, str.length()-start, str.length());
}
/**
* 截取字符串 (支持正向、反向截取)<br/>
*
* @param str 待截取的字符串
* @param length 长度 ,>=0时,从头开始向后截取length长度的字符串;<0时,从尾开始向前截取length长度的字符串
* @return 返回截取的字符串
* @throws RuntimeException
*/
public static String subStr(String str, int length) throws RuntimeException{
if(str==null){
throw new NullPointerException("字符串为null");
}
int len = str.length();
if(len<Math.abs(length)){
throw new StringIndexOutOfBoundsException("最大长度为"+len+",索引超出范围为:"+(len-Math.abs(length)));
}
if(length>=0){
return subStr(str, 0,length);
}else{
return subStr(str, len-Math.abs(length), len);
}
}
/**
* 截取字符串 (支持正向、反向选择)<br/>
*
* @param str 待截取的字符串
* @param start 起始索引 ,>=0时,从start开始截取;<0时,从length-|start|开始截取
* @param end 结束索引 ,>=0时,从end结束截取;<0时,从length-|end|结束截取
* @return 返回截取的字符串
* @throws RuntimeException
*/
public static String subStr(String str, int start, int end) throws RuntimeException{
if(str==null){
throw new NullPointerException("");
}
int len = str.length();
int s = 0;//记录起始索引
int e = 0;//记录结尾索引
if(len<Math.abs(start)){
throw new StringIndexOutOfBoundsException("最大长度为"+len+",索引超出范围为:"+(len-Math.abs(start)));
}else if(start<0){
s = len - Math.abs(start);
}else if(start<0){
s=0;
}else{//>=0
s = start;
}
if(len<Math.abs(end)){
throw new StringIndexOutOfBoundsException("最大长度为"+len+",索引超出范围为:"+(len-Math.abs(end)));
}else if (end <0){
e = len - Math.abs(end);
}else if (end==0){
e = len;
}else{//>=0
e = end;
}
if(e<s){
throw new StringIndexOutOfBoundsException("截至索引小于起始索引:"+(e-s));
}
return str.substring(s, e);
}
/**
* 用指定字符串数组相连接,并返回
*
* @param strs 字符串数组
* @param splitStr 连接数组的字符串
* @return
*/
public static String join(String[] strs, String splitStr){
if(strs!=null){
if(strs.length==1){
return strs[0];
}
StringBuffer sb = new StringBuffer();
for (String str : strs) {
sb.append(str).append(splitStr);
}
if(sb.length()>0){
sb.delete(sb.length()-splitStr.length(), sb.length());
}
return sb.toString();
}
return null;
}
}
......@@ -4,10 +4,12 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.compress.utils.Lists;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.DataType;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
/**
......@@ -1619,4 +1621,40 @@ public class RedisService
public Cursor<ZSetOperations.TypedTuple<String>> zScan(String key, ScanOptions options) {
return redisTemplate.opsForZSet().scan(key, options);
}
/**
* 模糊匹配key
*
* @param match
* @param count
* @return
*/
public Cursor<String> scan(String match, int count) {
ScanOptions scanOptions = ScanOptions.scanOptions().match(match).count(count).build();
RedisSerializer<String> redisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
Cursor cursor = (Cursor) redisTemplate.executeWithStickyConnection((RedisCallback) redisConnection ->
new ConvertingCursor<>(redisConnection.scan(scanOptions), redisSerializer::deserialize));
return cursor;
}
/**
* scan模糊匹配删除
*
* @param match
* @param count
*/
public void scanDelete(String match, int count) {
try {
List<String> keys = Lists.newArrayList();
Cursor<String> cursor = scan(match, count);
while (cursor.hasNext()) {
//找到一次就添加一次
keys.add(cursor.next());
}
cursor.close();
final long deleteObjectCount = deleteObject(keys);
} catch (Exception e) {
log.error("scanDelete error {}", e.getMessage());
}
}
}
......@@ -40,9 +40,8 @@ public class NoRepeatSubmitAop {
String key = SecurityUtils.getToken() + "-" + request.getServletPath();
log.info("newToken:{}", key);
if (!redisService.hasKey(Constants.RESUBMIT_URL_KEY+key)) {// 如果缓存中有这个url视为重复提交
Object o = pjp.proceed();//当使用环绕通知时,这个方法必须调用,否则拦截到的方法就不会再执行了
redisService.setCacheObject(Constants.RESUBMIT_URL_KEY+key, o, 2L, TimeUnit.SECONDS);
return o;
redisService.setCacheObject(Constants.RESUBMIT_URL_KEY+key, pjp.toString(), 3000L, TimeUnit.MILLISECONDS);
return pjp.proceed();
} else {
log.error("请勿重复提交");
return AjaxResult.error("请勿重复提交");
......
......@@ -41,11 +41,11 @@ public class DeviceActionMessageConsumer implements RocketMQListener {
* ${topic} 其他为业务数据自行处理
*/
if("$event/connect".equals(thinglinksMessage.get("topic"))){
deviceActionService.connectEvent(String.valueOf(thinglinksMessage.get("msg")));
deviceActionService.connectEvent(String.valueOf(thinglinksMessage.getString("msg")));
}else if("$event/close".equals(thinglinksMessage.get("topic"))){
deviceActionService.closeEvent(String.valueOf(thinglinksMessage.get("msg")));
deviceActionService.closeEvent(String.valueOf(thinglinksMessage.getString("msg")));
}else {
deviceDatasService.insertBaseDatas(String.valueOf(thinglinksMessage.get("msg")));
deviceDatasService.insertBaseDatas(thinglinksMessage);
}
}
}
......@@ -75,7 +75,11 @@ public class DeviceController extends BaseController {
@PostMapping
public AjaxResult add(@RequestBody Device device)
{
return toAjax(deviceService.insertDevice(device));
try {
return toAjax(deviceService.insertDevice(device));
}catch (Exception e){
return AjaxResult.error(e.getMessage());
}
}
/**
......
......@@ -12,6 +12,7 @@ import com.mqttsnet.thinglinks.common.security.annotation.PreAuthorize;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.Product;
import com.mqttsnet.thinglinks.link.service.product.ProductService;
import com.mqttsnet.thinglinks.system.api.RemoteFileService;
import com.mqttsnet.thinglinks.tdengine.api.domain.SuperTableDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
......@@ -179,4 +180,20 @@ public class ProductController extends BaseController {
}
return AjaxResult.error("产品名称已存在");
}
/**
* 获取超级表模型
* @return
*/
@GetMapping(value = "/findCreateSuperTableDataModel")
public AjaxResult findCreateSuperTableDataModel()
{
try {
final List<SuperTableDto> superTableDataModel = productService.createSuperTableDataModel();
return AjaxResult.success(superTableDataModel);
}catch (Exception e){
log.error(e.getMessage());
}
return AjaxResult.error("产品数据异常,请联系管理员");
}
}
......@@ -133,6 +133,11 @@ public interface ProductMapper {
Product findOneByManufacturerIdAndModelAndDeviceType(@Param("manufacturerId")String manufacturerId,@Param("model")String model,@Param("deviceType")String deviceType);
List<Product> findAllByStatus(@Param("status")String status);
Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(@Param("manufacturerId")String manufacturerId,@Param("model")String model,@Param("protocolType")String protocolType,@Param("status")String status);
}
\ No newline at end of file
......@@ -71,4 +71,8 @@ public interface ProductPropertiesMapper {
int updateBatchSelective(List<ProductProperties> list);
int batchInsert(@Param("list") List<ProductProperties> list);
List<ProductProperties> findAllByServiceId(@Param("serviceId")Long serviceId);
}
\ No newline at end of file
......@@ -72,7 +72,9 @@ public interface ProductServicesMapper {
int batchInsert(@Param("list") List<ProductServices> list);
ProductServices findOneByProductId(@Param("productId")Long productId);
List<ProductServices> findByProductIds(@Param("productIds") List<Long> productIds);
List<ProductServices> findAllByProductIdAndStatus(@Param("productId")Long productId,@Param("status")String status);
}
\ No newline at end of file
package com.mqttsnet.thinglinks.link.service.device;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.mqttsnet.thinglinks.link.api.domain.device.entity.DeviceDatas;
/**
......@@ -48,7 +50,16 @@ public interface DeviceDatasService {
*
* @param thinglinksMessage
*/
void insertBaseDatas(String thinglinksMessage);
void insertBaseDatas(JSONObject thinglinksMessage);
/**
* 处理datas Topic数据上报
*
* @param deviceIdentification 设备标识
* @param msg 数据
*/
void processingDatasTopic(String deviceIdentification,String msg) throws Exception;
}
......
......@@ -71,7 +71,7 @@ public interface DeviceService {
* @param device 设备管理
* @return 结果
*/
public int insertDevice(Device device);
public int insertDevice(Device device)throws Exception;
/**
* 修改设备管理
......
package com.mqttsnet.thinglinks.link.service.device.impl;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.Gson;
import com.mqttsnet.thinglinks.common.core.utils.SubStringUtil;
import com.mqttsnet.thinglinks.common.redis.service.RedisService;
import com.mqttsnet.thinglinks.link.api.domain.device.entity.Device;
import com.mqttsnet.thinglinks.link.api.domain.device.entity.DeviceDatas;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.Product;
import com.mqttsnet.thinglinks.link.mapper.device.DeviceDatasMapper;
import com.mqttsnet.thinglinks.link.service.device.DeviceDatasService;
import com.mqttsnet.thinglinks.link.service.device.DeviceService;
import com.mqttsnet.thinglinks.link.service.product.ProductService;
import com.mqttsnet.thinglinks.link.service.product.ProductServicesService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import com.mqttsnet.thinglinks.link.mapper.device.DeviceDatasMapper;
import com.mqttsnet.thinglinks.link.api.domain.device.entity.DeviceDatas;
import com.mqttsnet.thinglinks.link.service.device.DeviceDatasService;
/**
* @Description: java类作用描述
* @Author: ShiHuan Sun
......@@ -27,10 +35,19 @@ import com.mqttsnet.thinglinks.link.service.device.DeviceDatasService;
*/
@Service
@Slf4j
@Transactional(isolation = Isolation.DEFAULT, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class DeviceDatasServiceImpl implements DeviceDatasService {
@Resource
private DeviceDatasMapper deviceDatasMapper;
@Autowired
private DeviceService deviceService;
@Autowired
private ProductService productService;
@Autowired
private ProductServicesService productServicesService;
@Autowired
private RedisService redisService;
@Override
public int deleteByPrimaryKey(Long id) {
......@@ -103,21 +120,44 @@ public class DeviceDatasServiceImpl implements DeviceDatasService {
* @param thinglinksMessage
*/
@Override
public void insertBaseDatas(String thinglinksMessage) {
JSONObject mqttMessage = JSONObject.parseObject(thinglinksMessage);
String topic = mqttMessage.getString("topic");
String msg = mqttMessage.getString("msg");
public void insertBaseDatas(JSONObject thinglinksMessage) {
String topic = thinglinksMessage.getString("topic");
String msg = thinglinksMessage.getString("msg");
if (Objects.equals(msg, "{}")) {
log.error("topic:{},报文体为空已忽略处理", topic);
log.error("Topic:{},The entry is empty and ignored", topic);
return;
}
//边设备上报数据处理
if (topic.startsWith("/v1/devices/") && topic.endsWith("/datas")) {
log.info("边设备上报数据处理,Topic:{},Msg:{}", topic, msg);
log.info("Side equipment report data processing,Topic:{},Msg:{}", topic, msg);
final String deviceIdentification = SubStringUtil.subStr(topic,12,-6);
}
}
/**
* 处理datas Topic数据上报
*
* @param deviceIdentification 设备标识
* @param msg 数据
*/
@Override
public void processingDatasTopic(String deviceIdentification, String msg) throws Exception{
final Device oneByDeviceIdentification = deviceService.findOneByDeviceIdentification(deviceIdentification);
if (Objects.isNull(oneByDeviceIdentification)) {
log.error("The side device reports data processing, but the device does not exist,DeviceIdentification:{},Msg:{}", deviceIdentification, msg);
return;
}
final Product oneByManufacturerIdAndModelAndDeviceType = productService.findOneByManufacturerIdAndModelAndDeviceType(oneByDeviceIdentification.getManufacturerId(), oneByDeviceIdentification.getProductId(), oneByDeviceIdentification.getProtocolType());
if (Objects.isNull(oneByManufacturerIdAndModelAndDeviceType)) {
log.error("The side device reports data processing, but the product does not exist,DeviceIdentification:{},Msg:{}", deviceIdentification, msg);
return;
}
}
}
package com.mqttsnet.thinglinks.link.service.device.impl;
import com.alibaba.fastjson.JSON;
import com.mqttsnet.thinglinks.common.core.domain.R;
import com.mqttsnet.thinglinks.common.core.enums.DeviceConnectStatus;
import com.mqttsnet.thinglinks.common.core.utils.DateUtils;
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
import com.mqttsnet.thinglinks.common.log.annotation.Log;
import com.mqttsnet.thinglinks.common.security.service.TokenService;
import com.mqttsnet.thinglinks.link.api.domain.device.entity.Device;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.Product;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.ProductServices;
import com.mqttsnet.thinglinks.link.mapper.device.DeviceMapper;
import com.mqttsnet.thinglinks.link.service.device.DeviceService;
import com.mqttsnet.thinglinks.link.service.product.ProductService;
import com.mqttsnet.thinglinks.link.service.product.ProductServicesService;
import com.mqttsnet.thinglinks.system.api.domain.SysUser;
import com.mqttsnet.thinglinks.system.api.model.LoginUser;
import com.mqttsnet.thinglinks.tdengine.api.RemoteTdEngineService;
import com.mqttsnet.thinglinks.tdengine.api.domain.Fields;
import com.mqttsnet.thinglinks.tdengine.api.domain.TableDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
/**
* @Description: 设备管理业务层接口实现类
......@@ -30,12 +45,24 @@ import java.util.List;
*/
@Service
@Slf4j
@Transactional(isolation = Isolation.DEFAULT, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class DeviceServiceImpl implements DeviceService {
@Resource
private DeviceMapper deviceMapper;
@Autowired
private TokenService tokenService;
@Resource
private RemoteTdEngineService remoteTdEngineService;
@Autowired
private ProductService productService;
@Autowired
private ProductServicesService productServicesService;
/**
* 数据库名称
*/
@Value("${spring.datasource.dynamic.datasource.master.dbName:thinglinks}")
private String dataBaseName;
@Override
public int deleteByPrimaryKey(Long id) {
......@@ -178,8 +205,8 @@ public class DeviceServiceImpl implements DeviceService {
* @return 结果
*/
@Override
public int insertDevice(Device device)
{
@Transactional(rollbackFor = Exception.class)
public int insertDevice(Device device)throws Exception {
Device oneByClientIdAndDeviceIdentification = deviceMapper.findOneByClientIdOrDeviceIdentification(device.getClientId(), device.getDeviceIdentification());
if(StringUtils.isNotNull(oneByClientIdAndDeviceIdentification)){
return 0;
......@@ -189,7 +216,35 @@ public class DeviceServiceImpl implements DeviceService {
SysUser sysUser = loginUser.getSysUser();
device.setCreateBy(sysUser.getUserName());
device.setCreateTime(DateUtils.getNowDate());
return deviceMapper.insertDevice(device);
final int insertDeviceCount = deviceMapper.insertDevice(device);
Product product = productService.findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(device.getManufacturerId(), device.getProductId(), device.getProtocolType(),"0");
if (StringUtils.isNull(product)) {
new Throwable("The side device reports data processing, but the product does not exist,DeviceIdentification:" + device.getDeviceIdentification() + ",Msg:" + JSON.toJSONString(device));
}
// 新增设备管理成功后,创建TD普通表
List<ProductServices> allByProductIdAndStatus = productServicesService.findAllByProductIdAndStatus(product.getId(), "0");
TableDto tableDto;
for (ProductServices productServices : allByProductIdAndStatus) {
tableDto = new TableDto();
tableDto.setDataBaseName(dataBaseName);
//超级表名称命名规则:产品类型_产品标识_服务名称_设备标识(非ClientId)
String superTableName = product.getProductType()+"_"+product.getProductIdentification()+"_"+productServices.getServiceName();
tableDto.setSuperTableName(superTableName);
tableDto.setTableName(superTableName+"_"+device.getDeviceIdentification());
//Tag的处理
List<Fields> tagsFieldValues = new ArrayList<>();
Fields fields = new Fields();
fields.setFieldValue(device.getDeviceIdentification());
tagsFieldValues.add(fields);
tableDto.setTagsFieldValues(tagsFieldValues);
final R<?> ctResult = remoteTdEngineService.createTable(tableDto);
if (ctResult.getCode() != 200) {
log.error("Create SuperTable Exception: " + ctResult.getMsg());
}else {
log.info("Create SuperTable Success: " + ctResult.getMsg());
}
}
return insertDeviceCount;
}
/**
......
......@@ -40,4 +40,9 @@ public interface ProductPropertiesService{
int batchInsert(List<ProductProperties> list);
List<ProductProperties> findAllByServiceId(Long serviceId);
}
package com.mqttsnet.thinglinks.link.service.product;
import com.alibaba.fastjson.JSONArray;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.ProductServices;
import com.mqttsnet.thinglinks.common.core.web.domain.AjaxResult;
import com.mqttsnet.thinglinks.link.api.domain.product.entity.Product;
import com.mqttsnet.thinglinks.tdengine.api.domain.SuperTableDto;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
/**
* @Description: java类作用描述
* @Description: 产品服务接口
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
......@@ -120,10 +122,29 @@ public interface ProductService{
Product findOneByManufacturerIdAndModelAndDeviceType(String manufacturerId,String model,String deviceType);
/**
* 根据产品模型创建超级表
* @param product
* @param services
* @return
* @throws Exception
*/
AjaxResult createSuperTable(Product product, JSONArray services) throws Exception;
List<Product> findAllByStatus(String status);
/**
* 生成超级表模型
* @return List<SuperTableDto>
* @throws Exception
*/
List<SuperTableDto> createSuperTableDataModel()throws Exception;
ProductServices findOneByProductId(Long productId);
Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(String manufacturerId,String model,String protocolType,String status);
}
......@@ -4,7 +4,7 @@ import com.mqttsnet.thinglinks.link.api.domain.product.entity.ProductServices;
import java.util.List;
/**
* @Description: java类作用描述
* @Description: 产品服务
* @Author: ShiHuan Sun
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
......@@ -40,4 +40,11 @@ public interface ProductServicesService{
int batchInsert(List<ProductServices> list);
List<ProductServices> findByProductIds(List<Long> productIds);
List<ProductServices> findAllByProductIdAndStatus(Long productId,String status);
}
......@@ -80,4 +80,12 @@ public class ProductPropertiesServiceImpl implements ProductPropertiesService{
return productPropertiesMapper.batchInsert(list);
}
@Override
public List<ProductProperties> findAllByServiceId(Long serviceId){
return productPropertiesMapper.findAllByServiceId(serviceId);
}
}
package com.mqttsnet.thinglinks.link.service.product.impl;
import java.util.List;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
......@@ -8,8 +10,10 @@ import com.mqttsnet.thinglinks.common.core.constant.Constants;
import com.mqttsnet.thinglinks.common.core.domain.R;
import com.mqttsnet.thinglinks.common.core.enums.DataTypeEnum;
import com.mqttsnet.thinglinks.common.core.text.CharsetKit;
import com.mqttsnet.thinglinks.common.core.text.Convert;
import com.mqttsnet.thinglinks.common.core.text.UUID;
import com.mqttsnet.thinglinks.common.core.utils.DateUtils;
import com.mqttsnet.thinglinks.common.core.utils.SpringUtils;
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
import com.mqttsnet.thinglinks.common.core.web.domain.AjaxResult;
import com.mqttsnet.thinglinks.common.redis.service.RedisService;
......@@ -23,7 +27,10 @@ import com.mqttsnet.thinglinks.link.api.domain.product.model.Services;
import com.mqttsnet.thinglinks.link.mapper.product.ProductMapper;
import com.mqttsnet.thinglinks.link.mapper.product.ProductPropertiesMapper;
import com.mqttsnet.thinglinks.link.mapper.product.ProductServicesMapper;
import com.mqttsnet.thinglinks.link.service.product.ProductPropertiesService;
import com.mqttsnet.thinglinks.link.service.product.ProductService;
import com.mqttsnet.thinglinks.link.service.product.ProductServicesService;
import com.mqttsnet.thinglinks.system.api.domain.SysDictData;
import com.mqttsnet.thinglinks.system.api.domain.SysUser;
import com.mqttsnet.thinglinks.system.api.model.LoginUser;
import com.mqttsnet.thinglinks.tdengine.api.RemoteTdEngineService;
......@@ -35,6 +42,7 @@ import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
......@@ -78,9 +86,9 @@ public class ProductServiceImpl implements ProductService{
@Autowired
private TokenService tokenService;
@Autowired
private ProductServicesMapper productServicesMapper;
private ProductServicesService productServicesService;
@Autowired
private ProductPropertiesMapper productPropertiesMapper;
private ProductPropertiesService productPropertiesService;
@Resource
private RemoteTdEngineService remoteTdEngineService;
@Autowired
......@@ -90,7 +98,7 @@ public class ProductServiceImpl implements ProductService{
* 数据库名称
*/
@Value("${spring.datasource.dynamic.datasource.master.dbName:thinglinks}")
private String databaseName;
private String dataBaseName;
@Override
public int deleteByPrimaryKey(Long id) {
......@@ -327,7 +335,7 @@ public class ProductServiceImpl implements ProductService{
productServices.setDescription(service.getString("description"));
productServices.setCreateBy(sysUser.getUserName());
productServices.setCreateTime(DateUtils.getNowDate());
final int insertSelective = productServicesMapper.insertSelective(productServices);
final int insertSelective = productServicesService.insertSelective(productServices);
if (insertSelective==0) {
throw new RuntimeException("Service capability Data storage fails");
}
......@@ -340,7 +348,7 @@ public class ProductServiceImpl implements ProductService{
productProperties.setServiceId(productServices.getId());
productProperties.setCreateBy(sysUser.getUserName());
productProperties.setCreateTime(DateUtils.getNowDate());
final int batchInsert = productPropertiesMapper.insertSelective(productProperties);
final int batchInsert = productPropertiesService.insertSelective(productProperties);
}
}
//解析入库成功创建TD超级表及子表
......@@ -353,12 +361,13 @@ public class ProductServiceImpl implements ProductService{
}
/**
* 创建TD超级表
* 根据产品模型创建超级表
* @param product
* @param services
* @return
* @throws Exception
*/
@Override
@Transactional(rollbackFor = Exception.class)
public AjaxResult createSuperTable(Product product,JSONArray services) throws Exception{
//构建超级表入参对象
......@@ -370,7 +379,7 @@ public class ProductServiceImpl implements ProductService{
//超级表名称命名规则:产品类型_产品标识_服务名称
String superTableName = product.getProductType()+"_"+product.getProductIdentification()+"_"+service.getString("serviceId");
//设置数据库名称和超级表名称
superTableDto.setDatabaseName(databaseName);
superTableDto.setDataBaseName(dataBaseName);
superTableDto.setSuperTableName(superTableName);
//构建超级表的表结构字段列表
JSONArray properties = service.getJSONArray("properties");
......@@ -424,8 +433,8 @@ public class ProductServiceImpl implements ProductService{
redisService.deleteObject(Constants.TDENGINE_SUPERTABLEFILELDS+superTableName);
}
//在redis里存入新的超级表对的表结构信息
redisService.setCacheList(Constants.TDENGINE_SUPERTABLEFILELDS+superTableName, schemaFields);
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, JSON.toJSONString(superTableDto));
log.info("缓存超级表数据模型:{}",JSON.toJSONString(superTableDto));
}
}catch (Exception e){
log.error(e.getMessage());
......@@ -534,15 +543,94 @@ public class ProductServiceImpl implements ProductService{
}
@Override
public ProductServices findOneByProductId(Long productId){
return productServicesMapper.findOneByProductId(productId);
public List<Product> findAllByStatus(String status){
return productMapper.findAllByStatus(status);
}
/**
* 生成超级表模型
* @return List<SuperTableDto>
* @throws Exception
*/
@Async
@Override
public List<SuperTableDto> createSuperTableDataModel()throws Exception{
List<SuperTableDto> superTableDtoList = new ArrayList<>();
List<Product> allByStatus = this.findAllByStatus("0");
SuperTableDto superTableDto;
loop:
for (Product product : allByStatus) {
List<ProductServices> allByProductIdAndStatus = productServicesService.findAllByProductIdAndStatus(product.getId(), "0");
if(StringUtils.isEmpty(allByProductIdAndStatus)){
continue loop;
}
for (ProductServices productServices : allByProductIdAndStatus) {
superTableDto = new SuperTableDto();
if(StringUtils.isNull(productServices)){
continue loop;
}
//超级表名称命名规则:产品类型_产品标识_服务名称
String superTableName = product.getProductType()+"_"+product.getProductIdentification()+"_"+productServices.getServiceName();
//设置数据库名称和超级表名称
superTableDto.setDataBaseName(dataBaseName);
superTableDto.setSuperTableName(superTableName);
//构建超级表的表结构字段列表
List<ProductProperties> allByServiceId = productPropertiesService.findAllByServiceId(productServices.getId());
//如果服务下属性值为空,没必要为该服务创建超级表,跳过该循环,进入下个服务
if(StringUtils.isNull(allByServiceId)){
continue loop;
}
//构建超级表的表结构字段列表
List<Fields> schemaFields = new ArrayList<>();
//超级表第一个字段数据类型必须为时间戳
Fields firstColumn = new Fields();
firstColumn.setFieldName("ts");
firstColumn.setDataType(DataTypeEnum.TIMESTAMP);
schemaFields.add(firstColumn);
//根据属性对象列表循环构建超级表表结构
for (ProductProperties productProperties : allByServiceId) {
//获取字段名称
String filedName = productProperties.getName();
//获取该属性数据类型
String datatype = productProperties.getDatatype();
//获取该属性的数据大小
Integer size = productProperties.getMaxlength();
//添加超级表表结构字段
Fields fields = new Fields(filedName, datatype, size);
schemaFields.add(fields);
}
//构建超级表标签字段列表
//根据业务逻辑,将超级表的标签字段定为
// 1:设备标识:deviceIdentification
List<Fields> tagsFields = new ArrayList<>();
Fields tags = new Fields();
tags.setFieldName("deviceIdentification");
tags.setDataType(DataTypeEnum.BINARY);
tags.setSize(64);
tagsFields.add(tags);
//设置超级表表结构列表
superTableDto.setSchemaFields(schemaFields);
//设置超级表标签字段列表
superTableDto.setTagsFields(tagsFields);
//将之前存在redis里的同样的名称的超级表的表结构信息删除
if (redisService.hasKey(Constants.TDENGINE_SUPERTABLEFILELDS+superTableName)) {
redisService.deleteObject(Constants.TDENGINE_SUPERTABLEFILELDS+superTableName);
}
//在redis里存入新的超级表对的表结构信息
redisService.setCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS + superTableName, JSON.toJSONString(superTableDto));
log.info("缓存超级表数据模型:{}",JSON.toJSONString(superTableDto));
superTableDtoList.add(superTableDto);
}
}
return superTableDtoList;
}
@Override
public Product findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(String manufacturerId,String model,String protocolType,String status){
return productMapper.findOneByManufacturerIdAndModelAndProtocolTypeAndStatus(manufacturerId,model,protocolType,status);
}
......
......@@ -80,4 +80,19 @@ public class ProductServicesServiceImpl implements ProductServicesService{
return productServicesMapper.batchInsert(list);
}
@Override
public List<ProductServices> findByProductIds(List<Long> productIds){
return productServicesMapper.findByProductIds(productIds);
}
@Override
public List<ProductServices> findAllByProductIdAndStatus(Long productId,String status){
return productServicesMapper.findAllByProductIdAndStatus(productId,status);
}
}
......@@ -877,4 +877,37 @@
</if>
</where>
</select>
<!--auto generated by ShiHuan Sun E-mail: 13733918655@163.com on 2022-04-13-->
<select id="findAllByStatus" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from product
<where>
<if test="status != null">
and `status`=#{status,jdbcType=VARCHAR}
</if>
</where>
</select>
<!--auto generated by ShiHuan Sun E-mail: 13733918655@163.com on 2022-04-14-->
<select id="findOneByManufacturerIdAndModelAndProtocolTypeAndStatus" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from product
<where>
<if test="manufacturerId != null">
and manufacturer_id=#{manufacturerId,jdbcType=VARCHAR}
</if>
<if test="model != null">
and model=#{model,jdbcType=VARCHAR}
</if>
<if test="protocolType != null">
and protocol_type=#{protocolType,jdbcType=VARCHAR}
</if>
<if test="status != null">
and `status`=#{status,jdbcType=VARCHAR}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
......@@ -696,4 +696,16 @@
</if>
</trim>
</insert>
<!--auto generated by ShiHuan Sun E-mail: 13733918655@163.com on 2022-04-13-->
<select id="findAllByServiceId" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from product_properties
<where>
<if test="serviceId != null">
and service_id=#{serviceId,jdbcType=BIGINT}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
......@@ -445,8 +445,22 @@
</trim>
</insert>
<!--auto generated by ShiHuan Sun E-mail: 13733918655@163.com on 2022-03-25-->
<select id="findOneByProductId" resultMap="BaseResultMap">
<select id="findByProductIds" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from product_services
<where>
<if test="productIds != null">
and product_id in
<foreach collection="productIds" item="item" open="(" separator="," close=")">
#{item,jdbcType=BIGINT}
</foreach>
</if>
</where>
</select>
<!--auto generated by ShiHuan Sun E-mail: 13733918655@163.com on 2022-04-13-->
<select id="findAllByProductIdAndStatus" resultMap="BaseResultMap">
select
<include refid="Base_Column_List"/>
from product_services
......@@ -454,6 +468,9 @@
<if test="productId != null">
and product_id=#{productId,jdbcType=BIGINT}
</if>
<if test="status != null">
and `status`=#{status,jdbcType=VARCHAR}
</if>
</where>
</select>
</mapper>
\ No newline at end of file
//package com.mqttsnet.thinglinks.tdengine.common;
//
//import com.mqttsnet.thinglinks.tdengine.service.TdEngineService;
//import lombok.extern.slf4j.Slf4j;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.cloud.context.config.annotation.RefreshScope;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//
///**
// * @Description: 初始化数据库
// * @Author: ShiHuan SUN
// * @E-mail: 13733918655@163.com
// * @Website: http://thinglinks.mqttsnet.com
// * @CreateDate: 2022/3/28$ 16:12$
// * @UpdateUser: ShiHuan SUN
// * @UpdateDate: 2022/3/28$ 16:12$
// * @UpdateRemark: 修改内容
// * @Version: V1.0
// */
//@Component
//@Slf4j
//@RefreshScope
//public class InitDataBase {
// private static InitDataBase InitDataBase;
//
// @Autowired
// private TdEngineService tdEngineService;
//
// /**
// * 数据库名称
// */
// @Value("${spring.datasource.dynamic.datasource.master.dbName:thinglinks}")
// private String databaseName;
//
// @PostConstruct
// public void init() {
// /*InitDataBase = this;
// InitDataBase.tdEngineService=this.tdEngineService;
// //创建数据库
// this.tdEngineService.createDateBase(databaseName);
// log.info("初始化数据库:{}成功!",databaseName);*/
// }
//
//}
package com.mqttsnet.thinglinks.tdengine.common;
import com.mqttsnet.thinglinks.tdengine.service.TdEngineService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* @Description: 初始化数据库
* @Author: ShiHuan SUN
* @E-mail: 13733918655@163.com
* @Website: http://thinglinks.mqttsnet.com
* @CreateDate: 2022/3/28$ 16:12$
* @UpdateUser: ShiHuan SUN
* @UpdateDate: 2022/3/28$ 16:12$
* @UpdateRemark: 修改内容
* @Version: V1.0
*/
@Component
@Slf4j
@RefreshScope
public class InitDataBase {
private static InitDataBase InitDataBase;
@Autowired
private TdEngineService tdEngineService;
/**
* 数据库名称
*/
@Value("${spring.datasource.dynamic.datasource.master.dbName:thinglinks}")
private String dataBaseName;
@PostConstruct
public void init() throws Exception {
InitDataBase = this;
InitDataBase.tdEngineService=this.tdEngineService;
StopWatch watch = new StopWatch();
watch.start();
//创建数据库
this.tdEngineService.createDateBase(dataBaseName);
//初始化超级表结构
this.tdEngineService.initSTableFrame();
watch.stop();
log.info("初始化数据库及超级表:{} 成功 ! Time Elapsed (millisecond): {}",dataBaseName,watch.getTime());
}
}
//package com.mqttsnet.thinglinks.tdengine.common;
//
//import com.mqttsnet.thinglinks.common.core.domain.R;
//import com.mqttsnet.thinglinks.tdengine.api.RemoteTdEngineService;
//import com.mqttsnet.thinglinks.tdengine.api.domain.Fields;
//import com.mqttsnet.thinglinks.tdengine.api.domain.SuperTableDto;
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.beans.factory.annotation.Value;
//import org.springframework.stereotype.Component;
//
//import java.util.ArrayList;
//import java.util.List;
//
///**
// * @ClassDescription: 初始化表结构
// * @ClassName: InitTableFrame
// * @Author: thinglinks
// * @Date: 2021-12-31 10:52:18
// * @Version 1.0
// */
//@Component
//public class InitTableFrame {
//
// private static final Logger log = LoggerFactory.getLogger(InitTableFrame.class);
//
// /**
// * 时序性数据库服务
// */
// @Autowired
// private RemoteTdEngineService tdEngineService;
//
// /**
// * 服务信息mapper
// */
// @Autowired
// private IotThingServiceInfoMapper thingServiceInfoMapper;
//
// /**
// * 服务属性信息mapper
// */
// @Autowired
// private IotThingPropertyInfoMapper thingPropertyInfoMapper;
//
// /**
// * 设备信息mapper
// */
// @Autowired
// private IotDeviceInfoMapper deviceInfoMapper;
//
// @Autowired
// private RedisService redisService;
//
// /**
// * 数据库名称
// */
// @Value("${tdEngine.databaseName}")
// private String databaseName;
//
// /**
// *@MethodDescription 初始化数据库,超级表和子表
// *@author thinglinks
// *@Date 2022/1/10 9:42
// */
// public void initTableFrame() throws Exception {
//
// //创建数据库
// R<?> cdbResult = this.tdEngineService.createDataBase(databaseName);
//
// //创建数据库报错,打印报错信息,并结束方法
// if (cdbResult.getCode() != 200) {
// log.error("Create Database Exception: " + cdbResult.getMsg());
// return;
// }
//
// //获取服务id和设备模型id都不为空的服务对象列表
// List<IotThingServiceInfo> serviceInfos
// = this.thingServiceInfoMapper.getServiceExistProduct();
//
// //根据服务对象列表循环创建超级表,子表
// loop:
// for (IotThingServiceInfo serviceInfo : serviceInfos) {
// //构建超级表入参对象
// SuperTableDto superTableDto = new SuperTableDto();
//
// /*超级表名称命名规则(常量前缀加上设备模型id加上服务名称):
// "常量类的超级表名前缀" + “_” + “productId” + “_” + “serviceName"*/
// String superTableName = TdEngineConstant.SUPER_TABLE_NAME_PREFIX +
// "_" + serviceInfo.getProductId() +
// "_" + serviceInfo.getServiceName();
//
// //设置数据库名称和超级表名称
// superTableDto.setDatabaseName(databaseName);
// superTableDto.setSuperTableName(superTableName);
//
// //根据服务id获取该服务下所有属性对象列表
// List<IotThingPropertyInfo> propertyInfos
// = this.thingPropertyInfoMapper.getPropertyListByServiceId(serviceInfo.getServiceId());
//
// //如果服务下属性值为空,没必要为该服务创建超级表,跳过该循环,进入下个服务
// if (propertyInfos.isEmpty()) {
// continue loop;
// }
//
// //构建超级表的表结构字段列表
// List<Fields> schemaFields = new ArrayList<>();
// //超级表第一个字段数据类型必须为时间戳
// Fields firstColumn = new Fields();
// firstColumn.setFieldName("eventTime" + TdEngineConstant.FIELD_NAME_SUFFIX);
// firstColumn.setDataType(DataTypeEnum.TIMESTAMP);
// schemaFields.add(firstColumn);
//
// //根据属性对象列表循环构建超级表表结构
// for (IotThingPropertyInfo propertyInfo : propertyInfos) {
// //获取字段名称
// String filedName = propertyInfo.getName() + TdEngineConstant.FIELD_NAME_SUFFIX;
// //获取该属性数据类型
// String datatype = propertyInfo.getDatatype();
// //获取该属性的数据大小
// Integer size = propertyInfo.getMaxlength();
// Fields fields = new Fields(filedName, datatype, size);
// //添加超级表表结构字段
// schemaFields.add(fields);
// }
//
// //构建超级表标签字段列表
// //根据业务逻辑,将超级表的标签字段定为设备的客户端Id:clientId
// List<Fields> tagsFields = new ArrayList<>();
// Fields tags = new Fields();
// tags.setFieldName("clientId");
// tags.setDataType(DataTypeEnum.BINARY);
// tags.setSize(64);
// tagsFields.add(tags);
//
// //设置超级表表结构列表
// superTableDto.setSchemaFields(schemaFields);
// //设置超级表标签字段列表
// superTableDto.setTagsFields(tagsFields);
// //调用方法创建超级表
// R<?> cstResult = this.tdEngineService.createSuperTable(superTableDto);
// //创建超级表报错,打印报错信息,并跳过该循环,继续为下个服务创建表
// if (cstResult.getCode() != 200) {
// log.error("Create SuperTable Exception: " + cstResult.getMsg());
// continue loop;
// }
// //将之前存在redis里的同样的名称的超级表的表结构信息删除
// if (redisService.hasKey(superTableName)) {
// redisService.deleteObject(superTableName);
// }
// //在redis里存入新的超级表对的表结构信息
// redisService.setCacheList(superTableName, schemaFields);
//
// //根据设备模型id查询该设备模型下所有设备对象列表
// List<IotDeviceInfo> devices
// = this.deviceInfoMapper.getDeviceListByProductId((long) serviceInfo.getProductId());
// //根据设备对象列表创建子表
// for (IotDeviceInfo device : devices) {
// TableDto tableDto = new TableDto();
// //为设备的客户端id加上服务名称
// //子表名称命名规则(客户端id加上服务名称):“clientId” + “serviceName"
// tableDto.setTableName(device.getClientId() + serviceInfo.getServiceName());
// //设置数据库名称
// tableDto.setDatabaseName(databaseName);
// //设置超级表名称
// tableDto.setSuperTableName(superTableName);
// List<Fields> tagsFieldValues = new ArrayList<>();
// Fields fieldValue = new Fields();
// //设置标签字段的值
// fieldValue.setFieldValue(device.getClientId());
// tagsFieldValues.add(fieldValue);
// tableDto.setTagsFieldValues(tagsFieldValues);
// //创建子表报错,打印报错信息,不做其他操作
// R<?> ctResult = this.tdEngineService.createTable(tableDto);
// if (ctResult.getCode() != 200) {
// log.error("Create Table Exception: " + ctResult.getMsg());
// }
// }
// }
// }
//}
......@@ -37,17 +37,17 @@ public class TdEngineController {
private static final Logger log = LoggerFactory.getLogger(TdEngineController.class);
/**
* @param databaseName 数据库名称
* @param dataBaseName 数据库名称
* @return R
* @MethodDescription 创建tdEngine数据库
* @author thinglinks
* @Date 2021/12/27 16:26
*/
@PostMapping("/createDb")
public R createDataBase(@RequestBody() String databaseName) {
public R createDataBase(@RequestBody() String dataBaseName) {
//调用创建数据库方法
this.tdEngineService.createDateBase(databaseName);
log.info("successful operation: created database '" + databaseName + "' success");
this.tdEngineService.createDateBase(dataBaseName);
log.info("successful operation: created database '" + dataBaseName + "' success");
return R.ok();
}
......@@ -65,7 +65,7 @@ public class TdEngineController {
//从入参对象获取标签字段对象集合
List<Fields> tagsFields = superTableDto.getTagsFields();
//从入参获取数据库名称
String databaseName = superTableDto.getDatabaseName();
String dataBaseName = superTableDto.getDataBaseName();
//从入参获取超级表名称
String superTableName = superTableDto.getSuperTableName();
//获取列字段对象集合的第一个对象的字段数据类型
......@@ -81,7 +81,7 @@ public class TdEngineController {
List<FieldsVo> schemaFieldsVoList = FieldsVo.fieldsTranscoding(schemaFields);
List<FieldsVo> tagsFieldsVoList = FieldsVo.fieldsTranscoding(tagsFields);
//创建超级表
this.tdEngineService.createSuperTable(schemaFieldsVoList, tagsFieldsVoList, databaseName, superTableName);
this.tdEngineService.createSuperTable(schemaFieldsVoList, tagsFieldsVoList, dataBaseName, superTableName);
log.info("successful operation: created superTable '" + superTableName + "' success");
return R.ok();
} catch (UncategorizedSQLException e) {
......@@ -97,6 +97,11 @@ public class TdEngineController {
}
}
/**
* 添加列字段
* @param superTableDto
* @return
*/
@PostMapping("/addColumnInStb")
public R addColumnForSuperTable(@RequestBody SuperTableDto superTableDto) {
......@@ -140,7 +145,7 @@ public class TdEngineController {
try {
this.tdEngineService.createTable(tableDto);
log.info("successful operation: create table success");
return R.ok();
return R.ok("successful operation: create table success");
} catch (UncategorizedSQLException e) {
String message = e.getCause().getMessage();
try {
......@@ -192,4 +197,9 @@ public class TdEngineController {
public R selectByTimesTamp(@Validated @RequestBody SelectDto selectDto) {
return R.ok(this.tdEngineService.selectByTimesTamp(selectDto));
}
@PostMapping("/getCountByTimestamp")
public R getCountByTimestamp(@Validated @RequestBody SelectDto selectDto) {
return R.ok(this.tdEngineService.getCountByTimesTamp(selectDto));
}
}
......@@ -19,11 +19,11 @@ import java.util.Map;
@Mapper
public interface TdEngineMapper {
void createDatabase(@Param("databaseName") String databaseName);
void createDatabase(@Param("dataBaseName") String dataBaseName);
void createSuperTable(@Param("schemaFields") List<FieldsVo> schemaFields,
@Param("tagsFields") List<FieldsVo> tagsFields,
@Param("databaseName") String databaseName,
@Param("dataBaseName") String dataBaseName,
@Param("superTableName") String superTableName);
void createTable(TableDto tableDto);
......@@ -34,4 +34,15 @@ public interface TdEngineMapper {
void addColumnForSuperTable(@Param("superTableName") String superTableName,
@Param("fieldsVo") FieldsVo fieldsVo);
void dropColumnForSuperTable(@Param("superTableName") String superTableName,
@Param("fieldsVo") FieldsVo fieldsVo);
void addTagForSuperTable(@Param("superTableName") String superTableName,
@Param("fieldsVo") FieldsVo fieldsVo);
void dropTagForSuperTable(@Param("superTableName") String superTableName,
@Param("fieldsVo") FieldsVo fieldsVo);
Map<String, Long> getCountByTimestamp(SelectDto selectDto);
}
......@@ -17,9 +17,9 @@ import java.util.Map;
* @Version 1.0
*/
public interface TdEngineService {
void createDateBase(String databaseName);
void createDateBase(String dataBaseName);
void createSuperTable(List<FieldsVo> schemaFields, List<FieldsVo> tagsFields, String databaseName, String superTableName);
void createSuperTable(List<FieldsVo> schemaFields, List<FieldsVo> tagsFields, String dataBaseName, String superTableName);
void createTable(TableDto tableDto);
......@@ -28,4 +28,8 @@ public interface TdEngineService {
List<Map<String, Object>> selectByTimesTamp(SelectDto selectDto);
void addColumnForSuperTable(String superTableName, FieldsVo fieldsVo);
Long getCountByTimesTamp(SelectDto selectDto);
void initSTableFrame() throws Exception;
}
package com.mqttsnet.thinglinks.tdengine.service.impl;
import com.mqttsnet.thinglinks.common.core.constant.Constants;
import com.mqttsnet.thinglinks.common.core.utils.StringUtils;
import com.mqttsnet.thinglinks.common.redis.service.RedisService;
import com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto;
import com.mqttsnet.thinglinks.tdengine.api.domain.TableDto;
import com.mqttsnet.thinglinks.tdengine.api.domain.FieldsVo;
import com.mqttsnet.thinglinks.tdengine.mapper.TdEngineMapper;
import com.mqttsnet.thinglinks.tdengine.service.TdEngineService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
/**
......@@ -20,19 +28,23 @@ import java.util.stream.Collectors;
* @Version 1.0
*/
@Service
@Slf4j
@Transactional(isolation = Isolation.DEFAULT, propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
public class TdEngineServiceImpl implements TdEngineService {
@Autowired
private TdEngineMapper tdEngineMapper;
@Autowired
private RedisService redisService;
@Override
public void createDateBase(String databaseName) {
this.tdEngineMapper.createDatabase(databaseName);
public void createDateBase(String dataBaseName) {
this.tdEngineMapper.createDatabase(dataBaseName);
}
@Override
public void createSuperTable(List<FieldsVo> schemaFields, List<FieldsVo> tagsFields, String databaseName, String superTableName) {
this.tdEngineMapper.createSuperTable(schemaFields, tagsFields, databaseName, superTableName);
public void createSuperTable(List<FieldsVo> schemaFields, List<FieldsVo> tagsFields, String dataBaseName, String superTableName) {
this.tdEngineMapper.createSuperTable(schemaFields, tagsFields, dataBaseName, superTableName);
}
@Override
......@@ -61,4 +73,25 @@ public class TdEngineServiceImpl implements TdEngineService {
public void addColumnForSuperTable(String superTableName, FieldsVo fieldsVo) {
this.tdEngineMapper.addColumnForSuperTable(superTableName, fieldsVo);
}
@Override
public Long getCountByTimesTamp(SelectDto selectDto) {
Map<String, Long> countMap = this.tdEngineMapper.getCountByTimestamp(selectDto);
if (countMap == null) {
return 0L;
}
Long count = countMap.get("count");
return count;
}
@Override
public void initSTableFrame() throws Exception {
final Object cacheObject = redisService.getCacheObject(Constants.TDENGINE_SUPERTABLEFILELDS);
if (StringUtils.isNull(cacheObject)) {
log.info("The production model cache is empty");
}
List<Optional> optionalList = StringUtils.cast(cacheObject);
}
}
......@@ -2,13 +2,12 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.mqttsnet.thinglinks.tdengine.mapper.TdEngineMapper">
<update id="createDatabase" parameterType="String">
create database if not exists #{databaseName}
create database if not exists #{dataBaseName}
</update>
<update id="createSuperTable">
create table if not exists #{databaseName}.#{superTableName}
create table if not exists #{dataBaseName}.#{superTableName}
<foreach item="item" collection="schemaFields" separator=","
open="(" close=")" index="">
<if test="item.fieldName != null || item.fieldName != ''">
......@@ -107,9 +106,9 @@
<update id="createTable">
create table
if not exists #{databaseName}.#{tableName}
using #{databaseName}.#{superTableName}
tags
if not exists #{dataBaseName}.#{tableName}
using #{dataBaseName}.#{superTableName}
tags
<foreach item="item" collection="tagsFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldValue}
......@@ -117,7 +116,7 @@
</update>
<insert id="insertData">
insert into #{databaseName}.#{tableName}
insert into #{dataBaseName}.#{tableName}
<foreach item="item" collection="tagsFieldValues" separator=","
open="(" close=")" index="">
#{item.fieldName}
......@@ -129,7 +128,8 @@
</foreach>
</insert>
<select id="selectByTimestamp" parameterType="com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto" resultType="Map">
<select id="selectByTimestamp" parameterType="com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto"
resultType="Map">
select * from #{dataBaseName}.#{tableName}
<!--查询这里不能使用#{}占位符的方式,使用这种方式,tdEngine不识别为列名,只能使用${}占位的方式-->
<!--因为tdEngine引擎一次只执行一条sql,所以有效预防了sql的注入,且该服务该接口为内部调用,所以可以使用${}-->
......@@ -138,13 +138,14 @@
</select>
<update id="addColumnForSuperTable">
ALTER STABLE #{superTableName} ADD COLUMN
ALTER
STABLE
#{superTableName}
ADD
COLUMN
<if test="fieldsVo.fieldName != null || fieldsVo.fieldName != ''">
#{fieldsVo.fieldName}
</if>
<!--<if test="fieldsVo.dataType != null || fieldsVo.dataType != ''">
#{fieldsVo.dataType}
</if>-->
<if test="fieldsVo.dataType != null || fieldsVo.dataType != ''">
<choose>
<when test="fieldsVo.dataType == 'timestamp'">
......@@ -183,8 +184,90 @@
</choose>
</if>
<if test="fieldsVo.size != null">
(#{fieldsVo.size})
(
#{fieldsVo.size}
)
</if>
</update>
<update id="dropColumnForSuperTable">
ALTER
STABLE
#{superTableName}
DROP
COLUMN
<if test="fieldsVo.fieldName != null || fieldsVo.fieldName != ''">
#{fieldsVo.fieldName}
</if>
</update>
<update id="addTagForSuperTable">
ALTER
STABLE
#{superTableName}
ADD
TAG
<if test="fieldsVo.fieldName != null || fieldsVo.fieldName != ''">
#{fieldsVo.fieldName}
</if>
<if test="fieldsVo.dataType != null || fieldsVo.dataType != ''">
<choose>
<when test="fieldsVo.dataType == 'timestamp'">
timestamp
</when>
<when test="fieldsVo.dataType == 'tinyint'">
tinyint
</when>
<when test="fieldsVo.dataType == 'smallint'">
smallint
</when>
<when test="fieldsVo.dataType == 'int'">
int
</when>
<when test="fieldsVo.dataType == 'bigint'">
bigint
</when>
<when test="fieldsVo.dataType == 'float'">
float
</when>
<when test="fieldsVo.dataType == 'double'">
double
</when>
<when test="fieldsVo.dataType == 'binary'">
binary
</when>
<when test="fieldsVo.dataType == 'nchar'">
nchar
</when>
<when test="fieldsVo.dataType == 'bool'">
bool
</when>
<when test="fieldsVo.dataType == 'json'">
json
</when>
</choose>
</if>
<if test="fieldsVo.size != null">
(
#{fieldsVo.size}
)
</if>
</update>
<update id="dropTagForSuperTable">
ALTER
STABLE
#{superTableName}
DROP
TAG
<if test="fieldsVo.fieldName != null || fieldsVo.fieldName != ''">
#{fieldsVo.fieldName}
</if>
</update>
<select id="getCountByTimestamp" parameterType="com.mqttsnet.thinglinks.tdengine.api.domain.SelectDto"
resultType="java.util.Map">
SELECT count(0) AS count
FROM #{dataBaseName}.#{tableName} WHERE ${fieldName} BETWEEN #{startTime} AND #{endTime}
</select>
</mapper>
\ No newline at end of file
......@@ -420,7 +420,7 @@
<div class="el-upload__tip text-center" slot="tip">
<div class="el-upload__tip" slot="tip">
<el-checkbox v-model="upload.updateSupport"/>
是否更新已经存在的用户数据
是否更新已经存在的产品模型数据
</div>
<span>仅允许导入xlsxlsx格式文件</span>
<el-link type="primary" :underline="false" style="font-size:12px;vertical-align: baseline;"
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册