未验证 提交 0fae36ed 编写于 作者: W wankai123 提交者: GitHub

Support consul grouped dynamic configurations (#7647)

上级 e5a5e05f
......@@ -53,6 +53,7 @@ Release Notes.
* Support for filter function filtering of int type values.
* Support mTLS for gRPC channel.
* Add yaml file suffix limit when reading ui templates.
* Support consul grouped dynamic configurations.
#### UI
......
......@@ -12,4 +12,54 @@ configuration:
period: ${SW_CONFIG_CONSUL_PERIOD:1}
# Consul aclToken
aclToken: ${SW_CONFIG_CONSUL_ACL_TOKEN:""}
```
\ No newline at end of file
```
## Config Storage
### Single Config
Single configs in Consul are key/value pairs:
| Key | Value |
|-----|-----|
| configKey | configVaule |
e.g. The config is:
```
{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50}
```
The config in Consul is:
| Key | Value |
|-----|-----|
| agent-analyzer.default.slowDBAccessThreshold | default:200,mongodb:50 |
| ... | ... |
### Group Config
Group config in Consul are key/value pairs as well, but according to the level keys organized by `/`, see: https://www.consul.io/docs/dynamic-app-config/kv#using-consul-kv
| Key | Value |
|-----|-----|
| configKey/subItemkey1 | subItemValue1 |
| configKey/subItemkey2 | subItemValue2 |
| ... | ... |
If use Consul UI we can see keys organized like folder:
```
configKey
-- subItemkey1
-- subItemkey2
...
```
e.g. The config is:
```
{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1}
|{productAPI-v1}:{value of productAPI-v1}
|{productAPI-v2}:{value of productAPI-v2}
```
The config in Consul is:
| Key | Value |
|-----|-----|
| core.default.endpoint-name-grouping-openapi/customerAPI-v1 | value of customerAPI-v1 |
| core.default.endpoint-name-grouping-openapi/productAPI-v1 | value of productAPI-v1 |
| core.default.endpoint-name-grouping-openapi/productAPI-v2 | value of productAPI-v2 |
......@@ -19,19 +19,17 @@
package org.apache.skywalking.oap.server.configuration.consul;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.AbstractConfigurationProvider;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Get configuration from Consul.
*/
@Slf4j
public class ConsulConfigurationProvider extends AbstractConfigurationProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationProvider.class);
private final ConsulConfigurationCenterSettings settings;
public ConsulConfigurationProvider() {
......@@ -50,7 +48,7 @@ public class ConsulConfigurationProvider extends AbstractConfigurationProvider {
@Override
protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
LOGGER.info("consul settings: {}", settings);
log.info("consul settings: {}", settings);
if (Strings.isNullOrEmpty(settings.getHostAndPorts())) {
throw new ModuleStartException("Consul hostAndPorts cannot be null or empty");
......
......@@ -31,17 +31,15 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.skywalking.oap.server.configuration.api.ConfigTable;
import org.apache.skywalking.oap.server.configuration.api.ConfigWatcherRegister;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings("UnstableApiUsage")
@Slf4j
public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationWatcherRegister.class);
private static final int DEFAULT_PORT = 8500;
private final KeyValueClient consul;
......@@ -97,8 +95,22 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
GroupConfigTable groupConfigTable = new GroupConfigTable();
keys.forEach(key -> {
GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key);
groupConfigTable.addGroupConfigItems(groupConfigItems);
String groupKey = key + "/";
List<String> groupItemKeys = this.consul.getKeys(groupKey);
if (groupItemKeys != null) {
groupItemKeys.stream().filter(it -> !groupKey.equals(it)).forEach(groupItemKey -> {
Optional<String> itemValue = this.consul.getValueAsString(groupItemKey);
String itemName = groupItemKey.substring(groupKey.length());
groupConfigItems.add(
new ConfigTable.ConfigItem(itemName, itemValue.orElse(null)));
});
}
});
return Optional.of(groupConfigTable);
}
private void registerKeyListeners(final Set<String> keys) {
......@@ -133,8 +145,8 @@ public class ConsulConfigurationWatcherRegister extends ConfigWatcherRegister {
}
private void onKeyValueChanged(String key, String value) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Consul config changed: {}: {}", key, value);
if (log.isInfoEnabled()) {
log.info("Consul config changed: {}: {}", key, value);
}
configItemKeyedByName.put(key, Optional.ofNullable(value));
......
......@@ -18,21 +18,23 @@
package org.apache.skywalking.oap.server.configuration.consul;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class ConsulConfigurationTestProvider extends ModuleProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsulConfigurationTestProvider.class);
ConfigChangeWatcher watcher;
GroupConfigChangeWatcher groupWatcher;
@Override
public String name() {
......@@ -57,7 +59,7 @@ public class ConsulConfigurationTestProvider extends ModuleProvider {
@Override
public void notify(ConfigChangeWatcher.ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
......@@ -70,6 +72,27 @@ public class ConsulConfigurationTestProvider extends ModuleProvider {
return testValue;
}
};
groupWatcher = new GroupConfigChangeWatcher(ConsulConfigurationTestModule.NAME, this, "testKeyGroup") {
private Map<String, String> config = new ConcurrentHashMap<>();
@Override
public void notifyGroup(Map<String, ConfigChangeEvent> groupItems) {
log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
groupItems.forEach((groupItemName, event) -> {
if (EventType.DELETE.equals(event.getEventType())) {
config.remove(groupItemName);
} else {
config.put(groupItemName, event.getNewValue());
}
});
}
@Override
public Map<String, String> groupItems() {
return config;
}
};
}
@Override
......@@ -78,6 +101,11 @@ public class ConsulConfigurationTestProvider extends ModuleProvider {
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(groupWatcher);
}
@Override
......
......@@ -83,6 +83,43 @@ public class ITConsulConfigurationTest {
assertNull(provider.watcher.value());
}
@Test(timeout = 30000)
public void shouldReadUpdated4Group() {
assertEquals("{}", provider.groupWatcher.groupItems().toString());
String hostAndPort = System.getProperty("consul.address", "127.0.0.1:8500");
Consul consul = Consul.builder()
.withHostAndPort(HostAndPort.fromString(hostAndPort))
.withConnectTimeoutMillis(5000)
.build();
KeyValueClient client = consul.keyValueClient();
assertTrue(client.putValue("test-module.default.testKeyGroup/item1", "100"));
assertTrue(client.putValue("test-module.default.testKeyGroup/item2", "200"));
for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) {
}
for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) {
}
assertEquals("100", provider.groupWatcher.groupItems().get("item1"));
assertEquals("200", provider.groupWatcher.groupItems().get("item2"));
//test remove item1
client.deleteKey("test-module.default.testKeyGroup/item1");
for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) {
}
assertNull(provider.groupWatcher.groupItems().get("item1"));
//test modify item2
client.putValue("test-module.default.testKeyGroup/item2", "300");
for (String v = provider.groupWatcher.groupItems().get("item2"); v.equals("200"); v = provider.groupWatcher.groupItems().get("item2")) {
}
assertEquals("300", provider.groupWatcher.groupItems().get("item2"));
//chean
client.deleteKey("test-module.default.testKeyGroup/item2");
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
Reader applicationReader = ResourceUtils.read("application.yml");
......
......@@ -88,7 +88,7 @@ public class ITNacosConfigurationTest {
@Test(timeout = 20000)
public void shouldReadUpdatedGroup() throws NacosException {
assertNull(provider.watcher.value());
assertEquals("{}", provider.groupWatcher.groupItems().toString());
final Properties properties = new Properties();
final String nacosHost = System.getProperty("nacos.host");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册