未验证 提交 5456881c 编写于 作者: 何延龙 提交者: GitHub

Merge pull request #202 from SkyAPM/develop

Add Dockerfile support to e2e test
......@@ -2,7 +2,7 @@ name: CI
on:
push:
branches: [ master ]
branches: [ master, develop ]
pull_request:
branches: [ master ]
......@@ -17,8 +17,6 @@ jobs:
steps:
- name: Checkout
uses: actions/checkout@v2
with:
path: skywalking-php
- name: Checkout agent test tool
uses: actions/checkout@v2
......@@ -51,27 +49,25 @@ jobs:
- name: Build SkyWalking-PHP
run: |
cd skywalking-php
phpize
./configure
make
sudo make install
cd ..
- name: Build SkyWalking-PHP agent
run: |
cd skywalking-php
bash -c './build-sky-php-agent.sh'
cd ..
- name: Start apache SkyWalking mock collector
- name: Build SkyWalking mock collector
run: |
cd skywalking-agent-test-tool
mvn package -DskipTests
tar zxvf ./dist/skywalking-mock-collector.tar.gz -C ./mock-collector
cd ./mock-collector/skywalking-mock-collector
nohup bash -c './bin/collector-startup.sh' &
cd ../..
bash -c 'response=0; while [ $response -ne 200 ]; do sleep 1; response=$(curl --write-out %{http_code} --silent --output /dev/null http://localhost:12800/receiveData);echo $response;done'
- name: Check mock collector
run: bash -c 'response=0; while [ $response -ne 200 ]; do sleep 1; response=$(curl --write-out %{http_code} --silent --output /dev/null http://localhost:12800/receiveData);echo $response;done'
- name: Publish Dokcer image
if: matrix.php-versions == '7.4'
uses: elgohr/Publish-Docker-Github-Action@2.13
with:
name: skyapm/skywalking-php
username: ${{ secrets.DOCKER_USERNAME }}
password: ${{ secrets.DOCKER_PASSWORD }}
FROM php:7.4-fpm
ARG SKYWALKING=3.2.7
RUN set -ex \
&& apt-get update && apt-get install -y curl libcurl4-openssl-dev golang git \
&& mkdir -p /tmp/skywalking && cd /tmp/skywalking \
&& curl -L -o skywalking.tar.gz https://github.com/SkyAPM/SkyAPM-php-sdk/archive/${SKYWALKING}.tar.gz \
&& tar zxvf skywalking.tar.gz && cd SkyAPM-php-sdk-${SKYWALKING} \
&& apt-get update -y && apt-get install --no-install-recommends -y \
curl \
libcurl4-openssl-dev \
golang \
git \
procps \
nginx \
&& rm -rf /var/lib/apt/lists/*
ADD . /tmp/skywalking
RUN set -ex \
&& cd /tmp/skywalking \
&& phpize && ./configure && make && make install \
&& cp php.ini /usr/local/etc/php/conf.d/ext-skywalking.ini \
&& ./build-sky-php-agent.sh \
&& cp sky-php-agent-* /usr/local/bin/
\ No newline at end of file
&& cp sky-php-agent-* /usr/local/bin/ \
&& cp php.ini /usr/local/etc/php/conf.d/ext-skywalking.ini \
&& cp service.sh /opt/ \
&& cp nginx.conf /etc/nginx/nginx.conf \
&& cd /var/www/html \
&& rm -fr /tmp/skywalking
ENTRYPOINT ["/opt/service.sh"]
\ No newline at end of file
......@@ -20,7 +20,7 @@ func main() {
app := cli.NewApp()
app.Name = "sky_php_agent"
app.Usage = "the skywalking trace sending agent"
app.Version = "3.2.6"
app.Version = "3.2.7"
app.Flags = []cli.Flag{
&cli.StringSliceFlag{Name: "grpc", Usage: "SkyWalking collector grpc address", Value: cli.NewStringSlice("127.0.0.1:11800")},
&cli.StringFlag{Name: "socket", Usage: "Pipeline for communicating with PHP", Value: "/var/run/sky-agent.sock"},
......
......@@ -5,8 +5,6 @@ import (
"agent/agent/pb/agent"
"agent/agent/pb/agent2"
"agent/agent/pb/register2"
"container/list"
"fmt"
cli "github.com/urfave/cli/v2"
"google.golang.org/grpc"
"math/rand"
......@@ -38,19 +36,20 @@ type Agent struct {
socket string
socketListener net.Listener
register chan *register
registerCache sync.Map
registerCacheLock sync.Mutex
registerCache map[int]registerCache
registerCacheLock sync.RWMutex
trace chan string
queue *list.List
queue []string
queueLock sync.Mutex
}
func NewAgent(cli *cli.Context) *Agent {
var agent = &Agent{
flag: cli,
socket: cli.String("socket"),
register: make(chan *register),
trace: make(chan string),
queue: list.New(),
flag: cli,
socket: cli.String("socket"),
register: make(chan *register),
trace: make(chan string),
registerCache: make(map[int]registerCache),
}
go agent.sub()
......@@ -60,22 +59,25 @@ func NewAgent(cli *cli.Context) *Agent {
func (t *Agent) Run() {
log.Info("hello skywalking")
t.connGRPC()
t.listenSocket()
log.Info("🍺 skywalking php agent started successfully, enjoy yourself")
defer func() {
var err error
err = t.socketListener.Close()
if err != nil {
log.Errorln(err)
if t.socketListener != nil {
err = t.socketListener.Close()
if err != nil {
log.Errorln(err)
}
}
err = t.grpcConn.Close()
if err != nil {
log.Errorln(err)
if t.grpcConn != nil {
err = t.grpcConn.Close()
if err != nil {
log.Errorln(err)
}
}
}()
t.connGRPC()
t.listenSocket()
}
func (t *Agent) connGRPC() {
......@@ -99,6 +101,7 @@ func (t *Agent) connGRPC() {
t.grpcClient.segmentClientV6 = agent2.NewTraceSegmentReportServiceClient(t.grpcConn)
t.grpcClient.pingClient5 = agent.NewInstanceDiscoveryServiceClient(t.grpcConn)
t.grpcClient.pintClient6 = register2.NewServiceInstancePingClient(t.grpcConn)
log.Info("🍺 skywalking php agent started successfully, enjoy yourself")
}
func (t *Agent) listenSocket() {
......@@ -144,17 +147,21 @@ func (t *Agent) sub() {
for {
select {
case <-traceSendTicker.C:
len := t.queue.Len()
len := len(t.queue)
if len > 0 {
var segments []*upstreamSegment
for i := 0; i < len; i++ {
// front top 100
e := t.queue.Front()
st := format(fmt.Sprintf("%v", e.Value))
t.queueLock.Lock()
list := t.queue[:]
t.queue = []string{}
t.queueLock.Unlock()
for _, trace := range list {
info, st := format(trace)
if st != nil {
t.recoverRegister(info)
segments = append(segments, st)
}
t.queue.Remove(e)
}
go t.send(segments)
}
......@@ -163,8 +170,9 @@ func (t *Agent) sub() {
case register := <-t.register:
go t.doRegister(register)
case trace := <-t.trace:
t.queue.PushBack(trace)
go t.recoverRegister(trace)
t.queueLock.Lock()
t.queue = append(t.queue, trace)
t.queueLock.Unlock()
}
}
}
......@@ -9,10 +9,15 @@ import (
func (t *Agent) heartbeat() {
t.registerCache.Range(func(key, value interface{}) bool {
var heartList []registerCache
t.registerCacheLock.RLock()
for _, bind := range t.registerCache {
heartList = append(heartList, bind)
}
t.registerCacheLock.RUnlock()
for _, bind := range heartList {
log.Infoln("heartbeat")
bind := value.(registerCache)
if bind.Version == 5 {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
......@@ -43,6 +48,5 @@ func (t *Agent) heartbeat() {
log.Infof("heartbeat appId %d appInsId %d", bind.AppId, bind.InstanceId)
}
}
return true
})
}
}
......@@ -42,18 +42,21 @@ func ip4s() []string {
return ips
}
func (t *Agent) recoverRegister(r string) {
var info trace
err := json.Unmarshal([]byte(r), &info)
if err == nil {
if _, ok := t.registerCache.Load(info.Pid); !ok {
t.registerCache.Store(info.Pid, registerCache{
Version: info.Version,
AppId: info.ApplicationId,
InstanceId: info.ApplicationInstance,
Uuid: info.Uuid,
})
func (t *Agent) recoverRegister(info trace) {
t.registerCacheLock.RLock()
_, ok := t.registerCache[info.Pid]
t.registerCacheLock.RUnlock()
if !ok {
t.registerCacheLock.Lock()
t.registerCache[info.Pid] = registerCache{
Version: info.Version,
AppId: info.ApplicationId,
InstanceId: info.ApplicationInstance,
Uuid: info.Uuid,
}
t.registerCacheLock.Unlock()
}
}
......@@ -68,8 +71,10 @@ func (t *Agent) doRegister(r *register) {
}
pid := info.Pid
if value, ok := t.registerCache.Load(pid); ok {
bind := value.(registerCache)
t.registerCacheLock.RLock()
bind, ok := t.registerCache[pid]
t.registerCacheLock.RUnlock()
if ok {
log.Infof("register => pid %d appid %d insId %d", pid, bind.AppId, bind.InstanceId)
r.c.Write([]byte(strconv.FormatInt(int64(bind.AppId), 10) + "," + strconv.FormatInt(int64(bind.InstanceId), 10) + "," + bind.Uuid))
return
......@@ -79,9 +84,8 @@ func (t *Agent) doRegister(r *register) {
t.registerCacheLock.Lock()
defer t.registerCacheLock.Unlock()
// if map not found pid.. start register
if _, ok := t.registerCache.Load(pid); !ok {
if _, ok := t.registerCache[pid]; !ok {
log.Infof("start register pid %d used SkyWalking v%d", pid, info.Version)
var regAppStatus = false
var appId int32 = 0
......@@ -254,12 +258,12 @@ func (t *Agent) doRegister(r *register) {
}
if appInsId != 0 {
t.registerCache.Store(pid, registerCache{
t.registerCache[pid] = registerCache{
Version: info.Version,
AppId: appId,
InstanceId: appInsId,
Uuid: agentUUID,
})
}
log.Infof("register pid %d appid %d insId %d", pid, appId, appInsId)
}
} else {
......
......@@ -123,13 +123,13 @@ func (t *Agent) send(segments []*upstreamSegment) {
log.Info("sending success...")
}
func format(j string) *upstreamSegment {
func format(j string) (trace, *upstreamSegment) {
info := trace{}
err := json.Unmarshal([]byte(j), &info)
if err != nil {
log.Error("trace json decode:", err)
return nil
return info, nil
}
if info.Version == 5 {
var globalTrace []*agent.UniqueId
......@@ -190,14 +190,14 @@ func format(j string) *upstreamSegment {
//log.Info(seg)
if err != nil {
log.Error("trace json encode:", err)
return nil
return info, nil
}
segment := &agent.UpstreamSegment{
GlobalTraceIds: globalTrace,
Segment: seg,
}
return &upstreamSegment{
return info, &upstreamSegment{
Version: info.Version,
segment: segment,
}
......@@ -261,7 +261,7 @@ func format(j string) *upstreamSegment {
//log.Info(seg)
if err != nil {
log.Error("trace proto encode:", err)
return nil
return info, nil
}
segment := &agent.UpstreamSegment{
......@@ -269,12 +269,12 @@ func format(j string) *upstreamSegment {
Segment: seg,
}
return &upstreamSegment{
return info, &upstreamSegment{
Version: info.Version,
segment: segment,
}
}
return nil
return info, nil
}
func buildRefs(span *agent.SpanObject, refs []ref) {
......
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
user nginx;
worker_processes 1;
error_log /var/log/nginx/error.log warn;
pid /var/run/nginx.pid;
events {
worker_connections 1024;
}
http {
include /etc/nginx/mime.types;
default_type application/octet-stream;
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
access_log /var/log/nginx/access.log main;
sendfile on;
#tcp_nopush on;
keepalive_timeout 65;
#gzip on;
server {
listen 8080;
root /opt/www;
index index.php;
location / {
try_files $uri $uri/ /index.php?$query_string;
}
location ~ \.php$ {
fastcgi_pass 127.0.0.1:9000;
fastcgi_index index.php;
fastcgi_param SCRIPT_FILENAME $document_root$fastcgi_script_name;
include fastcgi_params;
}
}
}
\ No newline at end of file
......@@ -11,4 +11,5 @@
extension=skywalking.so
skywalking.app_code = hello_skywalking
skywalking.enable = 1
skywalking.version = 6
\ No newline at end of file
skywalking.version = 6
skywalking.sock_path = /tmp/sky-agent.sock
\ No newline at end of file
......@@ -25,7 +25,7 @@ extern zend_module_entry skywalking_module_entry;
#define phpext_skywalking_ptr &skywalking_module_entry
#define SKY_DEBUG 0
#define PHP_SKYWALKING_VERSION "3.2.6" /* Replace with version number for your extension */
#define PHP_SKYWALKING_VERSION "3.2.7" /* Replace with version number for your extension */
#ifdef PHP_WIN32
# define PHP_SKYWALKING_API __declspec(dllexport)
......
#!/usr/bin/env bash
nginx
php-fpm > /var/log/php-fpm.log 2>&1 &
sky-php-agent-linux-x64 --grpc ${SW_AGENT_COLLECTOR_BACKEND_SERVICES} --socket /tmp/sky-agent.sock > /var/log/sky-php.log 2>&1 &
while [[ true ]]; do
sleep 1
done
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册