segment.go 5.1 KB
Newer Older
1 2 3 4 5 6
// Licensed to SkyAPM org under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. SkyAPM org licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
I
Ignasi Barrera 已提交
7 8 9 10
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
11 12 13 14 15 16
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
I
Ignasi Barrera 已提交
17

G
Gao Hongtao 已提交
18 19
package go2sky

G
Gao Hongtao 已提交
20 21
import (
	"sync/atomic"
I
Ignasi Barrera 已提交
22

23 24 25 26 27
	"github.com/SkyAPM/go2sky/internal/idgen"
	"github.com/SkyAPM/go2sky/internal/tool"
	"github.com/SkyAPM/go2sky/propagation"
	"github.com/SkyAPM/go2sky/reporter/grpc/common"
	v2 "github.com/SkyAPM/go2sky/reporter/grpc/language-agent-v2"
G
Gao Hongtao 已提交
28
)
G
Gao Hongtao 已提交
29

30 31 32
func newSegmentSpan(defaultSpan *defaultSpan, parentSpan segmentSpan) (s segmentSpan) {
	ssi := &segmentSpanImpl{
		defaultSpan: *defaultSpan,
G
Gao Hongtao 已提交
33
	}
34 35 36 37 38 39 40
	ssi.createSegmentContext(parentSpan)
	if parentSpan == nil || !parentSpan.segmentRegister() {
		rs := newSegmentRoot(ssi)
		rs.createRootSegmentContext(parentSpan)
		s = rs
	} else {
		s = ssi
G
Gao Hongtao 已提交
41
	}
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
	return
}

// SegmentContext is the context in a segment
type SegmentContext struct {
	TraceID         []int64
	SegmentID       []int64
	SpanID          int32
	ParentSpanID    int32
	ParentSegmentID []int64
	collect         chan<- ReportedSpan
	refNum          *int32
	spanIDGenerator *int32
}

// ReportedSpan is accessed by Reporter to load reported data
type ReportedSpan interface {
	Context() *SegmentContext
	Refs() []*propagation.SpanContext
	StartTime() int64
	EndTime() int64
	OperationName() string
	Peer() string
	SpanType() common.SpanType
	SpanLayer() common.SpanLayer
	IsError() bool
	Tags() []*common.KeyStringValuePair
	Logs() []*v2.Log
70
	ComponentID() int32
G
Gao Hongtao 已提交
71 72 73
}

type segmentSpan interface {
74 75
	Span
	context() SegmentContext
G
Gao Hongtao 已提交
76 77 78 79 80
	segmentRegister() bool
}

type segmentSpanImpl struct {
	defaultSpan
81 82 83 84 85 86 87 88 89 90
	SegmentContext
}

// For Span

func (s *segmentSpanImpl) End() {
	s.defaultSpan.End()
	go func() {
		s.Context().collect <- s
	}()
G
Gao Hongtao 已提交
91 92
}

93
// For Reported Span
94 95 96

func (s *segmentSpanImpl) Context() *SegmentContext {
	return &s.SegmentContext
G
Gao Hongtao 已提交
97 98
}

99 100 101 102 103
func (s *segmentSpanImpl) Refs() []*propagation.SpanContext {
	return s.defaultSpan.Refs
}

func (s *segmentSpanImpl) StartTime() int64 {
G
Gao Hongtao 已提交
104
	return tool.Millisecond(s.defaultSpan.StartTime)
105 106 107
}

func (s *segmentSpanImpl) EndTime() int64 {
G
Gao Hongtao 已提交
108
	return tool.Millisecond(s.defaultSpan.EndTime)
109 110 111
}

func (s *segmentSpanImpl) OperationName() string {
112
	return s.defaultSpan.OperationName
113 114 115
}

func (s *segmentSpanImpl) Peer() string {
116
	return s.defaultSpan.Peer
117 118 119
}

func (s *segmentSpanImpl) SpanType() common.SpanType {
120
	return common.SpanType(s.defaultSpan.SpanType)
121 122 123
}

