未验证 提交 80f999f9 编写于 作者: W wankai123 提交者: GitHub

Support apollo grouped dynamic configurations (#7685)

上级 f61dff9a
......@@ -61,6 +61,7 @@ Release Notes.
* Support etcd grouped dynamic configurations.
* Unified the config word `namespace` in the project.
* Switch JRE base image for dev images.
* Support apollo grouped dynamic configurations.
#### UI
......
......@@ -6,9 +6,52 @@
configuration:
selector: ${SW_CONFIGURATION:apollo}
apollo:
apolloMeta: ${SW_CONFIG_APOLLO:http://106.12.25.204:8080}
apolloMeta: ${SW_CONFIG_APOLLO:http://localhost:8080}
apolloCluster: ${SW_CONFIG_APOLLO_CLUSTER:default}
apolloEnv: ${SW_CONFIG_APOLLO_ENV:""}
appId: ${SW_CONFIG_APOLLO_APP_ID:skywalking}
period: ${SW_CONFIG_APOLLO_PERIOD:5}
```
\ No newline at end of file
period: ${SW_CONFIG_APOLLO_PERIOD:60}
```
## Config Storage
### Single Config
Single configs in apollo are key/value pairs:
| Key | Value |
|-----|-----|
| configKey | configVaule |
e.g. The config is:
```
{agent-analyzer.default.slowDBAccessThreshold}:{default:200,mongodb:50}
```
The config in apollo is:
| Key | Value |
|-----|-----|
| agent-analyzer.default.slowDBAccessThreshold | default:200,mongodb:50 |
| ... | ... |
### Group Config
Group config in apollo are key/value pairs as well, and the key is composited by configKey and subItemKey with `.`.
| Key | Value |
|-----|-----|
| configKey.subItemkey1 | subItemValue1 |
| configKey.subItemkey2 | subItemValue2 |
| ... | ... |
e.g. The config is:
```
{core.default.endpoint-name-grouping-openapi}:|{customerAPI-v1}:{value of customerAPI-v1}
|{productAPI-v1}:{value of productAPI-v1}
|{productAPI-v2}:{value of productAPI-v2}
```
The config in apollo is:
| Key | Value |
|-----|-----|
| core.default.endpoint-name-grouping-openapi.customerAPI-v1 | value of customerAPI-v1 |
| core.default.endpoint-name-grouping-openapi.productAPI-v1 | value of productAPI-v1 |
| core.default.endpoint-name-grouping-openapi.productAPI-v2 | value of productAPI-v2 |
\ No newline at end of file
......@@ -68,7 +68,21 @@ public class ApolloConfigWatcherRegister extends ConfigWatcherRegister {
@Override
public Optional<GroupConfigTable> readGroupConfig(final Set<String> keys) {
// TODO: implement readGroupConfig
return Optional.empty();
GroupConfigTable groupConfigTable = new GroupConfigTable();
Set<String> allKeys = this.configReader.getPropertyNames();
keys.forEach(key -> {
GroupConfigTable.GroupConfigItems groupConfigItems = new GroupConfigTable.GroupConfigItems(key);
groupConfigTable.addGroupConfigItems(groupConfigItems);
String groupKey = key + ".";
if (allKeys != null) {
allKeys.stream().filter(it -> it.startsWith(groupKey)).forEach(groupItemKey -> {
String itemValue = this.configReader.getProperty(groupItemKey, null);
String itemName = groupItemKey.substring(groupKey.length());
groupConfigItems.add(new ConfigTable.ConfigItem(itemName, itemValue));
});
}
});
return Optional.of(groupConfigTable);
}
}
......@@ -18,21 +18,23 @@
package org.apache.skywalking.oap.server.configuration.apollo;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.configuration.api.ConfigChangeWatcher;
import org.apache.skywalking.oap.server.configuration.api.ConfigurationModule;
import org.apache.skywalking.oap.server.configuration.api.DynamicConfigurationService;
import org.apache.skywalking.oap.server.configuration.api.GroupConfigChangeWatcher;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Slf4j
public class ApolloConfigurationTestProvider extends ModuleProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(ApolloConfigurationTestProvider.class);
ConfigChangeWatcher watcher;
GroupConfigChangeWatcher groupWatcher;
@Override
public String name() {
......@@ -57,7 +59,7 @@ public class ApolloConfigurationTestProvider extends ModuleProvider {
@Override
public void notify(ConfigChangeWatcher.ConfigChangeEvent value) {
LOGGER.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
log.info("ConfigChangeWatcher.ConfigChangeEvent: {}", value);
if (EventType.DELETE.equals(value.getEventType())) {
testValue = null;
} else {
......@@ -70,18 +72,44 @@ public class ApolloConfigurationTestProvider extends ModuleProvider {
return testValue;
}
};
groupWatcher = new GroupConfigChangeWatcher(ApolloConfigurationTestModule.NAME, this, "testKeyGroup") {
private Map<String, String> config = new ConcurrentHashMap<>();
@Override
public void notifyGroup(Map<String, ConfigChangeEvent> groupItems) {
log.info("GroupConfigChangeWatcher.ConfigChangeEvents: {}", groupItems);
groupItems.forEach((groupItemName, event) -> {
if (EventType.DELETE.equals(event.getEventType())) {
config.remove(groupItemName);
} else {
config.put(groupItemName, event.getNewValue());
}
});
}
@Override
public Map<String, String> groupItems() {
return config;
}
};
}
@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
public void start() throws ServiceNotProvidedException {
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(watcher);
getManager().find(ConfigurationModule.NAME)
.provider()
.getService(DynamicConfigurationService.class)
.registerConfigChangeWatcher(groupWatcher);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.configuration.apollo;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Reader;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicResponseHandler;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.skywalking.apm.util.PropertyPlaceholderHelper;
import org.apache.skywalking.oap.server.library.module.ApplicationConfiguration;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.testcontainers.containers.DockerComposeContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.yaml.snakeyaml.Yaml;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
@Slf4j
public class ITApolloConfigurationTest {
private final Yaml yaml = new Yaml();
private final String token = "f71f002a4ff9845639ef655ee7019759e31449de";
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private final ResponseHandler responseHandler = new BasicResponseHandler();
private String baseUrl;
private ApolloConfigurationTestProvider provider;
@ClassRule
public final static DockerComposeContainer<?> ENVIRONMENT =
new DockerComposeContainer<>(new File(ITApolloConfigurationTest.class
.getClassLoader()
.getResource("docker/docker-compose.yaml").getPath()))
.withExposedService("apollo-config-and-portal", 8080,
Wait.forLogMessage(".*Config service started.*", 1))
.withExposedService("apollo-config-and-portal", 8070,
Wait.forLogMessage(".*Portal started. You can visit.*", 1)
.withStartupTimeout(Duration.ofSeconds(100))
);
@Before
public void setUp() throws Exception {
String metaHost = ENVIRONMENT.getServiceHost("apollo-config-and-portal", 8080);
String metaPort = ENVIRONMENT.getServicePort("apollo-config-and-portal", 8080).toString();
System.setProperty("apollo.configService", "http://" + metaHost + ":" + metaPort);
System.setProperty("apollo.meta.port", metaPort);
System.setProperty("apollo.meta.host", metaHost);
log.info("apollo.configService: {}", System.getProperty("apollo.configService"));
String host = ENVIRONMENT.getServiceHost("apollo-config-and-portal", 8070);
String port = ENVIRONMENT.getServicePort("apollo-config-and-portal", 8070).toString();
baseUrl = "http://" + host + ":" + port;
log.info("baseUrl: {}", baseUrl);
final ApplicationConfiguration applicationConfiguration = new ApplicationConfiguration();
loadConfig(applicationConfiguration);
final ModuleManager moduleManager = new ModuleManager();
moduleManager.init(applicationConfiguration);
provider = (ApolloConfigurationTestProvider) moduleManager.find(ApolloConfigurationTestModule.NAME).provider();
assertNotNull(provider);
}
@SuppressWarnings("StatementWithEmptyBody")
@Test(timeout = 100000)
public void shouldReadUpdated() {
try {
assertNull(provider.watcher.value());
final HttpPost createConfigPost = new HttpPost(baseUrl + "/openapi/v1/envs/DEV" + "/apps/SampleApp" + "/clusters/default" + "/namespaces/application" + "/items");
createConfigPost.setHeader("Authorization", token);
createConfigPost.setHeader("Content-Type", "application/json;charset=UTF-8");
final StringEntity entity = new StringEntity("{\n" + " \"key\":\"test-module.default.testKey\",\n" + " \"value\":\"3000\",\n" + " \"comment\":\"test key\",\n" + " \"dataChangeCreatedBy\":\"apollo\"\n" + "}");
createConfigPost.setEntity(entity);
String createResponse = null;
//retry to wait apollo adminserver registered
for (int r = 1; r <= 10 && createResponse == null; r++) {
TimeUnit.SECONDS.sleep(5);
log.info("try createItem, times...: {}", r);
createResponse = this.httpExec(createConfigPost, responseHandler);
log.info("createResponse: {}", createResponse);
}
final HttpPost releaseConfigRequest = new HttpPost(baseUrl + "/openapi/v1/envs/DEV" + "/apps/SampleApp" + "/clusters/default" + "/namespaces/application/releases");
releaseConfigRequest.setEntity(new StringEntity("{\n" + " \"releaseTitle\":\"2019-06-07\",\n" + " \"releaseComment\":\"test\",\n" + " \"releasedBy\":\"apollo\"\n" + "}"));
releaseConfigRequest.setHeader("Authorization", token);
releaseConfigRequest.setHeader("Content-Type", "application/json;charset=UTF-8");
final String releaseCreateResponse = (String) httpClient.execute(releaseConfigRequest, responseHandler);
log.info("releaseCreateResponse: {}", releaseCreateResponse);
for (String v = provider.watcher.value(); v == null; v = provider.watcher.value()) {
}
assertEquals("3000", provider.watcher.value());
final HttpDelete deleteConfigRequest = new HttpDelete(baseUrl + "/openapi/v1" + "/envs/DEV" + "/apps/SampleApp" + "/clusters/default" + "/namespaces/application" + "/items/test-module.default.testKey" + "?operator=apollo");
deleteConfigRequest.setHeader("Authorization", token);
deleteConfigRequest.setHeader("Content-Type", "application/json;charset=UTF-8");
httpClient.execute(deleteConfigRequest);
final String releaseDeleteResponse = (String) httpClient.execute(releaseConfigRequest, responseHandler);
log.info("releaseDeleteResponse: {}", releaseDeleteResponse);
for (String v = provider.watcher.value(); v != null; v = provider.watcher.value()) {
}
assertNull(provider.watcher.value());
} catch (IOException | InterruptedException e) {
log.error(e.getMessage(), e);
fail(e.getMessage());
}
}
@SuppressWarnings("StatementWithEmptyBody")
@Test(timeout = 100000)
public void shouldReadUpdated4Group() {
try {
assertEquals("{}", provider.groupWatcher.groupItems().toString());
final HttpPost createConfigPost = new HttpPost(baseUrl + "/openapi/v1/envs/DEV" + "/apps/SampleApp" + "/clusters/default" + "/namespaces/application" + "/items");
createConfigPost.setHeader("Authorization", token);
createConfigPost.setHeader("Content-Type", "application/json;charset=UTF-8");
final StringEntity entityItem1 = new StringEntity("{\n" + " \"key\":\"test-module.default.testKeyGroup.item1\",\n" + " \"value\":\"100\",\n" + " \"comment\":\"test key\",\n" + " \"dataChangeCreatedBy\":\"apollo\"\n" + "}");
createConfigPost.setEntity(entityItem1);
String createResponseItem1 = null;
//retry to wait apollo adminserver registered
for (int r = 1; r <= 10 && createResponseItem1 == null; r++) {
TimeUnit.SECONDS.sleep(5);
log.info("try createItem, times...: {}", r);
createResponseItem1 = this.httpExec(createConfigPost, responseHandler);
log.info("createResponse: {}", createResponseItem1);
}
final StringEntity entityItem2 = new StringEntity("{\n" + " \"key\":\"test-module.default.testKeyGroup.item2\",\n" + " \"value\":\"200\",\n" + " \"comment\":\"test key\",\n" + " \"dataChangeCreatedBy\":\"apollo\"\n" + "}");
createConfigPost.setEntity(entityItem2);
final String createResponseItem2 = (String) httpClient.execute(createConfigPost, responseHandler);
log.info("createResponseItem2: {}", createResponseItem2);
final HttpPost releaseConfigRequest = new HttpPost(baseUrl + "/openapi/v1/envs/DEV" + "/apps/SampleApp" + "/clusters/default" + "/namespaces/application/releases");
releaseConfigRequest.setEntity(new StringEntity("{\n" + " \"releaseTitle\":\"2019-06-07\",\n" + " \"releaseComment\":\"test\",\n" + " \"releasedBy\":\"apollo\"\n" + "}"));
releaseConfigRequest.setHeader("Authorization", token);
releaseConfigRequest.setHeader("Content-Type", "application/json;charset=UTF-8");
final String releaseCreateResponse = (String) httpClient.execute(releaseConfigRequest, responseHandler);
log.info("releaseCreateResponse: {}", releaseCreateResponse);
for (String v = provider.groupWatcher.groupItems().get("item1"); v == null; v = provider.groupWatcher.groupItems().get("item1")) {
}
for (String v = provider.groupWatcher.groupItems().get("item2"); v == null; v = provider.groupWatcher.groupItems().get("item2")) {
}
assertEquals("100", provider.groupWatcher.groupItems().get("item1"));
assertEquals("200", provider.groupWatcher.groupItems().get("item2"));
//test remove item1
final HttpDelete deleteConfigRequest = new HttpDelete(baseUrl + "/openapi/v1" + "/envs/DEV" + "/apps/SampleApp" + "/clusters/default" + "/namespaces/application" + "/items/test-module.default.testKeyGroup.item1" + "?operator=apollo");
deleteConfigRequest.setHeader("Authorization", token);
deleteConfigRequest.setHeader("Content-Type", "application/json;charset=UTF-8");
httpClient.execute(deleteConfigRequest);
final String releaseDeleteResponse = (String) httpClient.execute(releaseConfigRequest, responseHandler);
log.info("releaseDeleteResponse: {}", releaseDeleteResponse);
for (String v = provider.groupWatcher.groupItems().get("item1"); v != null; v = provider.groupWatcher.groupItems().get("item1")) {
}
assertNull(provider.groupWatcher.groupItems().get("item1"));
assertEquals("200", provider.groupWatcher.groupItems().get("item2"));
} catch (IOException | InterruptedException e) {
log.error(e.getMessage(), e);
fail(e.getMessage());
}
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws FileNotFoundException {
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
final Object replaceValue = yaml.load(PropertyPlaceholderHelper.INSTANCE.replacePlaceholders(value + "", properties));
if (replaceValue != null) {
properties.replace(key, replaceValue);
}
});
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
}
}
//for retry
private String httpExec(HttpUriRequest request, ResponseHandler responseHandler) {
try {
return (String) this.httpClient.execute(request, responseHandler);
} catch (IOException ignored) {
return null;
}
}
}
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
version: '2.1'
services:
apollo-config-and-portal:
image: kezhenxu94/apollo:1.2
depends_on:
- apollo-db
links:
- apollo-db
apollo-db:
image: mysql:5.7
environment:
TZ: Asia/Shanghai
MYSQL_ALLOW_EMPTY_PASSWORD: 'yes'
depends_on:
- apollo-dbdata
volumes:
- ./docker-entrypoint-initdb.d:/docker-entrypoint-initdb.d
volumes_from:
- apollo-dbdata
apollo-dbdata:
image: alpine:3.13.6
volumes:
- /var/lib/mysql
\ No newline at end of file
......@@ -422,7 +422,7 @@ configuration:
apolloCluster: ${SW_CONFIG_APOLLO_CLUSTER:default}
apolloEnv: ${SW_CONFIG_APOLLO_ENV:""}
appId: ${SW_CONFIG_APOLLO_APP_ID:skywalking}
period: ${SW_CONFIG_APOLLO_PERIOD:5}
period: ${SW_CONFIG_APOLLO_PERIOD:60}
zookeeper:
period: ${SW_CONFIG_ZK_PERIOD:60} # Unit seconds, sync period. Default fetch every 60 seconds.
namespace: ${SW_CONFIG_ZK_NAMESPACE:/default}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册