244 lines
5.4 KiB
Go
244 lines
5.4 KiB
Go
|
// Licensed to SkyAPM org under one or more contributor
|
||
|
// license agreements. See the NOTICE file distributed with
|
||
|
// this work for additional information regarding copyright
|
||
|
// ownership. SkyAPM org licenses this file to you under
|
||
|
// the Apache License, Version 2.0 (the "License"); you may
|
||
|
// not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing,
|
||
|
// software distributed under the License is distributed on an
|
||
|
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
|
// KIND, either express or implied. See the License for the
|
||
|
// specific language governing permissions and limitations
|
||
|
// under the License.
|
||
|
|
||
|
package go2sky
|
||
|
|
||
|
import (
|
||
|
"sync/atomic"
|
||
|
|
||
|
"github.com/SkyAPM/go2sky/internal/idgen"
|
||
|
"github.com/SkyAPM/go2sky/internal/tool"
|
||
|
"github.com/SkyAPM/go2sky/propagation"
|
||
|
"github.com/SkyAPM/go2sky/reporter/grpc/common"
|
||
|
v3 "github.com/SkyAPM/go2sky/reporter/grpc/language-agent"
|
||
|
)
|
||
|
|
||
|
func newSegmentSpan(defaultSpan *defaultSpan, parentSpan segmentSpan) (s segmentSpan, err error) {
|
||
|
ssi := &segmentSpanImpl{
|
||
|
defaultSpan: *defaultSpan,
|
||
|
}
|
||
|
err = ssi.createSegmentContext(parentSpan)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
if parentSpan == nil || !parentSpan.segmentRegister() {
|
||
|
rs := newSegmentRoot(ssi)
|
||
|
err = rs.createRootSegmentContext(parentSpan)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
s = rs
|
||
|
} else {
|
||
|
s = ssi
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// SegmentContext is the context in a segment
|
||
|
type SegmentContext struct {
|
||
|
TraceID string
|
||
|
SegmentID string
|
||
|
SpanID int32
|
||
|
ParentSpanID int32
|
||
|
ParentSegmentID string
|
||
|
collect chan<- ReportedSpan
|
||
|
refNum *int32
|
||
|
spanIDGenerator *int32
|
||
|
FirstSpan Span `json:"-"`
|
||
|
}
|
||
|
|
||
|
// ReportedSpan is accessed by Reporter to load reported data
|
||
|
type ReportedSpan interface {
|
||
|
Context() *SegmentContext
|
||
|
Refs() []*propagation.SpanContext
|
||
|
StartTime() int64
|
||
|
EndTime() int64
|
||
|
OperationName() string
|
||
|
Peer() string
|
||
|
SpanType() v3.SpanType
|
||
|
SpanLayer() v3.SpanLayer
|
||
|
IsError() bool
|
||
|
Tags() []*common.KeyStringValuePair
|
||
|
Logs() []*v3.Log
|
||
|
ComponentID() int32
|
||
|
}
|
||
|
|
||
|
type segmentSpan interface {
|
||
|
Span
|
||
|
context() SegmentContext
|
||
|
segmentRegister() bool
|
||
|
}
|
||
|
|
||
|
type segmentSpanImpl struct {
|
||
|
defaultSpan
|
||
|
SegmentContext
|
||
|
}
|
||
|
|
||
|
// For Span
|
||
|
func (s *segmentSpanImpl) End() {
|
||
|
s.defaultSpan.End()
|
||
|
go func() {
|
||
|
s.Context().collect <- s
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// For Reported Span
|
||
|
|
||
|
func (s *segmentSpanImpl) Context() *SegmentContext {
|
||
|
return &s.SegmentContext
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) Refs() []*propagation.SpanContext {
|
||
|
return s.defaultSpan.Refs
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) StartTime() int64 {
|
||
|
return tool.Millisecond(s.defaultSpan.StartTime)
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) EndTime() int64 {
|
||
|
return tool.Millisecond(s.defaultSpan.EndTime)
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) OperationName() string {
|
||
|
return s.defaultSpan.OperationName
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) Peer() string {
|
||
|
return s.defaultSpan.Peer
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) SpanType() v3.SpanType {
|
||
|
return v3.SpanType(s.defaultSpan.SpanType)
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) SpanLayer() v3.SpanLayer {
|
||
|
return s.defaultSpan.Layer
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) IsError() bool {
|
||
|
return s.defaultSpan.IsError
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) Tags() []*common.KeyStringValuePair {
|
||
|
return s.defaultSpan.Tags
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) Logs() []*v3.Log {
|
||
|
return s.defaultSpan.Logs
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) ComponentID() int32 {
|
||
|
return s.defaultSpan.ComponentID
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) context() SegmentContext {
|
||
|
return s.SegmentContext
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) segmentRegister() bool {
|
||
|
for {
|
||
|
o := atomic.LoadInt32(s.Context().refNum)
|
||
|
if o < 0 {
|
||
|
return false
|
||
|
}
|
||
|
if atomic.CompareAndSwapInt32(s.Context().refNum, o, o+1) {
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *segmentSpanImpl) createSegmentContext(parent segmentSpan) (err error) {
|
||
|
if parent == nil {
|
||
|
s.SegmentContext = SegmentContext{}
|
||
|
if len(s.defaultSpan.Refs) > 0 {
|
||
|
s.TraceID = s.defaultSpan.Refs[0].TraceID
|
||
|
} else {
|
||
|
s.TraceID, err = idgen.GenerateGlobalID()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
s.SegmentContext = parent.context()
|
||
|
s.ParentSegmentID = s.SegmentID
|
||
|
s.ParentSpanID = s.SpanID
|
||
|
s.SpanID = atomic.AddInt32(s.Context().spanIDGenerator, 1)
|
||
|
}
|
||
|
if s.SegmentContext.FirstSpan == nil {
|
||
|
s.SegmentContext.FirstSpan = s
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
type rootSegmentSpan struct {
|
||
|
*segmentSpanImpl
|
||
|
notify <-chan ReportedSpan
|
||
|
segment []ReportedSpan
|
||
|
doneCh chan int32
|
||
|
}
|
||
|
|
||
|
func (rs *rootSegmentSpan) End() {
|
||
|
rs.defaultSpan.End()
|
||
|
go func() {
|
||
|
rs.doneCh <- atomic.SwapInt32(rs.Context().refNum, -1)
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
func (rs *rootSegmentSpan) createRootSegmentContext(parent segmentSpan) (err error) {
|
||
|
rs.SegmentID, err = idgen.GenerateGlobalID()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
i := int32(0)
|
||
|
rs.spanIDGenerator = &i
|
||
|
rs.SpanID = i
|
||
|
rs.ParentSpanID = -1
|
||
|
return
|
||
|
}
|
||
|
|
||
|
func newSegmentRoot(segmentSpan *segmentSpanImpl) *rootSegmentSpan {
|
||
|
s := &rootSegmentSpan{
|
||
|
segmentSpanImpl: segmentSpan,
|
||
|
}
|
||
|
var init int32
|
||
|
s.refNum = &init
|
||
|
ch := make(chan ReportedSpan)
|
||
|
s.collect = ch
|
||
|
s.notify = ch
|
||
|
s.segment = make([]ReportedSpan, 0, 10)
|
||
|
s.doneCh = make(chan int32)
|
||
|
go func() {
|
||
|
total := -1
|
||
|
defer close(ch)
|
||
|
defer close(s.doneCh)
|
||
|
for {
|
||
|
select {
|
||
|
case span := <-s.notify:
|
||
|
s.segment = append(s.segment, span)
|
||
|
case n := <-s.doneCh:
|
||
|
total = int(n)
|
||
|
}
|
||
|
if total == len(s.segment) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
s.tracer.reporter.Send(append(s.segment, s))
|
||
|
}()
|
||
|
return s
|
||
|
}
|