未验证 提交 7560c990 编写于 作者: W wankai123 提交者: GitHub

Support gRPC sync grouped dynamic configurations. (#7709)

上级 c5ac46a8
......@@ -63,6 +63,7 @@ Release Notes.
* Switch JRE base image for dev images.
* Support apollo grouped dynamic configurations.
* Fix `ProfileThreadSnapshotQuery.queryProfiledSegments` adopts a wrong sort function
* Support gRPC sync grouped dynamic configurations.
#### UI
......
......@@ -12,3 +12,56 @@ configuration:
clusterName: ${SW_DCS_CLUSTER_NAME:SkyWalking}
period: ${SW_DCS_PERIOD:20}
```
## Config Server Response
`uuid`: To identify whether the config data changed, if `uuid` is the same not required to respond the config data.
### Single Config
Implement:
```
rpc call (ConfigurationRequest) returns (ConfigurationResponse) { }
```
e.g. The config is:
```
{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50}
```
The response `configTable` is:
```
configTable {
name: "agent-analyzer.default.slowDBAccessThreshold"
value: "default:200,mongodb:50"
}
```
### Group Config
Implement:
```
rpc callGroup (ConfigurationRequest) returns (GroupConfigurationResponse) {}
```
Respond config data `GroupConfigItems groupConfigTable`
e.g. The config is:
```
{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1}
|{productAPI-v1}:{value of productAPI-v1}
|{productAPI-v2}:{value of productAPI-v2}
```
The response `groupConfigTable` is:
```
groupConfigTable {
groupName: "core.default.endpoint-name-grouping-openapi"
items {
name: "customerAPI-v1"
value: "value of customerAPI-v1"
}
items {
name: "productAPI-v1"
value: "value of productAPI-v1"
}
items {
name: "productAPI-v2"
value: "value of productAPI-v2"
}
}
```
\ No newline at end of file
......@@ -54,6 +54,11 @@
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
......
......@@ -22,21 +22,21 @@ import io.grpc.netty.NettyChannelBuilder;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationRequest;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationResponse;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.skywalking.oap.server.configuration.service.GroupConfigurationResponse;
@Slf4j
public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
private static final Logger LOGGER = LoggerFactory.getLogger(GRPCConfigWatcherRegister.class);
private RemoteEndpointSettings settings;
private ConfigurationServiceGrpc.ConfigurationServiceBlockingStub stub;
private String uuid = null;
private String groupUuid = null;
public GRPCConfigWatcherRegister(RemoteEndpointSettings settings) {
super(settings.getPeriod());
......@@ -58,7 +58,7 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
}
ConfigurationResponse response = stub.call(builder.build());
String responseUuid = response.getUuid();
if (responseUuid != null && Objects.equals(uuid, responseUuid)) {
if (Objects.equals(uuid, responseUuid)) {
// If UUID matched, the config table is expected as empty.
return Optional.empty();
}
......@@ -70,14 +70,42 @@ public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
});
this.uuid = responseUuid;
} catch (Exception e) {
LOGGER.error("Remote config center [" + settings + "] is not available.", e);
log.error("Remote config center [{}] is not available.", settings, e);
}
return Optional.of(table);
}
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
GroupConfigTable groupConfigTable = new GroupConfigTable();
try {
ConfigurationRequest.Builder builder = ConfigurationRequest.newBuilder()
.setClusterName(settings.getClusterName());
if (groupUuid != null) {
builder.setUuid(groupUuid);
}
GroupConfigurationResponse response = stub.callGroup(builder.build());
String responseUuid = response.getUuid();
if (Objects.equals(groupUuid, responseUuid)) {
// If UUID matched, the config table is expected as empty.
return Optional.empty();
}
response.getGroupConfigTableList().forEach(rspGroupConfigItems -> {
String groupName = rspGroupConfigItems.getGroupName();
if (keys.contains(groupName)) {
GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(
groupName);
groupConfigTable.addGroupConfigItems(groupConfigItems);
rspGroupConfigItems.getItemsList().forEach(item -> {
groupConfigItems.add(new ConfigTable.ConfigItem(item.getName(), item.getValue()));
});
}
});
this.groupUuid = responseUuid;
} catch (Exception e) {
log.error("Remote config center [{}] is not available.", settings, e);
}
return Optional.of(groupConfigTable);
}
}
......@@ -24,6 +24,9 @@ option java_package = "org.apache.skywalking.oap.server.configuration.service";
service ConfigurationService {
rpc call (ConfigurationRequest) returns (ConfigurationResponse) {
}
rpc callGroup (ConfigurationRequest) returns (GroupConfigurationResponse) {
}
}
message ConfigurationRequest {
......@@ -49,6 +52,22 @@ message ConfigurationResponse {
string uuid = 2;
}
message GroupConfigurationResponse {
// Include all groupConfig items.
// All groupConfigTable.groupName should be not empty,
// Each watcher implementor provides this, and it will be notified when the groupConfigTable changed.
repeated GroupConfigItems groupConfigTable = 1;
// UUID is literal string represents the content of the config table.
// If groupConfigTable is unchanged, then could response the same uuid, and groupConfigTable is not required.
string uuid = 2;
}
message GroupConfigItems {
// The name is composed by "module name"."provider name"."groupName".
string groupName = 1;
repeated Config items = 2;
}
message Config {
string name = 1;
string value = 2;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.grpc;
import io.grpc.testing.GrpcServerRule;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.powermock.reflect.Whitebox;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@Slf4j
public class GRPCConfigurationTest {
private GRPCConfigurationProvider provider;
private GRPCConfigWatcherRegister register;
private ConfigChangeWatcher singleWatcher;
private GroupConfigChangeWatcher groupWatcher;
@Rule
public final GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Before
public void before() {
//for create register
RemoteEndpointSettings settings = new RemoteEndpointSettings();
settings.setHost("localhost");
settings.setPort(5678);
settings.setPeriod(1);
provider = new GRPCConfigurationProvider();
register = new GRPCConfigWatcherRegister(settings);
ConfigurationServiceGrpc.ConfigurationServiceBlockingStub blockingStub = ConfigurationServiceGrpc.newBlockingStub(
grpcServerRule.getChannel());
Whitebox.setInternalState(register, "stub", blockingStub);
initWatcher();
assertNotNull(provider);
}
@Test(timeout = 20000)
public void shouldReadUpdated() throws Exception {
AtomicInteger dataFlag = new AtomicInteger(0);
grpcServerRule.getServiceRegistry().addService(new MockGRPCConfigService(dataFlag));
assertNull(singleWatcher.value());
register.registerConfigChangeWatcher(singleWatcher);
register.start();
for (String v = singleWatcher.value(); v == null; v = singleWatcher.value()) {
}
assertEquals("100", singleWatcher.value());
//change
dataFlag.set(1);
TimeUnit.SECONDS.sleep(1);
for (String v = singleWatcher.value(); v.equals("100"); v = singleWatcher.value()) {
}
assertEquals("300", singleWatcher.value());
//no change
dataFlag.set(2);
TimeUnit.SECONDS.sleep(3);
for (String v = singleWatcher.value(); !v.equals("300"); v = singleWatcher.value()) {
}
assertEquals("300", singleWatcher.value());
//delete
dataFlag.set(3);
TimeUnit.SECONDS.sleep(1);
for (String v = singleWatcher.value(); v.equals("300"); v = singleWatcher.value()) {
}
assertEquals("", singleWatcher.value());
}
@Test(timeout = 20000)
public void shouldReadUpdated4Group() throws Exception {
AtomicInteger dataFlag = new AtomicInteger(0);
grpcServerRule.getServiceRegistry().addService(new MockGRPCConfigService(dataFlag));
assertEquals("{}", groupWatcher.groupItems().toString());
register.registerConfigChangeWatcher(groupWatcher);
register.start();
for (String v = groupWatcher.groupItems().get("item1");
v == null;
v = groupWatcher.groupItems().get("item1")) {
}
assertEquals("100", groupWatcher.groupItems().get("item1"));
for (String v = groupWatcher.groupItems().get("item2");
v == null;
v = groupWatcher.groupItems().get("item2")) {
}
assertEquals("200", groupWatcher.groupItems().get("item2"));
//change item2
dataFlag.set(1);
TimeUnit.SECONDS.sleep(1);
for (String v = groupWatcher.groupItems().get("item2");
v.equals("200");
v = groupWatcher.groupItems().get("item2")) {
}
assertEquals("2000", groupWatcher.groupItems().get("item2"));
//no change
dataFlag.set(2);
TimeUnit.SECONDS.sleep(3);
assertEquals("100", groupWatcher.groupItems().get("item1"));
assertEquals("2000", groupWatcher.groupItems().get("item2"));
//delete item1
dataFlag.set(3);
TimeUnit.SECONDS.sleep(1);
for (String v = groupWatcher.groupItems().get("item1");
v != null;
v = groupWatcher.groupItems().get("item1")) {
}
assertNull(groupWatcher.groupItems().get("item1"));
}
private void initWatcher() {
singleWatcher = new ConfigChangeWatcher("test-module", provider, "testKey") {
private volatile String testValue;
@Override
public void notify(ConfigChangeEvent value) {
log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
testValue = value.getNewValue();
}
}
@Override
public String value() {
return testValue;
}
};
groupWatcher = new GroupConfigChangeWatcher("test-module", provider, "testKeyGroup") {
private final Map<String, String> config = new ConcurrentHashMap<>();
@Override
public void notifyGroup(Map<String, ConfigChangeEvent> groupItems) {
log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
groupItems.forEach((groupItemName, event) -> {
if (EventType.DELETE.equals(event.getEventType())) {
config.remove(groupItemName);
} else {
config.put(groupItemName, event.getNewValue());
}
});
}
@Override
public Map<String, String> groupItems() {
return config;
}
};
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.grpc;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.oap.server.configuration.service.Config;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationRequest;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationResponse;
import org.apache.skywalking.oap.server.configuration.service.ConfigurationServiceGrpc;
import org.apache.skywalking.oap.server.configuration.service.GroupConfigItems;
import org.apache.skywalking.oap.server.configuration.service.GroupConfigurationResponse;
public class MockGRPCConfigService extends ConfigurationServiceGrpc.ConfigurationServiceImplBase {
private final AtomicInteger dataFlag;
/**
* @param dataFlag 0:init, 1:change, 2:no change, 3:delete
*/
public MockGRPCConfigService(AtomicInteger dataFlag) {
this.dataFlag = dataFlag;
}
@Override
public void call(ConfigurationRequest request,
StreamObserver<ConfigurationResponse> responseObserver) {
ConfigurationResponse response;
String uuid = request.getUuid();
switch (this.dataFlag.get()) {
case 1:
response = ConfigurationResponse
.newBuilder().setUuid(UUID.randomUUID().toString())
.addConfigTable(Config
.newBuilder()
.setName("test-module.grpc.testKey")
.setValue("300")
.build()).build();
responseObserver.onNext(response);
break;
case 2:
response = ConfigurationResponse.newBuilder().setUuid(uuid).build();
responseObserver.onNext(response);
break;
case 3:
response = ConfigurationResponse
.newBuilder().setUuid(UUID.randomUUID().toString())
.addConfigTable(Config
.newBuilder()
.setName("test-module.grpc.testKey")
.build()).build();
responseObserver.onNext(response);
break;
default:
response = ConfigurationResponse
.newBuilder().setUuid(UUID.randomUUID().toString())
.addConfigTable(Config
.newBuilder()
.setName("test-module.grpc.testKey")
.setValue("100")
.build()).build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
}
@Override
public void callGroup(ConfigurationRequest request,
StreamObserver<GroupConfigurationResponse> responseObserver) {
GroupConfigurationResponse response;
String uuid = request.getUuid();
switch (this.dataFlag.get()) {
case 1:
response = GroupConfigurationResponse
.newBuilder().setUuid(UUID.randomUUID().toString())
.addGroupConfigTable(GroupConfigItems
.newBuilder().setGroupName("test-module.grpc.testKeyGroup")
.addItems(Config
.newBuilder()
.setName("item1")
.setValue("100")
.build())
.addItems(Config
.newBuilder()
.setName("item2")
.setValue("2000")
.build()).build()).build();
responseObserver.onNext(response);
break;
case 2:
response = GroupConfigurationResponse.newBuilder().setUuid(uuid).build();
responseObserver.onNext(response);
break;
case 3:
response = GroupConfigurationResponse
.newBuilder().setUuid(UUID.randomUUID().toString())
.addGroupConfigTable(GroupConfigItems
.newBuilder().setGroupName("test-module.grpc.testKeyGroup")
.addItems(Config
.newBuilder()
.setName("item2")
.setValue("2000")
.build()).build()).build();
responseObserver.onNext(response);
break;
default:
response = GroupConfigurationResponse
.newBuilder().setUuid(UUID.randomUUID().toString())
.addGroupConfigTable(GroupConfigItems
.newBuilder().setGroupName("test-module.grpc.testKeyGroup")
.addItems(Config
.newBuilder()
.setName("item1")
.setValue("100")
.build())
.addItems(Config
.newBuilder()
.setName("item2")
.setValue("200")
.build()).build()).build();
responseObserver.onNext(response);
}
responseObserver.onCompleted();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<Configuration status="INFO">
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout charset="UTF-8" pattern="%d - %c -%-4r [%t] %-5p %x - %m%n"/>
</Console>
</Appenders>
<Loggers>
<Root level="INFO">
<AppenderRef ref="Console"/>
</Root>
</Loggers>
</Configuration>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册