未验证 提交 0308fbf8 编写于 作者: H huawei 提交者: GitHub

Add elasticsearch plugin (#64)

* Add elasticsearch plugin

* fix style

* fix style
Co-authored-by: Nhuawei <huawei@bit-s.cn>
上级 1c26c13e
......@@ -21,3 +21,4 @@ Environment Variable | Description | Default
| `SW_CORRELATION_VALUE_MAX_LENGTH`| Max value length of correlation context element.| `128` |
| `SW_TRACE_IGNORE`| This config item controls that whether the trace should be ignore | `false` |
| `SW_TRACE_IGNORE_PATH`| You can setup multiple URL path patterns, The endpoints match these patterns wouldn't be traced. the current matching rules follow Ant Path match style , like /path/*, /path/**, /path/?.| `''` |
| `SW_ELASTICSEARCH_TRACE_DSL`| If true, trace all the DSL(Domain Specific Language) in ElasticSearch access, default is false | `false` |
......@@ -13,3 +13,4 @@ Library | Plugin Name
| [tornado](https://www.tornadoweb.org/en/stable/) | `sw_tornado` |
| [pika](https://pika.readthedocs.io/en/stable/) | `sw_rabbitmq` |
| [pymongo](https://pymongo.readthedocs.io/en/stable/) | `sw_pymongo` |
| [elasticsearch](https://github.com/elastic/elasticsearch-py) | `sw_elasticsearch` |
......@@ -34,3 +34,4 @@ urllib3==1.25.10
websocket-client==0.57.0
Werkzeug==1.0.1
wrapt==1.12.1
elasticsearch==7.8.0
......@@ -53,6 +53,7 @@ setup(
"tornado",
"pika",
"pymongo",
"elasticsearch",
],
},
classifiers=[
......
......@@ -35,6 +35,7 @@ class Component(Enum):
KafkaConsumer = 41
RabbitmqProducer = 52
RabbitmqConsumer = 53
Elasticsearch = 47
class Layer(Enum):
......
......@@ -44,6 +44,8 @@ correlation_value_max_length = int(os.getenv('SW_CORRELATION_VALUE_MAX_LENGTH')
trace_ignore = True if os.getenv('SW_TRACE_IGNORE') and \
os.getenv('SW_TRACE_IGNORE') == 'True' else False # type: bool
trace_ignore_path = (os.getenv('SW_TRACE_IGNORE_PATH') or '').split(',') # type: List[str]
elasticsearch_trace_dsl = True if os.getenv('SW_ELASTICSEARCH_TRACE_DSL') and \
os.getenv('SW_ELASTICSEARCH_TRACE_DSL') == 'True' else False # type: bool
def init(
......
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
from skywalking import Layer, Component, config
from skywalking.trace import tags
from skywalking.trace.context import get_context
from skywalking.trace.tags import Tag
logger = logging.getLogger(__name__)
def install():
# noinspection PyBroadException
try:
from elasticsearch import Transport
_perform_request = Transport.perform_request
def _sw_perform_request(this: Transport, method, url, headers=None, params=None, body=None):
context = get_context()
peer = ",".join([host["host"] + ":" + str(host["port"]) for host in this.hosts])
with context.new_exit_span(op="Elasticsearch/" + method + url, peer=peer) as span:
span.layer = Layer.Database
span.component = Component.Elasticsearch
try:
res = _perform_request(this, method, url, headers=headers, params=params, body=body)
span.tag(Tag(key=tags.DbType, val="Elasticsearch"))
if config.elasticsearch_trace_dsl:
span.tag(Tag(key=tags.DbStatement, val="" if body is None else body))
except BaseException as e:
span.raised()
raise e
return res
Transport.perform_request = _sw_perform_request
except Exception:
logger.warning('failed to install plugin %s', __name__)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
version: '2.1'
services:
collector:
extends:
service: collector
file: ../docker/docker-compose.base.yml
elasticsearch:
image: elasticsearch:6.8.11
hostname: elasticsearch
expose:
- 9200
environment:
- cluster.name=docker-node
- xpack.security.enabled=false
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms256m -Xmx256m"
- discovery.type=single-node
healthcheck:
test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9200"]
interval: 5s
timeout: 60s
retries: 120
networks:
- beyond
consumer:
extends:
service: agent
file: ../docker/docker-compose.base.yml
ports:
- 9090:9090
volumes:
- ./services/consumer.py:/app/consumer.py
command: ['bash', '-c', 'pip install flask && pip install elasticsearch && python3 /app/consumer.py']
depends_on:
collector:
condition: service_healthy
elasticsearch:
condition: service_healthy
networks:
beyond:
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
segmentItems:
- serviceName: consumer
segmentSize: 1
segments:
- segmentId: not null
spans:
- operationName: Elasticsearch/PUT/test
operationId: 0
parentSpanId: 0
spanId: 1
spanLayer: Database
startTime: gt 0
endTime: gt 0
componentId: 47
isError: false
spanType: Exit
peer: elasticsearch:9200
skipAnalysis: false
tags:
- key: db.type
value: Elasticsearch
- key: db.statement
value: ''
- operationName: Elasticsearch/PUT/test/test/1
operationId: 0
parentSpanId: 0
spanId: 2
spanLayer: Database
startTime: gt 0
endTime: gt 0
componentId: 47
isError: false
spanType: Exit
peer: elasticsearch:9200
skipAnalysis: false
tags:
- key: db.type
value: Elasticsearch
- key: db.statement
value: '{''song'': ''Despacito'', ''artist'': ''Luis Fonsi''}'
- operationName: Elasticsearch/GET/test/_doc/1
operationId: 0
parentSpanId: 0
spanId: 3
spanLayer: Database
startTime: gt 0
endTime: gt 0
componentId: 47
isError: false
spanType: Exit
peer: elasticsearch:9200
skipAnalysis: false
tags:
- key: db.type
value: Elasticsearch
- key: db.statement
value: ''
- operationName: /users
operationId: 0
parentSpanId: -1
spanId: 0
spanLayer: Http
tags:
- key: http.method
value: GET
- key: url
value: http://0.0.0.0:9090/users
- key: status.code
value: '200'
startTime: gt 0
endTime: gt 0
componentId: 7001
spanType: Entry
peer: not null
skipAnalysis: false
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from elasticsearch import Elasticsearch
from skywalking import agent, config
if __name__ == '__main__':
config.service_name = 'consumer'
config.logging_level = 'DEBUG'
config.elasticsearch_trace_dsl = True
agent.start()
from flask import Flask, jsonify
app = Flask(__name__)
client = Elasticsearch('http://elasticsearch:9200/')
index_name = "test"
def create_index():
client.indices.create(index=index_name, ignore=400)
def save_index():
data = {"song": "Despacito", "artist": "Luis Fonsi"}
client.index(index=index_name, doc_type="test", id=1, body=data)
def search():
client.get(index=index_name, id=1)
@app.route("/users", methods=["POST", "GET"])
def application():
create_index()
save_index()
search()
return jsonify({"song": "Despacito", "artist": "Luis Fonsi"})
PORT = 9090
app.run(host='0.0.0.0', port=PORT, debug=True)
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import inspect
import time
import unittest
from os.path import dirname
from testcontainers.compose import DockerCompose
from tests.plugin import BasePluginTest
class TestPlugin(BasePluginTest):
@classmethod
def setUpClass(cls):
cls.compose = DockerCompose(filepath=dirname(inspect.getfile(cls)))
cls.compose.start()
cls.compose.wait_for(cls.url(('consumer', '9090'), 'users'))
def test_plugin(self):
time.sleep(10)
self.validate()
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册