提交 9880f8c5 编写于 作者: P peng-yongsheng

Add service metric storage and stream aggregate.

上级 691f8810
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.core.util.Const;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.table.service.ServiceMetric;
import org.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
/**
* @author peng-yongsheng
*/
public class ServiceMetricAggregationWorker extends AggregationWorker<ServiceReferenceMetric, ServiceMetric> {
public ServiceMetricAggregationWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return ServiceMetricAggregationWorker.class.hashCode();
}
@Override protected ServiceMetric transform(ServiceReferenceMetric serviceReferenceMetric) {
Integer serviceId = serviceReferenceMetric.getBehindServiceId();
Long timeBucket = serviceReferenceMetric.getTimeBucket();
ServiceMetric serviceMetric = new ServiceMetric(String.valueOf(timeBucket) + Const.ID_SPLIT + String.valueOf(serviceId));
serviceMetric.setServiceId(serviceId);
serviceMetric.setCalls(serviceReferenceMetric.getCalls());
serviceMetric.setErrorCalls(serviceReferenceMetric.getErrorCalls());
serviceMetric.setDurationSum(serviceReferenceMetric.getDurationSum());
serviceMetric.setErrorDurationSum(serviceReferenceMetric.getErrorDurationSum());
serviceMetric.setTimeBucket(timeBucket);
return serviceMetric;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceMetric, ServiceMetricAggregationWorker> {
public Factory(ModuleManager moduleManager,
QueueCreatorService<ServiceReferenceMetric> queueCreatorService) {
super(moduleManager, queueCreatorService);
}
@Override public ServiceMetricAggregationWorker workerInstance(ModuleManager moduleManager) {
return new ServiceMetricAggregationWorker(moduleManager);
}
@Override public int queueSize() {
return 256;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
import org.skywalking.apm.collector.storage.StorageModule;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.table.service.ServiceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.impl.PersistenceWorker;
/**
* @author peng-yongsheng
*/
public class ServiceMetricPersistenceWorker extends PersistenceWorker<ServiceMetric, ServiceMetric> {
public ServiceMetricPersistenceWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return ServiceMetricPersistenceWorker.class.hashCode();
}
@Override protected IPersistenceDAO persistenceDAO() {
return getModuleManager().find(StorageModule.NAME).getService(IServiceMetricPersistenceDAO.class);
}
@Override protected boolean needMergeDBData() {
return true;
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceMetric, ServiceMetric, ServiceMetricPersistenceWorker> {
public Factory(ModuleManager moduleManager, QueueCreatorService<ServiceMetric> queueCreatorService) {
super(moduleManager, queueCreatorService);
}
@Override public ServiceMetricPersistenceWorker workerInstance(ModuleManager moduleManager) {
return new ServiceMetricPersistenceWorker(moduleManager);
}
@Override
public int queueSize() {
return 1024;
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.agent.stream.worker.trace.service;
import org.skywalking.apm.collector.core.module.ModuleManager;
import org.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.skywalking.apm.collector.remote.service.Selector;
import org.skywalking.apm.collector.storage.table.service.ServiceMetric;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.base.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.base.WorkerException;
/**
* @author peng-yongsheng
*/
public class ServiceMetricRemoteWorker extends AbstractRemoteWorker<ServiceMetric, ServiceMetric> {
public ServiceMetricRemoteWorker(ModuleManager moduleManager) {
super(moduleManager);
}
@Override public int id() {
return ServiceMetricRemoteWorker.class.hashCode();
}
@Override protected void onWork(ServiceMetric serviceMetric) throws WorkerException {
onNext(serviceMetric);
}
@Override public Selector selector() {
return Selector.HashCode;
}
public static class Factory extends AbstractRemoteWorkerProvider<ServiceMetric, ServiceMetric, ServiceMetricRemoteWorker> {
public Factory(ModuleManager moduleManager, RemoteSenderService remoteSenderService, int graphId) {
super(moduleManager, remoteSenderService, graphId);
}
@Override public ServiceMetricRemoteWorker workerInstance(ModuleManager moduleManager) {
return new ServiceMetricRemoteWorker(moduleManager);
}
}
}
...@@ -37,10 +37,10 @@ import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO; ...@@ -37,10 +37,10 @@ import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO; import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO; import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO; import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO; import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
...@@ -53,6 +53,7 @@ import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO; ...@@ -53,6 +53,7 @@ import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO; import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO; import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
...@@ -108,6 +109,7 @@ public class StorageModule extends Module { ...@@ -108,6 +109,7 @@ public class StorageModule extends Module {
classes.add(ISegmentCostPersistenceDAO.class); classes.add(ISegmentCostPersistenceDAO.class);
classes.add(ISegmentPersistenceDAO.class); classes.add(ISegmentPersistenceDAO.class);
classes.add(IServiceEntryPersistenceDAO.class); classes.add(IServiceEntryPersistenceDAO.class);
classes.add(IServiceMetricPersistenceDAO.class);
classes.add(IServiceReferenceMetricPersistenceDAO.class); classes.add(IServiceReferenceMetricPersistenceDAO.class);
classes.add(IInstanceHeartBeatPersistenceDAO.class); classes.add(IInstanceHeartBeatPersistenceDAO.class);
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.storage.base.dao.IPersistenceDAO;
/**
* @author peng-yongsheng
*/
public interface IServiceMetricPersistenceDAO<Insert, Update, DataImpl extends Data> extends IPersistenceDAO<Insert, Update, DataImpl> {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.service;
import org.skywalking.apm.collector.core.data.Column;
import org.skywalking.apm.collector.core.data.Data;
import org.skywalking.apm.collector.core.data.operator.AddOperation;
import org.skywalking.apm.collector.core.data.operator.CoverOperation;
import org.skywalking.apm.collector.core.data.operator.NonOperation;
/**
* @author peng-yongsheng
*/
public class ServiceMetric extends Data {
private static final Column[] STRING_COLUMNS = {
new Column(ServiceMetricTable.COLUMN_ID, new NonOperation()),
};
private static final Column[] LONG_COLUMNS = {
new Column(ServiceMetricTable.COLUMN_CALLS, new AddOperation()),
new Column(ServiceMetricTable.COLUMN_ERROR_CALLS, new AddOperation()),
new Column(ServiceMetricTable.COLUMN_DURATION_SUM, new AddOperation()),
new Column(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, new AddOperation()),
new Column(ServiceMetricTable.COLUMN_TIME_BUCKET, new CoverOperation()),
};
private static final Column[] DOUBLE_COLUMNS = {};
private static final Column[] INTEGER_COLUMNS = {
new Column(ServiceMetricTable.COLUMN_SERVICE_ID, new NonOperation()),
};
private static final Column[] BOOLEAN_COLUMNS = {};
private static final Column[] BYTE_COLUMNS = {};
public ServiceMetric(String id) {
super(id, STRING_COLUMNS, LONG_COLUMNS, DOUBLE_COLUMNS, INTEGER_COLUMNS, BOOLEAN_COLUMNS, BYTE_COLUMNS);
}
public Integer getServiceId() {
return getDataInteger(0);
}
public void setServiceId(Integer serviceId) {
setDataInteger(0, serviceId);
}
public long getCalls() {
return getDataLong(0);
}
public void setCalls(long calls) {
setDataLong(0, calls);
}
public long getErrorCalls() {
return getDataLong(1);
}
public void setErrorCalls(long errorCalls) {
setDataLong(1, errorCalls);
}
public long getDurationSum() {
return getDataLong(2);
}
public void setDurationSum(long durationSum) {
setDataLong(2, durationSum);
}
public long getErrorDurationSum() {
return getDataLong(3);
}
public void setErrorDurationSum(long errorDurationSum) {
setDataLong(3, errorDurationSum);
}
public Long getTimeBucket() {
return getDataLong(4);
}
public void setTimeBucket(Long timeBucket) {
setDataLong(4, timeBucket);
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.table.service;
import org.skywalking.apm.collector.core.data.CommonTable;
/**
* @author peng-yongsheng
*/
public class ServiceMetricTable extends CommonTable {
public static final String TABLE = "service_metric";
public static final String COLUMN_SERVICE_ID = "service_id";
public static final String COLUMN_CALLS = "calls";
public static final String COLUMN_ERROR_CALLS = "error_calls";
public static final String COLUMN_DURATION_SUM = "duration_sum";
public static final String COLUMN_ERROR_DURATION_SUM = "error_duration_sum";
}
...@@ -46,10 +46,10 @@ import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO; ...@@ -46,10 +46,10 @@ import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO; import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO; import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO; import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO; import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
...@@ -62,6 +62,7 @@ import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO; ...@@ -62,6 +62,7 @@ import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO; import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO; import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
...@@ -83,12 +84,12 @@ import org.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO; ...@@ -83,12 +84,12 @@ import org.skywalking.apm.collector.storage.es.dao.GCMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.GCMetricEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.GCMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.GlobalTraceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO; import org.skywalking.apm.collector.storage.es.dao.InstanceEsCacheDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceEsRegisterDAO; import org.skywalking.apm.collector.storage.es.dao.InstanceEsRegisterDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.InstanceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.InstanceHeartBeatEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceMetricEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.InstanceMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.InstanceMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.MemoryMetricEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.MemoryPoolMetricEsPersistenceDAO;
...@@ -99,10 +100,11 @@ import org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO; ...@@ -99,10 +100,11 @@ import org.skywalking.apm.collector.storage.es.dao.SegmentEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.SegmentEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.SegmentEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceEntryEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceMetricEsPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsCacheDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsRegisterDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceNameEsRegisterDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsUIDAO; import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceEsUIDAO;
import org.skywalking.apm.collector.storage.es.dao.ServiceReferenceMetricEsPersistenceDAO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -204,7 +206,8 @@ public class StorageModuleEsProvider extends ModuleProvider { ...@@ -204,7 +206,8 @@ public class StorageModuleEsProvider extends ModuleProvider {
this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceEsMetricPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IServiceMetricPersistenceDAO.class, new ServiceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceMetricEsPersistenceDAO(elasticSearchClient));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatEsPersistenceDAO(elasticSearchClient));
} }
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.util.TimeBucketUtils;
import org.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.es.base.dao.EsDAO;
import org.skywalking.apm.collector.storage.table.service.ServiceMetric;
import org.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class ServiceMetricEsPersistenceDAO extends EsDAO implements IServiceMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceMetric> {
private final Logger logger = LoggerFactory.getLogger(ServiceMetricEsPersistenceDAO.class);
public ServiceMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client);
}
@Override public ServiceMetric get(String id) {
GetResponse getResponse = getClient().prepareGet(ServiceMetricTable.TABLE, id).get();
if (getResponse.isExists()) {
ServiceMetric serviceMetric = new ServiceMetric(id);
Map<String, Object> source = getResponse.getSource();
serviceMetric.setServiceId(((Number)source.get(ServiceMetricTable.COLUMN_SERVICE_ID)).intValue());
serviceMetric.setCalls(((Number)source.get(ServiceMetricTable.COLUMN_CALLS)).longValue());
serviceMetric.setErrorCalls(((Number)source.get(ServiceMetricTable.COLUMN_ERROR_CALLS)).longValue());
serviceMetric.setDurationSum(((Number)source.get(ServiceMetricTable.COLUMN_DURATION_SUM)).longValue());
serviceMetric.setErrorDurationSum(((Number)source.get(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM)).longValue());
serviceMetric.setTimeBucket(((Number)source.get(ServiceMetricTable.COLUMN_TIME_BUCKET)).longValue());
return serviceMetric;
} else {
return null;
}
}
@Override public IndexRequestBuilder prepareBatchInsert(ServiceMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceMetricTable.COLUMN_SERVICE_ID, data.getServiceId());
source.put(ServiceMetricTable.COLUMN_CALLS, data.getCalls());
source.put(ServiceMetricTable.COLUMN_ERROR_CALLS, data.getErrorCalls());
source.put(ServiceMetricTable.COLUMN_DURATION_SUM, data.getDurationSum());
source.put(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, data.getErrorDurationSum());
source.put(ServiceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareIndex(ServiceMetricTable.TABLE, data.getId()).setSource(source);
}
@Override public UpdateRequestBuilder prepareBatchUpdate(ServiceMetric data) {
Map<String, Object> source = new HashMap<>();
source.put(ServiceMetricTable.COLUMN_SERVICE_ID, data.getServiceId());
source.put(ServiceMetricTable.COLUMN_CALLS, data.getCalls());
source.put(ServiceMetricTable.COLUMN_ERROR_CALLS, data.getErrorCalls());
source.put(ServiceMetricTable.COLUMN_DURATION_SUM, data.getDurationSum());
source.put(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, data.getErrorDurationSum());
source.put(ServiceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
return getClient().prepareUpdate(ServiceMetricTable.TABLE, data.getId()).setDoc(source);
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
long startTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(startTimestamp);
long endTimeBucket = TimeBucketUtils.INSTANCE.getMinuteTimeBucket(endTimestamp);
BulkByScrollResponse response = getClient().prepareDelete()
.filter(QueryBuilders.rangeQuery(ServiceMetricTable.COLUMN_TIME_BUCKET).gte(startTimeBucket).lte(endTimeBucket))
.source(ServiceMetricTable.TABLE)
.get();
long deleted = response.getDeleted();
logger.info("Delete {} rows history from {} index.", deleted, ServiceMetricTable.TABLE);
}
}
...@@ -37,11 +37,11 @@ import org.slf4j.LoggerFactory; ...@@ -37,11 +37,11 @@ import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ServiceReferenceEsMetricPersistenceDAO extends EsDAO implements IServiceReferenceMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> { public class ServiceReferenceMetricEsPersistenceDAO extends EsDAO implements IServiceReferenceMetricPersistenceDAO<IndexRequestBuilder, UpdateRequestBuilder, ServiceReferenceMetric> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceEsMetricPersistenceDAO.class); private final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricEsPersistenceDAO.class);
public ServiceReferenceEsMetricPersistenceDAO(ElasticSearchClient client) { public ServiceReferenceMetricEsPersistenceDAO(ElasticSearchClient client) {
super(client); super(client);
} }
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.es.define;
import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.es.base.define.ElasticSearchTableDefine;
import org.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
/**
* @author peng-yongsheng
*/
public class ServiceMetricEsTableDefine extends ElasticSearchTableDefine {
public ServiceMetricEsTableDefine() {
super(ServiceMetricTable.TABLE);
}
@Override public int refreshInterval() {
return 2;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_CALLS, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_ERROR_CALLS, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_DURATION_SUM, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(ServiceMetricTable.COLUMN_TIME_BUCKET, ElasticSearchColumnDefine.Type.Long.name()));
}
}
...@@ -36,7 +36,6 @@ public class ServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefin ...@@ -36,7 +36,6 @@ public class ServiceReferenceMetricEsTableDefine extends ElasticSearchTableDefin
} }
@Override public void initialize() { @Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_AGG, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_ENTRY_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_FRONT_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name())); addColumn(new ElasticSearchColumnDefine(ServiceReferenceMetricTable.COLUMN_BEHIND_SERVICE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
......
...@@ -13,4 +13,5 @@ org.skywalking.apm.collector.storage.es.define.ApplicationReferenceMetricEsTable ...@@ -13,4 +13,5 @@ org.skywalking.apm.collector.storage.es.define.ApplicationReferenceMetricEsTable
org.skywalking.apm.collector.storage.es.define.SegmentCostEsTableDefine org.skywalking.apm.collector.storage.es.define.SegmentCostEsTableDefine
org.skywalking.apm.collector.storage.es.define.SegmentEsTableDefine org.skywalking.apm.collector.storage.es.define.SegmentEsTableDefine
org.skywalking.apm.collector.storage.es.define.ServiceEntryEsTableDefine org.skywalking.apm.collector.storage.es.define.ServiceEntryEsTableDefine
org.skywalking.apm.collector.storage.es.define.ServiceMetricEsTableDefine
org.skywalking.apm.collector.storage.es.define.ServiceReferenceMetricEsTableDefine org.skywalking.apm.collector.storage.es.define.ServiceReferenceMetricEsTableDefine
\ No newline at end of file
...@@ -41,10 +41,10 @@ import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO; ...@@ -41,10 +41,10 @@ import org.skywalking.apm.collector.storage.dao.IGCMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO; import org.skywalking.apm.collector.storage.dao.IGCMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO; import org.skywalking.apm.collector.storage.dao.IGlobalTracePersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO; import org.skywalking.apm.collector.storage.dao.IGlobalTraceUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO; import org.skywalking.apm.collector.storage.dao.IInstanceCacheDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IInstanceHeartBeatPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IInstanceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceMetricUIDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO; import org.skywalking.apm.collector.storage.dao.IInstanceRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO; import org.skywalking.apm.collector.storage.dao.IInstanceUIDAO;
import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IMemoryMetricPersistenceDAO;
...@@ -57,6 +57,7 @@ import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO; ...@@ -57,6 +57,7 @@ import org.skywalking.apm.collector.storage.dao.ISegmentPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO; import org.skywalking.apm.collector.storage.dao.ISegmentUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IServiceEntryPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO; import org.skywalking.apm.collector.storage.dao.IServiceEntryUIDAO;
import org.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameCacheDAO;
import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO; import org.skywalking.apm.collector.storage.dao.IServiceNameRegisterDAO;
import org.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO; import org.skywalking.apm.collector.storage.dao.IServiceReferenceMetricPersistenceDAO;
...@@ -78,12 +79,12 @@ import org.skywalking.apm.collector.storage.h2.dao.GCMetricH2PersistenceDAO; ...@@ -78,12 +79,12 @@ import org.skywalking.apm.collector.storage.h2.dao.GCMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.GCMetricH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.GCMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.GlobalTraceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceH2CacheDAO; import org.skywalking.apm.collector.storage.h2.dao.InstanceH2CacheDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceH2RegisterDAO; import org.skywalking.apm.collector.storage.h2.dao.InstanceH2RegisterDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.InstanceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.InstanceHeartBeatH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.InstanceMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.MemoryMetricH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.MemoryPoolMetricH2PersistenceDAO;
...@@ -94,10 +95,11 @@ import org.skywalking.apm.collector.storage.h2.dao.SegmentH2PersistenceDAO; ...@@ -94,10 +95,11 @@ import org.skywalking.apm.collector.storage.h2.dao.SegmentH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.SegmentH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.SegmentH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2PersistenceDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceEntryH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceMetricH2PersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2CacheDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2CacheDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2RegisterDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceNameH2RegisterDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2MetricPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2UIDAO; import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceH2UIDAO;
import org.skywalking.apm.collector.storage.h2.dao.ServiceReferenceMetricH2PersistenceDAO;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -181,7 +183,8 @@ public class StorageModuleH2Provider extends ModuleProvider { ...@@ -181,7 +183,8 @@ public class StorageModuleH2Provider extends ModuleProvider {
this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostH2PersistenceDAO(h2Client)); this.registerServiceImplementation(ISegmentCostPersistenceDAO.class, new SegmentCostH2PersistenceDAO(h2Client));
this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentH2PersistenceDAO(h2Client)); this.registerServiceImplementation(ISegmentPersistenceDAO.class, new SegmentH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IServiceEntryPersistenceDAO.class, new ServiceEntryH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceH2MetricPersistenceDAO(h2Client)); this.registerServiceImplementation(IServiceMetricPersistenceDAO.class, new ServiceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IServiceReferenceMetricPersistenceDAO.class, new ServiceReferenceMetricH2PersistenceDAO(h2Client));
this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client)); this.registerServiceImplementation(IInstanceHeartBeatPersistenceDAO.class, new InstanceHeartBeatH2PersistenceDAO(h2Client));
} }
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.dao;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.client.h2.H2ClientException;
import org.skywalking.apm.collector.storage.base.sql.SqlBuilder;
import org.skywalking.apm.collector.storage.dao.IServiceMetricPersistenceDAO;
import org.skywalking.apm.collector.storage.h2.base.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.skywalking.apm.collector.storage.table.service.ServiceMetric;
import org.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng, clevertension
*/
public class ServiceMetricH2PersistenceDAO extends H2DAO implements IServiceMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceMetric> {
private final Logger logger = LoggerFactory.getLogger(ServiceMetricH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?";
public ServiceMetricH2PersistenceDAO(H2Client client) {
super(client);
}
@Override
public ServiceMetric get(String id) {
H2Client client = getClient();
String sql = SqlBuilder.buildSql(GET_SQL, ServiceMetricTable.TABLE, ServiceMetricTable.COLUMN_ID);
Object[] params = new Object[] {id};
try (ResultSet rs = client.executeQuery(sql, params)) {
if (rs.next()) {
ServiceMetric serviceMetric = new ServiceMetric(id);
serviceMetric.setServiceId(rs.getInt(ServiceMetricTable.COLUMN_SERVICE_ID));
serviceMetric.setCalls(rs.getLong(ServiceMetricTable.COLUMN_CALLS));
serviceMetric.setErrorCalls(rs.getLong(ServiceMetricTable.COLUMN_ERROR_CALLS));
serviceMetric.setDurationSum(rs.getLong(ServiceMetricTable.COLUMN_DURATION_SUM));
serviceMetric.setErrorDurationSum(rs.getLong(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM));
serviceMetric.setTimeBucket(rs.getLong(ServiceMetricTable.COLUMN_TIME_BUCKET));
return serviceMetric;
}
} catch (SQLException | H2ClientException e) {
logger.error(e.getMessage(), e);
}
return null;
}
@Override
public H2SqlEntity prepareBatchInsert(ServiceMetric data) {
H2SqlEntity entity = new H2SqlEntity();
Map<String, Object> source = new HashMap<>();
source.put(ServiceMetricTable.COLUMN_ID, data.getId());
source.put(ServiceMetricTable.COLUMN_SERVICE_ID, data.getServiceId());
source.put(ServiceMetricTable.COLUMN_CALLS, data.getCalls());
source.put(ServiceMetricTable.COLUMN_ERROR_CALLS, data.getErrorCalls());
source.put(ServiceMetricTable.COLUMN_DURATION_SUM, data.getDurationSum());
source.put(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, data.getErrorDurationSum());
source.put(ServiceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
String sql = SqlBuilder.buildBatchInsertSql(ServiceMetricTable.TABLE, source.keySet());
entity.setSql(sql);
entity.setParams(source.values().toArray(new Object[0]));
return entity;
}
@Override
public H2SqlEntity prepareBatchUpdate(ServiceMetric data) {
H2SqlEntity entity = new H2SqlEntity();
Map<String, Object> source = new HashMap<>();
source.put(ServiceMetricTable.COLUMN_SERVICE_ID, data.getServiceId());
source.put(ServiceMetricTable.COLUMN_CALLS, data.getCalls());
source.put(ServiceMetricTable.COLUMN_ERROR_CALLS, data.getErrorCalls());
source.put(ServiceMetricTable.COLUMN_DURATION_SUM, data.getDurationSum());
source.put(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, data.getErrorDurationSum());
source.put(ServiceMetricTable.COLUMN_TIME_BUCKET, data.getTimeBucket());
String sql = SqlBuilder.buildBatchUpdateSql(ServiceMetricTable.TABLE, source.keySet(), ServiceMetricTable.COLUMN_ID);
entity.setSql(sql);
List<Object> values = new ArrayList<>(source.values());
values.add(data.getId());
entity.setParams(values.toArray(new Object[0]));
return entity;
}
@Override public void deleteHistory(Long startTimestamp, Long endTimestamp) {
}
}
...@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory; ...@@ -38,12 +38,12 @@ import org.slf4j.LoggerFactory;
/** /**
* @author peng-yongsheng, clevertension * @author peng-yongsheng, clevertension
*/ */
public class ServiceReferenceH2MetricPersistenceDAO extends H2DAO implements IServiceReferenceMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceReferenceMetric> { public class ServiceReferenceMetricH2PersistenceDAO extends H2DAO implements IServiceReferenceMetricPersistenceDAO<H2SqlEntity, H2SqlEntity, ServiceReferenceMetric> {
private final Logger logger = LoggerFactory.getLogger(ServiceReferenceH2MetricPersistenceDAO.class); private final Logger logger = LoggerFactory.getLogger(ServiceReferenceMetricH2PersistenceDAO.class);
private static final String GET_SQL = "select * from {0} where {1} = ?"; private static final String GET_SQL = "select * from {0} where {1} = ?";
public ServiceReferenceH2MetricPersistenceDAO(H2Client client) { public ServiceReferenceMetricH2PersistenceDAO(H2Client client) {
super(client); super(client);
} }
......
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.storage.h2.define;
import org.skywalking.apm.collector.storage.h2.base.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.base.define.H2TableDefine;
import org.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
/**
* @author peng-yongsheng
*/
public class ServiceMetricH2TableDefine extends H2TableDefine {
public ServiceMetricH2TableDefine() {
super(ServiceMetricTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_ID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_SERVICE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_CALLS, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_ERROR_CALLS, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_DURATION_SUM, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_ERROR_DURATION_SUM, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(ServiceMetricTable.COLUMN_TIME_BUCKET, H2ColumnDefine.Type.Bigint.name()));
}
}
...@@ -13,4 +13,5 @@ org.skywalking.apm.collector.storage.h2.define.ApplicationReferenceMetricH2Table ...@@ -13,4 +13,5 @@ org.skywalking.apm.collector.storage.h2.define.ApplicationReferenceMetricH2Table
org.skywalking.apm.collector.storage.h2.define.SegmentCostH2TableDefine org.skywalking.apm.collector.storage.h2.define.SegmentCostH2TableDefine
org.skywalking.apm.collector.storage.h2.define.SegmentH2TableDefine org.skywalking.apm.collector.storage.h2.define.SegmentH2TableDefine
org.skywalking.apm.collector.storage.h2.define.ServiceEntryH2TableDefine org.skywalking.apm.collector.storage.h2.define.ServiceEntryH2TableDefine
org.skywalking.apm.collector.storage.h2.define.ServiceMetricH2TableDefine
org.skywalking.apm.collector.storage.h2.define.ServiceReferenceMetricH2TableDefine org.skywalking.apm.collector.storage.h2.define.ServiceReferenceMetricH2TableDefine
\ No newline at end of file
...@@ -53,9 +53,6 @@ ...@@ -53,9 +53,6 @@
<module>apm-collector-configuration</module> <module>apm-collector-configuration</module>
</modules> </modules>
<properties>
</properties>
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.slf4j</groupId> <groupId>org.slf4j</groupId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册