提交 ecefb9b5 编写于 作者: K kezhenxu94

Complement exception handler and reorganize package structure

上级 fff26c22
......@@ -59,8 +59,10 @@ class Kind(Enum):
return self == Kind.Exit
LogItem = namedtuple('LogItem', 'key val')
class Log(object):
LogItem = namedtuple('LogItem', 'key val')
def __init__(self, timestamp: time = time.time(), items: List[LogItem] = None):
self.timestamp = timestamp
......
......@@ -16,27 +16,16 @@
#
import logging
from abc import ABC
from queue import Queue
from threading import Thread, Event
from skywalking import config
from skywalking.agent.protocol import Protocol
from skywalking.trace.context import Segment
logger = logging.getLogger(__name__)
class Protocol(ABC):
def connected(self):
raise NotImplementedError()
def heartbeat(self):
raise NotImplementedError()
def report(self, queue: Queue):
raise NotImplementedError()
def __heartbeat():
while not __finished.is_set():
if connected():
......@@ -63,7 +52,7 @@ __protocol: Protocol
def init():
if config.protocol == 'grpc':
from skywalking.agent.grpc import GrpcProtocol
from skywalking.agent.protocol.grpc import GrpcProtocol
global __protocol
__protocol = GrpcProtocol()
elif config.protocol == 'http':
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABC
from queue import Queue
class Protocol(ABC):
def connected(self):
raise NotImplementedError()
def heartbeat(self):
raise NotImplementedError()
def report(self, queue: Queue):
raise NotImplementedError()
......@@ -14,12 +14,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from queue import Queue
import grpc
from language_agent.Tracing_pb2 import SegmentObject, SpanObject
from common.Common_pb2 import KeyStringValuePair
from language_agent.Tracing_pb2 import SegmentObject, SpanObject, Log
from skywalking import config
from skywalking.agent import Protocol
from skywalking.client.grpc import GrpcServiceManagementClient, GrpcTraceSegmentReportService
......@@ -69,6 +71,11 @@ class GrpcProtocol(Protocol):
spanType=span.kind.name,
spanLayer=span.layer.name,
componentId=span.component.value,
isError=span.error_occurred,
logs=[Log(
time=int(log.timestamp * 1000),
data=[KeyStringValuePair(key=item.key, value=item.val) for item in log.items],
) for log in span.logs]
) for span in segment.spans],
)
......
......@@ -15,9 +15,10 @@
# limitations under the License.
#
import traceback
from functools import wraps
from skywalking import Layer, Component
from skywalking import Layer, Component, Log, LogItem
from skywalking.trace.context import get_context
......@@ -34,7 +35,16 @@ def trace(
with context.new_local_span(op=_op) as span:
span.layer = layer
span.component = component
return func(*args, **kwargs)
# noinspection PyBroadException
try:
result = func(*args, **kwargs)
return result
except:
span.error_occurred = True
span.logs = [Log(items=[
LogItem(key='Traceback', val=traceback.format_exc()),
])]
raise
return wrapper
......
......@@ -21,5 +21,5 @@ import logging
def init():
logging.basicConfig(
level=logging.DEBUG,
format='%(name)-24s [%(threadName)-15s] [%(levelname)-8s] %(message)s',
format='%(name)-32s [%(threadName)-15s] [%(levelname)-8s] %(message)s',
)
......@@ -122,12 +122,6 @@ _thread_local.context = None
def get_context() -> SpanContext:
if _thread_local.context is not None:
return _thread_local.context
if not agent.connected():
_thread_local.context = NoopContext()
else:
_thread_local.context = SpanContext()
_thread_local.context = _thread_local.context or (SpanContext() if agent.connected() else NoopContext())
return _thread_local.context
......@@ -35,6 +35,5 @@ class Segment(object):
self.spans.append(span)
class SegmentRef(object):
pass
......@@ -77,7 +77,9 @@ class Span(ABC):
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
return self
if exc_tb is not None:
return False
return True
@tostring
......
......@@ -14,6 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from time import sleep
from skywalking import agent, config, Component, Layer
......@@ -24,16 +25,16 @@ if __name__ == '__main__':
config.init(collector="127.0.0.1:11800", service='Python Service 1')
agent.init()
agent.start()
sleep(3)
@trace()
def test_decorator():
sleep(1)
@trace()
def test_nested_decorator():
test_decorator()
sleep(3)
for _ in range(1, 20):
......@@ -46,21 +47,21 @@ if __name__ == '__main__':
s2.component = Component.Http
s2.layer = Layer.Http
print(s2)
sleep(1)
sleep(0.5)
with context.new_exit_span(op='https://github.com/3', peer='127.0.0.1:80') as s3:
s3.component = Component.Http
s3.layer = Layer.Http
print(s3)
sleep(1)
sleep(0.5)
with context.new_entry_span(op='https://github.com/4') as s4:
s4.component = Component.Http
s4.layer = Layer.Http
print(s4)
sleep(1)
sleep(0.5)
with context.new_exit_span(op='https://github.com/5', peer='127.0.0.1:80') as s5:
s5.component = Component.Http
s5.layer = Layer.Http
print(s5)
sleep(1)
sleep(1)
sleep(0.5)
sleep(0.5)
print()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册