未验证 提交 47fb6347 编写于 作者: 何延龙 提交者: GitHub

Merge pull request #134 from SkyAPM/develop

New Agent
...@@ -51,8 +51,8 @@ skywalking.enable=1 ...@@ -51,8 +51,8 @@ skywalking.enable=1
skywalking.version=6 skywalking.version=6
; app_code代码,不要含特殊字符,请使用数字、字母、下换线。(默认为:hello_skywalking) ; app_code代码,不要含特殊字符,请使用数字、字母、下换线。(默认为:hello_skywalking)
skywalking.app_code=hello_skywalking skywalking.app_code=hello_skywalking
; sock文件路径(默认值为/tmp/sky_agent.sock) ; sock文件路径(默认值为/tmp/sky-agent.sock)
skywalking.sock_path=/tmp/sky_agent.sock skywalking.sock_path=/tmp/sky-agent.sock
``` ```
......
...@@ -70,6 +70,7 @@ ZEND_DECLARE_MODULE_GLOBALS(skywalking) ...@@ -70,6 +70,7 @@ ZEND_DECLARE_MODULE_GLOBALS(skywalking)
static int le_skywalking; static int le_skywalking;
static int application_instance = 0; static int application_instance = 0;
static int application_id = 0; static int application_id = 0;
static char application_uuid[37] = {0};
static int sky_increment_id = 0; static int sky_increment_id = 0;
static int cli_debug = 0; static int cli_debug = 0;
...@@ -85,7 +86,7 @@ PHP_INI_BEGIN() ...@@ -85,7 +86,7 @@ PHP_INI_BEGIN()
STD_PHP_INI_BOOLEAN("skywalking.enable", "0", PHP_INI_ALL, OnUpdateBool, enable, zend_skywalking_globals, skywalking_globals) STD_PHP_INI_BOOLEAN("skywalking.enable", "0", PHP_INI_ALL, OnUpdateBool, enable, zend_skywalking_globals, skywalking_globals)
STD_PHP_INI_ENTRY("skywalking.version", "6", PHP_INI_ALL, OnUpdateLong, version, zend_skywalking_globals, skywalking_globals) STD_PHP_INI_ENTRY("skywalking.version", "6", PHP_INI_ALL, OnUpdateLong, version, zend_skywalking_globals, skywalking_globals)
STD_PHP_INI_ENTRY("skywalking.app_code", "hello_skywalking", PHP_INI_ALL, OnUpdateString, app_code, zend_skywalking_globals, skywalking_globals) STD_PHP_INI_ENTRY("skywalking.app_code", "hello_skywalking", PHP_INI_ALL, OnUpdateString, app_code, zend_skywalking_globals, skywalking_globals)
STD_PHP_INI_ENTRY("skywalking.sock_path", "/tmp/sky_agent.sock", PHP_INI_ALL, OnUpdateString, sock_path, zend_skywalking_globals, skywalking_globals) STD_PHP_INI_ENTRY("skywalking.sock_path", "/tmp/sky-agent.sock", PHP_INI_ALL, OnUpdateString, sock_path, zend_skywalking_globals, skywalking_globals)
PHP_INI_END() PHP_INI_END()
/* }}} */ /* }}} */
...@@ -1361,6 +1362,7 @@ static void request_init() { ...@@ -1361,6 +1362,7 @@ static void request_init() {
generate_context(); generate_context();
add_assoc_long(&SKYWALKING_G(UpstreamSegment), "application_instance", application_instance); add_assoc_long(&SKYWALKING_G(UpstreamSegment), "application_instance", application_instance);
add_assoc_stringl(&SKYWALKING_G(UpstreamSegment), "uuid", application_uuid, strlen(application_uuid));
add_assoc_long(&SKYWALKING_G(UpstreamSegment), "pid", getppid()); add_assoc_long(&SKYWALKING_G(UpstreamSegment), "pid", getppid());
add_assoc_long(&SKYWALKING_G(UpstreamSegment), "application_id", application_id); add_assoc_long(&SKYWALKING_G(UpstreamSegment), "application_id", application_id);
add_assoc_long(&SKYWALKING_G(UpstreamSegment), "version", SKYWALKING_G(version)); add_assoc_long(&SKYWALKING_G(UpstreamSegment), "version", SKYWALKING_G(version));
...@@ -1533,9 +1535,10 @@ static int sky_register() { ...@@ -1533,9 +1535,10 @@ static int sky_register() {
p = strtok(NULL, ","); p = strtok(NULL, ",");
} }
if (ids[0] != NULL && ids[1] != NULL) { if (ids[0] != NULL && ids[1] != NULL && ids[2] != NULL) {
application_id = atoi(ids[0]); application_id = atoi(ids[0]);
application_instance = atoi(ids[1]); application_instance = atoi(ids[1]);
strncpy(application_uuid, ids[2], sizeof application_uuid - 1);
} }
} }
......
package main
import (
"agent/agent/logger"
"agent/agent/service"
"github.com/urfave/cli"
"os"
)
var log = logger.Log
func main() {
defer func() {
if err := recover(); err != nil {
log.Error(err)
}
}()
app := cli.NewApp()
app.Name = "sky_php_agent"
app.Usage = "the skywalking trace sending agent"
app.Flags = []cli.Flag{
cli.StringFlag{Name: "grpc", Usage: "SkyWalking collector grpc address", Value: "127.0.0.1:11800"},
cli.StringFlag{Name: "socket", Usage: "Pipeline for communicating with PHP", Value: "/tmp/sky-agent.sock"},
cli.IntFlag{Name: "send-rate", Usage: "Send trace 1 second by default", Value: 1},
}
app.Action = func(c *cli.Context) error {
a := service.NewAgent(c)
a.Run()
return nil
}
err := app.Run(os.Args)
if err != nil {
log.Errorln(err)
}
}
package logger
import (
"github.com/sirupsen/logrus"
"os"
)
var Log *logrus.Logger
func init() {
if Log == nil {
Log = logrus.New()
}
Log.SetOutput(os.Stdout)
Log.SetFormatter(&logrus.TextFormatter{
FullTimestamp: true,
TimestampFormat: "2006-01-02 15:04:05",
})
}
package service
import (
"agent/agent/logger"
"agent/agent/pb/agent"
"agent/agent/pb/agent2"
"agent/agent/pb/register2"
"container/list"
"fmt"
"github.com/urfave/cli"
"google.golang.org/grpc"
"net"
"os"
"sync"
"time"
)
var log = logger.Log
type register struct {
c net.Conn
body string
}
type grpcClient struct {
segmentClientV5 agent.TraceSegmentServiceClient
segmentClientV6 agent2.TraceSegmentReportServiceClient
pingClient5 agent.InstanceDiscoveryServiceClient
pintClient6 register2.ServiceInstancePingClient
}
type Agent struct {
flag *cli.Context
grpcConn *grpc.ClientConn
grpcClient grpcClient
socket string
socketListener net.Listener
register chan *register
registerCache sync.Map
registerCacheLock sync.Mutex
trace chan string
queue *list.List
}
func NewAgent(cli *cli.Context) *Agent {
var agent = &Agent{
flag: cli,
socket: cli.String("socket"),
register: make(chan *register),
trace: make(chan string),
queue: list.New(),
}
go agent.sub()
return agent
}
func (t *Agent) Run() {
log.Info("hello skywalking")
t.connGRPC()
t.listenSocket()
defer func() {
var err error
err = t.socketListener.Close()
if err != nil {
log.Errorln(err)
}
err = t.grpcConn.Close()
if err != nil {
log.Errorln(err)
}
}()
}
func (t *Agent) connGRPC() {
var err error
grpcAdd := t.flag.String("grpc")
t.grpcConn, err = grpc.Dial(grpcAdd, grpc.WithInsecure())
if err != nil {
log.Panic(err)
}
log.Infof("connection %s...", grpcAdd)
t.grpcClient.segmentClientV5 = agent.NewTraceSegmentServiceClient(t.grpcConn)
t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.grpcConn)
t.grpcClient.pingClient5 = agent.NewInstanceDiscoveryServiceClient(t.grpcConn)
t.grpcClient.pintClient6 = register2.NewServiceInstancePingClient(t.grpcConn)
}
func (t *Agent) listenSocket() {
var err error
if err = os.RemoveAll(t.socket); err != nil {
log.Panic(err)
}
t.socketListener, err = net.Listen("unix", t.socket)
if err != nil {
log.Panic(err)
}
err = os.Chmod(t.socket, os.ModeSocket|0777)
if err != nil {
log.Warningln(err)
}
for {
c, err := t.socketListener.Accept()
if err != nil {
log.Errorln(err)
break
}
// start a new goroutine to handle
// the new connection.
conn := NewConn(t, c)
go conn.Handle()
}
}
func (t *Agent) sub() {
heartbeatTicker := time.NewTicker(time.Second * 40)
defer heartbeatTicker.Stop()
traceSendTicker := time.NewTicker(time.Second * time.Duration(t.flag.Int("send-rate")))
defer traceSendTicker.Stop()
for {
select {
case <-traceSendTicker.C:
len := t.queue.Len()
if len > 0 {
var segments []*upstreamSegment
for i := 0; i < len; i++ {
// front top 100
e := t.queue.Front()
st := format(fmt.Sprintf("%v", e.Value))
if st != nil {
segments = append(segments, st)
}
t.queue.Remove(e)
}
go t.send(segments)
}
case <-heartbeatTicker.C:
go t.heartbeat()
case register := <-t.register:
go t.doRegister(register)
case trace := <-t.trace:
t.queue.PushBack(trace)
go t.recoverRegister(trace)
}
}
}
package service
import (
"io"
"net"
"strings"
)
type Conn struct {
agent *Agent
c net.Conn
}
func NewConn(a *Agent, c net.Conn) *Conn {
var conn = &Conn{
agent: a,
c: c,
}
return conn
}
func (c *Conn) Handle() {
defer func() {
c.c.Close()
}()
buf := make([]byte, 4096)
var json string
var endIndex int
for {
n, err := c.c.Read(buf)
if err != nil {
if err != io.EOF {
log.Warn("conn read error:", err)
}
return
}
json += string(buf[0:n])
for {
endIndex = strings.IndexAny(json, "\n")
if endIndex >= 0 {
body := json[0:endIndex]
if body[:1] == "0" {
c.agent.register <- &register{
c: c.c,
body: body[1:],
}
} else if body[:1] == "1" {
c.agent.trace <- body[1:]
}
json = json[endIndex+1:]
} else {
break
}
}
}
}
package service
import (
"agent/agent/pb/agent"
"agent/agent/pb/register2"
"context"
"time"
)
func (t *Agent) heartbeat() {
t.registerCache.Range(func(key, value interface{}) bool {
log.Infoln("heartbeat")
bind := value.(registerCache)
if bind.Version == 5 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := t.grpcClient.pingClient5.Heartbeat(ctx, &agent.ApplicationInstanceHeartbeat{
ApplicationInstanceId: bind.InstanceId,
HeartbeatTime: time.Now().UnixNano() / 1000000,
})
if err != nil {
log.Error("heartbeat:", err)
} else {
log.Infof("heartbeat appId %d appInsId %d", bind.AppId, bind.InstanceId)
}
} else if bind.Version == 6 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := t.grpcClient.pintClient6.DoPing(ctx, &register2.ServiceInstancePingPkg{
ServiceInstanceId: bind.InstanceId,
Time: time.Now().UnixNano() / 1000000,
ServiceInstanceUUID: bind.Uuid,
})
if err != nil {
log.Error("heartbeat:", err)
} else {
log.Infof("heartbeat appId %d appInsId %d", bind.AppId, bind.InstanceId)
}
}
return true
})
}
package main package service
import ( import (
"agent/agent/pb/agent" "agent/agent/pb/agent"
"agent/agent/pb/common" "agent/agent/pb/common"
"agent/agent/pb/register2" "agent/agent/pb/register2"
"agent/agent/service"
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"github.com/google/uuid" "github.com/google/uuid"
"google.golang.org/grpc"
"io"
"net" "net"
"os" "os"
"runtime" "runtime"
"strconv" "strconv"
"strings"
"sync"
"time" "time"
) )
type PHPSkyBind struct { type registerCache struct {
Version int Version int
AppId int32 AppId int32
InstanceId int32 InstanceId int32
Uuid string Uuid string
} }
type Register struct { type registerReq struct {
AppCode string `json:"app_code"` AppCode string `json:"app_code"`
Pid int `json:"pid"` Pid int `json:"pid"`
Version int `json:"version"` Version int `json:"version"`
} }
var registerMapLock = new(sync.Mutex)
var registerMap sync.Map
var grpcConn *grpc.ClientConn
func ip4s() []string { func ip4s() []string {
ipv4s, addErr := net.InterfaceAddrs() ipv4s, addErr := net.InterfaceAddrs()
var ips []string var ips []string
...@@ -52,38 +42,47 @@ func ip4s() []string { ...@@ -52,38 +42,47 @@ func ip4s() []string {
return ips return ips
} }
func register(c net.Conn, j string) { func (t *Agent) recoverRegister(r string) {
defer func() { var info trace
err := recover() err := json.Unmarshal([]byte(r), &info)
if err != nil { if err == nil {
fmt.Println("System error[register]:", err) if _, ok := t.registerCache.Load(info.Pid); !ok {
t.registerCache.Store(info.Pid, registerCache{
Version: info.Version,
AppId: info.ApplicationId,
InstanceId: info.ApplicationInstance,
Uuid: info.Uuid,
})
} }
}() }
}
info := Register{} func (t *Agent) doRegister(r *register) {
err := json.Unmarshal([]byte(j), &info)
info := registerReq{}
err := json.Unmarshal([]byte(r.body), &info)
if err != nil { if err != nil {
fmt.Println("register => ", err) log.Error("register json decode error", err)
c.Write([]byte("")) r.c.Write([]byte(""))
return return
} }
pid := info.Pid pid := info.Pid
if value, ok := registerMap.Load(pid); ok { if value, ok := t.registerCache.Load(pid); ok {
bind := value.(PHPSkyBind) bind := value.(registerCache)
fmt.Printf("register => pid %d appid %d insId %d\n", pid, bind.AppId, bind.InstanceId) log.Infof("register => pid %d appid %d insId %d", pid, bind.AppId, bind.InstanceId)
c.Write([]byte(strconv.FormatInt(int64(bind.AppId), 10) + "," + strconv.FormatInt(int64(bind.InstanceId), 10))) r.c.Write([]byte(strconv.FormatInt(int64(bind.AppId), 10) + "," + strconv.FormatInt(int64(bind.InstanceId), 10) + "," + bind.Uuid))
return return
} else { } else {
c.Write([]byte("")) r.c.Write([]byte(""))
} }
registerMapLock.Lock() t.registerCacheLock.Lock()
defer registerMapLock.Unlock() defer t.registerCacheLock.Unlock()
// if map not found pid.. start register // if map not found pid.. start register
if _, ok := registerMap.Load(pid); !ok { if _, ok := t.registerCache.Load(pid); !ok {
fmt.Println("register => Start register...") log.Infof("start register pid %d used SkyWalking v%d", pid, info.Version)
var regAppStatus = false var regAppStatus = false
var appId int32 = 0 var appId int32 = 0
var appInsId int32 = 0 var appInsId int32 = 0
...@@ -91,7 +90,7 @@ func register(c net.Conn, j string) { ...@@ -91,7 +90,7 @@ func register(c net.Conn, j string) {
agentUUID := uuid.New().String() agentUUID := uuid.New().String()
if info.Version == 5 { if info.Version == 5 {
c := agent.NewApplicationRegisterServiceClient(grpcConn) c := agent.NewApplicationRegisterServiceClient(t.grpcConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
...@@ -103,7 +102,7 @@ func register(c net.Conn, j string) { ...@@ -103,7 +102,7 @@ func register(c net.Conn, j string) {
ApplicationCode: info.AppCode, ApplicationCode: info.AppCode,
}) })
if regErr != nil { if regErr != nil {
fmt.Println("register error", regErr) log.Error("register error:", regErr)
break break
} }
if regResp.GetApplication() != nil { if regResp.GetApplication() != nil {
...@@ -114,7 +113,7 @@ func register(c net.Conn, j string) { ...@@ -114,7 +113,7 @@ func register(c net.Conn, j string) {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} else if info.Version == 6 { } else if info.Version == 6 {
c := register2.NewRegisterClient(grpcConn) c := register2.NewRegisterClient(t.grpcConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3) ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel() defer cancel()
...@@ -129,7 +128,7 @@ func register(c net.Conn, j string) { ...@@ -129,7 +128,7 @@ func register(c net.Conn, j string) {
Services: services, Services: services,
}) })
if regErr != nil { if regErr != nil {
fmt.Println("register error", regErr) log.Error("register error:", regErr)
break break
} }
...@@ -153,7 +152,7 @@ func register(c net.Conn, j string) { ...@@ -153,7 +152,7 @@ func register(c net.Conn, j string) {
if regAppStatus { if regAppStatus {
// start reg instance // start reg instance
if info.Version == 5 { if info.Version == 5 {
instanceClient := agent.NewInstanceDiscoveryServiceClient(grpcConn) instanceClient := agent.NewInstanceDiscoveryServiceClient(t.grpcConn)
instanceCtx, instanceCancel := context.WithTimeout(context.Background(), time.Second*3) instanceCtx, instanceCancel := context.WithTimeout(context.Background(), time.Second*3)
defer instanceCancel() defer instanceCancel()
...@@ -175,7 +174,7 @@ func register(c net.Conn, j string) { ...@@ -175,7 +174,7 @@ func register(c net.Conn, j string) {
for { for {
instanceResp, instanceErr = instanceClient.RegisterInstance(instanceCtx, instanceReq) instanceResp, instanceErr = instanceClient.RegisterInstance(instanceCtx, instanceReq)
if instanceErr != nil { if instanceErr != nil {
fmt.Println("register error", instanceErr) log.Error("register error:", instanceErr)
break break
} }
if instanceResp.GetApplicationInstanceId() != 0 { if instanceResp.GetApplicationInstanceId() != 0 {
...@@ -185,7 +184,7 @@ func register(c net.Conn, j string) { ...@@ -185,7 +184,7 @@ func register(c net.Conn, j string) {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
} else if info.Version == 6 { } else if info.Version == 6 {
instanceClient := register2.NewRegisterClient(grpcConn) instanceClient := register2.NewRegisterClient(t.grpcConn)
instanceCtx, instanceCancel := context.WithTimeout(context.Background(), time.Second*3) instanceCtx, instanceCancel := context.WithTimeout(context.Background(), time.Second*3)
defer instanceCancel() defer instanceCancel()
...@@ -236,7 +235,7 @@ func register(c net.Conn, j string) { ...@@ -236,7 +235,7 @@ func register(c net.Conn, j string) {
for { for {
instanceResp, instanceErr = instanceClient.DoServiceInstanceRegister(instanceCtx, instanceReq) instanceResp, instanceErr = instanceClient.DoServiceInstanceRegister(instanceCtx, instanceReq)
if instanceErr != nil { if instanceErr != nil {
fmt.Println("register error", instanceErr) log.Error(instanceErr)
break break
} }
if instanceResp.GetServiceInstances() != nil { if instanceResp.GetServiceInstances() != nil {
...@@ -255,175 +254,16 @@ func register(c net.Conn, j string) { ...@@ -255,175 +254,16 @@ func register(c net.Conn, j string) {
} }
if appInsId != 0 { if appInsId != 0 {
registerMap.Store(pid, PHPSkyBind{ t.registerCache.Store(pid, registerCache{
Version: info.Version, Version: info.Version,
AppId: appId, AppId: appId,
InstanceId: appInsId, InstanceId: appInsId,
Uuid: agentUUID, Uuid: agentUUID,
}) })
fmt.Println("register => Start register end...") log.Infof("register pid %d appid %d insId %d", pid, appId, appInsId)
} }
} else { } else {
fmt.Println("register => ", err) log.Error("register error:", err)
fmt.Println("register => Start register error...")
}
}
}
func handleConn(c net.Conn) {
defer func() {
err := recover()
if err != nil {
fmt.Println("System error[register]:", err)
}
}()
defer func() {
fmt.Println("Close conn..")
c.Close()
}()
buf := make([]byte, 4096)
var json string
var endIndex int
for {
n, err := c.Read(buf)
if err != nil {
if err != io.EOF {
fmt.Println("conn read error:", err)
}
return
}
json += string(buf[0:n])
for {
endIndex = strings.IndexAny(json, "\n")
if endIndex >= 0 {
body := json[0:endIndex]
if body[:1] == "0" {
fmt.Println("Service register protocol")
go register(c, body[1:])
} else if body[:1] == "1" {
fmt.Println("Service send trace protocol")
go service.SendTrace(grpcConn, body[1:])
}
json = json[endIndex+1:]
} else {
break
}
}
}
}
func heartbeat() {
defer func() {
err := recover()
if err != nil {
fmt.Println("System error[heartbeat]:", err)
go heartbeat()
}
}()
for {
registerMap.Range(func(key, value interface{}) bool {
fmt.Println("heartbeat => ...")
bind := value.(PHPSkyBind)
if bind.Version == 5 {
c := agent.NewInstanceDiscoveryServiceClient(grpcConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := c.Heartbeat(ctx, &agent.ApplicationInstanceHeartbeat{
ApplicationInstanceId: bind.InstanceId,
HeartbeatTime: time.Now().UnixNano() / 1000000,
})
if err != nil {
fmt.Println("heartbeat =>", err)
} else {
fmt.Printf("heartbeat => %d %d\n", bind.AppId, bind.InstanceId)
}
} else if bind.Version == 6 {
c := register2.NewServiceInstancePingClient(grpcConn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
_, err := c.DoPing(ctx, &register2.ServiceInstancePingPkg{
ServiceInstanceId: bind.InstanceId,
Time: time.Now().UnixNano() / 1000000,
ServiceInstanceUUID: bind.Uuid,
})
if err != nil {
fmt.Println("heartbeat =>", err)
} else {
fmt.Printf("heartbeat => %d %d\n", bind.AppId, bind.InstanceId)
}
}
return true
})
time.Sleep(time.Second * 40)
}
}
func main() {
args := os.Args
if len(args) <= 1 || (len(args) == 2 && (args[1] == "-h" || args[1] == "--help")) {
fmt.Println("The skywalking PHP agent")
fmt.Println("Use sky_php_agent_linux_x64 grpc_address [sock_path]")
fmt.Println("e.g. sky_php_agent_linux_x64 127.0.0.1:11800")
os.Exit(0)
}
grpcHost := args[1]
sockPath := "/tmp/sky_agent.sock"
if len(args) >= 3 {
sockPath = args[2]
}
// connection to sky server
fmt.Println("hello skywalking")
fmt.Println("GRPC Host: ", grpcHost)
fmt.Println("Sock Path: ", sockPath)
var err error
grpcConn, err = grpc.Dial(grpcHost, grpc.WithInsecure())
if err != nil {
fmt.Println(err)
}
defer grpcConn.Close()
if err := os.RemoveAll(sockPath); err != nil {
fmt.Println(err)
}
l, err := net.Listen("unix", sockPath)
if err != nil {
fmt.Println("listen error:", err)
return
}
defer l.Close()
// change sock file type and mode
err = os.Chmod(sockPath, os.ModeSocket|0666)
if err != nil {
fmt.Println("sock file change mod error:", err)
return
}
go heartbeat()
fmt.Println("listen:", sockPath)
for {
c, err := l.Accept()
if err != nil {
fmt.Println("accept error:", err)
break
} }
// start a new goroutine to handle
// the new connection.
go handleConn(c)
} }
} }
...@@ -7,17 +7,22 @@ import ( ...@@ -7,17 +7,22 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"google.golang.org/grpc" "io"
"log"
"strconv" "strconv"
"strings" "strings"
"time" "time"
) )
type upstreamSegment struct {
Version int
segment *agent.UpstreamSegment
}
type trace struct { type trace struct {
ApplicationInstance int32 `json:"application_instance"` ApplicationInstance int32 `json:"application_instance"`
Pid int `json:"pid"` Pid int `json:"pid"`
ApplicationId int32 `json:"application_id"` ApplicationId int32 `json:"application_id"`
Uuid string `json:"uuid"`
Version int `json:"version"` Version int `json:"version"`
Segment segment `json:"segment"` Segment segment `json:"segment"`
GlobalTraceIds []string `json:"globalTraceIds"` GlobalTraceIds []string `json:"globalTraceIds"`
...@@ -56,26 +61,76 @@ type ref struct { ...@@ -56,26 +61,76 @@ type ref struct {
ParentServiceName string `json:"parentServiceName"` ParentServiceName string `json:"parentServiceName"`
} }
func SendTrace(conn *grpc.ClientConn, j string) { func (t *Agent) send(segments []*upstreamSegment) {
info := trace{} var err error
err := json.Unmarshal([]byte(j), &info)
log.Infof("start sending trace..., count %d", len(segments))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
streamV5, err := t.grpcClient.segmentClientV5.Collect(ctx)
if err != nil { if err != nil {
log.Println("trace => ", err) log.Warningln(err)
return
} }
if info.Version == 5 {
log.Println("trace => Start trace...")
c := agent.NewTraceSegmentServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
client, err := c.Collect(ctx) ctx6, cancel6 := context.WithTimeout(context.Background(), time.Second*3)
if err != nil { defer cancel6()
log.Println("trace => ", err) streamV6, err := t.grpcClient.segmentClientV6.Collect(ctx6)
return if err != nil {
log.Warningln(err)
}
if streamV5 == nil && streamV6 == nil {
log.Error("no stream available")
}
for _, segment := range segments {
if segment.Version == 5 {
if streamV5 != nil {
if err = streamV5.Send(segment.segment); err != nil {
if err == io.EOF {
log.Warn(err)
} else {
log.Error(err)
}
}
} else {
log.Warn("stream not open, sending fail")
}
} else if segment.Version == 6 {
if streamV6 != nil {
if err = streamV6.Send(segment.segment); err != nil {
if err == io.EOF {
log.Warn(err)
} else {
log.Error(err)
}
}
} else {
log.Warn("stream not open, sending fail")
}
} }
}
if streamV5 != nil {
streamV5.CloseAndRecv()
}
if streamV6 == nil {
streamV6.CloseAndRecv()
}
log.Info("sending success...")
}
func format(j string) *upstreamSegment {
info := trace{}
err := json.Unmarshal([]byte(j), &info)
if err != nil {
log.Error("trace json decode:", err)
return nil
}
if info.Version == 5 {
var globalTrace []*agent.UniqueId var globalTrace []*agent.UniqueId
for _, v := range info.GlobalTraceIds { for _, v := range info.GlobalTraceIds {
...@@ -131,39 +186,21 @@ func SendTrace(conn *grpc.ClientConn, j string) { ...@@ -131,39 +186,21 @@ func SendTrace(conn *grpc.ClientConn, j string) {
// EnumsAsInts: true, // EnumsAsInts: true,
//} //}
seg, err := proto.Marshal(segmentObject) seg, err := proto.Marshal(segmentObject)
//fmt.Println(seg) //log.Info(seg)
if err != nil { if err != nil {
log.Println("trace => ", err) log.Error("trace json encode:", err)
return return nil
} }
segment := &agent.UpstreamSegment{ segment := &agent.UpstreamSegment{
GlobalTraceIds: globalTrace, GlobalTraceIds: globalTrace,
Segment: seg, Segment: seg,
} }
return &upstreamSegment{
err = client.Send(segment) Version: info.Version,
if err != nil { segment: segment,
log.Println("trace => ", err)
return
}
_, err = client.CloseAndRecv()
if err != nil {
log.Println("trace =>", err)
} }
log.Println("trace => send ok")
} else if info.Version == 6 { } else if info.Version == 6 {
log.Println("trace => Start trace...")
c := agent2.NewTraceSegmentReportServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
client, err := c.Collect(ctx)
if err != nil {
log.Println("trace => ", err)
return
}
var globalTrace []*agent.UniqueId var globalTrace []*agent.UniqueId
...@@ -220,10 +257,10 @@ func SendTrace(conn *grpc.ClientConn, j string) { ...@@ -220,10 +257,10 @@ func SendTrace(conn *grpc.ClientConn, j string) {
// EnumsAsInts: true, // EnumsAsInts: true,
//} //}
seg, err := proto.Marshal(segmentObject) seg, err := proto.Marshal(segmentObject)
//fmt.Println(seg) //log.Info(seg)
if err != nil { if err != nil {
log.Println("trace => ", err) log.Error("trace proto encode:", err)
return return nil
} }
segment := &agent.UpstreamSegment{ segment := &agent.UpstreamSegment{
...@@ -231,18 +268,12 @@ func SendTrace(conn *grpc.ClientConn, j string) { ...@@ -231,18 +268,12 @@ func SendTrace(conn *grpc.ClientConn, j string) {
Segment: seg, Segment: seg,
} }
err = client.Send(segment) return &upstreamSegment{
if err != nil { Version: info.Version,
log.Println("trace => ", err) segment: segment,
return
}
_, err = client.CloseAndRecv()
if err != nil {
log.Println("trace =>", err)
} }
log.Println("trace => send ok")
} }
return nil
} }
func buildRefs(span *agent.SpanObject, refs []ref) { func buildRefs(span *agent.SpanObject, refs []ref) {
......
...@@ -3,6 +3,9 @@ module agent ...@@ -3,6 +3,9 @@ module agent
require ( require (
github.com/golang/protobuf v1.3.1 github.com/golang/protobuf v1.3.1
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/sirupsen/logrus v1.4.2
github.com/urfave/cli v1.22.1
google.golang.org/grpc v1.21.1 google.golang.org/grpc v1.21.1
) )
go 1.13
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
...@@ -10,6 +13,18 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y ...@@ -10,6 +13,18 @@ github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5y
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628=
...@@ -18,6 +33,8 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG ...@@ -18,6 +33,8 @@ golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAG
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
...@@ -26,4 +43,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0 ...@@ -26,4 +43,6 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8= google.golang.org/grpc v1.21.1 h1:j6XxA85m/6txkUCHvzlV5f+HBNl/1r5cZ2A/3IEFOO8=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册