From 7560c990cfb75034f6c2aaa5b09f53c3037013fa Mon Sep 17 00:00:00 2001 From: wankai123 Date: Tue, 14 Sep 2021 17:06:18 +0800 Subject: [PATCH] Support gRPC sync grouped dynamic configurations. (#7709) --- CHANGES.md | 1 + .../setup/backend/dynamic-config-service.md | 53 ++++++ .../grpc-configuration-sync/pom.xml | 5 + .../grpc/GRPCConfigWatcherRegister.java | 44 ++++- .../main/proto/configuration-service.proto | 19 ++ .../grpc/GRPCConfigurationTest.java | 177 ++++++++++++++++++ .../grpc/MockGRPCConfigService.java | 141 ++++++++++++++ .../src/test/resources/log4j2.xml | 31 +++ 8 files changed, 463 insertions(+), 8 deletions(-) create mode 100644 oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigurationTest.java create mode 100644 oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/MockGRPCConfigService.java create mode 100644 oap-server/server-configuration/grpc-configuration-sync/src/test/resources/log4j2.xml diff --git a/CHANGES.md b/CHANGES.md index 8197b452cb..e4af5694bc 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -63,6 +63,7 @@ Release Notes. * Switch JRE base image for dev images. * Support apollo grouped dynamic configurations. * Fix `ProfileThreadSnapshotQuery.queryProfiledSegments` adopts a wrong sort function +* Support gRPC sync grouped dynamic configurations. #### UI diff --git a/docs/en/setup/backend/dynamic-config-service.md b/docs/en/setup/backend/dynamic-config-service.md index 8db1572a8a..6824cc4f1a 100755 --- a/docs/en/setup/backend/dynamic-config-service.md +++ b/docs/en/setup/backend/dynamic-config-service.md @@ -12,3 +12,56 @@ configuration: clusterName: ${SW_DCS_CLUSTER_NAME:SkyWalking} period: ${SW_DCS_PERIOD:20} ``` + +## Config Server Response +`uuid`: To identify whether the config data changed, if `uuid` is the same not required to respond the config data. +### Single Config +Implement: +``` +rpc call (ConfigurationRequest) returns (ConfigurationResponse) { } +``` + +e.g. The config is: +``` +{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50} +``` +The response `configTable` is: +``` +configTable { + name: "agent-analyzer.default.slowDBAccessThreshold" + value: "default:200,mongodb:50" +} +``` + +### Group Config +Implement: +``` +rpc callGroup (ConfigurationRequest) returns (GroupConfigurationResponse) {} +``` +Respond config data `GroupConfigItems groupConfigTable` + +e.g. The config is: +``` +{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1} + |{productAPI-v1}:{value of productAPI-v1} + |{productAPI-v2}:{value of productAPI-v2} +``` +The response `groupConfigTable` is: + +``` +groupConfigTable { + groupName: "core.default.endpoint-name-grouping-openapi" + items { + name: "customerAPI-v1" + value: "value of customerAPI-v1" + } + items { + name: "productAPI-v1" + value: "value of productAPI-v1" + } + items { + name: "productAPI-v2" + value: "value of productAPI-v2" + } +} +``` \ No newline at end of file diff --git a/oap-server/server-configuration/grpc-configuration-sync/pom.xml b/oap-server/server-configuration/grpc-configuration-sync/pom.xml index fccb53ae72..1750daa854 100644 --- a/oap-server/server-configuration/grpc-configuration-sync/pom.xml +++ b/oap-server/server-configuration/grpc-configuration-sync/pom.xml @@ -54,6 +54,11 @@ io.grpc grpc-stub + + io.grpc + grpc-testing + test + diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java b/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java index e3ad840f05..b10594877a 100644 --- a/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java +++ b/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java @@ -22,21 +22,21 @@ import io.grpc.netty.NettyChannelBuilder; import java.util.Objects; import java.util.Optional; import java.util.Set; +import lombok.extern.slf4j.Slf4j; import org.apache.skywalking.oap.server.configuration.api.ConfigTable; import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister; import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable; import org.apache.skywalking.oap.server.configuration.service.ConfigurationRequest; import org.apache.skywalking.oap.server.configuration.service.ConfigurationResponse; import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.skywalking.oap.server.configuration.service.GroupConfigurationResponse; +@Slf4j public class GRPCConfigWatcherRegister extends ConfigWatcherRegister { - private static final Logger LOGGER = LoggerFactory.getLogger(GRPCConfigWatcherRegister.class); - private RemoteEndpointSettings settings; private ConfigurationServiceGrpc.ConfigurationServiceBlockingStub stub; private String uuid = null; + private String groupUuid = null; public GRPCConfigWatcherRegister(RemoteEndpointSettings settings) { super(settings.getPeriod()); @@ -58,7 +58,7 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister { } ConfigurationResponse response = stub.call(builder.build()); String responseUuid = response.getUuid(); - if (responseUuid != null && Objects.equals(uuid, responseUuid)) { + if (Objects.equals(uuid, responseUuid)) { // If UUID matched, the config table is expected as empty. return Optional.empty(); } @@ -70,14 +70,42 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister { }); this.uuid = responseUuid; } catch (Exception e) { - LOGGER.error("Remote config center [" + settings + "] is not available.", e); + log.error("Remote config center [{}] is not available.", settings, e); } return Optional.of(table); } @Override public Optional readGroupConfig(final Set keys) { - // TODO: implement readGroupConfig - return Optional.empty(); + GroupConfigTable groupConfigTable = new GroupConfigTable(); + try { + ConfigurationRequest.Builder builder = ConfigurationRequest.newBuilder() + .setClusterName(settings.getClusterName()); + if (groupUuid != null) { + builder.setUuid(groupUuid); + } + GroupConfigurationResponse response = stub.callGroup(builder.build()); + String responseUuid = response.getUuid(); + if (Objects.equals(groupUuid, responseUuid)) { + // If UUID matched, the config table is expected as empty. + return Optional.empty(); + } + + response.getGroupConfigTableList().forEach(rspGroupConfigItems -> { + String groupName = rspGroupConfigItems.getGroupName(); + if (keys.contains(groupName)) { + GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems( + groupName); + groupConfigTable.addGroupConfigItems(groupConfigItems); + rspGroupConfigItems.getItemsList().forEach(item -> { + groupConfigItems.add(new ConfigTable.ConfigItem(item.getName(), item.getValue())); + }); + } + }); + this.groupUuid = responseUuid; + } catch (Exception e) { + log.error("Remote config center [{}] is not available.", settings, e); + } + return Optional.of(groupConfigTable); } } diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto b/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto index 8c0b4ef9d5..4bc57ba23c 100644 --- a/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto +++ b/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto @@ -24,6 +24,9 @@ option java_package = "org.apache.skywalking.oap.server.configuration.service"; service ConfigurationService { rpc call (ConfigurationRequest) returns (ConfigurationResponse) { } + + rpc callGroup (ConfigurationRequest) returns (GroupConfigurationResponse) { + } } message ConfigurationRequest { @@ -49,6 +52,22 @@ message ConfigurationResponse { string uuid = 2; } +message GroupConfigurationResponse { + // Include all groupConfig items. + // All groupConfigTable.groupName should be not empty, + // Each watcher implementor provides this, and it will be notified when the groupConfigTable changed. + repeated GroupConfigItems groupConfigTable = 1; + // UUID is literal string represents the content of the config table. + // If groupConfigTable is unchanged, then could response the same uuid, and groupConfigTable is not required. + string uuid = 2; +} + +message GroupConfigItems { + // The name is composed by "module name"."provider name"."groupName". + string groupName = 1; + repeated Config items = 2; +} + message Config { string name = 1; string value = 2; diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigurationTest.java b/oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigurationTest.java new file mode 100644 index 0000000000..7d3c0df119 --- /dev/null +++ b/oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigurationTest.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.configuration.grpc; + +import io.grpc.testing.GrpcServerRule; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher; +import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher; +import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.powermock.reflect.Whitebox; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +@Slf4j +public class GRPCConfigurationTest { + private GRPCConfigurationProvider provider; + private GRPCConfigWatcherRegister register; + private ConfigChangeWatcher singleWatcher; + private GroupConfigChangeWatcher groupWatcher; + + @Rule + public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor(); + + @Before + public void before() { + //for create register + RemoteEndpointSettings settings = new RemoteEndpointSettings(); + settings.setHost("localhost"); + settings.setPort(5678); + settings.setPeriod(1); + provider = new GRPCConfigurationProvider(); + register = new GRPCConfigWatcherRegister(settings); + ConfigurationServiceGrpc.ConfigurationServiceBlockingStub blockingStub = ConfigurationServiceGrpc.newBlockingStub( + grpcServerRule.getChannel()); + Whitebox.setInternalState(register, "stub", blockingStub); + initWatcher(); + assertNotNull(provider); + } + + @Test(timeout = 20000) + public void shouldReadUpdated() throws Exception { + AtomicInteger dataFlag = new AtomicInteger(0); + grpcServerRule.getServiceRegistry().addService(new MockGRPCConfigService(dataFlag)); + assertNull(singleWatcher.value()); + register.registerConfigChangeWatcher(singleWatcher); + register.start(); + + for (String v = singleWatcher.value(); v == null; v = singleWatcher.value()) { + } + assertEquals("100", singleWatcher.value()); + //change + dataFlag.set(1); + TimeUnit.SECONDS.sleep(1); + for (String v = singleWatcher.value(); v.equals("100"); v = singleWatcher.value()) { + } + assertEquals("300", singleWatcher.value()); + //no change + dataFlag.set(2); + TimeUnit.SECONDS.sleep(3); + for (String v = singleWatcher.value(); !v.equals("300"); v = singleWatcher.value()) { + } + assertEquals("300", singleWatcher.value()); + //delete + dataFlag.set(3); + TimeUnit.SECONDS.sleep(1); + for (String v = singleWatcher.value(); v.equals("300"); v = singleWatcher.value()) { + } + assertEquals("", singleWatcher.value()); + } + + @Test(timeout = 20000) + public void shouldReadUpdated4Group() throws Exception { + AtomicInteger dataFlag = new AtomicInteger(0); + grpcServerRule.getServiceRegistry().addService(new MockGRPCConfigService(dataFlag)); + assertEquals("{}", groupWatcher.groupItems().toString()); + register.registerConfigChangeWatcher(groupWatcher); + register.start(); + + for (String v = groupWatcher.groupItems().get("item1"); + v == null; + v = groupWatcher.groupItems().get("item1")) { + } + assertEquals("100", groupWatcher.groupItems().get("item1")); + for (String v = groupWatcher.groupItems().get("item2"); + v == null; + v = groupWatcher.groupItems().get("item2")) { + } + assertEquals("200", groupWatcher.groupItems().get("item2")); + //change item2 + dataFlag.set(1); + TimeUnit.SECONDS.sleep(1); + for (String v = groupWatcher.groupItems().get("item2"); + v.equals("200"); + v = groupWatcher.groupItems().get("item2")) { + } + assertEquals("2000", groupWatcher.groupItems().get("item2")); + //no change + dataFlag.set(2); + TimeUnit.SECONDS.sleep(3); + assertEquals("100", groupWatcher.groupItems().get("item1")); + assertEquals("2000", groupWatcher.groupItems().get("item2")); + //delete item1 + dataFlag.set(3); + TimeUnit.SECONDS.sleep(1); + for (String v = groupWatcher.groupItems().get("item1"); + v != null; + v = groupWatcher.groupItems().get("item1")) { + } + assertNull(groupWatcher.groupItems().get("item1")); + } + + private void initWatcher() { + singleWatcher = new ConfigChangeWatcher("test-module", provider, "testKey") { + private volatile String testValue; + + @Override + public void notify(ConfigChangeEvent value) { + log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value); + if (EventType.DELETE.equals(value.getEventType())) { + testValue = null; + } else { + testValue = value.getNewValue(); + } + } + + @Override + public String value() { + return testValue; + } + }; + + groupWatcher = new GroupConfigChangeWatcher("test-module", provider, "testKeyGroup") { + private final Map config = new ConcurrentHashMap<>(); + + @Override + public void notifyGroup(Map groupItems) { + log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems); + groupItems.forEach((groupItemName, event) -> { + if (EventType.DELETE.equals(event.getEventType())) { + config.remove(groupItemName); + } else { + config.put(groupItemName, event.getNewValue()); + } + }); + } + + @Override + public Map groupItems() { + return config; + } + }; + } +} diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/MockGRPCConfigService.java b/oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/MockGRPCConfigService.java new file mode 100644 index 0000000000..87096912c9 --- /dev/null +++ b/oap-server/server-configuration/grpc-configuration-sync/src/test/java/org/apache/skywalking/oap/server/configuration/grpc/MockGRPCConfigService.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.oap.server.configuration.grpc; + +import io.grpc.stub.StreamObserver; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.skywalking.oap.server.configuration.service.Config; +import org.apache.skywalking.oap.server.configuration.service.ConfigurationRequest; +import org.apache.skywalking.oap.server.configuration.service.ConfigurationResponse; +import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc; +import org.apache.skywalking.oap.server.configuration.service.GroupConfigItems; +import org.apache.skywalking.oap.server.configuration.service.GroupConfigurationResponse; + +public class MockGRPCConfigService extends ConfigurationServiceGrpc.ConfigurationServiceImplBase { + private final AtomicInteger dataFlag; + + /** + * @param dataFlag 0:init, 1:change, 2:no change, 3:delete + */ + public MockGRPCConfigService(AtomicInteger dataFlag) { + this.dataFlag = dataFlag; + } + + @Override + public void call(ConfigurationRequest request, + StreamObserver responseObserver) { + ConfigurationResponse response; + String uuid = request.getUuid(); + switch (this.dataFlag.get()) { + case 1: + response = ConfigurationResponse + .newBuilder().setUuid(UUID.randomUUID().toString()) + .addConfigTable(Config + .newBuilder() + .setName("test-module.grpc.testKey") + .setValue("300") + .build()).build(); + responseObserver.onNext(response); + break; + case 2: + response = ConfigurationResponse.newBuilder().setUuid(uuid).build(); + responseObserver.onNext(response); + break; + case 3: + response = ConfigurationResponse + .newBuilder().setUuid(UUID.randomUUID().toString()) + .addConfigTable(Config + .newBuilder() + .setName("test-module.grpc.testKey") + .build()).build(); + responseObserver.onNext(response); + break; + default: + response = ConfigurationResponse + .newBuilder().setUuid(UUID.randomUUID().toString()) + .addConfigTable(Config + .newBuilder() + .setName("test-module.grpc.testKey") + .setValue("100") + .build()).build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } + + @Override + public void callGroup(ConfigurationRequest request, + StreamObserver responseObserver) { + GroupConfigurationResponse response; + String uuid = request.getUuid(); + switch (this.dataFlag.get()) { + case 1: + response = GroupConfigurationResponse + .newBuilder().setUuid(UUID.randomUUID().toString()) + .addGroupConfigTable(GroupConfigItems + .newBuilder().setGroupName("test-module.grpc.testKeyGroup") + .addItems(Config + .newBuilder() + .setName("item1") + .setValue("100") + .build()) + .addItems(Config + .newBuilder() + .setName("item2") + .setValue("2000") + .build()).build()).build(); + responseObserver.onNext(response); + break; + case 2: + response = GroupConfigurationResponse.newBuilder().setUuid(uuid).build(); + responseObserver.onNext(response); + break; + case 3: + response = GroupConfigurationResponse + .newBuilder().setUuid(UUID.randomUUID().toString()) + .addGroupConfigTable(GroupConfigItems + .newBuilder().setGroupName("test-module.grpc.testKeyGroup") + .addItems(Config + .newBuilder() + .setName("item2") + .setValue("2000") + .build()).build()).build(); + responseObserver.onNext(response); + break; + default: + response = GroupConfigurationResponse + .newBuilder().setUuid(UUID.randomUUID().toString()) + .addGroupConfigTable(GroupConfigItems + .newBuilder().setGroupName("test-module.grpc.testKeyGroup") + .addItems(Config + .newBuilder() + .setName("item1") + .setValue("100") + .build()) + .addItems(Config + .newBuilder() + .setName("item2") + .setValue("200") + .build()).build()).build(); + responseObserver.onNext(response); + } + responseObserver.onCompleted(); + } +} diff --git a/oap-server/server-configuration/grpc-configuration-sync/src/test/resources/log4j2.xml b/oap-server/server-configuration/grpc-configuration-sync/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..cd672826bb --- /dev/null +++ b/oap-server/server-configuration/grpc-configuration-sync/src/test/resources/log4j2.xml @@ -0,0 +1,31 @@ + + + + + + + + + + + + + + + -- GitLab