From c73398544694ff7c64b083a5ee0d6d82578971cf Mon Sep 17 00:00:00 2001 From: huawei Date: Sun, 23 Aug 2020 08:26:39 +0800 Subject: [PATCH] [Feature] Process propagation (#67) Co-authored-by: kezhenxu94 --- README.md | 9 +- skywalking/agent/__init__.py | 4 + skywalking/config.py | 35 ++++- skywalking/trace/ipc/__init__.py | 16 +++ skywalking/trace/ipc/process.py | 34 +++++ tests/plugin/docker/Dockerfile.tool | 2 +- tests/plugin/sw_process/__init__.py | 16 +++ tests/plugin/sw_process/docker-compose.yml | 60 ++++++++ tests/plugin/sw_process/expected.data.yml | 143 +++++++++++++++++++ tests/plugin/sw_process/services/__init__.py | 16 +++ tests/plugin/sw_process/services/consumer.py | 52 +++++++ tests/plugin/sw_process/services/provider.py | 37 +++++ tests/plugin/sw_process/test_process.py | 32 +++++ 13 files changed, 448 insertions(+), 8 deletions(-) create mode 100644 skywalking/trace/ipc/__init__.py create mode 100644 skywalking/trace/ipc/process.py create mode 100644 tests/plugin/sw_process/__init__.py create mode 100644 tests/plugin/sw_process/docker-compose.yml create mode 100644 tests/plugin/sw_process/expected.data.yml create mode 100644 tests/plugin/sw_process/services/__init__.py create mode 100644 tests/plugin/sw_process/services/consumer.py create mode 100644 tests/plugin/sw_process/services/provider.py create mode 100644 tests/plugin/sw_process/test_process.py diff --git a/README.md b/README.md index 649b3c4..1900fb1 100755 --- a/README.md +++ b/README.md @@ -88,6 +88,7 @@ from time import sleep from skywalking import Component from skywalking.decorators import trace, runnable from skywalking.trace.context import SpanContext, get_context +from skywalking.trace.ipc.process import SwProcess @trace() # the operation name is the method name('some_other_method') by default def some_other_method(): @@ -103,10 +104,16 @@ def some_method(): def some_method(): some_other_method() -from threading import Thread +from threading import Thread t = Thread(target=some_method) t.start() +# When another process is started, agents will also be started in other processes, +# supporting only the process mode of spawn. +p1 = SwProcess(target=some_method) +p1.start() +p1.join() + context: SpanContext = get_context() with context.new_entry_span(op=str('https://github.com/apache/skywalking')) as span: diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py index fe181a5..8ed9759 100644 --- a/skywalking/agent/__init__.py +++ b/skywalking/agent/__init__.py @@ -82,6 +82,10 @@ def stop(): __finished.set() +def started(): + return __started + + def connected(): return __protocol.connected() diff --git a/skywalking/config.py b/skywalking/config.py index b2def35..9ea2478 100644 --- a/skywalking/config.py +++ b/skywalking/config.py @@ -14,10 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. # - +import inspect import os import uuid -from typing import List +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from typing import List service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str service_instance = os.getenv('SW_AGENT_INSTANCE') or str(uuid.uuid1()).replace('-', '') # type: str @@ -35,17 +38,17 @@ pymongo_parameters_max_length = int(os.getenv('SW_PYMONGO_PARAMETERS_MAX_LENGTH' ignore_suffix = os.getenv('SW_IGNORE_SUFFIX') or '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,' \ '.mp4,.html,.svg ' # type: str flask_collect_http_params = True if os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') and \ - os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool + os.getenv('SW_FLASK_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool http_params_length_threshold = int(os.getenv('SW_HTTP_PARAMS_LENGTH_THRESHOLD') or '1024') # type: int django_collect_http_params = True if os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') and \ - os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool + os.getenv('SW_DJANGO_COLLECT_HTTP_PARAMS') == 'True' else False # type: bool correlation_element_max_number = int(os.getenv('SW_CORRELATION_ELEMENT_MAX_NUMBER') or '3') # type: int correlation_value_max_length = int(os.getenv('SW_CORRELATION_VALUE_MAX_LENGTH') or '128') # type: int trace_ignore = True if os.getenv('SW_TRACE_IGNORE') and \ - os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool + os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool trace_ignore_path = (os.getenv('SW_TRACE_IGNORE_PATH') or '').split(',') # type: List[str] elasticsearch_trace_dsl = True if os.getenv('SW_ELASTICSEARCH_TRACE_DSL') and \ - os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool + os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool def init( @@ -69,3 +72,23 @@ def init( global authentication authentication = token or authentication + + +def serialize(): + from skywalking import config + return { + key: value for key, value in config.__dict__.items() if not ( + key.startswith('_') or key == 'TYPE_CHECKING' + or inspect.isfunction(value) + or inspect.ismodule(value) + or inspect.isbuiltin(value) + or inspect.isclass(value) + ) + } + + +def deserialize(data): + from skywalking import config + for key, value in data.items(): + if key in config.__dict__: + config.__dict__[key] = value diff --git a/skywalking/trace/ipc/__init__.py b/skywalking/trace/ipc/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/skywalking/trace/ipc/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/skywalking/trace/ipc/process.py b/skywalking/trace/ipc/process.py new file mode 100644 index 0000000..6799e0e --- /dev/null +++ b/skywalking/trace/ipc/process.py @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from multiprocessing import Process + +from skywalking import config, agent + + +class SwProcess(Process): + + def __init__(self, group=None, target=None, name=None, args=(), kwargs={}, *, + daemon=None): + super(SwProcess, self).__init__(group=group, target=target, name=name, args=args, kwargs=kwargs, daemon=daemon) + self._sw_config = config.serialize() + + def run(self): + if agent.started() is False: + config.deserialize(self._sw_config) + agent.start() + super(SwProcess, self).run() diff --git a/tests/plugin/docker/Dockerfile.tool b/tests/plugin/docker/Dockerfile.tool index f4283f4..e88d0ca 100644 --- a/tests/plugin/docker/Dockerfile.tool +++ b/tests/plugin/docker/Dockerfile.tool @@ -19,7 +19,7 @@ FROM openjdk:8 WORKDIR /tests -ARG COMMIT_HASH=3c9d7099f05dc4a4b937c8a47506e56c130b6221 +ARG COMMIT_HASH=8a48c49b4420df5c9576d2aea178b2ebcb7ecd09 ADD https://github.com/apache/skywalking-agent-test-tool/archive/${COMMIT_HASH}.tar.gz . diff --git a/tests/plugin/sw_process/__init__.py b/tests/plugin/sw_process/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/sw_process/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/sw_process/docker-compose.yml b/tests/plugin/sw_process/docker-compose.yml new file mode 100644 index 0000000..61f3be7 --- /dev/null +++ b/tests/plugin/sw_process/docker-compose.yml @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '2.1' + +services: + collector: + extends: + service: collector + file: ../docker/docker-compose.base.yml + + provider: + extends: + service: agent + file: ../docker/docker-compose.base.yml + ports: + - 9091:9091 + volumes: + - ./services/provider.py:/app/provider.py + command: ['bash', '-c', 'pip install flask && python3 /app/provider.py'] + depends_on: + collector: + condition: service_healthy + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9091"] + interval: 5s + timeout: 60s + retries: 120 + + consumer: + extends: + service: agent + file: ../docker/docker-compose.base.yml + ports: + - 9090:9090 + volumes: + - ./services/consumer.py:/app/consumer.py + command: ['bash', '-c', 'pip install flask && python3 /app/consumer.py'] + depends_on: + collector: + condition: service_healthy + provider: + condition: service_healthy + +networks: + beyond: diff --git a/tests/plugin/sw_process/expected.data.yml b/tests/plugin/sw_process/expected.data.yml new file mode 100644 index 0000000..c295a84 --- /dev/null +++ b/tests/plugin/sw_process/expected.data.yml @@ -0,0 +1,143 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +segmentItems: + - serviceName: provider + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: POST + - key: url + value: http://provider:9091/users + - key: status.code + value: '200' + refs: + - parentEndpoint: /users + networkAddress: 'provider:9091' + refType: CrossProcess + parentSpanId: 0 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + tags: + - key: http.method + value: POST + - key: url + value: http://provider:9091/users + - key: status.code + value: '200' + refs: + - parentEndpoint: /users + networkAddress: 'provider:9091' + refType: CrossProcess + parentSpanId: 1 + parentTraceSegmentId: not null + parentServiceInstance: not null + parentService: consumer + traceId: not null + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + spanType: Entry + peer: not null + skipAnalysis: false + - serviceName: consumer + segmentSize: 2 + segments: + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7002 + isError: false + spanType: Exit + peer: not null + skipAnalysis: false + tags: + - key: http.method + value: POST + - key: url + value: 'http://provider:9091/users' + - key: status.code + value: '200' + - segmentId: not null + spans: + - operationName: /users + operationId: 0 + parentSpanId: 0 + spanId: 1 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7002 + isError: false + spanType: Exit + peer: provider:9091 + skipAnalysis: false + tags: + - key: http.method + value: POST + - key: url + value: 'http://provider:9091/users' + - key: status.code + value: '200' + - operationName: /users + operationId: 0 + parentSpanId: -1 + spanId: 0 + spanLayer: Http + startTime: gt 0 + endTime: gt 0 + componentId: 7001 + isError: false + spanType: Entry + peer: not null + skipAnalysis: false + tags: + - key: http.method + value: GET + - key: url + value: 'http://0.0.0.0:9090/users' + - key: status.code + value: '200' \ No newline at end of file diff --git a/tests/plugin/sw_process/services/__init__.py b/tests/plugin/sw_process/services/__init__.py new file mode 100644 index 0000000..b1312a0 --- /dev/null +++ b/tests/plugin/sw_process/services/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# diff --git a/tests/plugin/sw_process/services/consumer.py b/tests/plugin/sw_process/services/consumer.py new file mode 100644 index 0000000..d7d3fe6 --- /dev/null +++ b/tests/plugin/sw_process/services/consumer.py @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time +import requests +from skywalking import agent, config +from skywalking.trace.ipc.process import SwProcess +import multiprocessing + + +def post(): + requests.post("http://provider:9091/users") + time.sleep(3) + + +if __name__ == '__main__': + multiprocessing.set_start_method('spawn') + config.service_name = 'consumer' + config.logging_level = 'DEBUG' + config.flask_collect_http_params = True + agent.start() + + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route("/users", methods=["POST", "GET"]) + def application(): + p1 = SwProcess(target=post) + p1.start() + p1.join() + + res = requests.post("http://provider:9091/users") + + return jsonify(res.json()) + + PORT = 9090 + app.run(host='0.0.0.0', port=PORT, debug=False) diff --git a/tests/plugin/sw_process/services/provider.py b/tests/plugin/sw_process/services/provider.py new file mode 100644 index 0000000..d043141 --- /dev/null +++ b/tests/plugin/sw_process/services/provider.py @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import time + +from skywalking import agent, config + +if __name__ == '__main__': + config.service_name = 'provider' + config.logging_level = 'DEBUG' + agent.start() + + from flask import Flask, jsonify + + app = Flask(__name__) + + @app.route("/users", methods=["POST", "GET"]) + def application(): + time.sleep(0.5) + return jsonify({"song": "Despacito", "artist": "Luis Fonsi"}) + + PORT = 9091 + app.run(host='0.0.0.0', port=PORT, debug=False) diff --git a/tests/plugin/sw_process/test_process.py b/tests/plugin/sw_process/test_process.py new file mode 100644 index 0000000..bfafb7a --- /dev/null +++ b/tests/plugin/sw_process/test_process.py @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Callable +import pytest +import requests +from tests.plugin.base import TestPluginBase + + +@pytest.fixture +def prepare(): + # type: () -> Callable + return lambda *_: requests.get('http://0.0.0.0:9090/users') + + +class TestPlugin(TestPluginBase): + def test_plugin(self, docker_compose, version): + self.validate() -- GitLab