未验证 提交 c7339854 编写于 作者: H huawei 提交者: GitHub

[Feature] Process propagation (#67)

Co-authored-by: Nkezhenxu94 <kezhenxu94@163.com>
上级 21011a9e
......@@ -88,6 +88,7 @@ from time import sleep
from skywalking import Component
from skywalking.decorators import trace, runnable
from skywalking.trace.context import SpanContext, get_context
from skywalking.trace.ipc.process import SwProcess
@trace() # the operation name is the method name('some_other_method') by default
def some_other_method():
......@@ -103,10 +104,16 @@ def some_method():
def some_method():
some_other_method()
from threading import Thread
from threading import Thread
t = Thread(target=some_method)
t.start()
# When another process is started, agents will also be started in other processes,
# supporting only the process mode of spawn.
p1 = SwProcess(target=some_method)
p1.start()
p1.join()
context: SpanContext = get_context()
with context.new_entry_span(op=str('https://github.com/apache/skywalking')) as span:
......
......@@ -82,6 +82,10 @@ def stop():
__finished.set()
def started():
return __started
def connected():
return __protocol.connected()
......
......@@ -14,10 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import os
import uuid
from typing import List
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import List
service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str
service_instance = os.getenv('SW_AGENT_INSTANCE') or str(uuid.uuid1()).replace('-', '') # type: str
......@@ -35,17 +38,17 @@ pymongo_parameters_max_length = int(os.getenv('SW_PYMONGO_PARAMETERS_MAX_LENGTH'
ignore_suffix = os.getenv('SW_IGNORE_SUFFIX') or '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,' \
'.mp4,.html,.svg ' # type: str
flask_collect_http_params = True if os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') and \
os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
http_params_length_threshold = int(os.getenv('SW_HTTP_PARAMS_LENGTH_THRESHOLD') or '1024') # type: int
django_collect_http_params = True if os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') and \
os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool
correlation_element_max_number = int(os.getenv('SW_CORRELATION_ELEMENT_MAX_NUMBER') or '3') # type: int
correlation_value_max_length = int(os.getenv('SW_CORRELATION_VALUE_MAX_LENGTH') or '128') # type: int
trace_ignore = True if os.getenv('SW_TRACE_IGNORE') and \
os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool
os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool
trace_ignore_path = (os.getenv('SW_TRACE_IGNORE_PATH') or '').split(',') # type: List[str]
elasticsearch_trace_dsl = True if os.getenv('SW_ELASTICSEARCH_TRACE_DSL') and \
os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool
os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool
def init(
......@@ -69,3 +72,23 @@ def init(
global authentication
authentication = token or authentication
def serialize():
from skywalking import config
return {
key: value for key, value in config.__dict__.items() if not (
key.startswith('_') or key == 'TYPE_CHECKING'
or inspect.isfunction(value)
or inspect.ismodule(value)
or inspect.isbuiltin(value)
or inspect.isclass(value)
)
}
def deserialize(data):
from skywalking import config
for key, value in data.items():
if key in config.__dict__:
config.__dict__[key] = value
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from multiprocessing import Process
from skywalking import config, agent
class SwProcess(Process):
def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *,
daemon=None):
super(SwProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon)
self._sw_config = config.serialize()
def run(self):
if agent.started() is False:
config.deserialize(self._sw_config)
agent.start()
super(SwProcess, self).run()
......@@ -19,7 +19,7 @@ FROM openjdk:8
WORKDIR /tests
ARG COMMIT_HASH=3c9d7099f05dc4a4b937c8a47506e56c130b6221
ARG COMMIT_HASH=8a48c49b4420df5c9576d2aea178b2ebcb7ecd09
ADD https://github.com/apache/skywalking-agent-test-tool/archive/${COMMIT_HASH}.tar.gz .
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2.1'
services:
collector:
extends:
service: collector
file: ../docker/docker-compose.base.yml
provider:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9091:9091
volumes:
- ./services/provider.py:/app/provider.py
command: ['bash', '-c', 'pip install flask && python3 /app/provider.py']
depends_on:
collector:
condition: service_healthy
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"]
interval: 5s
timeout: 60s
retries: 120
consumer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9090:9090
volumes:
- ./services/consumer.py:/app/consumer.py
command: ['bash', '-c', 'pip install flask && python3 /app/consumer.py']
depends_on:
collector:
condition: service_healthy
provider:
condition: service_healthy
networks:
beyond:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
segmentItems:
- serviceName: provider
segmentSize: 2
segments:
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: POST
- key: url
value: http://provider:9091/users
- key: status.code
value: '200'
refs:
- parentEndpoint: /users
networkAddress: 'provider:9091'
refType: CrossProcess
parentSpanId: 0
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: POST
- key: url
value: http://provider:9091/users
- key: status.code
value: '200'
refs:
- parentEndpoint: /users
networkAddress: 'provider:9091'
refType: CrossProcess
parentSpanId: 1
parentTraceSegmentId: not null
parentServiceInstance: not null
parentService: consumer
traceId: not null
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
- serviceName: consumer
segmentSize: 2
segments:
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
isError: false
spanType: Exit
peer: not null
skipAnalysis: false
tags:
- key: http.method
value: POST
- key: url
value: 'http://provider:9091/users'
- key: status.code
value: '200'
- segmentId: not null
spans:
- operationName: /users
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7002
isError: false
spanType: Exit
peer: provider:9091
skipAnalysis: false
tags:
- key: http.method
value: POST
- key: url
value: 'http://provider:9091/users'
- key: status.code
value: '200'
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
startTime: gt 0
endTime: gt 0
componentId: 7001
isError: false
spanType: Entry
peer: not null
skipAnalysis: false
tags:
- key: http.method
value: GET
- key: url
value: 'http://0.0.0.0:9090/users'
- key: status.code
value: '200'
\ No newline at end of file
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
import requests
from skywalking import agent, config
from skywalking.trace.ipc.process import SwProcess
import multiprocessing
def post():
requests.post("http://provider:9091/users")
time.sleep(3)
if __name__ == '__main__':
multiprocessing.set_start_method('spawn')
config.service_name = 'consumer'
config.logging_level = 'DEBUG'
config.flask_collect_http_params = True
agent.start()
from flask import Flask, jsonify
app = Flask(__name__)
@app.route("/users", methods=["POST", "GET"])
def application():
p1 = SwProcess(target=post)
p1.start()
p1.join()
res = requests.post("http://provider:9091/users")
return jsonify(res.json())
PORT = 9090
app.run(host='0.0.0.0', port=PORT, debug=False)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import time
from skywalking import agent, config
if __name__ == '__main__':
config.service_name = 'provider'
config.logging_level = 'DEBUG'
agent.start()
from flask import Flask, jsonify
app = Flask(__name__)
@app.route("/users", methods=["POST", "GET"])
def application():
time.sleep(0.5)
return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
PORT = 9091
app.run(host='0.0.0.0', port=PORT, debug=False)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Callable
import pytest
import requests
from tests.plugin.base import TestPluginBase
@pytest.fixture
def prepare():
# type: () -> Callable
return lambda *_: requests.get('http://0.0.0.0:9090/users')
class TestPlugin(TestPluginBase):
def test_plugin(self, docker_compose, version):
self.validate()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册