func (s *segmentSpanImpl) SpanLayer() common.SpanLayer {
124
	return s.defaultSpan.Layer
125 126 127
}

func (s *segmentSpanImpl) IsError() bool {
128
	return s.defaultSpan.IsError
129 130 131
}

func (s *segmentSpanImpl) Tags() []*common.KeyStringValuePair {
132
	return s.defaultSpan.Tags
133 134 135
}

func (s *segmentSpanImpl) Logs() []*v2.Log {
136 137 138 139 140
	return s.defaultSpan.Logs
}

func (s *segmentSpanImpl) ComponentID() int32 {
	return s.defaultSpan.ComponentID
141 142 143 144
}

func (s *segmentSpanImpl) context() SegmentContext {
	return s.SegmentContext
G
Gao Hongtao 已提交
145 146 147 148
}

func (s *segmentSpanImpl) segmentRegister() bool {
	for {
149
		o := atomic.LoadInt32(s.Context().refNum)
G
Gao Hongtao 已提交
150 151 152
		if o < 0 {
			return false
		}
153
		if atomic.CompareAndSwapInt32(s.Context().refNum, o, o+1) {
G
Gao Hongtao 已提交
154 155 156 157 158
			return true
		}
	}
}

159 160 161 162 163 164
func (s *segmentSpanImpl) createSegmentContext(parent segmentSpan) {
	if parent == nil {
		s.SegmentContext = SegmentContext{}
		if len(s.defaultSpan.Refs) > 0 {
			s.TraceID = s.defaultSpan.Refs[0].TraceID
		} else {
G
Gao Hongtao 已提交
165
			s.TraceID = idgen.GenerateGlobalID()
166 167 168 169 170 171 172
		}
	} else {
		s.SegmentContext = parent.context()
		s.ParentSegmentID = s.SegmentID
		s.ParentSpanID = s.SpanID
		s.SpanID = atomic.AddInt32(s.Context().spanIDGenerator, 1)
	}
G
Gao Hongtao 已提交
173 174 175 176
}

type rootSegmentSpan struct {
	*segmentSpanImpl
G
Gao Hongtao 已提交
177 178
	notify  <-chan ReportedSpan
	segment []ReportedSpan
G
Gao Hongtao 已提交
179 180 181 182
	doneCh  chan int32
}

func (rs *rootSegmentSpan) End() {
G
Gao Hongtao 已提交
183
	rs.defaultSpan.End()
G
Gao Hongtao 已提交
184
	go func() {
185
		rs.doneCh <- atomic.SwapInt32(rs.Context().refNum, -1)
G
Gao Hongtao 已提交
186 187 188
	}()
}

189
func (rs *rootSegmentSpan) createRootSegmentContext(parent segmentSpan) {
G
Gao Hongtao 已提交
190
	rs.SegmentID = idgen.GenerateScopedGlobalID(int64(rs.tracer.instanceID))
191 192 193 194 195 196
	i := int32(0)
	rs.spanIDGenerator = &i
	rs.SpanID = i
	rs.ParentSpanID = -1
}

G
Gao Hongtao 已提交
197 198 199 200 201 202
func newSegmentRoot(segmentSpan *segmentSpanImpl) *rootSegmentSpan {
	s := &rootSegmentSpan{
		segmentSpanImpl: segmentSpan,
	}
	var init int32
	s.refNum = &init
G
Gao Hongtao 已提交
203
	ch := make(chan ReportedSpan)
G
Gao Hongtao 已提交
204 205
	s.collect = ch
	s.notify = ch
G
Gao Hongtao 已提交
206
	s.segment = make([]ReportedSpan, 0, 10)
G
Gao Hongtao 已提交
207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226
	s.doneCh = make(chan int32)
	go func() {
		total := -1
		defer close(ch)
		defer close(s.doneCh)
		for {
			select {
			case span := <-s.notify:
				s.segment = append(s.segment, span)
			case n := <-s.doneCh:
				total = int(n)
			}
			if total == len(s.segment) {
				break
			}
		}
		s.tracer.reporter.Send(append(s.segment, s))
	}()
	return s
}