提交 9f2c8ac0 编写于 作者: P peng-yongsheng

Service reference metric pyramid aggregate.

上级 c4387a3e
......@@ -64,7 +64,7 @@ public class ServiceReferenceMetricAlarmGraph {
private void link(Graph<ServiceReferenceMetric> graph) {
GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class)
.toFinder().findNode(MetricWorkerIdDefine.SERVICE_REFERENCE_METRIC_PERSISTENCE_WORKER_ID, ServiceReferenceMetric.class)
.toFinder().findNode(MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID, ServiceReferenceMetric.class)
.addNext(new NodeProcessor<ServiceReferenceMetric, ServiceReferenceMetric>() {
@Override public int id() {
return AlarmWorkerIdDefine.SERVICE_REFERENCE_METRIC_ALARM_GRAPH_BRIDGE_WORKER_ID;
......
......@@ -22,9 +22,15 @@ package org.apache.skywalking.apm.collector.analysis.metric.define.graph;
* @author peng-yongsheng
*/
public class MetricWorkerIdDefine {
public static final int SERVICE_REFERENCE_METRIC_AGGREGATION_WORKER_ID = 400;
public static final int SERVICE_REFERENCE_METRIC_REMOTE_WORKER_ID = 401;
public static final int SERVICE_REFERENCE_METRIC_PERSISTENCE_WORKER_ID = 402;
public static final int SERVICE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4100;
public static final int SERVICE_REFERENCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4101;
public static final int SERVICE_REFERENCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4102;
public static final int SERVICE_REFERENCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4103;
public static final int SERVICE_REFERENCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4104;
public static final int SERVICE_REFERENCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4105;
public static final int SERVICE_REFERENCE_DAY_METRIC_TRANSFORM_NODE_ID = 4106;
public static final int SERVICE_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4107;
public static final int SERVICE_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4108;
public static final int INSTANCE_REFERENCE_METRIC_AGGREGATION_WORKER_ID = 403;
public static final int INSTANCE_REFERENCE_METRIC_REMOTE_WORKER_ID = 404;
......
......@@ -37,8 +37,8 @@ import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segme
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.ServiceEntryGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.ServiceEntrySpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.ServiceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.ServiceReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.ServiceReferenceMetricSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricSpanListener;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
......
......@@ -57,7 +57,7 @@ public class InstanceReferenceMetricGraph {
private void link(Graph<ServiceReferenceMetric> graph) {
GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class)
.toFinder().findNode(MetricWorkerIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_WORKER_ID, ServiceReferenceMetric.class)
.toFinder().findNode(MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID, ServiceReferenceMetric.class)
.addNext(new NodeProcessor<ServiceReferenceMetric, ServiceReferenceMetric>() {
@Override public int id() {
return MetricWorkerIdDefine.INSTANCE_REFERENCE_GRAPH_BRIDGE_WORKER_ID;
......
......@@ -57,7 +57,7 @@ public class ServiceMetricGraph {
private void link(Graph<ServiceReferenceMetric> graph) {
GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class)
.toFinder().findNode(MetricWorkerIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_WORKER_ID, ServiceReferenceMetric.class)
.toFinder().findNode(MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID, ServiceReferenceMetric.class)
.addNext(new NodeProcessor<ServiceReferenceMetric, ServiceReferenceMetric>() {
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_METRIC_GRAPH_BRIDGE_WORKER_ID;
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
......@@ -24,20 +24,20 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.impl.Persistenc
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricPersistenceWorker extends PersistenceWorker<ServiceReferenceMetric> {
public class ServiceReferenceDayMetricPersistenceWorker extends PersistenceWorker<ServiceReferenceMetric> {
public ServiceReferenceMetricPersistenceWorker(ModuleManager moduleManager) {
public ServiceReferenceDayMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_METRIC_PERSISTENCE_WORKER_ID;
return MetricWorkerIdDefine.SERVICE_REFERENCE_DAY_METRIC_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
......@@ -46,17 +46,17 @@ public class ServiceReferenceMetricPersistenceWorker extends PersistenceWorker<S
@SuppressWarnings("unchecked")
@Override protected IPersistenceDAO<?, ?, ServiceReferenceMetric> persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IServiceReferenceMetricPersistenceDAO.class);
return getModuleManager().find(StorageModule.NAME).getService(IServiceReferenceDayMetricPersistenceDAO.class);
}
public static class Factory extends PersistenceWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetricPersistenceWorker> {
public static class Factory extends PersistenceWorkerProvider<ServiceReferenceMetric, ServiceReferenceDayMetricPersistenceWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceReferenceMetricPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMetricPersistenceWorker(moduleManager);
@Override public ServiceReferenceDayMetricPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceDayMetricPersistenceWorker(moduleManager);
}
@Override
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.core.graph.Next;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceDayMetricTransformNode implements NodeProcessor<ServiceReferenceMetric, ServiceReferenceMetric> {
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_DAY_METRIC_TRANSFORM_NODE_ID;
}
@Override public void process(ServiceReferenceMetric serviceReferenceMetric, Next<ServiceReferenceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToDay(serviceReferenceMetric.getTimeBucket());
serviceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
serviceReferenceMetric.setTimeBucket(timeBucket);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceHourMetricPersistenceWorker extends PersistenceWorker<ServiceReferenceMetric> {
public ServiceReferenceHourMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_HOUR_METRIC_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
return true;
}
@SuppressWarnings("unchecked")
@Override protected IPersistenceDAO<?, ?, ServiceReferenceMetric> persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IServiceReferenceHourMetricPersistenceDAO.class);
}
public static class Factory extends PersistenceWorkerProvider<ServiceReferenceMetric, ServiceReferenceHourMetricPersistenceWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceReferenceHourMetricPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceHourMetricPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.core.graph.Next;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceHourMetricTransformNode implements NodeProcessor<ServiceReferenceMetric, ServiceReferenceMetric> {
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_HOUR_METRIC_TRANSFORM_NODE_ID;
}
@Override public void process(ServiceReferenceMetric serviceReferenceMetric, Next<ServiceReferenceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToHour(serviceReferenceMetric.getTimeBucket());
serviceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
serviceReferenceMetric.setTimeBucket(timeBucket);
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
......@@ -34,7 +34,7 @@ public class ServiceReferenceMetricAggregationWorker extends AggregationWorker<S
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_METRIC_AGGREGATION_WORKER_ID;
return MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_AGGREGATION_WORKER_ID;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceReferenceMetric, ServiceReferenceMetricAggregationWorker> {
......
......@@ -16,11 +16,12 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.core.graph.GraphManager;
import org.apache.skywalking.apm.collector.core.graph.Node;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
......@@ -42,9 +43,19 @@ public class ServiceReferenceMetricGraph {
public void create() {
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class)
Node<ServiceReferenceMetric, ServiceReferenceMetric> remoteNode = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID, ServiceReferenceMetric.class)
.addNode(new ServiceReferenceMetricAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new ServiceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceReferenceMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
.addNext(new ServiceReferenceMetricRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.SERVICE_REFERENCE_METRIC_GRAPH_ID).create(workerCreateListener));
remoteNode.addNext(new ServiceReferenceMinuteMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
remoteNode.addNext(new ServiceReferenceHourMetricTransformNode())
.addNext(new ServiceReferenceHourMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
remoteNode.addNext(new ServiceReferenceDayMetricTransformNode())
.addNext(new ServiceReferenceDayMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
remoteNode.addNext(new ServiceReferenceMonthMetricTransformNode())
.addNext(new ServiceReferenceMonthMetricPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
}
}
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
......@@ -37,7 +37,7 @@ public class ServiceReferenceMetricRemoteWorker extends AbstractRemoteWorker<Ser
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_METRIC_REMOTE_WORKER_ID;
return MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_REMOTE_WORKER_ID;
}
@Override protected void onWork(ServiceReferenceMetric serviceReferenceMetric) throws WorkerException {
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service;
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import java.util.LinkedList;
import java.util.List;
......@@ -57,7 +57,7 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
private SpanDecorator entrySpanDecorator;
private long timeBucket;
public ServiceReferenceMetricSpanListener(ModuleManager moduleManager) {
ServiceReferenceMetricSpanListener(ModuleManager moduleManager) {
this.entryReferenceMetric = new LinkedList<>();
this.exitReferenceMetric = new LinkedList<>();
this.instanceCacheService = moduleManager.find(CacheModule.NAME).getService(InstanceCacheService.class);
......@@ -76,7 +76,7 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
if (spanDecorator.getRefsCount() > 0) {
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ReferenceDecorator reference = spanDecorator.getRefs(i);
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric();
serviceReferenceMetric.setFrontServiceId(reference.getParentServiceId());
serviceReferenceMetric.setFrontInstanceId(reference.getParentApplicationInstanceId());
serviceReferenceMetric.setFrontApplicationId(instanceCacheService.getApplicationId(reference.getParentApplicationInstanceId()));
......@@ -88,7 +88,7 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
entryReferenceMetric.add(serviceReferenceMetric);
}
} else {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric();
serviceReferenceMetric.setFrontServiceId(Const.NONE_SERVICE_ID);
serviceReferenceMetric.setFrontInstanceId(Const.NONE_INSTANCE_ID);
serviceReferenceMetric.setFrontApplicationId(Const.NONE_APPLICATION_ID);
......@@ -104,7 +104,7 @@ public class ServiceReferenceMetricSpanListener implements FirstSpanListener, En
}
@Override public void parseExit(SpanDecorator spanDecorator, int applicationId, int instanceId, String segmentId) {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(Const.EMPTY_STRING);
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric();
int peerId = spanDecorator.getPeerId();
int behindApplicationId = applicationCacheService.getApplicationIdByAddressId(peerId);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMinuteMetricPersistenceWorker extends PersistenceWorker<ServiceReferenceMetric> {
public ServiceReferenceMinuteMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
return true;
}
@SuppressWarnings("unchecked")
@Override protected IPersistenceDAO<?, ?, ServiceReferenceMetric> persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IServiceReferenceMinuteMetricPersistenceDAO.class);
}
public static class Factory extends PersistenceWorkerProvider<ServiceReferenceMetric, ServiceReferenceMinuteMetricPersistenceWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceReferenceMinuteMetricPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMinuteMetricPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.PersistenceWorkerProvider;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMonthMetricPersistenceWorker extends PersistenceWorker<ServiceReferenceMetric> {
public ServiceReferenceMonthMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
return true;
}
@SuppressWarnings("unchecked")
@Override protected IPersistenceDAO<?, ?, ServiceReferenceMetric> persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IServiceReferenceMonthMetricPersistenceDAO.class);
}
public static class Factory extends PersistenceWorkerProvider<ServiceReferenceMetric, ServiceReferenceMonthMetricPersistenceWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceReferenceMonthMetricPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceReferenceMonthMetricPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.core.graph.Next;
import org.apache.skywalking.apm.collector.core.graph.NodeProcessor;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMonthMetricTransformNode implements NodeProcessor<ServiceReferenceMetric, ServiceReferenceMetric> {
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID;
}
@Override public void process(ServiceReferenceMetric serviceReferenceMetric, Next<ServiceReferenceMetric> next) {
long timeBucket = TimeBucketUtils.INSTANCE.minuteToMonth(serviceReferenceMetric.getTimeBucket());
serviceReferenceMetric.setId(String.valueOf(timeBucket) + Const.ID_SPLIT + serviceReferenceMetric.getMetricId());
serviceReferenceMetric.setTimeBucket(timeBucket);
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.data;
import java.util.LinkedList;
......@@ -36,6 +35,8 @@ public abstract class TableDefine {
public abstract void initialize();
public abstract boolean isPyramidTable();
public final void addColumn(ColumnDefine columnDefine) {
columnDefines.add(columnDefine);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.storage;
/**
* @author peng-yongsheng
*/
public enum TimePyramid {
Minute(0, "minute"), Hour(1, "hour"), Day(2, "day"), Month(3, "month");
private final int value;
private final String name;
TimePyramid(int value, String name) {
this.value = value;
this.name = name;
}
public int getValue() {
return value;
}
public String getName() {
return name;
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.util;
import java.text.SimpleDateFormat;
......@@ -63,6 +62,18 @@ public enum TimeBucketUtils {
return Long.valueOf(timeStr);
}
public long minuteToHour(long minuteBucket) {
return minuteBucket / 100;
}
public long minuteToDay(long minuteBucket) {
return minuteBucket / 100 / 100;
}
public long minuteToMonth(long minuteBucket) {
return minuteBucket / 100 / 100 / 100;
}
public long changeTimeBucket2TimeStamp(String timeBucketType, long timeBucket) {
if (TimeBucketType.SECOND.name().toLowerCase().equals(timeBucketType.toLowerCase())) {
Calendar calendar = Calendar.getInstance();
......
......@@ -72,7 +72,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO;
/**
......@@ -128,7 +128,7 @@ public class StorageModule extends Module {
classes.add(ISegmentPersistenceDAO.class);
classes.add(IServiceEntryPersistenceDAO.class);
classes.add(IServiceMetricPersistenceDAO.class);
classes.add(IServiceReferenceMetricPersistenceDAO.class);
classes.add(IServiceReferenceMinuteMetricPersistenceDAO.class);
classes.add(IInstanceMetricPersistenceDAO.class);
classes.add(IInstanceReferenceMetricPersistenceDAO.class);
......
......@@ -24,6 +24,7 @@ import org.apache.skywalking.apm.collector.core.data.StreamData;
* @author peng-yongsheng
*/
public interface IPersistenceDAO<Insert, Update, STREAM_DATA extends StreamData> extends DAO {
STREAM_DATA get(String id);
Insert prepareBatchInsert(STREAM_DATA data);
......
......@@ -16,7 +16,7 @@
*
*/
package org.apache.skywalking.apm.collector.storage.dao;
package org.apache.skywalking.apm.collector.storage.dao.srmp;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
......@@ -24,5 +24,5 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc
/**
* @author peng-yongsheng
*/
public interface IServiceReferenceMetricPersistenceDAO<Insert, Update, DataImpl extends ServiceReferenceMetric> extends IPersistenceDAO<Insert, Update, DataImpl> {
public interface IServiceReferenceDayMetricPersistenceDAO<Insert, Update, DataImpl extends ServiceReferenceMetric> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.dao.srmp;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public interface IServiceReferenceHourMetricPersistenceDAO<Insert, Update, DataImpl extends ServiceReferenceMetric> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.dao.srmp;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public interface IServiceReferenceMinuteMetricPersistenceDAO<Insert, Update, DataImpl extends ServiceReferenceMetric> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.dao.srmp;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
/**
* @author peng-yongsheng
*/
public interface IServiceReferenceMonthMetricPersistenceDAO<Insert, Update, DataImpl extends ServiceReferenceMetric> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
......@@ -19,7 +19,7 @@
package org.apache.skywalking.apm.collector.storage.table.service;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.AbstractData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.AddOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonOperation;
import org.apache.skywalking.apm.collector.storage.table.Metric;
......@@ -27,10 +27,11 @@ import org.apache.skywalking.apm.collector.storage.table.Metric;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetric extends AbstractData implements Metric {
public class ServiceReferenceMetric extends StreamData implements Metric {
private static final Column[] STRING_COLUMNS = {
new Column(ServiceReferenceMetricTable.COLUMN_ID, new NonOperation()),
new Column(ServiceReferenceMetricTable.COLUMN_METRIC_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
......@@ -69,8 +70,24 @@ public class ServiceReferenceMetric extends AbstractData implements Metric {
private static final Column[] BYTE_COLUMNS = {};
public ServiceReferenceMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
public ServiceReferenceMetric() {
super(STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
@Override public String getId() {
return getDataString(0);
}
@Override public void setId(String id) {
setDataString(0, id);
}
@Override public String getMetricId() {
return getDataString(1);
}
@Override public void setMetricId(String metricId) {
setDataString(1, metricId);
}
@Override
......
......@@ -26,6 +26,7 @@ import org.apache.skywalking.apm.collector.storage.dao.*;
import java.util.Calendar;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
/**
* @author peng-yongsheng
......@@ -104,7 +105,7 @@ public class DataTTLKeeperTimer {
ISegmentPersistenceDAO segmentPersistenceDAO = moduleManager.find(StorageModule.NAME).getService(ISegmentPersistenceDAO.class);
segmentPersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
IServiceReferenceMetricPersistenceDAO serviceReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMetricPersistenceDAO.class);
IServiceReferenceMinuteMetricPersistenceDAO serviceReferencePersistenceDAO = moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMinuteMetricPersistenceDAO.class);
serviceReferencePersistenceDAO.deleteHistory(startTimestamp, endTimestamp);
}
}
......@@ -81,7 +81,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.BatchEsDAO;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchStorageInstaller;
......@@ -136,7 +136,7 @@ import org.apache.skywalking.apm.collector.storage.es.dao.ServiceNameEsRegisterD
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceReferenceAlarmEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceReferenceAlarmListEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsUIDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.ServiceReferenceMetricEsPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.ServiceReferenceMinuteMetricEsPersistenceDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -241,7 +241,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceMetricPersistenceDAO.class, new ServiceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMinuteMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricEsPersistenceDAO(elasticSearchClient));
......
......@@ -42,10 +42,10 @@ public abstract class AbstractPersistenceEsDAO<STREAM_DATA extends StreamData> e
super(client);
}
protected abstract String tableName();
protected abstract STREAM_DATA esDataToStreamData(Map<String, Object> source);
protected abstract String tableName();
@Override public final STREAM_DATA get(String id) {
GetResponse getResponse = getClient().prepareGet(tableName(), id).get();
if (getResponse.isExists()) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements IServiceReferenceMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricEsPersistenceDAO.class);
public ServiceReferenceMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public ServiceReferenceMetric get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceReferenceMetricTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric(id);
Map<String, Object> source = getResponse.getSource();
serviceReferenceMetric.setFrontApplicationId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceMetric.setBehindApplicationId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceMetric.setFrontInstanceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceMetric.setBehindInstanceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceMetric.setFrontServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceMetric.setBehindServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceMetric.setSourceValue(((Number)source.get(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceMetric.setTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue());
serviceReferenceMetric.setTransactionDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue());
serviceReferenceMetric.setTransactionErrorDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue());
serviceReferenceMetric.setBusinessTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setBusinessTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS)).longValue());
serviceReferenceMetric.setBusinessTransactionDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM)).longValue());
serviceReferenceMetric.setBusinessTransactionErrorDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue());
serviceReferenceMetric.setMqTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setMqTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS)).longValue());
serviceReferenceMetric.setMqTransactionDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM)).longValue());
serviceReferenceMetric.setMqTransactionErrorDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM)).longValue());
serviceReferenceMetric.setTimeBucket(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceMetric;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceReferenceMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, data.getFrontInstanceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, data.getBehindInstanceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, data.getTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, data.getTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, data.getBusinessTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, data.getBusinessTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, data.getBusinessTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, data.getBusinessTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, data.getMqTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, data.getMqTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, data.getMqTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, data.getMqTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(ServiceReferenceMetricTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceReferenceMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID, data.getFrontApplicationId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID, data.getBehindApplicationId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, data.getFrontInstanceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, data.getBehindInstanceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, data.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, data.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, data.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, data.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, data.getTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, data.getTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, data.getTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, data.getBusinessTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, data.getBusinessTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, data.getBusinessTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, data.getBusinessTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, data.getMqTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, data.getMqTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, data.getMqTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, data.getMqTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(ServiceReferenceMetricTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceReferenceMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceReferenceMetricTable.TABLE);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.es.base.dao.AbstractPersistenceEsDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
/**
* @author peng-yongsheng
*/
public abstract class AbstractServiceReferenceMetricEsPersistenceDAO extends AbstractPersistenceEsDAO<ServiceReferenceMetric> {
AbstractServiceReferenceMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override protected final String timeBucketColumnNameForDelete() {
return ServiceReferenceMetricTable.COLUMN_TIME_BUCKET;
}
@Override protected final ServiceReferenceMetric esDataToStreamData(Map<String, Object> source) {
ServiceReferenceMetric serviceReferenceMetric = new ServiceReferenceMetric();
serviceReferenceMetric.setId((String)source.get(ServiceReferenceMetricTable.COLUMN_ID));
serviceReferenceMetric.setMetricId((String)source.get(ServiceReferenceMetricTable.COLUMN_METRIC_ID));
serviceReferenceMetric.setFrontApplicationId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID)).intValue());
serviceReferenceMetric.setBehindApplicationId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID)).intValue());
serviceReferenceMetric.setFrontInstanceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID)).intValue());
serviceReferenceMetric.setBehindInstanceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID)).intValue());
serviceReferenceMetric.setFrontServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID)).intValue());
serviceReferenceMetric.setBehindServiceId(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID)).intValue());
serviceReferenceMetric.setSourceValue(((Number)source.get(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE)).intValue());
serviceReferenceMetric.setTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS)).longValue());
serviceReferenceMetric.setTransactionDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM)).longValue());
serviceReferenceMetric.setTransactionErrorDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM)).longValue());
serviceReferenceMetric.setBusinessTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setBusinessTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS)).longValue());
serviceReferenceMetric.setBusinessTransactionDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM)).longValue());
serviceReferenceMetric.setBusinessTransactionErrorDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM)).longValue());
serviceReferenceMetric.setMqTransactionCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS)).longValue());
serviceReferenceMetric.setMqTransactionErrorCalls(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS)).longValue());
serviceReferenceMetric.setMqTransactionDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM)).longValue());
serviceReferenceMetric.setMqTransactionErrorDurationSum(((Number)source.get(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM)).longValue());
serviceReferenceMetric.setTimeBucket(((Number)source.get(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET)).longValue());
return serviceReferenceMetric;
}
@Override protected final Map<String, Object> esStreamDataToEsData(ServiceReferenceMetric streamData) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceReferenceMetricTable.COLUMN_ID, streamData.getId());
source.put(ServiceReferenceMetricTable.COLUMN_METRIC_ID, streamData.getMetricId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_APPLICATION_ID, streamData.getFrontApplicationId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_APPLICATION_ID, streamData.getBehindApplicationId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_INSTANCE_ID, streamData.getFrontInstanceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_INSTANCE_ID, streamData.getBehindInstanceId());
source.put(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, streamData.getFrontServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, streamData.getBehindServiceId());
source.put(ServiceReferenceMetricTable.COLUMN_SOURCE_VALUE, streamData.getSourceValue());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_CALLS, streamData.getTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_CALLS, streamData.getTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_DURATION_SUM, streamData.getTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_TRANSACTION_ERROR_DURATION_SUM, streamData.getTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_CALLS, streamData.getBusinessTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_CALLS, streamData.getBusinessTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_DURATION_SUM, streamData.getBusinessTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_BUSINESS_TRANSACTION_ERROR_DURATION_SUM, streamData.getBusinessTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_CALLS, streamData.getMqTransactionCalls());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_CALLS, streamData.getMqTransactionErrorCalls());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_DURATION_SUM, streamData.getMqTransactionDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_MQ_TRANSACTION_ERROR_DURATION_SUM, streamData.getMqTransactionErrorDurationSum());
source.put(ServiceReferenceMetricTable.COLUMN_TIME_BUCKET, streamData.getTimeBucket());
return source;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceDayMetricEsPersistenceDAO extends AbstractServiceReferenceMetricEsPersistenceDAO implements IServiceReferenceDayMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> {
public ServiceReferenceDayMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public String tableName() {
return ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Day.getName();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceHourMetricEsPersistenceDAO extends AbstractServiceReferenceMetricEsPersistenceDAO implements IServiceReferenceHourMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> {
public ServiceReferenceHourMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public String tableName() {
return ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Hour.getName();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMinuteMetricEsPersistenceDAO extends AbstractServiceReferenceMetricEsPersistenceDAO implements IServiceReferenceMinuteMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> {
public ServiceReferenceMinuteMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public String tableName() {
return ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Minute.getName();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao.srmp;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.storage.TimePyramid;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetricTable;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
/**
* @author peng-yongsheng
*/
public class ServiceReferenceMonthMetricEsPersistenceDAO extends AbstractServiceReferenceMetricEsPersistenceDAO implements IServiceReferenceMonthMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> {
public ServiceReferenceMonthMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public String tableName() {
return ServiceReferenceMetricTable.TABLE + Const.ID_SPLIT + TimePyramid.Month.getName();
}
}
......@@ -77,7 +77,7 @@ import org.apache.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceUIDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.BatchH2DAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2StorageInstaller;
......@@ -219,7 +219,7 @@ public class StorageModuleH2Provider extends ModuleProvider {
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceMetricPersistenceDAO.class, new ServiceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceReferenceMinuteMetricPersistenceDAO.class, new ServiceReferenceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceMetricPersistenceDAO.class, new InstanceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceReferenceMetricPersistenceDAO.class, new InstanceReferenceMetricH2PersistenceDAO(h2Client));
......
......@@ -27,7 +27,7 @@ import java.util.Map;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
......@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements IServiceReferenceMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceReferenceMetric> {
public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements IServiceReferenceMinuteMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceReferenceMetric> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册