未验证 提交 a966eea3 编写于 作者: A Alvin 提交者: GitHub

Support prepare and save metrics concurrency (#7153)

上级 de1d0461
......@@ -42,6 +42,7 @@ Release Notes.
* Upgrade commons-lang3 to avoid potential NPE in some JDK versions.
* OAL supports generating metrics from events.
* Support endpoint name grouping by OpenAPI definitions.
* Concurrent create PrepareRequest when persist Metrics
* Fix CounterWindow increase computing issue.
* Performance: optimize Envoy ALS analyzer performance in high traffic load scenario (reduce ~1cpu in ~10k RPS).
* Performance: trim useless metadata fields in Envoy ALS metadata to improve performance.
......
......@@ -41,6 +41,7 @@ core|default|role|Option values, `Mixed/Receiver/Aggregator`. **Receiver** mode
| - | - | maxPageSizeOfQueryProfileSnapshot|The max size in every OAP query for snapshot analysis| - | 500 |
| - | - | maxSizeOfAnalyzeProfileSnapshot|The max number of snapshots analyzed by OAP| - | 12000 |
| - | - | syncThreads|The number of threads used to synchronously refresh the metrics data to the storage.| SW_CORE_SYNC_THREADS | 2 |
| - | - | prepareThreads|The number of threads used to prepare metrics data to the storage.| SW_CORE_PREPARE_THREADS | 2 |
| - | - | maxSyncOperationNum|The maximum number of processes supported for each synchronous storage operation. When the number of the flush data is greater than this value, it will be assigned to multiple cores for execution.| SW_CORE_MAX_SYNC_OPERATION_NUM | 50000 |
| - | - | enableEndpointNameGroupingByOpenapi |Turn it on then automatically grouping endpoint by the given OpenAPI definitions.| SW_CORE_ENABLE_ENDPOINT_NAME_GROUPING_BY_OPAENAPI | true |
|cluster|standalone| - | standalone is not suitable for one node running, no available configuration.| - | - |
......
......@@ -106,6 +106,8 @@ core:
searchableAlarmTags: ${SW_SEARCHABLE_ALARM_TAG_KEYS:level}
# The number of threads used to synchronously refresh the metrics data to the storage.
syncThreads: ${SW_CORE_SYNC_THREADS:2}
# The number of threads used to prepare metrics data to the storage.
prepareThreads: ${SW_CORE_PREPARE_THREADS:2}
# The maximum number of processes supported for each synchronous storage operation. When the number of the flush data is greater than this value, it will be assigned to multiple cores for execution.
maxSyncOperationNum: ${SW_CORE_MAX_SYNC_OPERATION_NUM:50000}
# Turn it on then automatically grouping endpoint by the given OpenAPI definitions.
......
......@@ -52,7 +52,7 @@ public class CoreModuleConfig extends ModuleConfig {
/**
* The period of doing data persistence. Unit is second.
*/
@Setter
private long persistentPeriod = 3;
private boolean enableDataKeeperExecutor = true;
......@@ -150,6 +150,15 @@ public class CoreModuleConfig extends ModuleConfig {
@Getter
private int syncThreads = 2;
/**
* The number of threads used to prepare metrics data to the storage.
*
* @since 8.7.0
*/
@Setter
@Getter
private int prepareThreads = 2;
/**
* The maximum number of processes supported for each synchronous storage operation. When the number of the flush
* data is greater than this value, it will be assigned to multiple cores for execution.
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.List;
/**
* A blocking queue implementation for persistent process.
* Poll method only returns when it matches the threshold or no further appending declared.
*/
interface BlockingBatchQueue<E> {
List<E> poll() throws InterruptedException;
void offer(List<E> elements);
void noFurtherAppending();
void furtherAppending();
int size();
}
......@@ -18,13 +18,18 @@
package org.apache.skywalking.oap.server.core.storage;
import com.google.common.collect.Lists;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
......@@ -43,16 +48,18 @@ import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
@Slf4j
public enum PersistenceTimer {
INSTANCE;
private Boolean isStarted = false;
@VisibleForTesting
boolean isStarted = false;
private final Boolean debug;
private CounterMetrics errorCounter;
private HistogramMetrics prepareLatency;
private HistogramMetrics executeLatency;
private HistogramMetrics allLatency;
private long lastTime = System.currentTimeMillis();
private final List<PrepareRequest> prepareRequests = new ArrayList<>(50000);
private int syncOperationThreadsNum;
private int maxSyncoperationNum;
private ExecutorService executorService;
private ExecutorService prepareExecutorService;
PersistenceTimer() {
this.debug = System.getProperty("debug") != null;
......@@ -77,9 +84,15 @@ public enum PersistenceTimer {
"persistence_timer_bulk_execute_latency", "Latency of the execute stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
allLatency = metricsCreator.createHistogramMetric(
"persistence_timer_bulk_all_latency", "Latency of the all stage in persistence timer",
MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE
);
syncOperationThreadsNum = moduleConfig.getSyncThreads();
maxSyncoperationNum = moduleConfig.getMaxSyncOperationNum();
executorService = Executors.newFixedThreadPool(syncOperationThreadsNum);
prepareExecutorService = Executors.newFixedThreadPool(moduleConfig.getPrepareThreads());
if (!isStarted) {
Executors.newSingleThreadScheduledExecutor()
.scheduleWithFixedDelay(
......@@ -93,43 +106,61 @@ public enum PersistenceTimer {
}
private void extractDataAndSave(IBatchDAO batchDAO) {
if (log.isDebugEnabled()) {
log.debug("Extract data and save");
}
long startTime = System.currentTimeMillis();
HistogramMetrics.Timer allTimer = allLatency.createTimer();
// Use `stop` as a control signal to make fail-fast in the persistence process.
AtomicBoolean stop = new AtomicBoolean(false);
DefaultBlockingBatchQueue<PrepareRequest> prepareQueue = new DefaultBlockingBatchQueue(
this.maxSyncoperationNum);
try {
HistogramMetrics.Timer timer = prepareLatency.createTimer();
List<PersistenceWorker<? extends StorageData>> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
try {
List<PersistenceWorker> persistenceWorkers = new ArrayList<>();
persistenceWorkers.addAll(TopNStreamProcessor.getInstance().getPersistentWorkers());
persistenceWorkers.addAll(MetricsStreamProcessor.getInstance().getPersistentWorkers());
// CountDownLatch makes sure all prepare threads done eventually.
CountDownLatch prepareStageCountDownLatch = new CountDownLatch(persistenceWorkers.size());
persistenceWorkers.forEach(worker -> {
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
persistenceWorkers.forEach(worker -> {
prepareExecutorService.submit(() -> {
if (stop.get()) {
prepareStageCountDownLatch.countDown();
return;
}
worker.buildBatchRequests(prepareRequests);
worker.endOfRound(System.currentTimeMillis() - lastTime);
HistogramMetrics.Timer timer = prepareLatency.createTimer();
try {
if (log.isDebugEnabled()) {
log.debug("extract {} worker data and save", worker.getClass().getName());
}
List<PrepareRequest> innerPrepareRequests = new ArrayList<>(5000);
worker.buildBatchRequests(innerPrepareRequests);
// Push the prepared requests into DefaultBlockingBatchQueue,
// the executorService consumes from it when it reaches the size of batch.
prepareQueue.offer(innerPrepareRequests);
worker.endOfRound(System.currentTimeMillis() - lastTime);
} finally {
timer.finish();
prepareStageCountDownLatch.countDown();
}
});
});
if (debug) {
log.info("build batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
} finally {
timer.finish();
}
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
List<List<PrepareRequest>> partitions = Lists.partition(prepareRequests, maxSyncoperationNum);
CountDownLatch countDownLatch = new CountDownLatch(partitions.size());
for (final List<PrepareRequest> partition : partitions) {
executorService.submit(() -> {
List<Future<?>> batchFutures = new ArrayList<>();
for (int i = 0; i < syncOperationThreadsNum; i++) {
Future<?> batchFuture = executorService.submit(() -> {
// consume the metrics
while (!stop.get()) {
List<PrepareRequest> partition = prepareQueue.poll();
if (partition.isEmpty()) {
break;
}
HistogramMetrics.Timer executeLatencyTimer = executeLatency.createTimer();
try {
if (CollectionUtils.isNotEmpty(partition)) {
batchDAO.synchronous(partition);
......@@ -137,23 +168,33 @@ public enum PersistenceTimer {
} catch (Throwable e) {
log.error(e.getMessage(), e);
} finally {
countDownLatch.countDown();
executeLatencyTimer.finish();
}
});
}
countDownLatch.await();
} finally {
executeLatencyTimer.finish();
}
return null;
});
batchFutures.add(batchFuture);
}
// Wait for prepare stage is done.
prepareStageCountDownLatch.await();
prepareQueue.noFurtherAppending();
// Wait for batch stage is done.
for (Future<?> result : batchFutures) {
result.get();
}
} catch (Throwable e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
if (log.isDebugEnabled()) {
log.debug("Persistence data save finish");
}
prepareRequests.clear();
stop.set(true);
allTimer.finish();
lastTime = System.currentTimeMillis();
}
......@@ -161,4 +202,70 @@ public enum PersistenceTimer {
log.info("Batch persistence duration: {} ms", System.currentTimeMillis() - startTime);
}
}
@RequiredArgsConstructor
static class DefaultBlockingBatchQueue<E> implements BlockingBatchQueue<E> {
@Getter
private final int maxBatchSize;
@Getter
private boolean inAppendingMode = true;
private final List<E> elementData = new ArrayList<>(50000 * 3);
@Override
public void offer(List<E> elements) {
synchronized (elementData) {
if (!inAppendingMode) {
throw new IllegalStateException();
}
elementData.addAll(elements);
if (elementData.size() >= maxBatchSize) {
elementData.notifyAll();
}
}
}
@Override
public List<E> poll() throws InterruptedException {
synchronized (elementData) {
while (this.elementData.size() < maxBatchSize && inAppendingMode) {
elementData.wait(1000);
}
if (CollectionUtils.isEmpty(elementData)) {
return Collections.EMPTY_LIST;
}
List<E> sublist = this.elementData.subList(
0, Math.min(maxBatchSize, this.elementData.size()));
List<E> partition = new ArrayList<>(sublist);
sublist.clear();
return partition;
}
}
@Override
public void noFurtherAppending() {
synchronized (elementData) {
inAppendingMode = false;
elementData.notifyAll();
}
}
@Override
public void furtherAppending() {
synchronized (elementData) {
inAppendingMode = true;
elementData.notifyAll();
}
}
@Override
public int size() {
synchronized (elementData) {
return elementData.size();
}
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@Fork(2)
public class BlockingBatchQueueBenchmark {
@State(Scope.Benchmark)
public static class MyState {
int count = 10_000_000;
PersistenceTimer.DefaultBlockingBatchQueue blockingBatchQueueWithSynchronized = new PersistenceTimer.DefaultBlockingBatchQueue(
50000);
BlockingBatchQueueWithLinkedBlockingQueue blockingBatchQueueWithLinkedBlockingQueue = new BlockingBatchQueueWithLinkedBlockingQueue(
50000);
BlockingBatchQueueWithReentrantLock blockingBatchQueueWithReentrantLock = new BlockingBatchQueueWithReentrantLock(
50000);
List<Integer> willAdd = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
int producerCount = 10;
int consumerCount = 2;
int producerLength = count / producerCount / 1000;
ExecutorService producer;
ExecutorService consumer;
@Setup(Level.Invocation)
public void before() {
producer = Executors.newFixedThreadPool(producerCount);
consumer = Executors.newFixedThreadPool(consumerCount);
}
@TearDown(Level.Invocation)
public void after() {
producer.shutdown();
consumer.shutdown();
}
}
@Benchmark
public void testSynchronized(MyState myState) throws InterruptedException, ExecutionException {
testProductAndConsume(myState, myState.blockingBatchQueueWithSynchronized);
}
@Benchmark
public void testReentrantLock(MyState myState) throws InterruptedException, ExecutionException {
testProductAndConsume(myState, myState.blockingBatchQueueWithReentrantLock);
}
@Benchmark
public void testLinkedBlockingQueue(MyState myState) throws InterruptedException, ExecutionException {
testProductAndConsume(myState, myState.blockingBatchQueueWithLinkedBlockingQueue);
}
private void testProductAndConsume(final MyState myState,
BlockingBatchQueue queue) throws InterruptedException, ExecutionException {
queue.furtherAppending();
CountDownLatch latch = new CountDownLatch(myState.producerCount);
for (int i = 0; i < myState.producerCount; i++) {
myState.producer.submit(() -> {
for (int j = 0; j < myState.producerLength; j++) {
queue.offer(myState.willAdd);
}
latch.countDown();
return null;
});
}
List<Future<?>> futures = new ArrayList<>();
for (int i = 0; i < myState.consumerCount; i++) {
Future<?> submit = myState.consumer.submit(() -> {
while (!queue.poll().isEmpty()) {
}
return null;
});
futures.add(submit);
}
latch.await();
queue.noFurtherAppending();
for (Future<?> future : futures) {
future.get();
}
}
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(BlockingBatchQueueBenchmark.class.getSimpleName())
.forks(2)
.build();
new Runner(opt).run();
}
/**
* # JMH version: 1.21
* # VM version: JDK 1.8.0_172, Java HotSpot(TM) 64-Bit Server VM, 25.172-b11
* # VM invoker: /Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home/jre/bin/java
* # VM options: -javaagent:/Users/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7442.40/IntelliJ IDEA.app/Contents/lib/idea_rt.jar=50386:/Users/alvin/Library/Application Support/JetBrains/Toolbox/apps/IDEA-U/ch-0/211.7442.40/IntelliJ IDEA.app/Contents/bin -Dfile.encoding=UTF-8
* # Warmup: 5 iterations, 10 s each
* # Measurement: 5 iterations, 10 s each
* # Timeout: 10 min per iteration
* # Threads: 1 thread, will synchronize iterations
* # Benchmark mode: Throughput, ops/time
*
* Benchmark Mode Cnt Score Error Units
* BlockingBatchQueueBenchmark.testLinkedBlockingQueue thrpt 10 0.317 ± 0.032 ops/s
* BlockingBatchQueueBenchmark.testReentrantLock thrpt 10 16.018 ± 1.553 ops/s
* BlockingBatchQueueBenchmark.testSynchronized thrpt 10 16.769 ± 0.533 ops/s
*
*/
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@RequiredArgsConstructor
public class BlockingBatchQueueWithLinkedBlockingQueue<E> implements BlockingBatchQueue<E> {
@Getter
private final int maxBatchSize;
@Getter
private volatile boolean inAppendingMode = true;
private final LinkedBlockingQueue<E> elementData = new LinkedBlockingQueue<>();
public void offer(List<E> elements) {
elementData.addAll(elements);
}
public List<E> poll() throws InterruptedException {
List<E> result = new ArrayList<>();
do {
E take = elementData.poll(1000, TimeUnit.MILLISECONDS);
if (take != null) {
result.add(take);
}
if (result.size() >= maxBatchSize) {
return result;
}
if (!inAppendingMode && this.elementData.isEmpty()) {
return result;
}
}
while (!this.elementData.isEmpty());
return result;
}
public void noFurtherAppending() {
inAppendingMode = false;
}
public void furtherAppending() {
inAppendingMode = true;
}
@Override
public int size() {
return this.elementData.size();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
@RequiredArgsConstructor
public class BlockingBatchQueueWithReentrantLock<E> implements BlockingBatchQueue<E> {
@Getter
private final int maxBatchSize;
@Getter
private volatile boolean inAppendingMode = true;
private final List<E> elementData = new ArrayList<>(50000);
private ReentrantLock reentrantLock = new ReentrantLock();
private Condition condition = this.reentrantLock.newCondition();
public void offer(List<E> elements) {
reentrantLock.lock();
try {
elementData.addAll(elements);
if (elementData.size() >= maxBatchSize) {
condition.signalAll();
}
} finally {
reentrantLock.unlock();
}
}
public List<E> poll() throws InterruptedException {
reentrantLock.lock();
try {
while (this.elementData.size() < maxBatchSize && inAppendingMode) {
condition.await(1000, TimeUnit.MILLISECONDS);
}
if (CollectionUtils.isEmpty(elementData)) {
return Collections.EMPTY_LIST;
}
List<E> sublist = this.elementData.subList(
0, Math.min(maxBatchSize, this.elementData.size()));
List<E> partition = new ArrayList<>(sublist);
sublist.clear();
return partition;
} finally {
reentrantLock.unlock();
}
}
public void noFurtherAppending() {
reentrantLock.lock();
try {
inAppendingMode = false;
condition.signalAll();
} finally {
reentrantLock.unlock();
}
}
public void furtherAppending() {
reentrantLock.lock();
try {
inAppendingMode = true;
condition.signalAll();
} finally {
reentrantLock.unlock();
}
}
@Override
public int size() {
return elementData.size();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.storage;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import lombok.Data;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsPersistentWorker;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNStreamProcessor;
import org.apache.skywalking.oap.server.core.analysis.worker.TopNWorker;
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.none.MetricsCreatorNoop;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
public class PersistenceTimerTest {
@Test
public void testExtractDataAndSave() throws Exception {
Set<PrepareRequest> result = new HashSet();
int count = 101;
int workCount = 10;
CoreModuleConfig moduleConfig = new CoreModuleConfig();
moduleConfig.setMaxSyncOperationNum(5);
moduleConfig.setPersistentPeriod(Integer.MAX_VALUE);
IBatchDAO iBatchDAO = new IBatchDAO() {
@Override
public void asynchronous(InsertRequest insertRequest) {
}
@Override
public void synchronous(List<PrepareRequest> prepareRequests) {
synchronized (result) {
result.addAll(prepareRequests);
}
}
};
for (int i = 0; i < workCount; i++) {
MetricsStreamProcessor.getInstance().getPersistentWorkers().add(genWorkers(i, count));
TopNStreamProcessor.getInstance().getPersistentWorkers().add(genTopNWorkers(i, count));
}
ModuleManager moduleManager = mock(ModuleManager.class);
ModuleServiceHolder moduleServiceHolder = mock(ModuleServiceHolder.class);
doReturn((ModuleProviderHolder) () -> moduleServiceHolder).when(moduleManager).find(anyString());
doReturn(new MetricsCreatorNoop()).when(moduleServiceHolder).getService(MetricsCreator.class);
doReturn(iBatchDAO).when(moduleServiceHolder).getService(IBatchDAO.class);
PersistenceTimer.INSTANCE.isStarted = true;
PersistenceTimer.INSTANCE.start(moduleManager, moduleConfig);
Whitebox.invokeMethod(PersistenceTimer.INSTANCE, "extractDataAndSave", iBatchDAO);
Assert.assertEquals(count * workCount * 2, result.size());
}
private MetricsPersistentWorker genWorkers(int num, int count) {
MetricsPersistentWorker persistenceWorker = mock(MetricsPersistentWorker.class);
doAnswer(invocation -> {
List argument = invocation.getArgument(0, List.class);
for (int i = 0; i < count; i++) {
argument.add(new MockStorageData(num + " " + UUID.randomUUID()));
}
return Void.class;
}).when(persistenceWorker).buildBatchRequests(anyList());
return persistenceWorker;
}
private TopNWorker genTopNWorkers(int num, int count) {
TopNWorker persistenceWorker = mock(TopNWorker.class);
doAnswer(invocation -> {
List argument = invocation.getArgument(0, List.class);
for (int i = 0; i < count; i++) {
argument.add(new MockStorageData(num + " " + UUID.randomUUID()));
}
return Void.class;
}).when(persistenceWorker).buildBatchRequests(anyList());
return persistenceWorker;
}
@Data
static class MockStorageData implements StorageData {
private final String id;
@Override
public String id() {
return id;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册