未验证 提交 c2a425f5 编写于 作者: 彭勇升 pengys 提交者: GitHub

Filter non-active service name when UI query. (#1262)

* #1124

Add heart beat time into service name index to filter non-active service name.

* Filter service name by heart beat time when query service name count.

* #1124
1. Add application id to be a query condition of searchService method.
2. Filter service name by given time which calculated by the minute metric TTL setting. ( CurrentTimeStamp - minuteMetricTTL )

* Fixed the auto test failure.

#1124
上级 a6d9a605
......@@ -19,28 +19,18 @@
package org.apache.skywalking.apm.collector.analysis.alarm.provider;
import org.apache.skywalking.apm.collector.analysis.alarm.define.AnalysisAlarmModule;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.ApplicationMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.ApplicationReferenceMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.instance.InstanceMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.instance.InstanceReferenceMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.service.ServiceMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.service.ServiceReferenceMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.*;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.instance.*;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.service.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.*;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarm;
import org.apache.skywalking.apm.collector.storage.table.alarm.ApplicationAlarmList;
import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarm;
import org.apache.skywalking.apm.collector.storage.table.alarm.InstanceAlarmList;
import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarm;
import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList;
import org.apache.skywalking.apm.collector.storage.table.alarm.*;
/**
* @author peng-yongsheng
......@@ -72,23 +62,12 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider {
@Override public void start() {
WorkerCreateListener workerCreateListener = new WorkerCreateListener();
ServiceMetricAlarmGraph serviceMetricAlarmGraph = new ServiceMetricAlarmGraph(getManager(), workerCreateListener);
serviceMetricAlarmGraph.create();
InstanceMetricAlarmGraph instanceMetricAlarmGraph = new InstanceMetricAlarmGraph(getManager(), workerCreateListener);
instanceMetricAlarmGraph.create();
ApplicationMetricAlarmGraph applicationMetricAlarmGraph = new ApplicationMetricAlarmGraph(getManager(), workerCreateListener);
applicationMetricAlarmGraph.create();
ServiceReferenceMetricAlarmGraph serviceReferenceMetricAlarmGraph = new ServiceReferenceMetricAlarmGraph(getManager(), workerCreateListener);
serviceReferenceMetricAlarmGraph.create();
InstanceReferenceMetricAlarmGraph instanceReferenceMetricAlarmGraph = new InstanceReferenceMetricAlarmGraph(getManager(), workerCreateListener);
instanceReferenceMetricAlarmGraph.create();
ApplicationReferenceMetricAlarmGraph applicationReferenceMetricAlarmGraph = new ApplicationReferenceMetricAlarmGraph(getManager(), workerCreateListener);
applicationReferenceMetricAlarmGraph.create();
new ServiceMetricAlarmGraph(getManager(), workerCreateListener).create();
new InstanceMetricAlarmGraph(getManager(), workerCreateListener).create();
new ApplicationMetricAlarmGraph(getManager(), workerCreateListener).create();
new ServiceReferenceMetricAlarmGraph(getManager(), workerCreateListener).create();
new InstanceReferenceMetricAlarmGraph(getManager(), workerCreateListener).create();
new ApplicationReferenceMetricAlarmGraph(getManager(), workerCreateListener).create();
registerRemoteData();
......@@ -110,6 +89,5 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider {
remoteDataRegisterService.register(InstanceAlarmList.class, new InstanceAlarmList.InstanceCreator());
remoteDataRegisterService.register(ServiceAlarm.class, new ServiceAlarm.InstanceCreator());
remoteDataRegisterService.register(ServiceAlarmList.class, new ServiceAlarmList.InstanceCreator());
}
}
......@@ -19,14 +19,8 @@
package org.apache.skywalking.apm.collector.analysis.jvm.provider;
import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IMemoryPoolMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.CpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.GCMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.MemoryMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.MemoryPoolMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.*;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.service.*;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.cpu.CpuMetricPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.gc.GCMetricPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memory.MemoryMetricPersistenceGraph;
......@@ -34,7 +28,6 @@ import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memorypo
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.core.module.*;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -86,16 +79,9 @@ public class AnalysisJVMModuleProvider extends ModuleProvider {
}
private void graphCreate(WorkerCreateListener workerCreateListener) {
CpuMetricPersistenceGraph cpuMetricPersistenceGraph = new CpuMetricPersistenceGraph(getManager(), workerCreateListener);
cpuMetricPersistenceGraph.create();
GCMetricPersistenceGraph gcMetricPersistenceGraph = new GCMetricPersistenceGraph(getManager(), workerCreateListener);
gcMetricPersistenceGraph.create();
MemoryMetricPersistenceGraph memoryMetricPersistenceGraph = new MemoryMetricPersistenceGraph(getManager(), workerCreateListener);
memoryMetricPersistenceGraph.create();
MemoryPoolMetricPersistenceGraph memoryPoolMetricPersistenceGraph = new MemoryPoolMetricPersistenceGraph(getManager(), workerCreateListener);
memoryPoolMetricPersistenceGraph.create();
new CpuMetricPersistenceGraph(getManager(), workerCreateListener).create();
new GCMetricPersistenceGraph(getManager(), workerCreateListener).create();
new MemoryMetricPersistenceGraph(getManager(), workerCreateListener).create();
new MemoryPoolMetricPersistenceGraph(getManager(), workerCreateListener).create();
}
}
......@@ -38,4 +38,5 @@ public class MetricGraphIdDefine {
public static final int INSTANCE_MAPPING_GRAPH_ID = 411;
public static final int INSTANCE_HEART_BEAT_PERSISTENCE_GRAPH_ID = 412;
public static final int SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID = 413;
}
......@@ -52,84 +52,88 @@ public class MetricWorkerIdDefine {
public static final int APPLICATION_REFERENCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4037;
public static final int APPLICATION_REFERENCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4038;
public static final int SERVICE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4400;
public static final int SERVICE_MINUTE_METRIC_REMOTE_WORKER_ID = 4401;
public static final int SERVICE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4402;
public static final int SERVICE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4403;
public static final int SERVICE_HOUR_METRIC_TRANSFORM_NODE_ID = 4404;
public static final int SERVICE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4405;
public static final int SERVICE_DAY_METRIC_TRANSFORM_NODE_ID = 4406;
public static final int SERVICE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4407;
public static final int SERVICE_MONTH_METRIC_TRANSFORM_NODE_ID = 4408;
public static final int SERVICE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4040;
public static final int SERVICE_MINUTE_METRIC_REMOTE_WORKER_ID = 4041;
public static final int SERVICE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4042;
public static final int SERVICE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4043;
public static final int SERVICE_HOUR_METRIC_TRANSFORM_NODE_ID = 4044;
public static final int SERVICE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4045;
public static final int SERVICE_DAY_METRIC_TRANSFORM_NODE_ID = 4046;
public static final int SERVICE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4047;
public static final int SERVICE_MONTH_METRIC_TRANSFORM_NODE_ID = 4048;
public static final int INSTANCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4500;
public static final int INSTANCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4501;
public static final int INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4502;
public static final int INSTANCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4503;
public static final int INSTANCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4504;
public static final int INSTANCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4505;
public static final int INSTANCE_DAY_METRIC_TRANSFORM_NODE_ID = 4506;
public static final int INSTANCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4507;
public static final int INSTANCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4508;
public static final int INSTANCE_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4050;
public static final int INSTANCE_MINUTE_METRIC_REMOTE_WORKER_ID = 4051;
public static final int INSTANCE_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4052;
public static final int INSTANCE_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4053;
public static final int INSTANCE_HOUR_METRIC_TRANSFORM_NODE_ID = 4054;
public static final int INSTANCE_DAY_METRIC_PERSISTENCE_WORKER_ID = 4055;
public static final int INSTANCE_DAY_METRIC_TRANSFORM_NODE_ID = 4056;
public static final int INSTANCE_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4057;
public static final int INSTANCE_MONTH_METRIC_TRANSFORM_NODE_ID = 4058;
public static final int APPLICATION_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4600;
public static final int APPLICATION_MINUTE_METRIC_REMOTE_WORKER_ID = 4601;
public static final int APPLICATION_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4602;
public static final int APPLICATION_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4603;
public static final int APPLICATION_HOUR_METRIC_TRANSFORM_NODE_ID = 4604;
public static final int APPLICATION_DAY_METRIC_PERSISTENCE_WORKER_ID = 4605;
public static final int APPLICATION_DAY_METRIC_TRANSFORM_NODE_ID = 4606;
public static final int APPLICATION_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4607;
public static final int APPLICATION_MONTH_METRIC_TRANSFORM_NODE_ID = 4608;
public static final int APPLICATION_MINUTE_METRIC_AGGREGATION_WORKER_ID = 4060;
public static final int APPLICATION_MINUTE_METRIC_REMOTE_WORKER_ID = 4061;
public static final int APPLICATION_MINUTE_METRIC_PERSISTENCE_WORKER_ID = 4062;
public static final int APPLICATION_HOUR_METRIC_PERSISTENCE_WORKER_ID = 4063;
public static final int APPLICATION_HOUR_METRIC_TRANSFORM_NODE_ID = 4064;
public static final int APPLICATION_DAY_METRIC_PERSISTENCE_WORKER_ID = 4065;
public static final int APPLICATION_DAY_METRIC_TRANSFORM_NODE_ID = 4066;
public static final int APPLICATION_MONTH_METRIC_PERSISTENCE_WORKER_ID = 4067;
public static final int APPLICATION_MONTH_METRIC_TRANSFORM_NODE_ID = 4068;
public static final int INSTANCE_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4700;
public static final int INSTANCE_MAPPING_MINUTE_REMOTE_WORKER_ID = 4701;
public static final int INSTANCE_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4702;
public static final int INSTANCE_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4703;
public static final int INSTANCE_MAPPING_HOUR_TRANSFORM_NODE_ID = 4704;
public static final int INSTANCE_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4705;
public static final int INSTANCE_MAPPING_DAY_TRANSFORM_NODE_ID = 4706;
public static final int INSTANCE_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4707;
public static final int INSTANCE_MAPPING_MONTH_TRANSFORM_NODE_ID = 4708;
public static final int INSTANCE_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4070;
public static final int INSTANCE_MAPPING_MINUTE_REMOTE_WORKER_ID = 4071;
public static final int INSTANCE_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4072;
public static final int INSTANCE_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4073;
public static final int INSTANCE_MAPPING_HOUR_TRANSFORM_NODE_ID = 4074;
public static final int INSTANCE_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4075;
public static final int INSTANCE_MAPPING_DAY_TRANSFORM_NODE_ID = 4076;
public static final int INSTANCE_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4077;
public static final int INSTANCE_MAPPING_MONTH_TRANSFORM_NODE_ID = 4078;
public static final int APPLICATION_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4800;
public static final int APPLICATION_MAPPING_MINUTE_REMOTE_WORKER_ID = 4801;
public static final int APPLICATION_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4802;
public static final int APPLICATION_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4803;
public static final int APPLICATION_MAPPING_HOUR_TRANSFORM_NODE_ID = 4804;
public static final int APPLICATION_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4805;
public static final int APPLICATION_MAPPING_DAY_TRANSFORM_NODE_ID = 4806;
public static final int APPLICATION_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4807;
public static final int APPLICATION_MAPPING_MONTH_TRANSFORM_NODE_ID = 4808;
public static final int APPLICATION_MAPPING_MINUTE_AGGREGATION_WORKER_ID = 4080;
public static final int APPLICATION_MAPPING_MINUTE_REMOTE_WORKER_ID = 4081;
public static final int APPLICATION_MAPPING_MINUTE_PERSISTENCE_WORKER_ID = 4082;
public static final int APPLICATION_MAPPING_HOUR_PERSISTENCE_WORKER_ID = 4083;
public static final int APPLICATION_MAPPING_HOUR_TRANSFORM_NODE_ID = 4084;
public static final int APPLICATION_MAPPING_DAY_PERSISTENCE_WORKER_ID = 4085;
public static final int APPLICATION_MAPPING_DAY_TRANSFORM_NODE_ID = 4086;
public static final int APPLICATION_MAPPING_MONTH_PERSISTENCE_WORKER_ID = 4087;
public static final int APPLICATION_MAPPING_MONTH_TRANSFORM_NODE_ID = 4088;
public static final int APPLICATION_COMPONENT_MINUTE_AGGREGATION_WORKER_ID = 4900;
public static final int APPLICATION_COMPONENT_MINUTE_REMOTE_WORKER_ID = 4901;
public static final int APPLICATION_COMPONENT_MINUTE_PERSISTENCE_WORKER_ID = 4902;
public static final int APPLICATION_COMPONENT_HOUR_PERSISTENCE_WORKER_ID = 4903;
public static final int APPLICATION_COMPONENT_HOUR_TRANSFORM_NODE_ID = 4904;
public static final int APPLICATION_COMPONENT_DAY_PERSISTENCE_WORKER_ID = 4905;
public static final int APPLICATION_COMPONENT_DAY_TRANSFORM_NODE_ID = 4906;
public static final int APPLICATION_COMPONENT_MONTH_PERSISTENCE_WORKER_ID = 4907;
public static final int APPLICATION_COMPONENT_MONTH_TRANSFORM_NODE_ID = 4908;
public static final int APPLICATION_COMPONENT_MINUTE_AGGREGATION_WORKER_ID = 4090;
public static final int APPLICATION_COMPONENT_MINUTE_REMOTE_WORKER_ID = 4091;
public static final int APPLICATION_COMPONENT_MINUTE_PERSISTENCE_WORKER_ID = 4092;
public static final int APPLICATION_COMPONENT_HOUR_PERSISTENCE_WORKER_ID = 4093;
public static final int APPLICATION_COMPONENT_HOUR_TRANSFORM_NODE_ID = 4094;
public static final int APPLICATION_COMPONENT_DAY_PERSISTENCE_WORKER_ID = 4095;
public static final int APPLICATION_COMPONENT_DAY_TRANSFORM_NODE_ID = 4096;
public static final int APPLICATION_COMPONENT_MONTH_PERSISTENCE_WORKER_ID = 4097;
public static final int APPLICATION_COMPONENT_MONTH_TRANSFORM_NODE_ID = 4098;
public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_AGGREGATION_WORKER_ID = 4040;
public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_REMOTE_WORKER_ID = 4041;
public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_PERSISTENCE_WORKER_ID = 4042;
public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_PERSISTENCE_WORKER_ID = 4043;
public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_TRANSFORM_NODE_ID = 4044;
public static final int RESPONSE_TIME_DISTRIBUTION_DAY_PERSISTENCE_WORKER_ID = 4045;
public static final int RESPONSE_TIME_DISTRIBUTION_DAY_TRANSFORM_NODE_ID = 4046;
public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_PERSISTENCE_WORKER_ID = 4047;
public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_TRANSFORM_NODE_ID = 4048;
public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_AGGREGATION_WORKER_ID = 4100;
public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_REMOTE_WORKER_ID = 4101;
public static final int RESPONSE_TIME_DISTRIBUTION_MINUTE_PERSISTENCE_WORKER_ID = 4102;
public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_PERSISTENCE_WORKER_ID = 4103;
public static final int RESPONSE_TIME_DISTRIBUTION_HOUR_TRANSFORM_NODE_ID = 4104;
public static final int RESPONSE_TIME_DISTRIBUTION_DAY_PERSISTENCE_WORKER_ID = 4105;
public static final int RESPONSE_TIME_DISTRIBUTION_DAY_TRANSFORM_NODE_ID = 4106;
public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_PERSISTENCE_WORKER_ID = 4107;
public static final int RESPONSE_TIME_DISTRIBUTION_MONTH_TRANSFORM_NODE_ID = 4108;
public static final int GLOBAL_TRACE_PERSISTENCE_WORKER_ID = 427;
public static final int SEGMENT_DURATION_PERSISTENCE_WORKER_ID = 428;
public static final int GLOBAL_TRACE_PERSISTENCE_WORKER_ID = 4110;
public static final int SEGMENT_DURATION_PERSISTENCE_WORKER_ID = 4120;
public static final int INSTANCE_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 429;
public static final int APPLICATION_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 430;
public static final int SERVICE_METRIC_GRAPH_BRIDGE_WORKER_ID = 431;
public static final int INSTANCE_METRIC_GRAPH_BRIDGE_WORKER_ID = 432;
public static final int APPLICATION_METRIC_GRAPH_BRIDGE_WORKER_ID = 433;
public static final int INSTANCE_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 4130;
public static final int APPLICATION_REFERENCE_GRAPH_BRIDGE_WORKER_ID = 4140;
public static final int SERVICE_METRIC_GRAPH_BRIDGE_WORKER_ID = 4150;
public static final int INSTANCE_METRIC_GRAPH_BRIDGE_WORKER_ID = 4160;
public static final int APPLICATION_METRIC_GRAPH_BRIDGE_WORKER_ID = 4170;
public static final int INST_HEART_BEAT_PERSISTENCE_WORKER_ID = 400;
public static final int INST_HEART_BEAT_PERSISTENCE_WORKER_ID = 4180;
public static final int SERVICE_NAME_HEART_BEAT_AGGREGATION_WORKER_ID = 4190;
public static final int SERVICE_NAME_HEART_BEAT_REMOTE_WORKER_ID = 4191;
public static final int SERVICE_NAME_HEART_BEAT_PERSISTENCE_WORKER_ID = 4192;
}
......@@ -21,49 +21,34 @@ package org.apache.skywalking.apm.collector.analysis.metric.provider;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.metric.provider.service.InstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.ApplicationComponentGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.ApplicationComponentSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.ApplicationMappingGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.ApplicationMappingSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.component.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.mapping.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.metric.ApplicationMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.application.refmetric.ApplicationReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.GlobalTraceSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std.ResponseTimeDistributionGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std.ResponseTimeDistributionSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.global.std.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.heartbeat.InstanceHeartBeatPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.InstanceMappingGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.InstanceMappingSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.mapping.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.metric.InstanceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.instance.refmetric.InstanceReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentDurationGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.SegmentDurationSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.segment.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat.*;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.metric.ServiceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricGraph;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.ServiceReferenceMetricSpanListener;
import org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.refmetric.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.core.module.*;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.application.*;
import org.apache.skywalking.apm.collector.storage.table.global.ResponseTimeDistribution;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMapping;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceMetric;
import org.apache.skywalking.apm.collector.storage.table.instance.InstanceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric;
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.apache.skywalking.apm.collector.storage.table.instance.*;
import org.apache.skywalking.apm.collector.storage.table.service.*;
/**
* @author peng-yongsheng
......@@ -122,47 +107,27 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
segmentParserListenerRegister.register(new GlobalTraceSpanListener.Factory());
segmentParserListenerRegister.register(new SegmentDurationSpanListener.Factory());
segmentParserListenerRegister.register(new ResponseTimeDistributionSpanListener.Factory());
segmentParserListenerRegister.register(new ServiceNameSpanListener.Factory());
}
private void graphCreate(WorkerCreateListener workerCreateListener) {
ServiceReferenceMetricGraph serviceReferenceMetricGraph = new ServiceReferenceMetricGraph(getManager(), workerCreateListener);
serviceReferenceMetricGraph.create();
InstanceReferenceMetricGraph instanceReferenceMetricGraph = new InstanceReferenceMetricGraph(getManager(), workerCreateListener);
instanceReferenceMetricGraph.create();
ApplicationReferenceMetricGraph applicationReferenceMetricGraph = new ApplicationReferenceMetricGraph(getManager(), workerCreateListener);
applicationReferenceMetricGraph.create();
ServiceMetricGraph serviceMetricGraph = new ServiceMetricGraph(getManager(), workerCreateListener);
serviceMetricGraph.create();
InstanceMetricGraph instanceMetricGraph = new InstanceMetricGraph(getManager(), workerCreateListener);
instanceMetricGraph.create();
ApplicationMetricGraph applicationMetricGraph = new ApplicationMetricGraph(getManager(), workerCreateListener);
applicationMetricGraph.create();
ApplicationComponentGraph applicationComponentGraph = new ApplicationComponentGraph(getManager(), workerCreateListener);
applicationComponentGraph.create();
ApplicationMappingGraph applicationMappingGraph = new ApplicationMappingGraph(getManager(), workerCreateListener);
applicationMappingGraph.create();
InstanceMappingGraph instanceMappingGraph = new InstanceMappingGraph(getManager(), workerCreateListener);
instanceMappingGraph.create();
GlobalTraceGraph globalTraceGraph = new GlobalTraceGraph(getManager(), workerCreateListener);
globalTraceGraph.create();
ResponseTimeDistributionGraph responseTimeDistributionGraph = new ResponseTimeDistributionGraph(getManager(), workerCreateListener);
responseTimeDistributionGraph.create();
SegmentDurationGraph segmentDurationGraph = new SegmentDurationGraph(getManager(), workerCreateListener);
segmentDurationGraph.create();
InstanceHeartBeatPersistenceGraph instanceHeartBeatPersistenceGraph = new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener);
instanceHeartBeatPersistenceGraph.create();
new ServiceReferenceMetricGraph(getManager(), workerCreateListener).create();
new ServiceMetricGraph(getManager(), workerCreateListener).create();
new ServiceNameHeartBeatGraph(getManager(), workerCreateListener).create();
new InstanceHeartBeatPersistenceGraph(getManager(), workerCreateListener).create();
new InstanceMappingGraph(getManager(), workerCreateListener).create();
new InstanceReferenceMetricGraph(getManager(), workerCreateListener).create();
new InstanceMetricGraph(getManager(), workerCreateListener).create();
new ApplicationComponentGraph(getManager(), workerCreateListener).create();
new ApplicationMappingGraph(getManager(), workerCreateListener).create();
new ApplicationReferenceMetricGraph(getManager(), workerCreateListener).create();
new ApplicationMetricGraph(getManager(), workerCreateListener).create();
new GlobalTraceGraph(getManager(), workerCreateListener).create();
new ResponseTimeDistributionGraph(getManager(), workerCreateListener).create();
new SegmentDurationGraph(getManager(), workerCreateListener).create();
}
private void registerRemoteData() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.AggregationWorker;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.table.register.*;
/**
* @author peng-yongsheng
*/
public class ServiceNameAggregationWorker extends AggregationWorker<ServiceName, ServiceName> {
private ServiceNameAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_NAME_HEART_BEAT_AGGREGATION_WORKER_ID;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceName, ServiceName, ServiceNameAggregationWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceNameAggregationWorker workerInstance(ModuleManager moduleManager) {
return new ServiceNameAggregationWorker(moduleManager);
}
@Override public int queueSize() {
return 256;
}
}
@GraphComputingMetric(name = "/aggregate/onWork/" + ServiceNameTable.TABLE)
@Override protected void onWork(ServiceName message) throws WorkerException {
super.onWork(message);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public class ServiceNameHeartBeatGraph {
private final ModuleManager moduleManager;
private final WorkerCreateListener workerCreateListener;
public ServiceNameHeartBeatGraph(ModuleManager moduleManager, WorkerCreateListener workerCreateListener) {
this.moduleManager = moduleManager;
this.workerCreateListener = workerCreateListener;
}
public void create() {
RemoteSenderService remoteSenderService = moduleManager.find(RemoteModule.NAME).getService(RemoteSenderService.class);
Graph<ServiceName> graph = GraphManager.INSTANCE.createIfAbsent(MetricGraphIdDefine.SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID, ServiceName.class);
graph.addNode(new ServiceNameAggregationWorker.Factory(moduleManager).create(workerCreateListener))
.addNext(new ServiceNameHeartBeatRemoteWorker.Factory(moduleManager, remoteSenderService, MetricGraphIdDefine.SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID).create(workerCreateListener))
.addNext(new ServiceNameHeartBeatPersistenceWorker.Factory(moduleManager).create(workerCreateListener));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public class ServiceNameHeartBeatPersistenceWorker extends MergePersistenceWorker<ServiceName> {
private ServiceNameHeartBeatPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_NAME_HEART_BEAT_PERSISTENCE_WORKER_ID;
}
@Override protected boolean needMergeDBData() {
return true;
}
@SuppressWarnings("unchecked")
@Override protected IPersistenceDAO<?, ?, ServiceName> persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IServiceNameHeartBeatPersistenceDAO.class);
}
public static class Factory extends MergePersistenceWorkerProvider<ServiceName, ServiceNameHeartBeatPersistenceWorker> {
public Factory(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public ServiceNameHeartBeatPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceNameHeartBeatPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
@GraphComputingMetric(name = "/persistence/onWork/serviceName/heartbeat")
@Override protected void onWork(ServiceName input) {
super.onWork(input);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.service.*;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public class ServiceNameHeartBeatRemoteWorker extends AbstractRemoteWorker<ServiceName, ServiceName> {
private ServiceNameHeartBeatRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return MetricWorkerIdDefine.SERVICE_NAME_HEART_BEAT_REMOTE_WORKER_ID;
}
@Override protected void onWork(ServiceName serviceName) {
onNext(serviceName);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceName, ServiceName, ServiceNameHeartBeatRemoteWorker> {
public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) {
super(moduleManager, remoteSenderService, graphId);
}
@Override public ServiceNameHeartBeatRemoteWorker workerInstance(ModuleManager moduleManager) {
return new ServiceNameHeartBeatRemoteWorker(moduleManager);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.heartbeat;
import java.util.*;
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricGraphIdDefine;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.decorator.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.listener.*;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.core.graph.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public class ServiceNameSpanListener implements EntrySpanListener, ExitSpanListener, LocalSpanListener {
private List<ServiceName> serviceNames;
private ServiceNameSpanListener() {
this.serviceNames = new LinkedList<>();
}
@Override public boolean containsPoint(Point point) {
return Point.Entry.equals(point) || Point.Exit.equals(point) || Point.Local.equals(point);
}
@Override public void parseEntry(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
ServiceName serviceName = new ServiceName();
serviceName.setId(String.valueOf(spanDecorator.getOperationNameId()));
serviceName.setHeartBeatTime(segmentCoreInfo.getStartTime());
serviceNames.add(serviceName);
for (int i = 0; i < spanDecorator.getRefsCount(); i++) {
ReferenceDecorator referenceDecorator = spanDecorator.getRefs(i);
ServiceName entryServiceName = new ServiceName();
entryServiceName.setId(String.valueOf(referenceDecorator.getEntryServiceId()));
entryServiceName.setHeartBeatTime(segmentCoreInfo.getStartTime());
serviceNames.add(entryServiceName);
ServiceName parentServiceName = new ServiceName();
parentServiceName.setId(String.valueOf(referenceDecorator.getParentServiceId()));
parentServiceName.setHeartBeatTime(segmentCoreInfo.getStartTime());
serviceNames.add(parentServiceName);
}
}
@Override public void parseExit(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
ServiceName serviceName = new ServiceName();
serviceName.setId(String.valueOf(spanDecorator.getOperationNameId()));
serviceName.setHeartBeatTime(segmentCoreInfo.getStartTime());
serviceNames.add(serviceName);
}
@Override public void parseLocal(SpanDecorator spanDecorator, SegmentCoreInfo segmentCoreInfo) {
ServiceName serviceName = new ServiceName();
serviceName.setId(String.valueOf(spanDecorator.getOperationNameId()));
serviceName.setHeartBeatTime(segmentCoreInfo.getStartTime());
serviceNames.add(serviceName);
}
@Override public void build() {
Graph<ServiceName> graph = GraphManager.INSTANCE.findGraph(MetricGraphIdDefine.SERVICE_HEART_BEAT_PERSISTENCE_GRAPH_ID, ServiceName.class);
serviceNames.forEach(graph::start);
}
public static class Factory implements SpanListenerFactory {
@GraphComputingMetric(name = "/segment/parse/createSpanListeners/serviceNameSpanListener")
@Override public SpanListener create(ModuleManager moduleManager) {
return new ServiceNameSpanListener();
}
}
}
......@@ -19,30 +19,17 @@
package org.apache.skywalking.apm.collector.analysis.register.provider;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.INetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IServiceNameService;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.ApplicationRegisterGraph;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.InstanceRegisterGraph;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.NetworkAddressRegisterGraph;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.ServiceNameRegisterGraph;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.ApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.InstanceIDService;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.NetworkAddressIDService;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.ServiceNameService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.*;
import org.apache.skywalking.apm.collector.analysis.register.provider.register.*;
import org.apache.skywalking.apm.collector.analysis.register.provider.service.*;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.core.module.*;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.table.register.Application;
import org.apache.skywalking.apm.collector.storage.table.register.Instance;
import org.apache.skywalking.apm.collector.storage.table.register.NetworkAddress;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.apache.skywalking.apm.collector.storage.table.register.*;
/**
* @author peng-yongsheng
......@@ -94,17 +81,10 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider {
}
private void graphCreate(WorkerCreateListener workerCreateListener) {
ApplicationRegisterGraph applicationRegisterGraph = new ApplicationRegisterGraph(getManager(), workerCreateListener);
applicationRegisterGraph.create();
InstanceRegisterGraph instanceRegisterGraph = new InstanceRegisterGraph(getManager(), workerCreateListener);
instanceRegisterGraph.create();
ServiceNameRegisterGraph serviceNameRegisterGraph = new ServiceNameRegisterGraph(getManager(), workerCreateListener);
serviceNameRegisterGraph.create();
NetworkAddressRegisterGraph networkAddressRegisterGraph = new NetworkAddressRegisterGraph(getManager(), workerCreateListener);
networkAddressRegisterGraph.create();
new ApplicationRegisterGraph(getManager(), workerCreateListener).create();
new InstanceRegisterGraph(getManager(), workerCreateListener).create();
new ServiceNameRegisterGraph(getManager(), workerCreateListener).create();
new NetworkAddressRegisterGraph(getManager(), workerCreateListener).create();
}
private void registerRemoteData() {
......
......@@ -19,23 +19,16 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractRemoteWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.apache.skywalking.apm.collector.remote.service.Selector;
import org.apache.skywalking.apm.collector.remote.service.*;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker<ServiceName, ServiceName> {
private static final Logger logger = LoggerFactory.getLogger(ServiceNameRegisterRemoteWorker.class);
private ServiceNameRegisterRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
......@@ -44,7 +37,7 @@ public class ServiceNameRegisterRemoteWorker extends AbstractRemoteWorker<Servic
return WorkerIdDefine.SERVICE_NAME_REGISTER_REMOTE_WORKER;
}
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
@Override protected void onWork(ServiceName serviceName) {
onNext(serviceName);
}
......
......@@ -19,9 +19,7 @@
package org.apache.skywalking.apm.collector.analysis.register.provider.register;
import org.apache.skywalking.apm.collector.analysis.register.define.graph.WorkerIdDefine;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorker;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.*;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
......@@ -29,8 +27,7 @@ import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -52,10 +49,11 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
return WorkerIdDefine.SERVICE_NAME_REGISTER_SERIAL_WORKER;
}
@Override protected void onWork(ServiceName serviceName) throws WorkerException {
@Override protected void onWork(ServiceName serviceName) {
logger.debug("register service name: {}, application id: {}", serviceName.getServiceName(), serviceName.getApplicationId());
int serviceId = serviceIdCacheService.get(serviceName.getApplicationId(), serviceName.getSrcSpanType(), serviceName.getServiceName());
if (serviceId == 0) {
long now = System.currentTimeMillis();
ServiceName newServiceName;
int min = serviceNameRegisterDAO.getMinServiceId();
......@@ -66,6 +64,8 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
noneServiceName.setServiceId(Const.NONE_SERVICE_ID);
noneServiceName.setServiceName(Const.NONE_SERVICE_NAME);
noneServiceName.setSrcSpanType(Const.SPAN_TYPE_VIRTUAL);
noneServiceName.setRegisterTime(now);
noneServiceName.setHeartBeatTime(now);
serviceNameRegisterDAO.save(noneServiceName);
newServiceName = new ServiceName();
......@@ -74,6 +74,8 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
newServiceName.setServiceId(-1);
newServiceName.setSrcSpanType(serviceName.getSrcSpanType());
newServiceName.setServiceName(serviceName.getServiceName());
newServiceName.setRegisterTime(now);
newServiceName.setHeartBeatTime(now);
} else {
int max = serviceNameRegisterDAO.getMaxServiceId();
serviceId = IdAutoIncrement.INSTANCE.increment(min, max);
......@@ -84,6 +86,8 @@ public class ServiceNameRegisterSerialWorker extends AbstractLocalAsyncWorker<Se
newServiceName.setServiceId(serviceId);
newServiceName.setSrcSpanType(serviceName.getSrcSpanType());
newServiceName.setServiceName(serviceName.getServiceName());
newServiceName.setRegisterTime(now);
newServiceName.setHeartBeatTime(now);
}
serviceNameRegisterDAO.save(newServiceName);
}
......
......@@ -20,21 +20,16 @@ package org.apache.skywalking.apm.collector.analysis.segment.parser.provider;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer.BufferFileConfig;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer.SegmentBufferReader;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentParserListenerManager;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.SegmentPersistenceGraph;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.*;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.parser.standardization.SegmentStandardizationGraph;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.SegmentParseService;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.SegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.service.*;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.core.module.*;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
import org.apache.skywalking.apm.collector.storage.StorageModule;
/**
......@@ -90,10 +85,7 @@ public class AnalysisSegmentParserModuleProvider extends ModuleProvider {
}
private void graphCreate(WorkerCreateListener workerCreateListener) {
SegmentPersistenceGraph segmentPersistenceGraph = new SegmentPersistenceGraph(getManager(), workerCreateListener);
segmentPersistenceGraph.create();
SegmentStandardizationGraph segmentStandardizationGraph = new SegmentStandardizationGraph(getManager(), workerCreateListener);
segmentStandardizationGraph.create();
new SegmentPersistenceGraph(getManager(), workerCreateListener).create();
new SegmentStandardizationGraph(getManager(), workerCreateListener).create();
}
}
......@@ -39,7 +39,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<INPUT extends QueueData,
workerCreateListener.addWorker(localAsyncWorker);
LocalAsyncWorkerRef<INPUT, OUTPUT> localAsyncWorkerRef = new LocalAsyncWorkerRef<>(localAsyncWorker);
DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, 8192 * 2);
DataCarrier<INPUT> dataCarrier = new DataCarrier<>(1, queueSize());
localAsyncWorkerRef.setQueueEventHandler(dataCarrier);
dataCarrier.consume(localAsyncWorkerRef, 1);
return localAsyncWorkerRef;
......
......@@ -18,114 +18,29 @@
package org.apache.skywalking.apm.collector.storage;
import java.util.ArrayList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.collector.core.module.ModuleDefine;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentDurationPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMinutePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.acp.IApplicationComponentMonthPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMinutePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmListMonthPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IApplicationReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IInstanceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmListPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.alarm.IServiceReferenceAlarmPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.amp.IApplicationMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingMinutePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ampp.IApplicationMappingMonthPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.armp.IApplicationReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.cache.IApplicationCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.cache.IInstanceCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.cache.INetworkAddressCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.cache.IServiceNameCacheDAO;
import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.cpu.ICpuMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.gc.IGCDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.gc.IGCHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.gc.IGCMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.imp.IInstanceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingMinutePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.impp.IInstanceMappingMonthPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.irmp.IInstanceReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.memory.IMemoryMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.mpool.IMemoryPoolMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.IApplicationRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.IInstanceRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.INetworkAddressRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionDayPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionHourPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionMinutePersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.rtd.IResponseTimeDistributionMonthPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.smp.IServiceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceDayMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceHourMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMinuteMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.srmp.IServiceReferenceMonthMetricPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmListUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationAlarmUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationComponentUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMappingUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IApplicationReferenceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.ICpuMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IGCMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IGlobalTraceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceAlarmUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IInstanceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IMemoryMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.INetworkAddressUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IResponseTimeDistributionUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentDurationUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.ISegmentUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceAlarmUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceReferenceMetricUIDAO;
import org.apache.skywalking.apm.collector.storage.dao.*;
import org.apache.skywalking.apm.collector.storage.dao.acp.*;
import org.apache.skywalking.apm.collector.storage.dao.alarm.*;
import org.apache.skywalking.apm.collector.storage.dao.amp.*;
import org.apache.skywalking.apm.collector.storage.dao.ampp.*;
import org.apache.skywalking.apm.collector.storage.dao.armp.*;
import org.apache.skywalking.apm.collector.storage.dao.cache.*;
import org.apache.skywalking.apm.collector.storage.dao.cpu.*;
import org.apache.skywalking.apm.collector.storage.dao.gc.*;
import org.apache.skywalking.apm.collector.storage.dao.imp.*;
import org.apache.skywalking.apm.collector.storage.dao.impp.*;
import org.apache.skywalking.apm.collector.storage.dao.irmp.*;
import org.apache.skywalking.apm.collector.storage.dao.memory.*;
import org.apache.skywalking.apm.collector.storage.dao.mpool.*;
import org.apache.skywalking.apm.collector.storage.dao.register.*;
import org.apache.skywalking.apm.collector.storage.dao.rtd.*;
import org.apache.skywalking.apm.collector.storage.dao.smp.*;
import org.apache.skywalking.apm.collector.storage.dao.srmp.*;
import org.apache.skywalking.apm.collector.storage.dao.ui.*;
import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService;
/**
* @author peng-yongsheng
......@@ -142,6 +57,8 @@ public class StorageModule extends ModuleDefine {
List<Class> classes = new ArrayList<>();
classes.add(IBatchDAO.class);
classes.add(ITTLConfigService.class);
addCacheDAO(classes);
addRegisterDAO(classes);
addPersistenceDAO(classes);
......@@ -205,6 +122,7 @@ public class StorageModule extends ModuleDefine {
classes.add(ISegmentDurationPersistenceDAO.class);
classes.add(ISegmentPersistenceDAO.class);
classes.add(IInstanceHeartBeatPersistenceDAO.class);
classes.add(IServiceNameHeartBeatPersistenceDAO.class);
classes.add(IResponseTimeDistributionMinutePersistenceDAO.class);
classes.add(IResponseTimeDistributionHourPersistenceDAO.class);
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.dao;
import org.apache.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
/**
* @author peng-yongsheng
*/
public interface IServiceNameHeartBeatPersistenceDAO<INSERT, UPDATE, STREAM_DATA extends ServiceName> extends IPersistenceDAO<INSERT, UPDATE, STREAM_DATA> {
}
......@@ -37,21 +37,27 @@ public interface IServiceNameServiceUIDAO extends DAO {
*
* <p>SQL as: select count(SERVICE_NAME) from SERVICE_NAME
* where SRC_SPAN_TYPE = SpanType.Entry_VALUE
* and HEARTBEAT_TIME ge ${startTimeMillis}
*
* @param startTimeMillis service heart beat time after given data
* @return count of service names
*/
int getCount();
int getCount(long startTimeMillis);
/**
* <p>SQL as: select SERVICE_ID, SERVICE_NAME from SERVICE_NAME
* where SRC_SPAN_TYPE = SpanType.Entry_VALUE
* and SERVICE_NAME like '%{keyword}%'
* and APPLICATION_ID = ${applicationId}
* and HEARTBEAT_TIME ge ${startTimeMillis}
*
* <p> Note: keyword might not given
*
* @param keyword fuzzy query condition
* @param applicationId the owner id of this service
* @param startTimeMillis service heart beat time after given data
* @param topN how many rows should return
* @return not nullable result list
*/
List<ServiceInfo> searchService(String keyword, int topN);
List<ServiceInfo> searchService(String keyword, int applicationId, long startTimeMillis, int topN);
}
......@@ -18,11 +18,8 @@
package org.apache.skywalking.apm.collector.storage.table.register;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.RemoteData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
import org.apache.skywalking.apm.collector.core.data.*;
import org.apache.skywalking.apm.collector.core.data.operator.*;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
/**
......@@ -39,7 +36,7 @@ public class Instance extends StreamData {
private static final Column[] LONG_COLUMNS = {
new Column(InstanceTable.REGISTER_TIME, new CoverMergeOperation()),
new Column(InstanceTable.HEARTBEAT_TIME, new CoverMergeOperation()),
new Column(InstanceTable.HEARTBEAT_TIME, new MaxMergeOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
......
......@@ -31,10 +31,6 @@ public interface InstanceTable extends CommonTable, RegisterColumns {
ColumnName AGENT_UUID = new ColumnName("agent_uuid", "iau");
ColumnName REGISTER_TIME = new ColumnName("register_time", "irt");
ColumnName HEARTBEAT_TIME = new ColumnName("heartbeat_time", "iht");
ColumnName OS_INFO = new ColumnName("os_info", "ioi");
ColumnName IS_ADDRESS = new ColumnName("is_address", "iia");
......
......@@ -31,4 +31,8 @@ public interface RegisterColumns {
ColumnName SERVICE_ID = new ColumnName("service_id", "si");
ColumnName ADDRESS_ID = new ColumnName("address_id", "ni");
ColumnName REGISTER_TIME = new ColumnName("register_time", "rt");
ColumnName HEARTBEAT_TIME = new ColumnName("heartbeat_time", "ht");
}
......@@ -18,11 +18,8 @@
package org.apache.skywalking.apm.collector.storage.table.register;
import org.apache.skywalking.apm.collector.core.data.Column;
import org.apache.skywalking.apm.collector.core.data.RemoteData;
import org.apache.skywalking.apm.collector.core.data.StreamData;
import org.apache.skywalking.apm.collector.core.data.operator.CoverMergeOperation;
import org.apache.skywalking.apm.collector.core.data.operator.NonMergeOperation;
import org.apache.skywalking.apm.collector.core.data.*;
import org.apache.skywalking.apm.collector.core.data.operator.*;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
/**
......@@ -35,7 +32,10 @@ public class ServiceName extends StreamData {
new Column(ServiceNameTable.SERVICE_NAME, new CoverMergeOperation()),
};
private static final Column[] LONG_COLUMNS = {};
private static final Column[] LONG_COLUMNS = {
new Column(ServiceNameTable.REGISTER_TIME, new NonMergeOperation()),
new Column(ServiceNameTable.HEARTBEAT_TIME, new MaxMergeOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
......@@ -99,6 +99,22 @@ public class ServiceName extends StreamData {
setDataInteger(2, srcSpanType);
}
public long getRegisterTime() {
return getDataLong(0);
}
public void setRegisterTime(long registerTime) {
setDataLong(0, registerTime);
}
public long getHeartBeatTime() {
return getDataLong(1);
}
public void setHeartBeatTime(long heartBeatTime) {
setDataLong(1, heartBeatTime);
}
public static class InstanceCreator implements RemoteDataRegisterService.RemoteDataInstanceCreator {
@Override public RemoteData createInstance() {
return new ServiceName();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.ttl;
import org.apache.skywalking.apm.collector.core.module.Service;
/**
* @author peng-yongsheng
*/
public interface ITTLConfigService extends Service {
int traceDataTTL();
int minuteMetricDataTTL();
int hourMetricDataTTL();
int dayMetricDataTTL();
int monthMetricDataTTL();
}
......@@ -50,17 +50,14 @@ class DataTTLKeeperTimer {
private final ModuleManager moduleManager;
private final StorageModuleEsNamingListener namingListener;
private final String selfAddress;
private int traceDataTTL = 90;
private int minuteMetricDataTTL = 90;
private int hourMetricDataTTL = 36;
private int dayMetricDataTTL = 45;
private int monthMetricDataTTL = 18;
private final StorageModuleEsConfig config;
DataTTLKeeperTimer(ModuleManager moduleManager,
StorageModuleEsNamingListener namingListener, String selfAddress) {
StorageModuleEsNamingListener namingListener, String selfAddress, StorageModuleEsConfig config) {
this.moduleManager = moduleManager;
this.namingListener = namingListener;
this.selfAddress = selfAddress;
this.config = config;
}
void start() {
......@@ -91,11 +88,11 @@ class DataTTLKeeperTimer {
TimeBuckets convertTimeBucket(DateTime currentTime) {
TimeBuckets timeBuckets = new TimeBuckets();
timeBuckets.traceDataBefore = Long.valueOf(currentTime.plusMinutes(0 - traceDataTTL).toString("yyyyMMddHHmm"));
timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - minuteMetricDataTTL).toString("yyyyMMddHHmm"));
timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - hourMetricDataTTL).toString("yyyyMMddHH"));
timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - dayMetricDataTTL).toString("yyyyMMdd"));
timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - monthMetricDataTTL).toString("yyyyMM"));
timeBuckets.traceDataBefore = Long.valueOf(currentTime.plusMinutes(0 - config.getTraceDataTTL()).toString("yyyyMMddHHmm"));
timeBuckets.minuteTimeBucketBefore = Long.valueOf(currentTime.plusMinutes(0 - config.getMinuteMetricDataTTL()).toString("yyyyMMddHHmm"));
timeBuckets.hourTimeBucketBefore = Long.valueOf(currentTime.plusHours(0 - config.getHourMetricDataTTL()).toString("yyyyMMddHH"));
timeBuckets.dayTimeBucketBefore = Long.valueOf(currentTime.plusDays(0 - config.getDayMetricDataTTL()).toString("yyyyMMdd"));
timeBuckets.monthTimeBucketBefore = Long.valueOf(currentTime.plusMonths(0 - config.getMonthMetricDataTTL()).toString("yyyyMM"));
return timeBuckets;
}
......@@ -207,26 +204,6 @@ class DataTTLKeeperTimer {
moduleManager.find(StorageModule.NAME).getService(IServiceReferenceMonthMetricPersistenceDAO.class).deleteHistory(timeBuckets.monthTimeBucketBefore);
}
void setTraceDataTTL(int traceDataTTL) {
this.traceDataTTL = traceDataTTL == 0 ? 90 : traceDataTTL;
}
void setMinuteMetricDataTTL(int minuteMetricDataTTL) {
this.minuteMetricDataTTL = minuteMetricDataTTL == 0 ? 90 : minuteMetricDataTTL;
}
void setHourMetricDataTTL(int hourMetricDataTTL) {
this.hourMetricDataTTL = hourMetricDataTTL == 0 ? 36 : hourMetricDataTTL;
}
void setDayMetricDataTTL(int dayMetricDataTTL) {
this.dayMetricDataTTL = dayMetricDataTTL == 0 ? 45 : dayMetricDataTTL;
}
void setMonthMetricDataTTL(int monthMetricDataTTL) {
this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL;
}
class TimeBuckets {
private long traceDataBefore;
private long minuteTimeBucketBefore;
......
......@@ -23,16 +23,16 @@ import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchCli
/**
* @author peng-yongsheng
*/
class StorageModuleEsConfig extends ElasticSearchClientConfig {
public class StorageModuleEsConfig extends ElasticSearchClientConfig {
private int indexShardsNumber;
private int indexReplicasNumber;
private boolean highPerformanceMode;
private int traceDataTTL;
private int minuteMetricDataTTL;
private int hourMetricDataTTL;
private int dayMetricDataTTL;
private int monthMetricDataTTL;
private int traceDataTTL = 90;
private int minuteMetricDataTTL = 90;
private int hourMetricDataTTL = 36;
private int dayMetricDataTTL = 45;
private int monthMetricDataTTL = 18;
int getIndexShardsNumber() {
return indexShardsNumber;
......@@ -58,43 +58,43 @@ class StorageModuleEsConfig extends ElasticSearchClientConfig {
this.highPerformanceMode = highPerformanceMode;
}
int getTraceDataTTL() {
public int getTraceDataTTL() {
return traceDataTTL;
}
void setTraceDataTTL(int traceDataTTL) {
this.traceDataTTL = traceDataTTL;
this.traceDataTTL = traceDataTTL == 0 ? 90 : traceDataTTL;
}
int getMinuteMetricDataTTL() {
public int getMinuteMetricDataTTL() {
return minuteMetricDataTTL;
}
void setMinuteMetricDataTTL(int minuteMetricDataTTL) {
this.minuteMetricDataTTL = minuteMetricDataTTL;
this.minuteMetricDataTTL = minuteMetricDataTTL == 0 ? 90 : minuteMetricDataTTL;
}
int getHourMetricDataTTL() {
public int getHourMetricDataTTL() {
return hourMetricDataTTL;
}
void setHourMetricDataTTL(int hourMetricDataTTL) {
this.hourMetricDataTTL = hourMetricDataTTL;
this.hourMetricDataTTL = hourMetricDataTTL == 0 ? 36 : hourMetricDataTTL;
}
int getDayMetricDataTTL() {
public int getDayMetricDataTTL() {
return dayMetricDataTTL;
}
void setDayMetricDataTTL(int dayMetricDataTTL) {
this.dayMetricDataTTL = dayMetricDataTTL;
this.dayMetricDataTTL = dayMetricDataTTL == 0 ? 45 : dayMetricDataTTL;
}
int getMonthMetricDataTTL() {
public int getMonthMetricDataTTL() {
return monthMetricDataTTL;
}
void setMonthMetricDataTTL(int monthMetricDataTTL) {
this.monthMetricDataTTL = monthMetricDataTTL;
this.monthMetricDataTTL = monthMetricDataTTL == 0 ? 18 : monthMetricDataTTL;
}
}
......@@ -69,6 +69,8 @@ import org.apache.skywalking.apm.collector.storage.es.dao.rtd.*;
import org.apache.skywalking.apm.collector.storage.es.dao.smp.*;
import org.apache.skywalking.apm.collector.storage.es.dao.srmp.*;
import org.apache.skywalking.apm.collector.storage.es.dao.ui.*;
import org.apache.skywalking.apm.collector.storage.es.ttl.TTLConfigService;
import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService;
/**
* @author peng-yongsheng
......@@ -102,7 +104,9 @@ public class StorageModuleEsProvider extends ModuleProvider {
@Override public void prepare() throws ServiceNotProvidedException {
elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes(), nameSpace);
this.registerServiceImplementation(ITTLConfigService.class, new TTLConfigService(config));
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
registerRegisterDAO();
registerPersistenceDAO();
......@@ -132,12 +136,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, esRegistration.buildValue().getHostPort());
deleteTimer.setTraceDataTTL(config.getTraceDataTTL());
deleteTimer.setMinuteMetricDataTTL(config.getMinuteMetricDataTTL());
deleteTimer.setHourMetricDataTTL(config.getHourMetricDataTTL());
deleteTimer.setDayMetricDataTTL(config.getDayMetricDataTTL());
deleteTimer.setMonthMetricDataTTL(config.getMonthMetricDataTTL());
deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, esRegistration.buildValue().getHostPort(), config);
}
@Override
......@@ -241,6 +240,7 @@ public class StorageModuleEsProvider extends ModuleProvider {
this.registerServiceImplementation(IInstanceReferenceMonthMetricPersistenceDAO.class, new InstanceReferenceMonthMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceNameHeartBeatPersistenceDAO.class, new ServiceNameHeartBeatEsPersistenceDAO(elasticSearchClient));
}
private void registerUiDAO() throws ServiceNotProvidedException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.dao;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
import org.apache.skywalking.apm.collector.storage.dao.IServiceNameHeartBeatPersistenceDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.slf4j.*;
/**
* @author peng-yongsheng
*/
public class ServiceNameHeartBeatEsPersistenceDAO extends EsDAO implements IServiceNameHeartBeatPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceName> {
private static final Logger logger = LoggerFactory.getLogger(ServiceNameHeartBeatEsPersistenceDAO.class);
public ServiceNameHeartBeatEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@GraphComputingMetric(name = "/persistence/get/" + ServiceNameTable.TABLE + "/heartbeat")
@Override public ServiceName get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceNameTable.TABLE, id).get();
if (getResponse.isExists()) {
Map<String, Object> source = getResponse.getSource();
ServiceName serviceName = new ServiceName();
serviceName.setId(id);
serviceName.setServiceId(((Number)source.get(ServiceNameTable.SERVICE_ID.getName())).intValue());
serviceName.setHeartBeatTime(((Number)source.get(ServiceNameTable.HEARTBEAT_TIME.getName())).longValue());
logger.debug("service id: {} is exists", id);
return serviceName;
} else {
logger.debug("service id: {} is not exists", id);
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceName data) {
throw new UnexpectedException("Received an service name heart beat message under service id= " + data.getId() + " , which doesn't exist.");
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceName data) {
logger.info("service name heart beat, service id: {}, heart beat time: {}", data.getId(), data.getHeartBeatTime());
Map<String, Object> source = new HashMap<>();
source.put(ServiceNameTable.HEARTBEAT_TIME.getName(), data.getHeartBeatTime());
return getClient().prepareUpdate(ServiceNameTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long timeBucketBefore) {
}
}
......@@ -18,17 +18,14 @@
package org.apache.skywalking.apm.collector.storage.es.dao.register;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -58,6 +55,8 @@ public class ServiceNameRegisterEsDAO extends EsDAO implements IServiceNameRegis
target.put(ServiceNameTable.SERVICE_NAME.getName(), serviceName.getServiceName());
target.put(ServiceNameTable.SERVICE_NAME_KEYWORD.getName(), serviceName.getServiceName());
target.put(ServiceNameTable.SRC_SPAN_TYPE.getName(), serviceName.getSrcSpanType());
target.put(ServiceNameTable.REGISTER_TIME.getName(), serviceName.getRegisterTime());
target.put(ServiceNameTable.HEARTBEAT_TIME.getName(), serviceName.getHeartBeatTime());
IndexResponse response = client.prepareIndex(ServiceNameTable.TABLE, serviceName.getId()).setSource(target).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save service name register info, application getApplicationId: {}, service name: {}, status: {}", serviceName.getId(), serviceName.getServiceName(), response.status().name());
......
......@@ -18,20 +18,16 @@
package org.apache.skywalking.apm.collector.storage.es.dao.ui;
import java.util.LinkedList;
import java.util.List;
import java.util.*;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.core.util.*;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO;
import org.apache.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.apache.skywalking.apm.collector.storage.ui.service.ServiceInfo;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.action.search.*;
import org.elasticsearch.index.query.*;
import org.elasticsearch.search.SearchHit;
/**
......@@ -43,31 +39,41 @@ public class ServiceNameServiceEsUIDAO extends EsDAO implements IServiceNameServ
super(client);
}
@Override public int getCount() {
@Override public int getCount(long startTimeMillis) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceNameTable.TABLE);
searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE));
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceNameTable.HEARTBEAT_TIME.getName()).gte(startTimeMillis));
searchRequestBuilder.setQuery(boolQuery);
searchRequestBuilder.setSize(0);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
return (int)searchResponse.getHits().getTotalHits();
}
@Override public List<ServiceInfo> searchService(String keyword, int topN) {
@Override
public List<ServiceInfo> searchService(String keyword, int applicationId, long startTimeMillis, int topN) {
SearchRequestBuilder searchRequestBuilder = getClient().prepareSearch(ServiceNameTable.TABLE);
searchRequestBuilder.setTypes(ServiceNameTable.TABLE_TYPE);
searchRequestBuilder.setSearchType(SearchType.DFS_QUERY_THEN_FETCH);
searchRequestBuilder.setSize(topN);
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE));
boolQuery.must().add(QueryBuilders.rangeQuery(ServiceNameTable.HEARTBEAT_TIME.getName()).gte(startTimeMillis));
if (applicationId != Const.NONE) {
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.APPLICATION_ID.getName(), applicationId));
}
if (StringUtils.isNotEmpty(keyword)) {
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(QueryBuilders.matchQuery(ServiceNameTable.SERVICE_NAME.getName(), keyword));
boolQuery.must().add(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE));
searchRequestBuilder.setQuery(boolQuery);
} else {
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ServiceNameTable.SRC_SPAN_TYPE.getName(), SpanType.Entry_VALUE));
}
searchRequestBuilder.setQuery(boolQuery);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
SearchHit[] searchHits = searchResponse.getHits().getHits();
......
......@@ -18,8 +18,7 @@
package org.apache.skywalking.apm.collector.storage.es.define.register;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine;
import org.apache.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine;
import org.apache.skywalking.apm.collector.storage.es.base.define.*;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
/**
......@@ -41,5 +40,7 @@ public class ServiceNameEsTableDefine extends ElasticSearchTableDefine {
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.SERVICE_NAME_KEYWORD, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.SRC_SPAN_TYPE, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceNameTable.HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name()));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es.ttl;
import org.apache.skywalking.apm.collector.storage.es.StorageModuleEsConfig;
import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService;
/**
* @author peng-yongsheng
*/
public class TTLConfigService implements ITTLConfigService {
private StorageModuleEsConfig config;
public TTLConfigService(StorageModuleEsConfig config) {
this.config = config;
}
@Override public int traceDataTTL() {
return config.getTraceDataTTL();
}
@Override public int minuteMetricDataTTL() {
return config.getMinuteMetricDataTTL();
}
@Override public int hourMetricDataTTL() {
return config.getHourMetricDataTTL();
}
@Override public int dayMetricDataTTL() {
return config.getDayMetricDataTTL();
}
@Override public int monthMetricDataTTL() {
return config.getMonthMetricDataTTL();
}
}
......@@ -29,7 +29,7 @@ public class DataTTLKeeperTimerTestCase {
@Test
public void testConvertTimeBucket() {
DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null);
DataTTLKeeperTimer timer = new DataTTLKeeperTimer(null, null, null, new StorageModuleEsConfig());
DateTime currentTime = new DateTime(2018, 5, 26, 15, 5);
DataTTLKeeperTimer.TimeBuckets timeBuckets = timer.convertTimeBucket(currentTime);
......
......@@ -18,17 +18,13 @@
package org.apache.skywalking.apm.collector.storage.h2.dao.register;
import java.util.HashMap;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import java.util.*;
import org.apache.skywalking.apm.collector.client.h2.*;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.register.IServiceNameRegisterDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.apm.collector.storage.table.register.*;
import org.slf4j.*;
/**
* @author peng-yongsheng, clevertension
......@@ -61,6 +57,8 @@ public class ServiceNameRegisterH2DAO extends H2DAO implements IServiceNameRegis
target.put(ServiceNameTable.APPLICATION_ID.getName(), serviceName.getApplicationId());
target.put(ServiceNameTable.SERVICE_NAME.getName(), serviceName.getServiceName());
target.put(ServiceNameTable.SRC_SPAN_TYPE.getName(), serviceName.getSrcSpanType());
target.put(ServiceNameTable.REGISTER_TIME.getName(), serviceName.getRegisterTime());
target.put(ServiceNameTable.HEARTBEAT_TIME.getName(), serviceName.getHeartBeatTime());
String sql = SqlBuilder.buildBatchInsertSql(ServiceNameTable.TABLE, target.keySet());
Object[] params = target.values().toArray(new Object[0]);
......
......@@ -18,20 +18,16 @@
package org.apache.skywalking.apm.collector.storage.h2.dao.ui;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import java.sql.*;
import java.util.*;
import org.apache.skywalking.apm.collector.client.h2.*;
import org.apache.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.apache.skywalking.apm.collector.storage.dao.ui.IServiceNameServiceUIDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
import org.apache.skywalking.apm.collector.storage.ui.service.ServiceInfo;
import org.apache.skywalking.apm.network.proto.SpanType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;
/**
* @author peng-yongsheng
......@@ -44,10 +40,10 @@ public class ServiceNameServiceH2UIDAO extends H2DAO implements IServiceNameServ
super(client);
}
@Override public int getCount() {
String dynamicSql = "select count({0}) as cnt from {1} where {2} = ?";
String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.TABLE, ServiceNameTable.SRC_SPAN_TYPE.getName());
Object[] params = new Object[] {SpanType.Entry_VALUE};
@Override public int getCount(long startTimeMillis) {
String dynamicSql = "select count({0}) as cnt from {1} where {2} = ? and {3} >= ?";
String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.TABLE, ServiceNameTable.SRC_SPAN_TYPE.getName(), ServiceNameTable.HEARTBEAT_TIME.getName());
Object[] params = new Object[] {SpanType.Entry_VALUE, startTimeMillis};
try (ResultSet rs = getClient().executeQuery(sql, params)) {
if (rs.next()) {
......@@ -59,10 +55,11 @@ public class ServiceNameServiceH2UIDAO extends H2DAO implements IServiceNameServ
return 0;
}
@Override public List<ServiceInfo> searchService(String keyword, int topN) {
String dynamicSql = "select {0},{1} from {2} where {3} like ? and {4} = ? limit ?";
String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.TABLE, ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.SRC_SPAN_TYPE.getName());
Object[] params = new Object[] {keyword, SpanType.Entry_VALUE, topN};
@Override
public List<ServiceInfo> searchService(String keyword, int applicationId, long startTimeMillis, int topN) {
String dynamicSql = "select {0},{1} from {2} where {3} like ? and {4} = ? and {5} = ? and {6} >= ? limit ?";
String sql = SqlBuilder.buildSql(dynamicSql, ServiceNameTable.SERVICE_ID.getName(), ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.TABLE, ServiceNameTable.SERVICE_NAME.getName(), ServiceNameTable.SRC_SPAN_TYPE.getName(), ServiceNameTable.APPLICATION_ID.getName(), ServiceNameTable.HEARTBEAT_TIME.getName());
Object[] params = new Object[] {keyword, SpanType.Entry_VALUE, applicationId, startTimeMillis, topN};
List<ServiceInfo> serviceInfos = new LinkedList<>();
try (ResultSet rs = getClient().executeQuery(sql, params)) {
......
......@@ -18,8 +18,7 @@
package org.apache.skywalking.apm.collector.storage.h2.define.register;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2ColumnDefine;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2TableDefine;
import org.apache.skywalking.apm.collector.storage.h2.base.define.*;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceNameTable;
/**
......@@ -37,5 +36,7 @@ public class ServiceNameH2TableDefine extends H2TableDefine {
addColumn(new H2ColumnDefine(ServiceNameTable.SERVICE_NAME, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.SRC_SPAN_TYPE, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.REGISTER_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceNameTable.HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name()));
}
}
......@@ -18,17 +18,15 @@
package org.apache.skywalking.apm.collector.ui.query;
import java.text.ParseException;
import java.util.List;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.storage.ui.common.*;
import org.apache.skywalking.apm.collector.storage.ui.service.ServiceInfo;
import org.apache.skywalking.apm.collector.ui.graphql.Query;
import org.apache.skywalking.apm.collector.ui.service.ServiceNameService;
import org.apache.skywalking.apm.collector.ui.service.ServiceTopologyService;
import org.apache.skywalking.apm.collector.ui.service.*;
import org.apache.skywalking.apm.collector.ui.utils.DurationUtils;
import java.text.ParseException;
import java.util.List;
import static java.util.Objects.isNull;
/**
......@@ -58,8 +56,8 @@ public class ServiceQuery implements Query {
return serviceTopologyService;
}
public List<ServiceInfo> searchService(String keyword, int topN) throws ParseException {
return getServiceNameService().searchService(keyword, topN);
public List<ServiceInfo> searchService(String keyword, int applicationId, int topN) {
return getServiceNameService().searchService(keyword, applicationId, topN);
}
public ResponseTimeTrend getServiceResponseTimeTrend(int serviceId, Duration duration) throws ParseException {
......@@ -81,7 +79,7 @@ public class ServiceQuery implements Query {
return getServiceNameService().getServiceSLATrend(serviceId, duration.getStep(), startTimeBucket, endTimeBucket);
}
public Topology getServiceTopology(int serviceId, Duration duration) throws ParseException {
public Topology getServiceTopology(int serviceId, Duration duration) {
long startTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getStart());
long endTimeBucket = DurationUtils.INSTANCE.exchangeToTimeBucket(duration.getEnd());
......
......@@ -27,6 +27,7 @@ import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.dao.ui.*;
import org.apache.skywalking.apm.collector.storage.table.MetricSource;
import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
import org.apache.skywalking.apm.collector.storage.ttl.ITTLConfigService;
import org.apache.skywalking.apm.collector.storage.ui.common.*;
import org.apache.skywalking.apm.collector.storage.ui.service.*;
import org.apache.skywalking.apm.collector.storage.utils.DurationPoint;
......@@ -44,20 +45,27 @@ public class ServiceNameService {
private final IServiceMetricUIDAO serviceMetricUIDAO;
private final ServiceNameCacheService serviceNameCacheService;
private final DateBetweenService dateBetweenService;
private final ITTLConfigService configService;
public ServiceNameService(ModuleManager moduleManager) {
this.serviceNameServiceUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceNameServiceUIDAO.class);
this.serviceMetricUIDAO = moduleManager.find(StorageModule.NAME).getService(IServiceMetricUIDAO.class);
this.serviceNameCacheService = moduleManager.find(CacheModule.NAME).getService(ServiceNameCacheService.class);
this.configService = moduleManager.find(StorageModule.NAME).getService(ITTLConfigService.class);
this.dateBetweenService = new DateBetweenService(moduleManager);
}
public int getCount() {
return serviceNameServiceUIDAO.getCount();
return serviceNameServiceUIDAO.getCount(startTimeMillis());
}
public List<ServiceInfo> searchService(String keyword, int topN) {
return serviceNameServiceUIDAO.searchService(keyword, topN);
public List<ServiceInfo> searchService(String keyword, int applicationId, int topN) {
return serviceNameServiceUIDAO.searchService(keyword, applicationId, startTimeMillis(), topN);
}
private long startTimeMillis() {
int minuteMetricDataTTL = configService.minuteMetricDataTTL();
return System.currentTimeMillis() - minuteMetricDataTTL * 60 * 60 * 100;
}
public ThroughputTrend getServiceThroughputTrend(int serviceId, Step step, long startTimeBucket,
......
......@@ -18,18 +18,13 @@
package org.apache.skywalking.apm.collector.ui.query;
import org.apache.skywalking.apm.collector.storage.ui.common.Duration;
import org.apache.skywalking.apm.collector.storage.ui.common.Step;
import org.apache.skywalking.apm.collector.ui.service.ServiceNameService;
import org.apache.skywalking.apm.collector.ui.service.ServiceTopologyService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.text.ParseException;
import org.apache.skywalking.apm.collector.storage.ui.common.*;
import org.apache.skywalking.apm.collector.ui.service.*;
import org.junit.*;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import java.text.ParseException;
/**
* @author lican
*/
......@@ -50,14 +45,14 @@ public class ServiceQueryTest {
@Test
public void searchService() throws ParseException {
serviceQuery.searchService("keyword", -1);
serviceQuery.searchService("keyword", 0, -1);
}
@Test
public void getServiceResponseTimeTrend() throws ParseException {
Mockito.when(serviceNameService.getServiceResponseTimeTrend(
Mockito.anyInt(), Mockito.anyObject(),
Mockito.anyLong(), Mockito.anyLong())
Mockito.anyInt(), Mockito.anyObject(),
Mockito.anyLong(), Mockito.anyLong())
).then(invocation -> {
Object[] arguments = invocation.getArguments();
Assert.assertEquals(201701L, arguments[2]);
......@@ -74,8 +69,8 @@ public class ServiceQueryTest {
@Test
public void getServiceThroughputTrend() throws ParseException {
Mockito.when(serviceNameService.getServiceThroughputTrend(
Mockito.anyInt(), Mockito.anyObject(),
Mockito.anyLong(), Mockito.anyLong())
Mockito.anyInt(), Mockito.anyObject(),
Mockito.anyLong(), Mockito.anyLong())
).then(invocation -> {
Object[] arguments = invocation.getArguments();
Assert.assertEquals(201701L, arguments[2]);
......@@ -92,8 +87,8 @@ public class ServiceQueryTest {
@Test
public void getServiceSLATrend() throws ParseException {
Mockito.when(serviceNameService.getServiceSLATrend(
Mockito.anyInt(), Mockito.anyObject(),
Mockito.anyLong(), Mockito.anyLong())
Mockito.anyInt(), Mockito.anyObject(),
Mockito.anyLong(), Mockito.anyLong())
).then(invocation -> {
Object[] arguments = invocation.getArguments();
Assert.assertEquals(201701L, arguments[2]);
......@@ -110,9 +105,9 @@ public class ServiceQueryTest {
@Test
public void getServiceTopology() throws ParseException {
Mockito.when(serviceTopologyService.getServiceTopology(
Mockito.anyObject(), Mockito.anyInt(),
Mockito.anyLong(), Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyLong())
Mockito.anyObject(), Mockito.anyInt(),
Mockito.anyLong(), Mockito.anyLong(),
Mockito.anyLong(), Mockito.anyLong())
).then(invocation -> {
Object[] arguments = invocation.getArguments();
Assert.assertEquals(201701L, arguments[2]);
......
......@@ -79,7 +79,7 @@ public class ServiceNameServiceTest {
@Test
public void searchService() {
List<ServiceInfo> serviceInfos = serverNameService.searchService("keyword", 10);
List<ServiceInfo> serviceInfos = serverNameService.searchService("keyword", 0, 10);
Assert.assertTrue(serviceInfos.size() == 0);
}
......
......@@ -47,7 +47,7 @@ type TraceItem {
}
extend type Query {
searchService(keyword: String!, topN: Int!): [ServiceInfo!]!
searchService(keyword: String!, applicationId: ID!, topN: Int!): [ServiceInfo!]!
getServiceResponseTimeTrend(serviceId: ID!, duration: Duration!): ResponseTimeTrend
getServiceThroughputTrend(serviceId: ID!, duration: Duration!): ThroughputTrend
getServiceSLATrend(serviceId: ID!, duration: Duration!): SLATrend
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册