提交 a3a85e52 编写于 作者: wu-sheng's avatar wu-sheng 提交者: 彭勇升 pengys

Support Exporter in core (#2368)

* Rename ALarmSupported to WithMetadata

* Finish the base of exporter.

* Fix class comment.
上级 3eafda28
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>oap-server</artifactId>
<groupId>org.apache.skywalking</groupId>
<version>6.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>exporter</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>server-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
......@@ -28,8 +28,6 @@ import org.apache.skywalking.oap.server.core.Const;
<#break>
</#if>
</#list>
import org.apache.skywalking.oap.server.core.alarm.AlarmMeta;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
......@@ -45,7 +43,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
@IndicatorType
@StreamData
@StorageEntity(name = "${tableName}", builder = ${metricName}Indicator.Builder.class, sourceScopeId = ${sourceScopeId})
public class ${metricName}Indicator extends ${indicatorClassName} implements AlarmSupported {
public class ${metricName}Indicator extends ${indicatorClassName} implements WithMetadata {
<#list fieldsFromSource as sourceField>
@Setter @Getter @Column(columnName = "${sourceField.columnName}") <#if sourceField.isID()>@IDColumn</#if> private ${sourceField.typeName} ${sourceField.fieldName};
......@@ -170,8 +168,8 @@ public class ${metricName}Indicator extends ${indicatorClassName} implements Ala
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("${varName}", ${sourceScopeId}<#if (fieldsFromSource?size>0) ><#list fieldsFromSource as field><#if field.isID()>, ${field.fieldName}</#if></#list></#if>);
@Override public IndicatorMetaInfo getMeta() {
return new IndicatorMetaInfo("${varName}", ${sourceScopeId}<#if (fieldsFromSource?size>0) ><#list fieldsFromSource as field><#if field.isID()>, ${field.fieldName}</#if></#list></#if>);
}
@Override
......
......@@ -21,8 +21,6 @@ package org.apache.skywalking.oap.server.core.analysis.generated.service.service
import java.util.*;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.alarm.AlarmMeta;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorType;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamData;
......@@ -38,7 +36,7 @@ import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
@IndicatorType
@StreamData
@StorageEntity(name = "service_avg", builder = ServiceAvgIndicator.Builder.class, sourceScopeId = 1)
public class ServiceAvgIndicator extends LongAvgIndicator implements AlarmSupported {
public class ServiceAvgIndicator extends LongAvgIndicator implements WithMetadata {
@Setter @Getter @Column(columnName = "entity_id") @IDColumn private java.lang.String entityId;
......@@ -108,8 +106,8 @@ public class ServiceAvgIndicator extends LongAvgIndicator implements AlarmSuppor
}
@Override public AlarmMeta getAlarmMeta() {
return new AlarmMeta("generate_indicator", 1, entityId);
@Override public IndicatorMetaInfo getMeta() {
return new IndicatorMetaInfo("generate_indicator", 1, entityId);
}
@Override
......
......@@ -41,6 +41,7 @@
<module>generate-tool</module>
<module>server-telemetry</module>
<module>generate-tool-grammar</module>
<module>exporter</module>
</modules>
<properties>
......
......@@ -20,7 +20,7 @@ package org.apache.skywalking.oap.server.core.alarm;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.register.*;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -50,33 +50,33 @@ public class AlarmEntrance {
init();
AlarmMeta alarmMeta = ((AlarmSupported)indicator).getAlarmMeta();
IndicatorMetaInfo indicatorMetaInfo = ((WithMetadata)indicator).getMeta();
MetaInAlarm metaInAlarm;
switch (alarmMeta.getScope()) {
switch (indicatorMetaInfo.getScope()) {
case SERVICE:
int serviceId = Integer.parseInt(alarmMeta.getId());
int serviceId = Integer.parseInt(indicatorMetaInfo.getId());
ServiceInventory serviceInventory = serviceInventoryCache.get(serviceId);
ServiceMetaInAlarm serviceMetaInAlarm = new ServiceMetaInAlarm();
serviceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
serviceMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName());
serviceMetaInAlarm.setId(serviceId);
serviceMetaInAlarm.setName(serviceInventory.getName());
metaInAlarm = serviceMetaInAlarm;
break;
case SERVICE_INSTANCE:
int serviceInstanceId = Integer.parseInt(alarmMeta.getId());
int serviceInstanceId = Integer.parseInt(indicatorMetaInfo.getId());
ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(serviceInstanceId);
ServiceInstanceMetaInAlarm instanceMetaInAlarm = new ServiceInstanceMetaInAlarm();
instanceMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
instanceMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName());
instanceMetaInAlarm.setId(serviceInstanceId);
instanceMetaInAlarm.setName(serviceInstanceInventory.getName());
metaInAlarm = instanceMetaInAlarm;
break;
case ENDPOINT:
int endpointId = Integer.parseInt(alarmMeta.getId());
int endpointId = Integer.parseInt(indicatorMetaInfo.getId());
EndpointInventory endpointInventory = endpointInventoryCache.get(endpointId);
EndpointMetaInAlarm endpointMetaInAlarm = new EndpointMetaInAlarm();
endpointMetaInAlarm.setIndicatorName(alarmMeta.getIndicatorName());
endpointMetaInAlarm.setIndicatorName(indicatorMetaInfo.getIndicatorName());
endpointMetaInAlarm.setId(endpointId);
serviceId = endpointInventory.getServiceId();
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.oap.server.core.alarm;
package org.apache.skywalking.oap.server.core.analysis.indicator;
import lombok.*;
import org.apache.skywalking.oap.server.core.Const;
......@@ -24,18 +24,18 @@ import org.apache.skywalking.oap.server.core.Const;
/**
* @author wusheng
*/
public class AlarmMeta {
public class IndicatorMetaInfo {
@Setter @Getter private String indicatorName;
@Setter @Getter private int scope;
@Setter @Getter private String id;
public AlarmMeta(String indicatorName, int scope) {
public IndicatorMetaInfo(String indicatorName, int scope) {
this.indicatorName = indicatorName;
this.scope = scope;
this.id = Const.EMPTY_STRING;
}
public AlarmMeta(String indicatorName, int scope, String id) {
public IndicatorMetaInfo(String indicatorName, int scope, String id) {
this.indicatorName = indicatorName;
this.scope = scope;
this.id = id;
......@@ -48,4 +48,12 @@ public class AlarmMeta {
public void setId(String id) {
this.id = id;
}
@Override public String toString() {
return "IndicatorMetaInfo{" +
"indicatorName='" + indicatorName + '\'' +
", scope=" + scope +
", id='" + id + '\'' +
'}';
}
}
......@@ -16,13 +16,13 @@
*
*/
package org.apache.skywalking.oap.server.core.alarm;
package org.apache.skywalking.oap.server.core.analysis.indicator;
/**
* Alarm supported interface implementor could return the {@link AlarmMeta}
* Indicator, which implement this interface, could provide {@link IndicatorMetaInfo}.
*
* @author wusheng
*/
public interface AlarmSupported {
AlarmMeta getAlarmMeta();
public interface WithMetadata {
IndicatorMetaInfo getMeta();
}
......@@ -18,9 +18,9 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.alarm.AlarmEntrance;
import org.apache.skywalking.oap.server.core.alarm.AlarmSupported;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.alarm.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.WithMetadata;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
......@@ -40,7 +40,7 @@ public class AlarmNotifyWorker extends AbstractWorker<Indicator> {
}
@Override public void in(Indicator indicator) {
if (indicator instanceof AlarmSupported) {
if (indicator instanceof WithMetadata) {
entrance.forward(indicator);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.analysis.worker;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.core.exporter.*;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
/**
* @author wusheng
*/
public class ExportWorker extends AbstractWorker<Indicator> {
private ModuleManager moduleManager;
private MetricValuesExportService exportService;
public ExportWorker(int workerId, ModuleManager moduleManager) {
super(workerId);
this.moduleManager = moduleManager;
}
@Override public void in(Indicator indicator) {
if (exportService != null || moduleManager.has(ExporterModule.NAME)) {
if (indicator instanceof WithMetadata) {
if (exportService == null) {
exportService = moduleManager.find(ExporterModule.NAME).provider().getService(MetricValuesExportService.class);
}
exportService.export(((WithMetadata)indicator).getMeta(), indicator);
}
}
}
}
......@@ -18,21 +18,16 @@
package org.apache.skywalking.oap.server.core.analysis.worker;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.*;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.EndOfBatchContext;
import org.apache.skywalking.oap.server.core.analysis.data.MergeDataCache;
import org.apache.skywalking.oap.server.core.analysis.data.*;
import org.apache.skywalking.oap.server.core.analysis.indicator.Indicator;
import org.apache.skywalking.oap.server.core.storage.IIndicatorDAO;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
import static java.util.Objects.nonNull;
......@@ -46,16 +41,19 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
private final String modelName;
private final MergeDataCache<Indicator> mergeDataCache;
private final IIndicatorDAO indicatorDAO;
private final AbstractWorker<Indicator> nextWorker;
private final AbstractWorker<Indicator> nextAlarmWorker;
private final AbstractWorker<Indicator> nextExportWorker;
private final DataCarrier<Indicator> dataCarrier;
IndicatorPersistentWorker(int workerId, String modelName, int batchSize, ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextWorker) {
IIndicatorDAO indicatorDAO, AbstractWorker<Indicator> nextAlarmWorker,
AbstractWorker<Indicator> nextExportWorker) {
super(moduleManager, workerId, batchSize);
this.modelName = modelName;
this.mergeDataCache = new MergeDataCache<>();
this.indicatorDAO = indicatorDAO;
this.nextWorker = nextWorker;
this.nextAlarmWorker = nextAlarmWorker;
this.nextExportWorker = nextExportWorker;
String name = "INDICATOR_L2_AGGREGATION";
int size = BulkConsumePool.Creator.recommendMaxSize() / 8;
......@@ -117,8 +115,11 @@ public class IndicatorPersistentWorker extends PersistenceWorker<Indicator, Merg
batchCollection.add(indicatorDAO.prepareBatchInsert(modelName, data));
}
if (Objects.nonNull(nextWorker)) {
nextWorker.in(data);
if (Objects.nonNull(nextAlarmWorker)) {
nextAlarmWorker.in(data);
}
if (Objects.nonNull(nextExportWorker)) {
nextExportWorker.in(data);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
......
......@@ -74,8 +74,11 @@ public enum IndicatorProcess {
AlarmNotifyWorker alarmNotifyWorker = new AlarmNotifyWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
WorkerInstances.INSTANCES.put(alarmNotifyWorker.getWorkerId(), alarmNotifyWorker);
ExportWorker exportWorker = new ExportWorker(WorkerIdGenerator.INSTANCES.generate(), moduleManager);
WorkerInstances.INSTANCES.put(exportWorker.getWorkerId(), exportWorker);
IndicatorPersistentWorker minutePersistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, indicatorDAO, alarmNotifyWorker);
1000, moduleManager, indicatorDAO, alarmNotifyWorker, exportWorker);
WorkerInstances.INSTANCES.put(minutePersistentWorker.getWorkerId(), minutePersistentWorker);
persistentWorkers.add(minutePersistentWorker);
......@@ -85,7 +88,7 @@ public enum IndicatorProcess {
private IndicatorPersistentWorker worker(ModuleManager moduleManager,
IIndicatorDAO indicatorDAO, String modelName) {
IndicatorPersistentWorker persistentWorker = new IndicatorPersistentWorker(WorkerIdGenerator.INSTANCES.generate(), modelName,
1000, moduleManager, indicatorDAO, null);
1000, moduleManager, indicatorDAO, null, null);
WorkerInstances.INSTANCES.put(persistentWorker.getWorkerId(), persistentWorker);
persistentWorkers.add(persistentWorker);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.exporter;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
/**
* @author wusheng
*/
public class ExporterModule extends ModuleDefine {
public static final String NAME = "exporter";
public ExporterModule() {
super(NAME);
}
@Override public Class[] services() {
return new Class[] {MetricValuesExportService.class};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.core.exporter;
import org.apache.skywalking.oap.server.core.analysis.indicator.*;
import org.apache.skywalking.oap.server.library.module.Service;
/**
* Export the metric value from indicators through this service, if provider exists.
*
* @author wusheng
*/
public interface MetricValuesExportService extends Service {
void export(IndicatorMetaInfo meta, Indicator indicator);
}
......@@ -20,4 +20,5 @@ org.apache.skywalking.oap.server.core.storage.StorageModule
org.apache.skywalking.oap.server.core.cluster.ClusterModule
org.apache.skywalking.oap.server.core.CoreModule
org.apache.skywalking.oap.server.core.query.QueryModule
org.apache.skywalking.oap.server.core.alarm.AlarmModule
\ No newline at end of file
org.apache.skywalking.oap.server.core.alarm.AlarmModule
org.apache.skywalking.oap.server.core.exporter.ExporterModule
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册