SpanForward.java 4.6 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

package org.apache.skywalking.oap.server.receiver.zipkin.trace;

import java.util.List;
import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.library.util.*;
import org.apache.skywalking.oap.server.receiver.zipkin.*;
import org.apache.skywalking.oap.server.receiver.zipkin.handler.SpanEncode;
import org.apache.skywalking.oap.server.storage.plugin.zipkin.ZipkinSpan;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;

/**
 * @author wusheng
 */
public class SpanForward {
    private ZipkinReceiverConfig config;
    private SourceReceiver receiver;
    private ServiceInventoryCache serviceInventoryCache;
    private EndpointInventoryCache endpointInventoryCache;
    private int encode;

    public SpanForward(ZipkinReceiverConfig config, SourceReceiver receiver,
        ServiceInventoryCache serviceInventoryCache,
        EndpointInventoryCache endpointInventoryCache, int encode) {
        this.config = config;
        this.receiver = receiver;
        this.serviceInventoryCache = serviceInventoryCache;
        this.endpointInventoryCache = endpointInventoryCache;
        this.encode = encode;
    }

    public void send(List<Span> spanList) {
        spanList.forEach(span -> {
            ZipkinSpan zipkinSpan = new ZipkinSpan();
            zipkinSpan.setTraceId(span.traceId());
            zipkinSpan.setSpanId(span.id());
            String serviceName = span.localServiceName();
            int serviceId = Const.NONE;
            if (!StringUtil.isEmpty(serviceName)) {
                serviceId = serviceInventoryCache.getServiceId(serviceName);
                if (serviceId != Const.NONE) {
                    zipkinSpan.setServiceId(serviceId);
                } else {
                    /**
                     * Only register, but don't wait.
                     * For this span, service id will be missed.
                     */
                    CoreRegisterLinker.getServiceInventoryRegister().getOrCreate(serviceName, null);
                }
            }

            String spanName = span.name();
            Span.Kind kind = span.kind();
            switch (kind) {
                case SERVER:
                case CONSUMER:
                    if (!StringUtil.isEmpty(spanName) && serviceId != Const.NONE) {
                        int endpointId = endpointInventoryCache.getEndpointId(serviceId, spanName,
                            DetectPoint.SERVER.ordinal());
                        if (endpointId != Const.NONE) {
                            zipkinSpan.setEndpointId(endpointId);
                        } else if (config.isRegisterZipkinEndpoint()) {
                            CoreRegisterLinker.getEndpointInventoryRegister().getOrCreate(serviceId, spanName, DetectPoint.SERVER);
                        }
                    }
            }
            if (!StringUtil.isEmpty(spanName)) {
                zipkinSpan.setEndpointName(spanName);
            }
            long startTime = span.timestampAsLong() / 1000;
            zipkinSpan.setStartTime(startTime);
            if (startTime != 0) {
                long timeBucket = TimeBucketUtils.INSTANCE.getSecondTimeBucket(zipkinSpan.getStartTime());
                zipkinSpan.setTimeBucket(timeBucket);
            }

            long latency = span.durationAsLong() / 1000;

            zipkinSpan.setEndTime(startTime + latency);
            zipkinSpan.setIsError(BooleanUtils.booleanToValue(false));
            zipkinSpan.setEncode(SpanEncode.PROTO3);
            zipkinSpan.setLatency((int)latency);
            zipkinSpan.setDataBinary(SpanBytesEncoder.PROTO3.encode(span));

            receiver.receive(zipkinSpan);
        });
    }
}