未验证 提交 41f41d12 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support zk namespace (#955)

* Support zk namespace, #935

* Revert default changes.
上级 5bf79f3a
...@@ -26,8 +26,6 @@ import org.apache.skywalking.apm.collector.client.Client; ...@@ -26,8 +26,6 @@ import org.apache.skywalking.apm.collector.client.Client;
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public interface DataMonitor { public interface DataMonitor {
String BASE_CATALOG = "/skywalking";
void setClient(Client client); void setClient(Client client);
void addListener(ClusterModuleListener listener) throws ClientException; void addListener(ClusterModuleListener listener) throws ClientException;
...@@ -39,4 +37,6 @@ public interface DataMonitor { ...@@ -39,4 +37,6 @@ public interface DataMonitor {
void createPath(String path) throws ClientException; void createPath(String path) throws ClientException;
void setData(String path, String value) throws ClientException; void setData(String path, String value) throws ClientException;
String getBaseCatalog();
} }
...@@ -55,18 +55,18 @@ public class ClusterStandaloneDataMonitor implements DataMonitor { ...@@ -55,18 +55,18 @@ public class ClusterStandaloneDataMonitor implements DataMonitor {
@Override @Override
public void addListener(ClusterModuleListener listener) { public void addListener(ClusterModuleListener listener) {
String path = BASE_CATALOG + listener.path(); String path = getBaseCatalog() + listener.path();
logger.info("listener path: {}", path); logger.info("listener path: {}", path);
listeners.put(path, listener); listeners.put(path, listener);
} }
@Override public ClusterModuleListener getListener(String path) { @Override public ClusterModuleListener getListener(String path) {
path = BASE_CATALOG + path; path = getBaseCatalog() + path;
return listeners.get(path); return listeners.get(path);
} }
@Override public void register(String path, ModuleRegistration registration) { @Override public void register(String path, ModuleRegistration registration) {
registrations.put(BASE_CATALOG + path, registration); registrations.put(getBaseCatalog() + path, registration);
} }
@Override public void createPath(String path) throws ClientException { @Override public void createPath(String path) throws ClientException {
...@@ -80,6 +80,10 @@ public class ClusterStandaloneDataMonitor implements DataMonitor { ...@@ -80,6 +80,10 @@ public class ClusterStandaloneDataMonitor implements DataMonitor {
} }
} }
@Override public String getBaseCatalog() {
return "/skywalking";
}
public void start() throws CollectorException { public void start() throws CollectorException {
Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator(); Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator();
while (entryIterator.hasNext()) { while (entryIterator.hasNext()) {
......
...@@ -34,5 +34,10 @@ ...@@ -34,5 +34,10 @@
<artifactId>collector-cluster-define</artifactId> <artifactId>collector-cluster-define</artifactId>
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>collector-configuration-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies> </dependencies>
</project> </project>
...@@ -16,22 +16,23 @@ ...@@ -16,22 +16,23 @@
* *
*/ */
package org.apache.skywalking.apm.collector.cluster.zookeeper; package org.apache.skywalking.apm.collector.cluster.zookeeper;
import java.util.Properties; import java.util.Properties;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClientException; import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.apache.skywalking.apm.collector.cluster.ClusterModule; import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService; import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleListenerService; import org.apache.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleRegisterService; import org.apache.skywalking.apm.collector.cluster.zookeeper.service.ZookeeperModuleRegisterService;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfig;
import org.apache.skywalking.apm.collector.core.CollectorException; import org.apache.skywalking.apm.collector.core.CollectorException;
import org.apache.skywalking.apm.collector.core.UnexpectedException; import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider; import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException; import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -69,6 +70,7 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider { ...@@ -69,6 +70,7 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
} }
@Override public void start(Properties config) throws ServiceNotProvidedException { @Override public void start(Properties config) throws ServiceNotProvidedException {
dataMonitor.setNamespace(getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace());
try { try {
zookeeperClient.initialize(); zookeeperClient.initialize();
} catch (ZookeeperClientException e) { } catch (ZookeeperClientException e) {
...@@ -85,7 +87,7 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider { ...@@ -85,7 +87,7 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
} }
@Override public String[] requiredModules() { @Override public String[] requiredModules() {
return new String[0]; return new String[] {ConfigurationModule.NAME};
} }
} }
...@@ -16,7 +16,6 @@ ...@@ -16,7 +16,6 @@
* *
*/ */
package org.apache.skywalking.apm.collector.cluster.zookeeper; package org.apache.skywalking.apm.collector.cluster.zookeeper;
import java.util.HashSet; import java.util.HashSet;
...@@ -25,11 +24,6 @@ import java.util.LinkedHashMap; ...@@ -25,11 +24,6 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.skywalking.apm.collector.client.Client; import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.ClientException; import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClient; import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
...@@ -40,6 +34,11 @@ import org.apache.skywalking.apm.collector.cluster.DataMonitor; ...@@ -40,6 +34,11 @@ import org.apache.skywalking.apm.collector.cluster.DataMonitor;
import org.apache.skywalking.apm.collector.cluster.ModuleRegistration; import org.apache.skywalking.apm.collector.cluster.ModuleRegistration;
import org.apache.skywalking.apm.collector.core.CollectorException; import org.apache.skywalking.apm.collector.core.CollectorException;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils; import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
...@@ -54,6 +53,7 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -54,6 +53,7 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
private Map<String, ClusterModuleListener> listeners; private Map<String, ClusterModuleListener> listeners;
private Map<String, ModuleRegistration> registrations; private Map<String, ModuleRegistration> registrations;
private String namespace;
public ClusterZKDataMonitor() { public ClusterZKDataMonitor() {
listeners = new LinkedHashMap<>(); listeners = new LinkedHashMap<>();
...@@ -130,17 +130,17 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -130,17 +130,17 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
} }
@Override public void addListener(ClusterModuleListener listener) { @Override public void addListener(ClusterModuleListener listener) {
String path = BASE_CATALOG + listener.path(); String path = getBaseCatalog() + listener.path();
logger.info("listener path: {}", path); logger.info("listener path: {}", path);
listeners.put(path, listener); listeners.put(path, listener);
} }
@Override public void register(String path, ModuleRegistration registration) { @Override public void register(String path, ModuleRegistration registration) {
registrations.put(BASE_CATALOG + path, registration); registrations.put(getBaseCatalog() + path, registration);
} }
@Override public ClusterModuleListener getListener(String path) { @Override public ClusterModuleListener getListener(String path) {
path = BASE_CATALOG + path; path = getBaseCatalog() + path;
return listeners.get(path); return listeners.get(path);
} }
...@@ -163,4 +163,12 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher { ...@@ -163,4 +163,12 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
client.setData(path, value.getBytes(), -1); client.setData(path, value.getBytes(), -1);
} }
} }
@Override public String getBaseCatalog() {
return "/" + namespace + "/skywalking";
}
void setNamespace(String namespace) {
this.namespace = namespace;
}
} }
...@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.configuration; ...@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.configuration;
import org.apache.skywalking.apm.collector.configuration.service.IApdexThresholdService; import org.apache.skywalking.apm.collector.configuration.service.IApdexThresholdService;
import org.apache.skywalking.apm.collector.configuration.service.IApplicationAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IApplicationAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.IApplicationReferenceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IApplicationReferenceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfig;
import org.apache.skywalking.apm.collector.configuration.service.IInstanceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IInstanceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.IInstanceReferenceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IInstanceReferenceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.IServiceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IServiceAlarmRuleConfig;
...@@ -40,6 +41,7 @@ public class ConfigurationModule extends Module { ...@@ -40,6 +41,7 @@ public class ConfigurationModule extends Module {
@Override public Class[] services() { @Override public Class[] services() {
return new Class[] { return new Class[] {
ICollectorConfig.class,
IApdexThresholdService.class, IApdexThresholdService.class,
IServiceAlarmRuleConfig.class, IInstanceAlarmRuleConfig.class, IApplicationAlarmRuleConfig.class, IServiceAlarmRuleConfig.class, IInstanceAlarmRuleConfig.class, IApplicationAlarmRuleConfig.class,
IServiceReferenceAlarmRuleConfig.class, IInstanceReferenceAlarmRuleConfig.class, IApplicationReferenceAlarmRuleConfig.class}; IServiceReferenceAlarmRuleConfig.class, IInstanceReferenceAlarmRuleConfig.class, IApplicationReferenceAlarmRuleConfig.class};
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.configuration.service;
import org.apache.skywalking.apm.collector.core.module.Service;
/**
* @author wu-sheng
*/
public interface ICollectorConfig extends Service {
/**
* @return the namespace of Collector, empty String if no custom namespace
*/
String getNamespace();
}
...@@ -22,9 +22,11 @@ import java.util.Properties; ...@@ -22,9 +22,11 @@ import java.util.Properties;
import org.apache.skywalking.apm.collector.configuration.service.ApdexThresholdService; import org.apache.skywalking.apm.collector.configuration.service.ApdexThresholdService;
import org.apache.skywalking.apm.collector.configuration.service.ApplicationAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.ApplicationAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.ApplicationReferenceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.ApplicationReferenceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.CollectorConfigService;
import org.apache.skywalking.apm.collector.configuration.service.IApdexThresholdService; import org.apache.skywalking.apm.collector.configuration.service.IApdexThresholdService;
import org.apache.skywalking.apm.collector.configuration.service.IApplicationAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IApplicationAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.IApplicationReferenceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IApplicationReferenceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfig;
import org.apache.skywalking.apm.collector.configuration.service.IInstanceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IInstanceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.IInstanceReferenceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IInstanceReferenceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.IServiceAlarmRuleConfig; import org.apache.skywalking.apm.collector.configuration.service.IServiceAlarmRuleConfig;
...@@ -41,7 +43,7 @@ import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedExcepti ...@@ -41,7 +43,7 @@ import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedExcepti
* @author peng-yongsheng * @author peng-yongsheng
*/ */
public class ConfigurationModuleProvider extends ModuleProvider { public class ConfigurationModuleProvider extends ModuleProvider {
private static final String NAMESPACE = "namespace";
private static final String APPLICATION_APDEX_THRESHOLD = "application_apdex_threshold"; private static final String APPLICATION_APDEX_THRESHOLD = "application_apdex_threshold";
private static final String SERVICE_ERROR_RATE_THRESHOLD = "service_error_rate_threshold"; private static final String SERVICE_ERROR_RATE_THRESHOLD = "service_error_rate_threshold";
private static final String SERVICE_AVERAGE_RESPONSE_TIME_THRESHOLD = "service_average_response_time_threshold"; private static final String SERVICE_AVERAGE_RESPONSE_TIME_THRESHOLD = "service_average_response_time_threshold";
...@@ -59,6 +61,7 @@ public class ConfigurationModuleProvider extends ModuleProvider { ...@@ -59,6 +61,7 @@ public class ConfigurationModuleProvider extends ModuleProvider {
} }
@Override public void prepare(Properties config) throws ServiceNotProvidedException { @Override public void prepare(Properties config) throws ServiceNotProvidedException {
String namespace = (String)config.getOrDefault(NAMESPACE, "");
Integer applicationApdexThreshold = (Integer)config.getOrDefault(APPLICATION_APDEX_THRESHOLD, 2000); Integer applicationApdexThreshold = (Integer)config.getOrDefault(APPLICATION_APDEX_THRESHOLD, 2000);
Double serviceErrorRateThreshold = (Double)config.getOrDefault(SERVICE_ERROR_RATE_THRESHOLD, 10.00); Double serviceErrorRateThreshold = (Double)config.getOrDefault(SERVICE_ERROR_RATE_THRESHOLD, 10.00);
Integer serviceAverageResponseTimeThreshold = (Integer)config.getOrDefault(SERVICE_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000); Integer serviceAverageResponseTimeThreshold = (Integer)config.getOrDefault(SERVICE_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000);
...@@ -67,6 +70,7 @@ public class ConfigurationModuleProvider extends ModuleProvider { ...@@ -67,6 +70,7 @@ public class ConfigurationModuleProvider extends ModuleProvider {
Double applicationErrorRateThreshold = (Double)config.getOrDefault(APPLICATION_ERROR_RATE_THRESHOLD, 10.00); Double applicationErrorRateThreshold = (Double)config.getOrDefault(APPLICATION_ERROR_RATE_THRESHOLD, 10.00);
Integer applicationAverageResponseTimeThreshold = (Integer)config.getOrDefault(APPLICATION_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000); Integer applicationAverageResponseTimeThreshold = (Integer)config.getOrDefault(APPLICATION_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000);
this.registerServiceImplementation(ICollectorConfig.class, new CollectorConfigService(namespace));
this.registerServiceImplementation(IApdexThresholdService.class, new ApdexThresholdService(applicationApdexThreshold)); this.registerServiceImplementation(IApdexThresholdService.class, new ApdexThresholdService(applicationApdexThreshold));
this.registerServiceImplementation(IServiceAlarmRuleConfig.class, new ServiceAlarmRuleConfig(serviceErrorRateThreshold, serviceAverageResponseTimeThreshold)); this.registerServiceImplementation(IServiceAlarmRuleConfig.class, new ServiceAlarmRuleConfig(serviceErrorRateThreshold, serviceAverageResponseTimeThreshold));
this.registerServiceImplementation(IInstanceAlarmRuleConfig.class, new InstanceAlarmRuleConfig(instanceErrorRateThreshold, instanceAverageResponseTimeThreshold)); this.registerServiceImplementation(IInstanceAlarmRuleConfig.class, new InstanceAlarmRuleConfig(instanceErrorRateThreshold, instanceAverageResponseTimeThreshold));
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.configuration.service;
/**
* @author wu-sheng
*/
public class CollectorConfigService implements ICollectorConfig {
private String namespace;
public CollectorConfigService(String namespace) {
this.namespace = namespace == null ? "" : namespace;
}
@Override public String getNamespace() {
return namespace;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册