377 lines
9.7 KiB
Go
377 lines
9.7 KiB
Go
package apm
|
|
|
|
import (
|
|
"context"
|
|
"hash/fnv"
|
|
"log"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/SkyAPM/go2sky"
|
|
"github.com/SkyAPM/go2sky/reporter/grpc/common"
|
|
agentv3 "github.com/SkyAPM/go2sky/reporter/grpc/language-agent"
|
|
managementv3 "github.com/SkyAPM/go2sky/reporter/grpc/management"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
const (
|
|
maxSendQueueSize int32 = 30000
|
|
defaultCheckInterval = 20 * time.Second
|
|
defaultLogPrefix = "go2sky-gRPC"
|
|
authKey = "Authentication"
|
|
defaultSamplePartitions uint32 = 1
|
|
)
|
|
|
|
// NewGRPCReporter create a new reporter to send data to gRPC oap server. Only one backend address is allowed.
|
|
func NewGRPCReporter(serverAddr string, opts ...GRPCReporterOption) (go2sky.Reporter, error) {
|
|
r := &gRPCReporter{
|
|
logger: log.New(os.Stderr, defaultLogPrefix, log.LstdFlags),
|
|
sendCh: make(chan *agentv3.SegmentObject, maxSendQueueSize),
|
|
checkInterval: defaultCheckInterval,
|
|
samplePartitions: defaultSamplePartitions,
|
|
}
|
|
for _, o := range opts {
|
|
o(r)
|
|
}
|
|
|
|
var credsDialOption grpc.DialOption
|
|
if r.creds != nil {
|
|
// use tls
|
|
credsDialOption = grpc.WithTransportCredentials(r.creds)
|
|
} else {
|
|
credsDialOption = grpc.WithInsecure()
|
|
}
|
|
|
|
conn, err := grpc.Dial(serverAddr, credsDialOption)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
r.conn = conn
|
|
r.traceClient = agentv3.NewTraceSegmentReportServiceClient(r.conn)
|
|
r.managementClient = managementv3.NewManagementServiceClient(r.conn)
|
|
return r, nil
|
|
}
|
|
|
|
// GRPCReporterOption allows for functional options to adjust behaviour
|
|
// of a gRPC reporter to be created by NewGRPCReporter
|
|
type GRPCReporterOption func(r *gRPCReporter)
|
|
|
|
// WithLogger setup logger for gRPC reporter
|
|
func WithLogger(logger *log.Logger) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
r.logger = logger
|
|
}
|
|
}
|
|
|
|
// WithCheckInterval setup service and endpoint registry check interval
|
|
func WithCheckInterval(interval time.Duration) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
r.checkInterval = interval
|
|
}
|
|
}
|
|
|
|
// WithSamplePartitions setup sample partitions. For example,
|
|
// when partitions is 1, then the sample rate is 1;
|
|
// when partitions is 10, then the sample rate is 0.1;
|
|
// when partitions is 100, then the sample rate is 0.01;
|
|
func WithSamplePartitions(partitions uint32) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
if partitions <= 0 {
|
|
partitions = 1
|
|
}
|
|
r.samplePartitions = partitions
|
|
}
|
|
}
|
|
|
|
// WithMaxSendQueueSize setup send span queue buffer length
|
|
func WithMaxSendQueueSize(maxSendQueueSize int) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
r.sendCh = make(chan *agentv3.SegmentObject, maxSendQueueSize)
|
|
}
|
|
}
|
|
|
|
// WithInstanceProps setup service instance properties eg: org=SkyAPM
|
|
func WithInstanceProps(props map[string]string) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
r.instanceProps = props
|
|
}
|
|
}
|
|
|
|
// WithTransportCredentials setup transport layer security
|
|
func WithTransportCredentials(creds credentials.TransportCredentials) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
r.creds = creds
|
|
}
|
|
}
|
|
|
|
// WithAuthentication used Authentication for gRPC
|
|
func WithAuthentication(auth string) GRPCReporterOption {
|
|
return func(r *gRPCReporter) {
|
|
r.md = metadata.New(map[string]string{authKey: auth})
|
|
}
|
|
}
|
|
|
|
type gRPCReporter struct {
|
|
service string
|
|
serviceInstance string
|
|
instanceProps map[string]string
|
|
logger *log.Logger
|
|
sendCh chan *agentv3.SegmentObject
|
|
conn *grpc.ClientConn
|
|
traceClient agentv3.TraceSegmentReportServiceClient
|
|
managementClient managementv3.ManagementServiceClient
|
|
checkInterval time.Duration
|
|
samplePartitions uint32
|
|
|
|
md metadata.MD
|
|
creds credentials.TransportCredentials
|
|
}
|
|
|
|
func (r *gRPCReporter) Boot(service string, serviceInstance string) {
|
|
r.service = service
|
|
r.serviceInstance = serviceInstance
|
|
r.initSendPipeline()
|
|
r.check()
|
|
}
|
|
|
|
func (r *gRPCReporter) calcStringHashCode(str string) uint32 {
|
|
hash := fnv.New32a()
|
|
_, _ = hash.Write([]byte(str))
|
|
return hash.Sum32()
|
|
}
|
|
|
|
func (r *gRPCReporter) sampling(spans []go2sky.ReportedSpan) []go2sky.ReportedSpan {
|
|
if spans == nil {
|
|
return nil
|
|
}
|
|
samplePartitions := r.samplePartitions
|
|
if samplePartitions <= 0 {
|
|
samplePartitions = 1
|
|
}
|
|
sampled := make([]go2sky.ReportedSpan, 0, len(spans))
|
|
for _, span := range spans {
|
|
idx := r.calcStringHashCode(span.Context().TraceID) % samplePartitions
|
|
if idx == 0 {
|
|
sampled = append(sampled, span)
|
|
}
|
|
}
|
|
return sampled
|
|
}
|
|
|
|
func (r *gRPCReporter) Send(spans []go2sky.ReportedSpan) {
|
|
spans = r.sampling(spans)
|
|
spanSize := len(spans)
|
|
if spanSize < 1 {
|
|
return
|
|
}
|
|
rootSpan := spans[spanSize-1]
|
|
rootCtx := rootSpan.Context()
|
|
segmentObject := &agentv3.SegmentObject{
|
|
TraceId: rootCtx.TraceID,
|
|
TraceSegmentId: rootCtx.SegmentID,
|
|
Spans: make([]*agentv3.SpanObject, spanSize),
|
|
Service: r.service,
|
|
ServiceInstance: r.serviceInstance,
|
|
}
|
|
for i, s := range spans {
|
|
spanCtx := s.Context()
|
|
segmentObject.Spans[i] = &agentv3.SpanObject{
|
|
SpanId: spanCtx.SpanID,
|
|
ParentSpanId: spanCtx.ParentSpanID,
|
|
StartTime: s.StartTime(),
|
|
EndTime: s.EndTime(),
|
|
OperationName: s.OperationName(),
|
|
Peer: s.Peer(),
|
|
SpanType: s.SpanType(),
|
|
SpanLayer: s.SpanLayer(),
|
|
ComponentId: s.ComponentID(),
|
|
IsError: s.IsError(),
|
|
Tags: s.Tags(),
|
|
Logs: s.Logs(),
|
|
}
|
|
srr := make([]*agentv3.SegmentReference, 0)
|
|
if i == (spanSize-1) && spanCtx.ParentSpanID > -1 {
|
|
srr = append(srr, &agentv3.SegmentReference{
|
|
RefType: agentv3.RefType_CrossThread,
|
|
TraceId: spanCtx.TraceID,
|
|
ParentTraceSegmentId: spanCtx.ParentSegmentID,
|
|
ParentSpanId: spanCtx.ParentSpanID,
|
|
ParentService: r.service,
|
|
ParentServiceInstance: r.serviceInstance,
|
|
})
|
|
}
|
|
if len(s.Refs()) > 0 {
|
|
for _, tc := range s.Refs() {
|
|
srr = append(srr, &agentv3.SegmentReference{
|
|
RefType: agentv3.RefType_CrossProcess,
|
|
TraceId: spanCtx.TraceID,
|
|
ParentTraceSegmentId: tc.ParentSegmentID,
|
|
ParentSpanId: tc.ParentSpanID,
|
|
ParentService: tc.ParentService,
|
|
ParentServiceInstance: tc.ParentServiceInstance,
|
|
ParentEndpoint: tc.ParentEndpoint,
|
|
NetworkAddressUsedAtPeer: tc.AddressUsedAtClient,
|
|
})
|
|
}
|
|
}
|
|
segmentObject.Spans[i].Refs = srr
|
|
}
|
|
defer func() {
|
|
// recover the panic caused by skyClose sendCh
|
|
if err := recover(); err != nil {
|
|
r.logger.Printf("reporter segment err %v", err)
|
|
}
|
|
}()
|
|
select {
|
|
case r.sendCh <- segmentObject:
|
|
default:
|
|
r.logger.Printf("reach max send buffer")
|
|
}
|
|
}
|
|
|
|
func (r *gRPCReporter) Close() {
|
|
if r.sendCh != nil {
|
|
close(r.sendCh)
|
|
}
|
|
r.closeGRPCConn()
|
|
}
|
|
|
|
func (r *gRPCReporter) closeGRPCConn() {
|
|
if r.conn != nil {
|
|
if err := r.conn.Close(); err != nil {
|
|
r.logger.Print(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *gRPCReporter) initSendPipeline() {
|
|
if r.traceClient == nil {
|
|
return
|
|
}
|
|
go func() {
|
|
StreamLoop:
|
|
for {
|
|
stream, err := r.traceClient.Collect(metadata.NewOutgoingContext(context.Background(), r.md))
|
|
if err != nil {
|
|
r.logger.Printf("open stream error %v", err)
|
|
time.Sleep(5 * time.Second)
|
|
continue StreamLoop
|
|
}
|
|
for s := range r.sendCh {
|
|
err = stream.Send(s)
|
|
if err != nil {
|
|
r.logger.Printf("send segment error %v", err)
|
|
r.closeStream(stream)
|
|
continue StreamLoop
|
|
}
|
|
}
|
|
r.closeStream(stream)
|
|
r.closeGRPCConn()
|
|
break
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (r *gRPCReporter) closeStream(stream agentv3.TraceSegmentReportService_CollectClient) {
|
|
err := stream.CloseSend()
|
|
if err != nil {
|
|
r.logger.Printf("send closing error %v", err)
|
|
}
|
|
}
|
|
|
|
func (r *gRPCReporter) reportInstanceProperties() (err error) {
|
|
props := buildOSInfo()
|
|
if r.instanceProps != nil {
|
|
for k, v := range r.instanceProps {
|
|
props = append(props, &common.KeyStringValuePair{
|
|
Key: k,
|
|
Value: v,
|
|
})
|
|
}
|
|
}
|
|
_, err = r.managementClient.ReportInstanceProperties(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstanceProperties{
|
|
Service: r.service,
|
|
ServiceInstance: r.serviceInstance,
|
|
Properties: props,
|
|
})
|
|
return err
|
|
}
|
|
|
|
func (r *gRPCReporter) check() {
|
|
if r.checkInterval < 0 || r.conn == nil || r.managementClient == nil {
|
|
return
|
|
}
|
|
go func() {
|
|
instancePropertiesSubmitted := false
|
|
for {
|
|
if r.conn.GetState() == connectivity.Shutdown {
|
|
break
|
|
}
|
|
|
|
if !instancePropertiesSubmitted {
|
|
err := r.reportInstanceProperties()
|
|
if err != nil {
|
|
r.logger.Printf("report serviceInstance properties error %v", err)
|
|
time.Sleep(r.checkInterval)
|
|
continue
|
|
}
|
|
instancePropertiesSubmitted = true
|
|
}
|
|
|
|
_, err := r.managementClient.KeepAlive(metadata.NewOutgoingContext(context.Background(), r.md), &managementv3.InstancePingPkg{
|
|
Service: r.service,
|
|
ServiceInstance: r.serviceInstance,
|
|
})
|
|
|
|
if err != nil {
|
|
r.logger.Printf("send keep alive signal error %v", err)
|
|
}
|
|
time.Sleep(r.checkInterval)
|
|
}
|
|
}()
|
|
}
|
|
|
|
func buildOSInfo() (props []*common.KeyStringValuePair) {
|
|
processNo := ProcessNo()
|
|
if processNo != "" {
|
|
kv := &common.KeyStringValuePair{
|
|
Key: "Process No.",
|
|
Value: processNo,
|
|
}
|
|
props = append(props, kv)
|
|
}
|
|
|
|
hostname := &common.KeyStringValuePair{
|
|
Key: "hostname",
|
|
Value: HostName(),
|
|
}
|
|
props = append(props, hostname)
|
|
|
|
language := &common.KeyStringValuePair{
|
|
Key: "language",
|
|
Value: "go",
|
|
}
|
|
props = append(props, language)
|
|
|
|
osName := &common.KeyStringValuePair{
|
|
Key: "OS Name",
|
|
Value: OSName(),
|
|
}
|
|
props = append(props, osName)
|
|
|
|
ipv4s := AllIPV4()
|
|
if len(ipv4s) > 0 {
|
|
for _, ipv4 := range ipv4s {
|
|
kv := &common.KeyStringValuePair{
|
|
Key: "ipv4",
|
|
Value: ipv4,
|
|
}
|
|
props = append(props, kv)
|
|
}
|
|
}
|
|
return
|
|
}
|