提交 faaf3e82 编写于 作者: 彭勇升 pengys 提交者: wu-sheng

Separated TTL from minute to minute, hour, day, month. (#1268)

* #1263

Separated TTL from minute to minute, hour, day, month.

* 1. Minute metric data ttl change to 90, because of the 1 hour duration condition in UI means 60 minutes.
And i prefer to setting the value up to above 50%.
2. Some description and logging.

#1263

* Add a config time named traceDataTTL for trace data TTL.

#1263

* Fixed the logger english sentence mistake.
上级 af5eedeb
......@@ -64,7 +64,7 @@ public class GlobalTraceSpanListener implements GlobalTraceIdsListener {
for (String globalTraceId : globalTraceIds) {
GlobalTrace globalTrace = new GlobalTrace();
globalTrace.setId(segmentCoreInfo.getSegmentId() + Const.ID_SPLIT + globalTraceId);
globalTrace.setGlobalTraceId(globalTraceId);
globalTrace.setTraceId(globalTraceId);
globalTrace.setSegmentId(segmentCoreInfo.getSegmentId());
globalTrace.setTimeBucket(segmentCoreInfo.getMinuteTimeBucket());
graph.start(globalTrace);
......
......@@ -68,7 +68,12 @@ storage:
indexShardsNumber: 2
indexReplicasNumber: 0
highPerformanceMode: true
ttl: 7
# Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
traceDataTTL: 90 # Unit is minute
minuteMetricDataTTL: 90 # Unit is minute
hourMetricDataTTL: 36 # Unit is hour
dayMetricDataTTL: 45 # Unit is day
monthMetricDataTTL: 18 # Unit is month
#storage:
# h2:
# url: jdbc:h2:~/memorydb
......
......@@ -31,5 +31,5 @@ public interface IPersistenceDAO<INSERT, UPDATE, STREAM_DATA extends StreamData>
UPDATE prepareBatchUpdate(STREAM_DATA data);
void deleteHistory(Long startTimestamp, Long endTimestamp);
void deleteHistory(Long timeBucketBefore);
}
......@@ -73,11 +73,11 @@ public class GlobalTrace extends StreamData {
setDataString(1, segmentId);
}
public String getGlobalTraceId() {
public String getTraceId() {
return getDataString(2);
}
public void setGlobalTraceId(String globalTraceId) {
public void setTraceId(String globalTraceId) {
setDataString(2, globalTraceId);
}
......
......@@ -27,8 +27,12 @@ class StorageModuleEsConfig extends ElasticSearchClientConfig {
private int indexShardsNumber;
private int indexReplicasNumber;
private int ttl;
private boolean highPerformanceMode;
private int traceDataTTL;
private int minuteMetricDataTTL;
private int hourMetricDataTTL;
private int dayMetricDataTTL;
private int monthMetricDataTTL;
int getIndexShardsNumber() {
return indexShardsNumber;
......@@ -46,14 +50,6 @@ class StorageModuleEsConfig extends ElasticSearchClientConfig {
this.indexReplicasNumber = indexReplicasNumber;
}
int getTtl() {
return ttl;
}
void setTtl(int ttl) {
this.ttl = ttl;
}
boolean isHighPerformanceMode() {
return highPerformanceMode;
}
......@@ -61,4 +57,44 @@ class StorageModuleEsConfig extends ElasticSearchClientConfig {
void setHighPerformanceMode(boolean highPerformanceMode) {
this.highPerformanceMode = highPerformanceMode;
}
int getTraceDataTTL() {
return traceDataTTL;
}
void setTraceDataTTL(int traceDataTTL) {
this.traceDataTTL = traceDataTTL;
}
int getMinuteMetricDataTTL() {
return minuteMetricDataTTL;
}
void setMinuteMetricDataTTL(int minuteMetricDataTTL) {
this.minuteMetricDataTTL = minuteMetricDataTTL;
}
int getHourMetricDataTTL() {
return hourMetricDataTTL;
}
void setHourMetricDataTTL(int hourMetricDataTTL) {
this.hourMetricDataTTL = hourMetricDataTTL;
}
int getDayMetricDataTTL() {
return dayMetricDataTTL;
}
void setDayMetricDataTTL(int dayMetricDataTTL) {
this.dayMetricDataTTL = dayMetricDataTTL;
}
int getMonthMetricDataTTL() {
return monthMetricDataTTL;
}
void setMonthMetricDataTTL(int monthMetricDataTTL) {
this.monthMetricDataTTL = monthMetricDataTTL;
}
}
......@@ -27,8 +27,7 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -74,9 +73,9 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
protected abstract String timeBucketColumnNameForDelete();
@Override
public final void deleteHistory(Long startTimeBucket, Long endTimeBucket) {
public final void deleteHistory(Long timeBucketBefore) {
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).gte(startTimeBucket).lte(endTimeBucket),
QueryBuilders.rangeQuery(timeBucketColumnNameForDelete()).lte(timeBucketBefore),
tableName())
.get();
......
......@@ -18,64 +18,50 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.global.GlobalTrace;
import org.apache.skywalking.apm.collector.storage.table.global.GlobalTraceTable;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.global.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GlobalTraceEsPersistenceDAO extends EsDAO implements IGlobalTracePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GlobalTrace> {
private final Logger logger = LoggerFactory.getLogger(GlobalTraceEsPersistenceDAO.class);
public class GlobalTraceEsPersistenceDAO extends AbstractPersistenceEsDAO<GlobalTrace> implements IGlobalTracePersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, GlobalTrace> {
public GlobalTraceEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override
public GlobalTrace get(String id) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
@Override protected String tableName() {
return GlobalTraceTable.TABLE;
}
@Override
public UpdateRequestBuilder prepareBatchUpdate(GlobalTrace data) {
throw new UnexpectedException("There is no need to merge stream data with database data.");
@Override protected GlobalTrace esDataToStreamData(Map<String, Object> source) {
GlobalTrace globalTrace = new GlobalTrace();
globalTrace.setSegmentId((String)source.get(GlobalTraceTable.SEGMENT_ID.getName()));
globalTrace.setTraceId((String)source.get(GlobalTraceTable.TRACE_ID.getName()));
globalTrace.setTimeBucket(((Number)source.get(GlobalTraceTable.TIME_BUCKET.getName())).longValue());
return globalTrace;
}
@Override
public IndexRequestBuilder prepareBatchInsert(GlobalTrace data) {
@Override protected Map<String, Object> esStreamDataToEsData(GlobalTrace streamData) {
Map<String, Object> target = new HashMap<>();
target.put(GlobalTraceTable.SEGMENT_ID.getName(), data.getSegmentId());
target.put(GlobalTraceTable.TRACE_ID.getName(), data.getGlobalTraceId());
target.put(GlobalTraceTable.TIME_BUCKET.getName(), data.getTimeBucket());
logger.debug("global trace source: {}", target.toString());
return getClient().prepareIndex(GlobalTraceTable.TABLE, data.getId()).setSource(target);
target.put(GlobalTraceTable.SEGMENT_ID.getName(), streamData.getSegmentId());
target.put(GlobalTraceTable.TRACE_ID.getName(), streamData.getTraceId());
target.put(GlobalTraceTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
return target;
}
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(GlobalTraceTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket),
GlobalTraceTable.TABLE)
.get();
@Override protected String timeBucketColumnNameForDelete() {
return GlobalTraceTable.TIME_BUCKET.getName();
}
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, GlobalTraceTable.TABLE);
@GraphComputingMetric(name = "/persistence/get/" + GlobalTraceTable.TABLE)
@Override public GlobalTrace get(String id) {
return super.get(id);
}
}
......@@ -18,20 +18,17 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.register.Instance;
import org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -54,10 +51,10 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
instance.setId(id);
instance.setInstanceId(((Number)source.get(InstanceTable.INSTANCE_ID.getName())).intValue());
instance.setHeartBeatTime(((Number)source.get(InstanceTable.HEARTBEAT_TIME.getName())).longValue());
logger.debug("getApplicationId: {} is exists", id);
logger.debug("instance id: {} exists", id);
return instance;
} else {
logger.debug("getApplicationId: {} is not exists", id);
logger.debug("instance id: {} not exists", id);
return null;
}
}
......@@ -72,6 +69,6 @@ public class InstanceHeartBeatEsPersistenceDAO extends EsDAO implements IInstanc
return getClient().prepareUpdate(InstanceTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -18,21 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentDuration;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentDurationTable;
import org.apache.skywalking.apm.collector.storage.table.segment.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -57,7 +52,6 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
@Override
public IndexRequestBuilder prepareBatchInsert(SegmentDuration data) {
logger.debug("segment cost prepareBatchInsert, getApplicationId: {}", data.getId());
Map<String, Object> target = new HashMap<>();
target.put(SegmentDurationTable.SEGMENT_ID.getName(), data.getSegmentId());
target.put(SegmentDurationTable.APPLICATION_ID.getName(), data.getApplicationId());
......@@ -67,18 +61,14 @@ public class SegmentDurationEsPersistenceDAO extends EsDAO implements ISegmentDu
target.put(SegmentDurationTable.END_TIME.getName(), data.getEndTime());
target.put(SegmentDurationTable.IS_ERROR.getName(), data.getIsError());
target.put(SegmentDurationTable.TIME_BUCKET.getName(), data.getTimeBucket());
logger.debug("segment cost source: {}", target.toString());
return getClient().prepareIndex(SegmentDurationTable.TABLE, data.getId()).setSource(target);
}
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
@Override public void deleteHistory(Long timeBucketBefore) {
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(SegmentDurationTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket),
SegmentDurationTable.TABLE)
.get();
QueryBuilders.rangeQuery(SegmentDurationTable.TIME_BUCKET.getName()).lte(timeBucketBefore * 100),
SegmentDurationTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentDurationTable.TABLE);
......
......@@ -18,62 +18,48 @@
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentTable;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.segment.*;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class SegmentEsPersistenceDAO extends EsDAO implements ISegmentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, Segment> {
private static final Logger logger = LoggerFactory.getLogger(SegmentEsPersistenceDAO.class);
public class SegmentEsPersistenceDAO extends AbstractPersistenceEsDAO<Segment> implements ISegmentPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, Segment> {
public SegmentEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override
public Segment get(String id) {
return null;
@Override protected String tableName() {
return SegmentTable.TABLE;
}
@Override
public UpdateRequestBuilder prepareBatchUpdate(Segment data) {
return null;
@Override protected Segment esDataToStreamData(Map<String, Object> source) {
Segment segment = new Segment();
segment.setDataBinary(Base64.getDecoder().decode((String)source.get(SegmentTable.DATA_BINARY.getName())));
segment.setTimeBucket(((Number)source.get(SegmentTable.TIME_BUCKET.getName())).longValue());
return segment;
}
@Override
public IndexRequestBuilder prepareBatchInsert(Segment data) {
@Override protected Map<String, Object> esStreamDataToEsData(Segment streamData) {
Map<String, Object> target = new HashMap<>();
target.put(SegmentTable.DATA_BINARY.getName(), new String(Base64.getEncoder().encode(data.getDataBinary())));
target.put(SegmentTable.TIME_BUCKET.getName(), data.getTimeBucket());
logger.debug("segment source: {}", target.toString());
return getClient().prepareIndex(SegmentTable.TABLE, data.getId()).setSource(target);
target.put(SegmentTable.DATA_BINARY.getName(), new String(Base64.getEncoder().encode(streamData.getDataBinary())));
target.put(SegmentTable.TIME_BUCKET.getName(), streamData.getTimeBucket());
return target;
}
@Override
public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete(
QueryBuilders.rangeQuery(SegmentTable.TIME_BUCKET.getName()).gte(startTimeBucket).lte(endTimeBucket),
SegmentTable.TABLE)
.get();
@Override protected String timeBucketColumnNameForDelete() {
return SegmentTable.TIME_BUCKET.getName();
}
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, SegmentTable.TABLE);
@GraphComputingMetric(name = "/persistence/get/" + SegmentTable.TABLE)
@Override public Segment get(String id) {
return super.get(id);
}
}
......@@ -20,23 +20,13 @@ package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.List;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.BooleanUtils;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentDurationTable;
import org.apache.skywalking.apm.collector.storage.ui.trace.BasicTrace;
import org.apache.skywalking.apm.collector.storage.ui.trace.QueryOrder;
import org.apache.skywalking.apm.collector.storage.ui.trace.TraceBrief;
import org.apache.skywalking.apm.collector.storage.ui.trace.TraceState;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.apache.skywalking.apm.collector.storage.ui.trace.*;
import org.elasticsearch.action.search.*;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
......@@ -51,7 +41,8 @@ public class SegmentDurationEsUIDAO extends EsDAO implements ISegmentDurationUID
@Override
public TraceBrief loadTop(long startSecondTimeBucket, long endSecondTimeBucket, long minDuration, long maxDuration,
String operationName, int applicationId, int limit, int from, TraceState traceState, QueryOrder queryOrder, String... segmentIds) {
String operationName, int applicationId, int limit, int from, TraceState traceState, QueryOrder queryOrder,
String... segmentIds) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(SegmentDurationTable.TABLE);
searchRequestBuilder.setTypes(SegmentDurationTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
......@@ -60,7 +51,6 @@ public class SegmentDurationEsUIDAO extends EsDAO implements ISegmentDurationUID
List<QueryBuilder> mustQueryList = boolQueryBuilder.must();
if (startSecondTimeBucket != 0 && endSecondTimeBucket != 0) {
//TODO second
mustQueryList.add(QueryBuilders.rangeQuery(SegmentDurationTable.TIME_BUCKET.getName()).gte(startSecondTimeBucket).lte(endSecondTimeBucket));
}
......@@ -95,8 +85,8 @@ public class SegmentDurationEsUIDAO extends EsDAO implements ISegmentDurationUID
case BY_START_TIME:
searchRequestBuilder.addSort(SegmentDurationTable.START_TIME.getName(), SortOrder.DESC);
break;
case
BY_DURATION:searchRequestBuilder.addSort(SegmentDurationTable.DURATION.getName(), SortOrder.DESC);
case BY_DURATION:
searchRequestBuilder.addSort(SegmentDurationTable.DURATION.getName(), SortOrder.DESC);
break;
}
searchRequestBuilder.setSize(limit);
......
......@@ -18,68 +18,35 @@
package org.apache.skywalking.apm.collector.storage.es;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.joda.time.DateTime;
import org.junit.*;
import org.powermock.reflect.Whitebox;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
/**
* @author peng-yongsheng
*/
public class DataTTLKeeperTimerTestCase {
@Test
public void testConvertTimeBucket() throws ParseException {
DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null, 8);
DataTTLKeeperTimer.TimeBuckets timeBuckets = timer.convertTimeBucket();
long minuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(System.currentTimeMillis());
long dayTimeBucket = TimeBucketUtils.INSTANCE.minuteToDay(minuteTimeBucket);
Date dayTimeBucketSource = new SimpleDateFormat("yyyyMMdd").parse(String.valueOf(dayTimeBucket));
Calendar calendar = Calendar.getInstance();
calendar.setTime(dayTimeBucketSource);
calendar.add(Calendar.DAY_OF_MONTH, -8);
calendar.set(Calendar.HOUR_OF_DAY, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
long newMinuteTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(calendar.getTimeInMillis());
long newDayTimeBucket = TimeBucketUtils.INSTANCE.minuteToDay(newMinuteTimeBucket);
long startSecondTimeBucket = Whitebox.getInternalState(timeBuckets, "startSecondTimeBucket");
Assert.assertEquals(newDayTimeBucket * 1000000, startSecondTimeBucket);
long endSecondTimeBucket = Whitebox.getInternalState(timeBuckets, "endSecondTimeBucket");
Assert.assertEquals(newDayTimeBucket * 1000000 + 235959, endSecondTimeBucket);
long startMinuteTimeBucket = Whitebox.getInternalState(timeBuckets, "startMinuteTimeBucket");
Assert.assertEquals(newDayTimeBucket * 10000, startMinuteTimeBucket);
long endMinuteTimeBucket = Whitebox.getInternalState(timeBuckets, "endMinuteTimeBucket");
Assert.assertEquals(newDayTimeBucket * 10000 + 2359, endMinuteTimeBucket);
public void testConvertTimeBucket() {
DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null);
long startHourTimeBucket = Whitebox.getInternalState(timeBuckets, "startHourTimeBucket");
Assert.assertEquals(newDayTimeBucket * 100, startHourTimeBucket);
DateTime currentTime = new DateTime(2018, 5, 26, 15, 5);
DataTTLKeeperTimer.TimeBuckets timeBuckets = timer.convertTimeBucket(currentTime);
long endHourTimeBucket = Whitebox.getInternalState(timeBuckets, "endHourTimeBucket");
Assert.assertEquals(newDayTimeBucket * 100 + 23, endHourTimeBucket);
long traceDataBefore = Whitebox.getInternalState(timeBuckets, "traceDataBefore");
Assert.assertEquals(201805261335L, traceDataBefore);
long startDayTimeBucket = Whitebox.getInternalState(timeBuckets, "startDayTimeBucket");
Assert.assertEquals(newDayTimeBucket, startDayTimeBucket);
long minuteTimeBucketBefore = Whitebox.getInternalState(timeBuckets, "minuteTimeBucketBefore");
Assert.assertEquals(201805261335L, minuteTimeBucketBefore);
long endDayTimeBucket = Whitebox.getInternalState(timeBuckets, "endDayTimeBucket");
Assert.assertEquals(newDayTimeBucket, endDayTimeBucket);
long hourTimeBucketBefore = Whitebox.getInternalState(timeBuckets, "hourTimeBucketBefore");
Assert.assertEquals(2018052503, hourTimeBucketBefore);
long startMonthTimeBucket = Whitebox.getInternalState(timeBuckets, "startMonthTimeBucket");
Assert.assertEquals(newDayTimeBucket / 100, startMonthTimeBucket);
long dayTimeBucketBefore = Whitebox.getInternalState(timeBuckets, "dayTimeBucketBefore");
Assert.assertEquals(20180411, dayTimeBucketBefore);
long endMonthTimeBucket = Whitebox.getInternalState(timeBuckets, "endMonthTimeBucket");
Assert.assertEquals(newDayTimeBucket / 100, endMonthTimeBucket);
long monthTimeBucketBefore = Whitebox.getInternalState(timeBuckets, "monthTimeBucketBefore");
Assert.assertEquals(201611, monthTimeBucketBefore);
}
}
......@@ -90,6 +90,6 @@ public abstract class AbstractPersistenceH2DAO<STREAM_DATA extends StreamData> e
return entity;
}
@Override public final void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -55,7 +55,7 @@ public class GlobalTraceH2PersistenceDAO extends H2DAO implements IGlobalTracePe
H2SqlEntity entity = new H2SqlEntity();
target.put(GlobalTraceTable.ID.getName(), data.getId());
target.put(GlobalTraceTable.SEGMENT_ID.getName(), data.getSegmentId());
target.put(GlobalTraceTable.TRACE_ID.getName(), data.getGlobalTraceId());
target.put(GlobalTraceTable.TRACE_ID.getName(), data.getTraceId());
target.put(GlobalTraceTable.TIME_BUCKET.getName(), data.getTimeBucket());
logger.debug("global trace source: {}", target.toString());
......@@ -65,6 +65,6 @@ public class GlobalTraceH2PersistenceDAO extends H2DAO implements IGlobalTracePe
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -18,23 +18,16 @@
package org.apache.skywalking.apm.collector.storage.h2.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.apm.collector.client.h2.*;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.apache.skywalking.apm.collector.storage.table.register.Instance;
import org.apache.skywalking.apm.collector.storage.table.register.InstanceTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.slf4j.*;
/**
* @author peng-yongsheng, clevertension
......@@ -83,6 +76,6 @@ public class InstanceHeartBeatH2PersistenceDAO extends H2DAO implements IInstanc
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -18,17 +18,14 @@
package org.apache.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentDuration;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentDurationTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.collector.storage.table.segment.*;
import org.slf4j.*;
/**
* @author peng-yongsheng, clevertension
......@@ -70,6 +67,6 @@ public class SegmentDurationH2PersistenceDAO extends H2DAO implements ISegmentDu
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -18,17 +18,14 @@
package org.apache.skywalking.apm.collector.storage.h2.dao;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.apache.skywalking.apm.collector.storage.table.segment.Segment;
import org.apache.skywalking.apm.collector.storage.table.segment.SegmentTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.collector.storage.table.segment.*;
import org.slf4j.*;
/**
* @author peng-yongsheng, clevertension
......@@ -63,6 +60,6 @@ public class SegmentH2PersistenceDAO extends H2DAO implements ISegmentPersistenc
return null;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -20,19 +20,13 @@ package org.apache.skywalking.apm.collector.ui.query;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.ui.trace.*;
import org.apache.skywalking.apm.collector.ui.graphql.Query;
import org.apache.skywalking.apm.collector.ui.service.SegmentTopService;
import org.apache.skywalking.apm.collector.ui.service.TraceStackService;
import org.apache.skywalking.apm.collector.ui.utils.DurationUtils;
import org.apache.skywalking.apm.collector.ui.utils.PaginationUtils;
import org.apache.skywalking.apm.collector.ui.service.*;
import org.apache.skywalking.apm.collector.ui.utils.*;
import java.text.ParseException;
import static java.util.Objects.isNull;
import static java.util.Objects.nonNull;
import static java.util.Objects.*;
/**
* @author peng-yongsheng
......@@ -61,7 +55,7 @@ public class TraceQuery implements Query {
return traceStackService;
}
public TraceBrief queryBasicTraces(TraceQueryCondition condition) throws ParseException {
public TraceBrief queryBasicTraces(TraceQueryCondition condition) {
long startSecondTimeBucket = 0;
long endSecondTimeBucket = 0;
String traceId = Const.EMPTY_STRING;
......@@ -83,7 +77,7 @@ public class TraceQuery implements Query {
QueryOrder queryOrder = condition.getQueryOrder();
PaginationUtils.Page page = PaginationUtils.INSTANCE.exchange(condition.getPaging());
return getSegmentTopService().loadTop(startSecondTimeBucket, endSecondTimeBucket, minDuration, maxDuration, operationName, traceId, applicationId, page.getLimit(), page.getFrom(),traceState,queryOrder);
return getSegmentTopService().loadTop(startSecondTimeBucket, endSecondTimeBucket, minDuration, maxDuration, operationName, traceId, applicationId, page.getLimit(), page.getFrom(), traceState, queryOrder);
}
public Trace queryTrace(String traceId) {
......
......@@ -20,16 +20,11 @@ package org.apache.skywalking.apm.collector.ui.service;
import java.util.List;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.ui.IGlobalTraceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
import org.apache.skywalking.apm.collector.storage.ui.trace.QueryOrder;
import org.apache.skywalking.apm.collector.storage.ui.trace.TraceBrief;
import org.apache.skywalking.apm.collector.storage.ui.trace.TraceState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.collector.storage.dao.ui.*;
import org.apache.skywalking.apm.collector.storage.ui.trace.*;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -47,12 +42,12 @@ public class SegmentTopService {
}
public TraceBrief loadTop(long startSecondTimeBucket, long endSecondTimeBucket, long minDuration, long maxDuration,
String operationName,
String traceId, int applicationId, int limit, int from, TraceState traceState, QueryOrder queryOrder) {
String operationName,
String traceId, int applicationId, int limit, int from, TraceState traceState, QueryOrder queryOrder) {
logger.debug("startSecondTimeBucket: {}, endSecondTimeBucket: {}, minDuration: {}, " +
"maxDuration: {}, operationName: {}, traceId: {}, applicationId: {}, limit: {}, from: {}, traceState: {}, queryOrder: {}",
startSecondTimeBucket, endSecondTimeBucket, minDuration,
maxDuration, operationName, traceId, applicationId, limit, from,traceState,queryOrder);
maxDuration, operationName, traceId, applicationId, limit, from, traceState, queryOrder);
TraceBrief traceBrief;
if (StringUtils.isNotEmpty(traceId)) {
......
......@@ -38,10 +38,10 @@ cluster:
sessionTimeout: 100000
naming:
# 配置探针使用的host和port
jetty:
jetty:
host: localhost
port: 10800
context_path: /
contextPath: /
remote:
gRPC:
host: localhost
......@@ -54,30 +54,50 @@ agent_jetty:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
analysis_register:
default:
analysis_jvm:
default:
analysis_segment_parser:
default:
buffer_file_path: ../buffer/
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
bufferFilePath: ../buffer/
bufferOffsetMaxFileSize: 10M
bufferSegmentMaxFileSize: 500M
ui:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
# 配置 Elasticsearch 集群连接信息
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
ttl: 7
clusterName: CollectorDBCluster
clusterTransportSniffer: true
clusterNodes: localhost:9300
indexShardsNumber: 2
indexReplicasNumber: 0
highPerformanceMode: true
# 设置统计指标数据的失效时间,当指标数据失效时系统将数据自动删除.
traceDataTTL: 90 # 单位为分
minuteMetricDataTTL: 90 # 单位为分
hourMetricDataTTL: 36 # 单位为小时
dayMetricDataTTL: 45 # 单位为天
monthMetricDataTTL: 18 # 单位为月
configuration:
default:
# namespace: xxxxx
# 告警阀值
applicationApdexThreshold: 2000
serviceErrorRateThreshold: 10.00
serviceAverageResponseTimeThreshold: 2000
instanceErrorRateThreshold: 10.00
instanceAverageResponseTimeThreshold: 2000
applicationErrorRateThreshold: 10.00
applicationAverageResponseTimeThreshold: 2000
# 热力图配置,修改配置后需要删除热力指标统计表,由系统重建
thermodynamicResponseTimeStep: 50
thermodynamicCountOfResponseTimeSteps: 40
```
......
......@@ -37,10 +37,10 @@ cluster:
sessionTimeout: 100000
naming:
# Host and port used for agent config
jetty:
host: localhost
port: 10800
contextPath: /
jetty:
host: localhost
port: 10800
contextPath: /
remote:
gRPC:
host: localhost
......@@ -76,10 +76,17 @@ storage:
clusterNodes: localhost:9300
indexShardsNumber: 2
indexReplicasNumber: 0
ttl: 7
highPerformanceMode: true
# Set a timeout on metric data. After the timeout has expired, the metric data will automatically be deleted.
traceDataTTL: 90 # Unit is minute
minuteMetricDataTTL: 45 # Unit is minute
hourMetricDataTTL: 36 # Unit is hour
dayMetricDataTTL: 45 # Unit is day
monthMetricDataTTL: 18 # Unit is month
configuration:
default:
# namespace: xxxxx
# alarm threshold
applicationApdexThreshold: 2000
serviceErrorRateThreshold: 10.00
serviceAverageResponseTimeThreshold: 2000
......@@ -87,6 +94,9 @@ configuration:
instanceAverageResponseTimeThreshold: 2000
applicationErrorRateThreshold: 10.00
applicationAverageResponseTimeThreshold: 2000
# thermodynamic
thermodynamicResponseTimeStep: 50
thermodynamicCountOfResponseTimeSteps: 40
```
3. Run `bin/collectorService.sh`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册