From 9ecb9fd87ca8625aad9fd52217c9b087c9a4c374 Mon Sep 17 00:00:00 2001 From: Daming Date: Wed, 4 Aug 2021 15:53:48 +0800 Subject: [PATCH] add event http receiver (#7406) --- CHANGES.md | 1 + docs/en/protocols/Log-Data-Protocol.md | 2 +- docs/en/protocols/README.md | 20 ++++ .../receiver/event/EventModuleProvider.java | 6 ++ .../event/rest/EventRestServiceHandler.java | 91 +++++++++++++++++++ 5 files changed, 119 insertions(+), 1 deletion(-) create mode 100644 oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/rest/EventRestServiceHandler.java diff --git a/CHANGES.md b/CHANGES.md index 40a3772d91..983b22fe71 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -16,6 +16,7 @@ Release Notes. * Fix CVE-2021-35515, CVE-2021-35516, CVE-2021-35517, CVE-2021-36090. Upgrade org.apache.commons:commons-compress to 1.21. * kubernetes java client upgrade from 12.0.1 to 13.0.0 +* Add `event` http receiver #### UI diff --git a/docs/en/protocols/Log-Data-Protocol.md b/docs/en/protocols/Log-Data-Protocol.md index cb5be1d392..10ca59310d 100644 --- a/docs/en/protocols/Log-Data-Protocol.md +++ b/docs/en/protocols/Log-Data-Protocol.md @@ -45,7 +45,7 @@ Json log record example: ## HTTP API -Report `json` format logs via HTTP API, the endpoint is http://:12800/v3/logs. +Report `json` format logs via HTTP API, the endpoint is `http://:12800/v3/logs`. Json log record example: diff --git a/docs/en/protocols/README.md b/docs/en/protocols/README.md index 8dfd87d0c9..49d9757e23 100644 --- a/docs/en/protocols/README.md +++ b/docs/en/protocols/README.md @@ -50,6 +50,26 @@ the following key information: The protocol is used to report events to the backend. The [doc](../concepts-and-designs/event.md) introduces the definition of an event, and [the protocol repository](https://github.com/apache/skywalking-data-collect-protocol/blob/master/event) defines gRPC services and message formats of events. +Report `JSON` format events via HTTP API, the endpoint is `http://:12800/v3/events`. +JSON event record example: +```json +[ + { + "uuid": "f498b3c0-8bca-438d-a5b0-3701826ae21c", + "source": { + "service": "SERVICE-A", + "instance": "INSTANCE-1" + }, + "name": "Reboot", + "type": "Normal", + "message": "App reboot.", + "parameters": {}, + "startTime": 1628044330000, + "endTime": 1628044331000 + } +] +``` + ### 3rd-party instrument protocol 3rd-party instrument protocols are not defined by SkyWalking. They are just protocols/formats with which SkyWalking is compatible, and SkyWalking could receive them from their existing libraries. SkyWalking starts with supporting Zipkin v1, v2 data formats. diff --git a/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/EventModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/EventModuleProvider.java index 5ba2a2fde9..bad5bbc8ba 100755 --- a/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/EventModuleProvider.java +++ b/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/EventModuleProvider.java @@ -21,11 +21,13 @@ package org.apache.skywalking.oap.server.receiver.event; import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerModule; import org.apache.skywalking.oap.server.core.CoreModule; import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; import org.apache.skywalking.oap.server.library.module.ModuleConfig; import org.apache.skywalking.oap.server.library.module.ModuleDefine; import org.apache.skywalking.oap.server.library.module.ModuleProvider; import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.receiver.event.grpc.EventGrpcServiceHandler; +import org.apache.skywalking.oap.server.receiver.event.rest.EventRestServiceHandler; import org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule; public class EventModuleProvider extends ModuleProvider { @@ -56,6 +58,10 @@ public class EventModuleProvider extends ModuleProvider { .getService(GRPCHandlerRegister.class); final EventGrpcServiceHandler eventGRPCServiceHandler = new EventGrpcServiceHandler(getManager()); grpcHandlerRegister.addHandler(eventGRPCServiceHandler); + JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME) + .provider() + .getService(JettyHandlerRegister.class); + jettyHandlerRegister.addHandler(new EventRestServiceHandler(getManager())); } @Override diff --git a/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/rest/EventRestServiceHandler.java b/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/rest/EventRestServiceHandler.java new file mode 100644 index 0000000000..6f8da24827 --- /dev/null +++ b/oap-server/server-receiver-plugin/skywalking-event-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/event/rest/EventRestServiceHandler.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.skywalking.oap.server.receiver.event.rest; + +import com.google.common.collect.Lists; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.apm.network.event.v3.Event; +import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerModule; +import org.apache.skywalking.oap.server.analyzer.event.EventAnalyzerService; +import org.apache.skywalking.oap.server.library.module.ModuleManager; +import org.apache.skywalking.oap.server.library.server.jetty.JettyHandler; +import org.apache.skywalking.oap.server.library.util.ProtoBufJsonUtils; +import org.apache.skywalking.oap.server.telemetry.TelemetryModule; +import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics; +import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics; +import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator; +import org.apache.skywalking.oap.server.telemetry.api.MetricsTag; + +@Slf4j +public class EventRestServiceHandler extends JettyHandler { + private final HistogramMetrics histogram; + + private final CounterMetrics errorCounter; + + private final EventAnalyzerService eventAnalyzerService; + + private final Gson gson = new Gson(); + + public EventRestServiceHandler(final ModuleManager manager) { + final MetricsCreator metricsCreator = manager.find(TelemetryModule.NAME) + .provider() + .getService(MetricsCreator.class); + + eventAnalyzerService = manager.find(EventAnalyzerModule.NAME) + .provider() + .getService(EventAnalyzerService.class); + + histogram = metricsCreator.createHistogramMetric( + "event_in_latency", "The process latency of event data", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("http") + ); + errorCounter = metricsCreator.createCounter( + "event_error_count", "The error number of event analysis", + new MetricsTag.Keys("protocol"), new MetricsTag.Values("http") + ); + } + + @Override + protected void doPost(final HttpServletRequest req, final HttpServletResponse resp) { + try (HistogramMetrics.Timer ignored = histogram.createTimer()) { + List events = Lists.newArrayList(); + JsonArray array = gson.fromJson(req.getReader(), JsonArray.class); + for (JsonElement element : array) { + Event.Builder builder = Event.newBuilder(); + ProtoBufJsonUtils.fromJSON(element.toString(), builder); + events.add(builder.build()); + } + + events.forEach(eventAnalyzerService::analyze); + } catch (Exception e) { + errorCounter.inc(); + log.error(e.getMessage(), e); + } + } + + @Override + public String pathSpec() { + return "/v3/events"; + } +} -- GitLab