未验证 提交 f32d3d07 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Enhance the storage session mechanism (#7221)

上级 da5af095
...@@ -63,6 +63,9 @@ Release Notes. ...@@ -63,6 +63,9 @@ Release Notes.
metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min. metrics. The timeout of the cache for minute and hour level metrics has been prolonged to ~5 min.
* Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC. * Performance: Add L1 aggregation flush period, which reduce the CPU load and help young GC.
* Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages. * Support connectTimeout and socketTimeout settings for ElasticSearch6 and ElasticSearch7 storages.
* Re-implement storage session mechanism, cached metrics are removed only according to their last access timestamp,
rather than first time. This makes sure hot data never gets removed unexpectedly.
* Support session expired threshold configurable.
#### UI #### UI
......
...@@ -24,6 +24,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode ...@@ -24,6 +24,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3| | - | - | recordDataTTL|The lifecycle of record data. Record data includes traces, top n sampled records, and logs. Unit is day. Minimal value is 2.|SW_CORE_RECORD_DATA_TTL|3|
| - | - | metricsDataTTL|The lifecycle of metrics data, including the metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value is 2.| SW_CORE_METRICS_DATA_TTL|7| | - | - | metricsDataTTL|The lifecycle of metrics data, including the metadata. Unit is day. Recommend metricsDataTTL >= recordDataTTL. Minimal value is 2.| SW_CORE_METRICS_DATA_TTL|7|
| - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation. Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 | | - | - | l1FlushPeriod| The period of L1 aggregation flush to L2 aggregation. Unit is ms. | SW_CORE_L1_AGGREGATION_FLUSH_PERIOD | 500 |
| - | - | storageSessionTimeout| The threshold of session time. Unit is ms. Default value is 70s. | SW_CORE_STORAGE_SESSION_TIMEOUT | 70000 |
| - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute.|SW_CORE_ENABLE_DATABASE_SESSION|true| | - | - | enableDatabaseSession|Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute.|SW_CORE_ENABLE_DATABASE_SESSION|true|
| - | - | topNReportPeriod|The execution period of top N sampler, which saves sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10| | - | - | topNReportPeriod|The execution period of top N sampler, which saves sampled data into the storage. Unit is minute|SW_CORE_TOPN_REPORT_PERIOD|10|
| - | - | activeExtraModelColumns|Append the names of entity, such as service name, into the metrics storage entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false| | - | - | activeExtraModelColumns|Append the names of entity, such as service name, into the metrics storage entities.|SW_CORE_ACTIVE_EXTRA_MODEL_COLUMNS|false|
......
...@@ -87,6 +87,8 @@ core: ...@@ -87,6 +87,8 @@ core:
metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day metricsDataTTL: ${SW_CORE_METRICS_DATA_TTL:7} # Unit is day
# The period of L1 aggregation flush to L2 aggregation. Unit is ms. # The period of L1 aggregation flush to L2 aggregation. Unit is ms.
l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500} l1FlushPeriod: ${SW_CORE_L1_AGGREGATION_FLUSH_PERIOD:500}
# The threshold of session time. Unit is ms. Default value is 70s.
storageSessionTimeout: ${SW_CORE_STORAGE_SESSION_TIMEOUT:70000}
# Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute, # Cache metrics data for 1 minute to reduce database queries, and if the OAP cluster changes within that minute,
# the metrics may not be accurate within that minute. # the metrics may not be accurate within that minute.
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true} enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
......
...@@ -55,6 +55,10 @@ public class CoreModuleConfig extends ModuleConfig { ...@@ -55,6 +55,10 @@ public class CoreModuleConfig extends ModuleConfig {
* Enable database flush session. * Enable database flush session.
*/ */
private boolean enableDatabaseSession; private boolean enableDatabaseSession;
/**
* The threshold of session time. Unit is ms. Default value is 70s.
*/
private long storageSessionTimeout = 70_000;
private final List<String> downsampling; private final List<String> downsampling;
/** /**
* The period of doing data persistence. Unit is second. * The period of doing data persistence. Unit is second.
......
...@@ -288,8 +288,10 @@ public class CoreModuleProvider extends ModuleProvider { ...@@ -288,8 +288,10 @@ public class CoreModuleProvider extends ModuleProvider {
this.registerServiceImplementation( this.registerServiceImplementation(
UITemplateManagementService.class, new UITemplateManagementService(getManager())); UITemplateManagementService.class, new UITemplateManagementService(getManager()));
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession()); final MetricsStreamProcessor metricsStreamProcessor = MetricsStreamProcessor.getInstance();
MetricsStreamProcessor.getInstance().setL1FlushPeriod(moduleConfig.getL1FlushPeriod()); metricsStreamProcessor.setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
metricsStreamProcessor.setL1FlushPeriod(moduleConfig.getL1FlushPeriod());
metricsStreamProcessor.setStorageSessionTimeout(moduleConfig.getStorageSessionTimeout());
TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod()); TopNStreamProcessor.getInstance().setTopNWorkerReportCycle(moduleConfig.getTopNReportPeriod());
apdexThresholdConfig = new ApdexThresholdConfig(this); apdexThresholdConfig = new ApdexThresholdConfig(this);
ApdexMetrics.setDICT(apdexThresholdConfig); ApdexMetrics.setDICT(apdexThresholdConfig);
......
...@@ -50,13 +50,14 @@ public abstract class Metrics extends StreamData implements StorageData { ...@@ -50,13 +50,14 @@ public abstract class Metrics extends StreamData implements StorageData {
* Time in the cache, only work when MetricsPersistentWorker#enableDatabaseSession == true. * Time in the cache, only work when MetricsPersistentWorker#enableDatabaseSession == true.
*/ */
@Getter @Getter
private long survivalTime = 0L; private long lastUpdateTimestamp = 0L;
/** /**
* Merge the given metrics instance, these two must be the same metrics type. * Merge the given metrics instance, these two must be the same metrics type.
* *
* @param metrics to be merged * @param metrics to be merged
* @return {@code true} if the combined metrics should be continuously processed. {@code false} means it should be abandoned, and the implementation needs to keep the data unaltered in this case. * @return {@code true} if the combined metrics should be continuously processed. {@code false} means it should be
* abandoned, and the implementation needs to keep the data unaltered in this case.
*/ */
public abstract boolean combine(Metrics metrics); public abstract boolean combine(Metrics metrics);
...@@ -80,12 +81,21 @@ public abstract class Metrics extends StreamData implements StorageData { ...@@ -80,12 +81,21 @@ public abstract class Metrics extends StreamData implements StorageData {
public abstract Metrics toDay(); public abstract Metrics toDay();
/** /**
* Extend the {@link #survivalTime} * Set the last update timestamp
* *
* @param value to extend * @param timestamp last update timestamp
*/ */
public void extendSurvivalTime(long value) { public void setLastUpdateTimestamp(long timestamp) {
survivalTime += value; lastUpdateTimestamp = timestamp;
}
/**
* @param timestamp of current time
* @param expiredThreshold represents the duration between last update time and the time point removing from cache.
* @return true means this metrics should be removed from cache.
*/
public boolean isExpired(long timestamp, long expiredThreshold) {
return timestamp - lastUpdateTimestamp > expiredThreshold;
} }
public long toTimeBucketInHour() { public long toTimeBucketInHour() {
......
...@@ -65,12 +65,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -65,12 +65,13 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
private final Optional<MetricsTransWorker> transWorker; private final Optional<MetricsTransWorker> transWorker;
private final boolean enableDatabaseSession; private final boolean enableDatabaseSession;
private final boolean supportUpdate; private final boolean supportUpdate;
private long sessionTimeout;
private CounterMetrics aggregationCounter; private CounterMetrics aggregationCounter;
private long sessionTimeout = 70_000; // Unit, ms. 70,000ms means more than one minute.
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker, AbstractWorker<Metrics> nextAlarmWorker, AbstractWorker<ExportEvent> nextExportWorker,
MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate) { MetricsTransWorker transWorker, boolean enableDatabaseSession, boolean supportUpdate,
long storageSessionTimeout) {
super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData())); super(moduleDefineHolder, new ReadWriteSafeCache<>(new MergableBufferedData(), new MergableBufferedData()));
this.model = model; this.model = model;
this.context = new HashMap<>(100); this.context = new HashMap<>(100);
...@@ -80,6 +81,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -80,6 +81,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
this.nextExportWorker = Optional.ofNullable(nextExportWorker); this.nextExportWorker = Optional.ofNullable(nextExportWorker);
this.transWorker = Optional.ofNullable(transWorker); this.transWorker = Optional.ofNullable(transWorker);
this.supportUpdate = supportUpdate; this.supportUpdate = supportUpdate;
this.sessionTimeout = storageSessionTimeout;
String name = "METRICS_L2_AGGREGATION"; String name = "METRICS_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8; int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
...@@ -111,15 +113,15 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -111,15 +113,15 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
* Create the leaf and down-sampling MetricsPersistentWorker, no next step. * Create the leaf and down-sampling MetricsPersistentWorker, no next step.
*/ */
MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO, MetricsPersistentWorker(ModuleDefineHolder moduleDefineHolder, Model model, IMetricsDAO metricsDAO,
boolean enableDatabaseSession, boolean supportUpdate) { boolean enableDatabaseSession, boolean supportUpdate, long storageSessionTimeout) {
this(moduleDefineHolder, model, metricsDAO, this(moduleDefineHolder, model, metricsDAO,
null, null, null, null, null, null,
enableDatabaseSession, supportUpdate enableDatabaseSession, supportUpdate, storageSessionTimeout
); );
// For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes. // For a down-sampling metrics, we prolong the session timeout for 4 times, nearly 5 minutes.
// And add offset according to worker creation sequence, to avoid context clear overlap, // And add offset according to worker creation sequence, to avoid context clear overlap,
// eventually optimize load of IDs reading. // eventually optimize load of IDs reading.
this.sessionTimeout = sessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200; this.sessionTimeout = this.sessionTimeout * 4 + SESSION_TIMEOUT_OFFSITE_COUNTER * 200;
} }
/** /**
...@@ -171,6 +173,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -171,6 +173,7 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
try { try {
loadFromStorage(metricsList); loadFromStorage(metricsList);
long timestamp = System.currentTimeMillis();
for (Metrics metrics : metricsList) { for (Metrics metrics : metricsList) {
Metrics cachedMetrics = context.get(metrics); Metrics cachedMetrics = context.get(metrics);
if (cachedMetrics != null) { if (cachedMetrics != null) {
...@@ -191,10 +194,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -191,10 +194,12 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
cachedMetrics.calculate(); cachedMetrics.calculate();
prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics)); prepareRequests.add(metricsDAO.prepareBatchUpdate(model, cachedMetrics));
nextWorker(cachedMetrics); nextWorker(cachedMetrics);
cachedMetrics.setLastUpdateTimestamp(timestamp);
} else { } else {
metrics.calculate(); metrics.calculate();
prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics)); prepareRequests.add(metricsDAO.prepareBatchInsert(model, metrics));
nextWorker(metrics); nextWorker(metrics);
metrics.setLastUpdateTimestamp(timestamp);
} }
/* /*
...@@ -221,14 +226,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -221,14 +226,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
*/ */
private void loadFromStorage(List<Metrics> metrics) { private void loadFromStorage(List<Metrics> metrics) {
try { try {
List<Metrics> noInCacheMetrics = metrics.stream() List<Metrics> notInCacheMetrics = metrics.stream()
.filter(m -> !context.containsKey(m) || !enableDatabaseSession) .filter(m -> !context.containsKey(m) || !enableDatabaseSession)
.collect(Collectors.toList()); .collect(Collectors.toList());
if (noInCacheMetrics.isEmpty()) { if (notInCacheMetrics.isEmpty()) {
return; return;
} }
final List<Metrics> dbMetrics = metricsDAO.multiGet(model, noInCacheMetrics); final List<Metrics> dbMetrics = metricsDAO.multiGet(model, notInCacheMetrics);
if (!enableDatabaseSession) { if (!enableDatabaseSession) {
// Clear the cache only after results from DB are returned successfully. // Clear the cache only after results from DB are returned successfully.
context.clear(); context.clear();
...@@ -240,14 +245,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> { ...@@ -240,14 +245,14 @@ public class MetricsPersistentWorker extends PersistenceWorker<Metrics> {
} }
@Override @Override
public void endOfRound(long tookTime) { public void endOfRound() {
if (enableDatabaseSession) { if (enableDatabaseSession) {
Iterator<Metrics> iterator = context.values().iterator(); Iterator<Metrics> iterator = context.values().iterator();
long timestamp = System.currentTimeMillis();
while (iterator.hasNext()) { while (iterator.hasNext()) {
Metrics metrics = iterator.next(); Metrics metrics = iterator.next();
metrics.extendSurvivalTime(tookTime);
if (metrics.getSurvivalTime() > sessionTimeout) { if (metrics.isExpired(timestamp, sessionTimeout)) {
iterator.remove(); iterator.remove();
} }
} }
......
...@@ -82,6 +82,11 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -82,6 +82,11 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
@Setter @Setter
@Getter @Getter
private boolean enableDatabaseSession; private boolean enableDatabaseSession;
/**
* The threshold of session time. Unit is ms. Default value is 70s.
*/
@Setter
private long storageSessionTimeout = 70_000;
public static MetricsStreamProcessor getInstance() { public static MetricsStreamProcessor getInstance() {
return PROCESSOR; return PROCESSOR;
...@@ -191,7 +196,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -191,7 +196,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker( MetricsPersistentWorker minutePersistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession, moduleDefineHolder, model, metricsDAO, alarmNotifyWorker, exportWorker, transWorker, enableDatabaseSession,
supportUpdate supportUpdate, storageSessionTimeout
); );
persistentWorkers.add(minutePersistentWorker); persistentWorkers.add(minutePersistentWorker);
...@@ -203,7 +208,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> { ...@@ -203,7 +208,7 @@ public class MetricsStreamProcessor implements StreamProcessor<Metrics> {
Model model, Model model,
boolean supportUpdate) { boolean supportUpdate) {
MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker( MetricsPersistentWorker persistentWorker = new MetricsPersistentWorker(
moduleDefineHolder, model, metricsDAO, enableDatabaseSession, supportUpdate); moduleDefineHolder, model, metricsDAO, enableDatabaseSession, supportUpdate, storageSessionTimeout);
persistentWorkers.add(persistentWorker); persistentWorkers.add(persistentWorker);
return persistentWorker; return persistentWorker;
......
...@@ -55,10 +55,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr ...@@ -55,10 +55,8 @@ public abstract class PersistenceWorker<INPUT extends StorageData> extends Abstr
/** /**
* The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}. * The persistence process is driven by the {@link org.apache.skywalking.oap.server.core.storage.PersistenceTimer}.
* This is a notification method for the worker when every round finished. * This is a notification method for the worker when every round finished.
*
* @param tookTime The time costs in this round.
*/ */
public abstract void endOfRound(long tookTime); public abstract void endOfRound();
/** /**
* Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage * Prepare the batch persistence, transfer all prepared data to the executable data format based on the storage
......
...@@ -86,7 +86,7 @@ public class TopNWorker extends PersistenceWorker<TopN> { ...@@ -86,7 +86,7 @@ public class TopNWorker extends PersistenceWorker<TopN> {
* This method used to clear the expired cache, but TopN is not following it. * This method used to clear the expired cache, but TopN is not following it.
*/ */
@Override @Override
public void endOfRound(long tookTime) { public void endOfRound() {
} }
@Override @Override
......
...@@ -54,9 +54,8 @@ public enum PersistenceTimer { ...@@ -54,9 +54,8 @@ public enum PersistenceTimer {
private HistogramMetrics prepareLatency; private HistogramMetrics prepareLatency;
private HistogramMetrics executeLatency; private HistogramMetrics executeLatency;
private HistogramMetrics allLatency; private HistogramMetrics allLatency;
private long lastTime = System.currentTimeMillis();
private int syncOperationThreadsNum; private int syncOperationThreadsNum;
private int maxSyncoperationNum; private int maxSyncOperationNum;
private ExecutorService executorService; private ExecutorService executorService;
private ExecutorService prepareExecutorService; private ExecutorService prepareExecutorService;
...@@ -89,7 +88,7 @@ public enum PersistenceTimer { ...@@ -89,7 +88,7 @@ public enum PersistenceTimer {
); );
syncOperationThreadsNum = moduleConfig.getSyncThreads(); syncOperationThreadsNum = moduleConfig.getSyncThreads();
maxSyncoperationNum = moduleConfig.getMaxSyncOperationNum(); maxSyncOperationNum = moduleConfig.getMaxSyncOperationNum();
executorService = Executors.newFixedThreadPool(syncOperationThreadsNum); executorService = Executors.newFixedThreadPool(syncOperationThreadsNum);
prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads()); prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads());
if (!isStarted) { if (!isStarted) {
...@@ -116,7 +115,7 @@ public enum PersistenceTimer { ...@@ -116,7 +115,7 @@ public enum PersistenceTimer {
AtomicBoolean stop = new AtomicBoolean(false); AtomicBoolean stop = new AtomicBoolean(false);
DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new DefaultBlockingBatchQueue( DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new DefaultBlockingBatchQueue(
this.maxSyncoperationNum); this.maxSyncOperationNum);
try { try {
List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>(); List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers()); persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
...@@ -142,7 +141,7 @@ public enum PersistenceTimer { ...@@ -142,7 +141,7 @@ public enum PersistenceTimer {
// Push the prepared requests into DefaultBlockingBatchQueue, // Push the prepared requests into DefaultBlockingBatchQueue,
// the executorService consumes from it when it reaches the size of batch. // the executorService consumes from it when it reaches the size of batch.
prepareQueue.offer(innerPrepareRequests); prepareQueue.offer(innerPrepareRequests);
worker.endOfRound(System.currentTimeMillis() - lastTime); worker.endOfRound();
} finally { } finally {
timer.finish(); timer.finish();
prepareStageCountDownLatch.countDown(); prepareStageCountDownLatch.countDown();
...@@ -194,7 +193,6 @@ public enum PersistenceTimer { ...@@ -194,7 +193,6 @@ public enum PersistenceTimer {
stop.set(true); stop.set(true);
allTimer.finish(); allTimer.finish();
lastTime = System.currentTimeMillis();
} }
if (debug) { if (debug) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册