ServiceMetricMinuteAggregationWorker.java 5.0 KB
Newer Older
1
/*
wu-sheng's avatar
wu-sheng 已提交
2 3 4 5 6 7
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
8 9 10 11 12 13 14 15 16 17 18
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

19
package org.apache.skywalking.apm.collector.analysis.metric.provider.worker.service.metric;
20

P
peng-yongsheng 已提交
21
import org.apache.skywalking.apm.collector.analysis.metric.define.graph.MetricWorkerIdDefine;
P
peng-yongsheng 已提交
22
import org.apache.skywalking.apm.collector.analysis.worker.model.base.AbstractLocalAsyncWorkerProvider;
23
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerException;
P
peng-yongsheng 已提交
24
import org.apache.skywalking.apm.collector.analysis.worker.model.impl.AggregationWorker;
25
import org.apache.skywalking.apm.collector.core.annotations.trace.GraphComputingMetric;
26
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
P
peng-yongsheng 已提交
27
import org.apache.skywalking.apm.collector.core.util.Const;
28
import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetric;
29
import org.apache.skywalking.apm.collector.storage.table.service.ServiceMetricTable;
30
import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenceMetric;
31 32 33 34

/**
 * @author peng-yongsheng
 */
35
public class ServiceMetricMinuteAggregationWorker extends AggregationWorker<ServiceReferenceMetric, ServiceMetric> {
36

37
    private ServiceMetricMinuteAggregationWorker(ModuleManager moduleManager) {
38 39 40 41
        super(moduleManager);
    }

    @Override public int id() {
42
        return MetricWorkerIdDefine.SERVICE_MINUTE_METRIC_AGGREGATION_WORKER_ID;
43 44 45 46 47
    }

    @Override protected ServiceMetric transform(ServiceReferenceMetric serviceReferenceMetric) {
        Integer serviceId = serviceReferenceMetric.getBehindServiceId();
        Long timeBucket = serviceReferenceMetric.getTimeBucket();
P
peng-yongsheng 已提交
48 49
        Integer sourceValue = serviceReferenceMetric.getSourceValue();

50 51 52 53 54 55
        String metricId = String.valueOf(serviceId) + Const.ID_SPLIT + String.valueOf(sourceValue);
        String id = String.valueOf(timeBucket) + Const.ID_SPLIT + metricId;

        ServiceMetric serviceMetric = new ServiceMetric();
        serviceMetric.setId(id);
        serviceMetric.setMetricId(metricId);
P
peng-yongsheng 已提交
56 57 58

        serviceMetric.setApplicationId(serviceReferenceMetric.getBehindApplicationId());
        serviceMetric.setInstanceId(serviceReferenceMetric.getBehindInstanceId());
59
        serviceMetric.setServiceId(serviceId);
P
peng-yongsheng 已提交
60
        serviceMetric.setSourceValue(sourceValue);
P
peng-yongsheng 已提交
61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76

        serviceMetric.setTransactionCalls(serviceReferenceMetric.getTransactionCalls());
        serviceMetric.setTransactionDurationSum(serviceReferenceMetric.getTransactionDurationSum());
        serviceMetric.setTransactionErrorCalls(serviceReferenceMetric.getTransactionErrorCalls());
        serviceMetric.setTransactionErrorDurationSum(serviceReferenceMetric.getTransactionErrorDurationSum());

        serviceMetric.setBusinessTransactionCalls(serviceReferenceMetric.getBusinessTransactionCalls());
        serviceMetric.setBusinessTransactionDurationSum(serviceReferenceMetric.getBusinessTransactionDurationSum());
        serviceMetric.setBusinessTransactionErrorCalls(serviceReferenceMetric.getBusinessTransactionErrorCalls());
        serviceMetric.setBusinessTransactionErrorDurationSum(serviceReferenceMetric.getBusinessTransactionErrorDurationSum());

        serviceMetric.setMqTransactionCalls(serviceReferenceMetric.getMqTransactionCalls());
        serviceMetric.setMqTransactionDurationSum(serviceReferenceMetric.getMqTransactionDurationSum());
        serviceMetric.setMqTransactionErrorCalls(serviceReferenceMetric.getMqTransactionErrorCalls());
        serviceMetric.setMqTransactionErrorDurationSum(serviceReferenceMetric.getMqTransactionErrorDurationSum());

77 78 79 80 81
        serviceMetric.setTimeBucket(timeBucket);

        return serviceMetric;
    }

82
    public static class Factory extends AbstractLocalAsyncWorkerProvider<ServiceReferenceMetric, ServiceMetric, ServiceMetricMinuteAggregationWorker> {
83

P
peng-yongsheng 已提交
84 85
        public Factory(ModuleManager moduleManager) {
            super(moduleManager);
86 87
        }

88 89
        @Override public ServiceMetricMinuteAggregationWorker workerInstance(ModuleManager moduleManager) {
            return new ServiceMetricMinuteAggregationWorker(moduleManager);
90 91 92 93 94 95
        }

        @Override public int queueSize() {
            return 256;
        }
    }
96 97 98 99 100

    @GraphComputingMetric(name = "/aggregate/onWork/" + ServiceMetricTable.TABLE)
    @Override protected void onWork(ServiceReferenceMetric message) throws WorkerException {
        super.onWork(message);
    }
101
}