未验证 提交 605ca038 编写于 作者: 静夜思朝颜's avatar 静夜思朝颜 提交者: GitHub

support negative infinity bucket in meter system (#7930)

上级 d73c51a4
......@@ -40,6 +40,7 @@ Release Notes.
* Support `!= null` in OAL engine.
* Add `Message Queue Consuming Count` metric for MQ consuming service and endpoint.
* Add `Message Queue Avg Consuming Latency` metric for MQ consuming service and endpoint.
* Support `-Inf` as bucket in the meter system.
#### UI
......
Subproject commit e626ee04850703c220f64b642d2893fa65572943
Subproject commit e6742be211302cf7eb93db83bdf1da2a8e600d17
......@@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.apm.network.language.agent.v3.Label;
import org.apache.skywalking.apm.network.language.agent.v3.MeterBucketValue;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterHistogram;
import org.apache.skywalking.apm.network.language.agent.v3.MeterSingleValue;
......@@ -96,7 +97,7 @@ public class MeterProcessor {
.name(histogram.getName())
.labels(ImmutableMap.<String, String>builder()
.putAll(baseLabel)
.put("le", String.valueOf(v.getBucket())).build())
.put("le", parseHistogramBucket(v)).build())
.value(v.getCount()).build()
).collect(Collectors.toList()));
break;
......@@ -143,4 +144,12 @@ public class MeterProcessor {
}
}
private String parseHistogramBucket(MeterBucketValue bucketValue) {
if (bucketValue.getIsNegativeInfinity()) {
return String.valueOf(Long.MIN_VALUE);
} else {
return String.valueOf(bucketValue.getBucket());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.analyzer.provider.meter.process;
import org.apache.skywalking.apm.network.language.agent.v3.MeterBucketValue;
import org.apache.skywalking.apm.network.language.agent.v3.MeterData;
import org.apache.skywalking.apm.network.language.agent.v3.MeterHistogram;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfig;
import org.apache.skywalking.oap.server.analyzer.provider.meter.config.MeterConfigs;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.StreamDefinition;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterEntity;
import org.apache.skywalking.oap.server.core.analysis.meter.MeterSystem;
import org.apache.skywalking.oap.server.core.analysis.meter.function.avg.AvgHistogramFunction;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.worker.MetricsStreamProcessor;
import org.apache.skywalking.oap.server.core.config.NamingControl;
import org.apache.skywalking.oap.server.core.config.group.EndpointNameGrouping;
import org.apache.skywalking.oap.server.core.query.type.Bucket;
import org.apache.skywalking.oap.server.core.storage.StorageException;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.ModuleProviderHolder;
import org.apache.skywalking.oap.server.library.module.ModuleServiceHolder;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.powermock.reflect.Whitebox;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class MeterProcessorTest {
@Mock
private ModuleManager moduleManager;
private MeterSystem meterSystem;
private MeterProcessor processor;
private String service = "test-service";
private String serviceInstance = "test-service-instance";
@BeforeClass
public static void init() {
MeterEntity.setNamingControl(
new NamingControl(512, 512, 512, new EndpointNameGrouping()));
}
@Before
public void setup() throws StorageException, ModuleStartException {
meterSystem = spy(new MeterSystem(moduleManager));
when(moduleManager.find(anyString())).thenReturn(mock(ModuleProviderHolder.class));
when(moduleManager.find(CoreModule.NAME).provider()).thenReturn(mock(ModuleServiceHolder.class));
when(moduleManager.find(CoreModule.NAME).provider().getService(MeterSystem.class)).thenReturn(meterSystem);
Whitebox.setInternalState(MetricsStreamProcessor.class, "PROCESSOR",
Mockito.spy(MetricsStreamProcessor.getInstance())
);
doNothing().when(MetricsStreamProcessor.getInstance()).create(any(), (StreamDefinition) any(), any());
final MeterProcessService processService = new MeterProcessService(moduleManager);
List<MeterConfig> config = MeterConfigs.loadConfig("meter-analyzer-config", Arrays.asList("config"));
processService.start(config);
processor = new MeterProcessor(processService);
}
@Test
public void testProcess() throws ModuleStartException {
AtomicReference<AvgHistogramFunction> data = new AtomicReference<>();
doAnswer(invocationOnMock -> {
if (AvgHistogramFunction.class.isAssignableFrom(invocationOnMock.getArgument(0).getClass())) {
data.set(invocationOnMock.getArgument(0));
}
return null;
}).when(meterSystem).doStreamingCalculation(any());
processor.read(MeterData.newBuilder()
.setService(service)
.setServiceInstance(serviceInstance)
.setTimestamp(System.currentTimeMillis())
.setHistogram(MeterHistogram.newBuilder()
.setName("test_histogram")
.addValues(MeterBucketValue.newBuilder().setIsNegativeInfinity(true).setCount(10).build())
.addValues(MeterBucketValue.newBuilder().setBucket(0).setCount(20).build())
.addValues(MeterBucketValue.newBuilder().setBucket(10).setCount(10).build())
.build())
.build());
processor.process();
// verify data
final AvgHistogramFunction func = data.get();
final DataTable summation = new DataTable();
summation.put(Bucket.INFINITE_NEGATIVE, 10L);
summation.put("0", 20L);
summation.put("10", 10L);
Assert.assertEquals(summation, func.getSummation());
final DataTable count = new DataTable();
count.put(Bucket.INFINITE_NEGATIVE, 1L);
count.put("0", 1L);
count.put("10", 1L);
Assert.assertEquals(count, func.getCount());
}
}
......@@ -148,7 +148,12 @@ public class Analyzer {
long[] vv = new long[bb.length];
for (int i = 0; i < subSs.size(); i++) {
Sample s = subSs.get(i);
bb[i] = Long.parseLong(s.getLabels().get("le"));
final double leVal = Double.parseDouble(s.getLabels().get("le"));
if (leVal == Double.NEGATIVE_INFINITY) {
bb[i] = Long.MIN_VALUE;
} else {
bb[i] = (long) leVal;
}
vv[i] = getValue(s);
}
BucketedValues bv = new BucketedValues(bb, vv);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册