提交 479b421a 编写于 作者: P pengys5

no message

上级 f187931e
package org.skywalking.apm.collector.agentjvm;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentJVMModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentJVMModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override protected void initializeOtherContext() {
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentjvm;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
/**
* @author pengys5
*/
public class AgentJVMModuleInstaller implements ModuleInstaller {
public class AgentJVMModuleInstaller extends MultipleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent jvm module install");
AgentJVMModuleContext context = new AgentJVMModuleContext(AgentJVMModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
}
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public Context moduleContext() {
return new AgentJVMModuleContext(groupName());
}
}
package org.skywalking.apm.collector.agentregister;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentRegisterModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleDefine.class);
@Override protected void initializeOtherContext() {
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentRegisterModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
......@@ -46,6 +21,4 @@ public abstract class AgentRegisterModuleDefine extends ModuleDefine implements
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentregister;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
/**
* @author pengys5
*/
public class AgentRegisterModuleInstaller implements ModuleInstaller {
public class AgentRegisterModuleInstaller extends MultipleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentRegisterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent register module install");
AgentRegisterModuleContext context = new AgentRegisterModuleContext(AgentRegisterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return AgentRegisterModuleGroupDefine.GROUP_NAME;
}
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public Context moduleContext() {
return new AgentRegisterModuleContext(groupName());
}
}
package org.skywalking.apm.collector.agentserver;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentServerModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleDefine.class);
@Override protected void initializeOtherContext() {
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentServerModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentserver;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
/**
* @author pengys5
*/
public class AgentServerModuleInstaller implements ModuleInstaller {
public class AgentServerModuleInstaller extends MultipleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentServerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent server module install");
AgentServerModuleContext context = new AgentServerModuleContext(AgentServerModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return AgentServerModuleGroupDefine.GROUP_NAME;
}
logger.info("could not configure agent server module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public Context moduleContext() {
return new AgentServerModuleContext(groupName());
}
}
package org.skywalking.apm.collector.agentstream;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentStreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentStreamModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
......@@ -47,5 +18,7 @@ public abstract class AgentStreamModuleDefine extends ModuleDefine implements Cl
return true;
}
public abstract List<Handler> handlerList();
@Override protected void initializeOtherContext() {
}
}
package org.skywalking.apm.collector.agentstream;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.agentstream.worker.storage.PersistenceTimer;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class AgentStreamModuleInstaller implements ModuleInstaller {
public class AgentStreamModuleInstaller extends MultipleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentStreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent stream module install");
AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return AgentStreamModuleGroupDefine.GROUP_NAME;
}
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public Context moduleContext() {
return new AgentStreamModuleContext(groupName());
}
@Override public void install() throws DefineException, ConfigException, ServerException {
super.install();
new PersistenceTimer().start();
}
}
package org.skywalking.apm.collector.boot;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
......@@ -34,11 +33,13 @@ public class CollectorStarter implements Starter {
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
ServerHolder serverHolder = new ServerHolder();
moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME), serverHolder);
moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME);
// moduleGroupDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME).moduleInstaller().install(configuration.get(ClusterModuleGroupDefine.GROUP_NAME), moduleDefineMap.get(ClusterModuleGroupDefine.GROUP_NAME), serverHolder);
// moduleGroupDefineMap.remove(ClusterModuleGroupDefine.GROUP_NAME);
for (ModuleGroupDefine moduleGroupDefine : moduleGroupDefineMap.values()) {
moduleGroupDefine.moduleInstaller().install(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()), serverHolder);
moduleGroupDefine.moduleInstaller().injectConfiguration(configuration.get(moduleGroupDefine.name()), moduleDefineMap.get(moduleGroupDefine.name()));
moduleGroupDefine.moduleInstaller().injectServerHolder(serverHolder);
moduleGroupDefine.moduleInstaller().install();
}
serverHolder.getServers().forEach(server -> {
......
package org.skywalking.apm.collector.cluster;
import java.util.Map;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
......@@ -22,22 +20,20 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
public static final String BASE_CATALOG = "skywalking";
private Client client;
private DataMonitor dataMonitor;
@Override public final void initialize(Map config, ServerHolder serverHolder) throws ClusterModuleException {
@Override protected void initializeOtherContext() {
try {
configParser().parse(config);
DataMonitor dataMonitor = dataMonitor();
dataMonitor = dataMonitor();
client = createClient(dataMonitor);
client.initialize();
dataMonitor.setClient(client);
ClusterModuleRegistrationReader reader = registrationReader(dataMonitor);
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setDataMonitor(dataMonitor);
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setReader(reader);
} catch (ConfigParseException | ClientException e) {
throw new ClusterModuleException(e.getMessage(), e);
CollectorContextHelper.INSTANCE.getClusterModuleContext().setDataMonitor(dataMonitor);
CollectorContextHelper.INSTANCE.getClusterModuleContext().setReader(reader);
} catch (ClientException e) {
throw new UnexpectedException(e.getMessage());
}
}
......@@ -49,6 +45,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
throw new UnsupportedOperationException("");
}
@Override public final List<Handler> handlerList() {
throw new UnsupportedOperationException("");
}
@Override protected final ModuleRegistration registration() {
throw new UnsupportedOperationException("Cluster module do not need module registration.");
}
......@@ -56,4 +56,5 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
public abstract DataMonitor dataMonitor();
public abstract ClusterModuleRegistrationReader registrationReader(DataMonitor dataMonitor);
}
package org.skywalking.apm.collector.cluster;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning cluster module install");
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return ClusterModuleGroupDefine.GROUP_NAME;
}
installSingle(moduleConfig, moduleDefineMap, serverHolder);
@Override public Context moduleContext() {
ClusterModuleContext clusterModuleContext = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putClusterContext(clusterModuleContext);
return clusterModuleContext;
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
......@@ -12,6 +13,7 @@ import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.skywalking.apm.collector.client.zookeeper.util.PathUtils;
import org.skywalking.apm.collector.cluster.ClusterNodeExistException;
import org.skywalking.apm.collector.core.CollectorException;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
......@@ -31,9 +33,11 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
private ZookeeperClient client;
private Map<String, ClusterDataListener> listeners;
private Map<String, ModuleRegistration> registrations;
public ClusterZKDataMonitor() {
listeners = new LinkedHashMap<>();
registrations = new LinkedHashMap<>();
}
@Override public void process(WatchedEvent event) {
......@@ -65,24 +69,32 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
this.client = (ZookeeperClient)client;
}
@Override public void start() throws CollectorException {
Iterator<Map.Entry<String, ModuleRegistration>> entryIterator = registrations.entrySet().iterator();
while (entryIterator.hasNext()) {
Map.Entry<String, ModuleRegistration> next = entryIterator.next();
createPath(next.getKey());
ModuleRegistration.Value value = next.getValue().buildValue();
String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
client.getChildren(next.getKey(), true);
String serverPath = next.getKey() + "/" + value.getHostPort();
if (client.exists(serverPath, false) == null) {
setData(serverPath, contextPath);
} else {
throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment.");
}
}
}
@Override
public void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException {
String path = PathUtils.convertKey2Path(listener.path());
logger.info("listener path: {}", path);
listeners.put(path, listener);
createPath(path);
ModuleRegistration.Value value = registration.buildValue();
String contextPath = value.getContextPath() == null ? "" : value.getContextPath();
client.getChildren(path, true);
String serverPath = path + "/" + value.getHostPort();
if (client.exists(serverPath, false) == null) {
setData(serverPath, contextPath);
} else {
throw new ClusterNodeExistException("current address: " + value.getHostPort() + " has been registered, check the host and port configuration or wait a moment.");
}
registrations.put(path, registration);
}
@Override public ClusterDataListener getListener(String path) {
......
package org.skywalking.apm.collector.core.client;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Starter;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public interface DataMonitor {
public interface DataMonitor extends Starter{
void setClient(Client client);
void addListener(ClusterDataListener listener, ModuleRegistration registration) throws ClientException;
......
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.core.framework;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
/**
* @author pengys5
......@@ -9,12 +10,17 @@ import java.util.Map;
public enum CollectorContextHelper {
INSTANCE;
private Map<String, Context> contexts = new LinkedHashMap();
private ClusterModuleContext clusterModuleContext;
private Map<String, Context> contexts = new LinkedHashMap<>();
public Context getContext(String moduleGroupName) {
return contexts.get(moduleGroupName);
}
public ClusterModuleContext getClusterModuleContext() {
return this.clusterModuleContext;
}
public void putContext(Context context) {
if (contexts.containsKey(context.getGroupName())) {
throw new UnsupportedOperationException("This module context was put, do not allow put a new one");
......@@ -22,4 +28,8 @@ public enum CollectorContextHelper {
contexts.put(context.getGroupName(), context);
}
}
public void putClusterContext(ClusterModuleContext clusterModuleContext) {
this.clusterModuleContext = clusterModuleContext;
}
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
*/
public interface Define {
void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException;
String name();
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
/**
* @author pengys5
*/
public abstract class ModuleConfigContainer implements ModuleInstaller {
private Map<String, Map> moduleConfig;
private Map<String, ModuleDefine> moduleDefineMap;
@Override
public final void injectConfiguration(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap) {
this.moduleConfig = moduleConfig;
this.moduleDefineMap = moduleDefineMap;
}
public final Map<String, Map> getModuleConfig() {
return moduleConfig;
}
public final Map<String, ModuleDefine> getModuleDefineMap() {
return moduleDefineMap;
}
}
package org.skywalking.apm.collector.core.module;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.server.Server;
/**
......@@ -20,5 +22,9 @@ public abstract class ModuleDefine implements Define {
protected abstract Server server();
public abstract List<Handler> handlerList();
protected abstract ModuleRegistration registration();
protected abstract void initializeOtherContext();
}
......@@ -2,13 +2,26 @@ package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
*/
public interface ModuleInstaller {
void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException;
void injectServerHolder(ServerHolder serverHolder);
String groupName();
Context moduleContext();
void injectConfiguration(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap);
void preInstall() throws DefineException, ConfigException, ServerException;
void install() throws ClientException, DefineException, ConfigException, ServerException;
}
package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class MultipleModuleInstaller extends ModuleConfigContainer {
private final Logger logger = LoggerFactory.getLogger(MultipleModuleInstaller.class);
public MultipleModuleInstaller() {
moduleDefines = new LinkedList<>();
}
private List<ModuleDefine> moduleDefines;
private ServerHolder serverHolder;
@Override public final void injectServerHolder(ServerHolder serverHolder) {
this.serverHolder = serverHolder;
}
@Override public final void preInstall() throws DefineException, ConfigException, ServerException {
Map<String, Map> moduleConfig = getModuleConfig();
Map<String, ModuleDefine> moduleDefineMap = getModuleDefineMap();
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineIterator = moduleDefineMap.entrySet().iterator();
while (moduleDefineIterator.hasNext()) {
Map.Entry<String, ModuleDefine> moduleDefineEntry = moduleDefineIterator.next();
logger.info("module {} initialize", moduleDefineEntry.getKey());
moduleDefineEntry.getValue().configParser().parse(moduleConfig.get(moduleDefineEntry.getKey()));
moduleDefines.add(moduleDefineEntry.getValue());
serverHolder.holdServer(moduleDefineEntry.getValue().server(), moduleDefineEntry.getValue().handlerList());
}
}
@Override public void install() throws DefineException, ConfigException, ServerException {
preInstall();
CollectorContextHelper.INSTANCE.putContext(moduleContext());
moduleDefines.forEach(moduleDefine -> {
moduleDefine.initializeOtherContext();
});
}
}
......@@ -3,7 +3,12 @@ package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
......@@ -12,28 +17,60 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class SingleModuleInstaller implements ModuleInstaller {
public abstract class SingleModuleInstaller extends ModuleConfigContainer {
private final Logger logger = LoggerFactory.getLogger(SingleModuleInstaller.class);
protected void installSingle(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
private ModuleDefine moduleDefine;
private ServerHolder serverHolder;
@Override public final void injectServerHolder(ServerHolder serverHolder) {
this.serverHolder = serverHolder;
}
@Override public final void preInstall() throws DefineException, ConfigException, ServerException {
Map<String, Map> moduleConfig = getModuleConfig();
Map<String, ModuleDefine> moduleDefineMap = getModuleDefineMap();
if (CollectionUtils.isNotEmpty(moduleConfig)) {
if (moduleConfig.size() > 1) {
throw new ClusterModuleException("single module, but configure multiple modules");
}
Map.Entry<String, Map> configEntry = moduleConfig.entrySet().iterator().next();
if (moduleDefineMap.containsKey(configEntry.getKey())) {
moduleDefine = moduleDefineMap.get(configEntry.getKey());
moduleDefine.configParser().parse(configEntry.getValue());
} else {
throw new ClusterModuleException("module name incorrect, please check the module name in application.yml");
}
} else {
logger.info("could not configure module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
boolean hasDefaultModule = false;
while (moduleDefineEntry.hasNext()) {
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
if (moduleDefineEntry.next().getValue().defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null, serverHolder);
break;
if (hasDefaultModule) {
throw new ClusterModuleException("single module, but configure multiple default module");
}
moduleDefine = moduleDefineEntry.next().getValue();
moduleDefine.configParser().parse(null);
hasDefaultModule = true;
}
}
} else {
Map.Entry<String, Map> configEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(configEntry.getKey());
moduleDefine.initialize(configEntry.getValue(), serverHolder);
}
serverHolder.holdServer(moduleDefine.server(), moduleDefine.handlerList());
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
preInstall();
moduleDefine.initializeOtherContext();
CollectorContextHelper.INSTANCE.putContext(moduleContext());
if (moduleDefine instanceof ClusterDataListenerDefine) {
ClusterDataListenerDefine listenerDefine = (ClusterDataListenerDefine)moduleDefine;
CollectorContextHelper.INSTANCE.getClusterModuleContext().getDataMonitor().addListener(listenerDefine.listener(), moduleDefine.registration());
}
}
}
......@@ -20,6 +20,10 @@ public class CollectionUtils {
return !isEmpty(list);
}
public static boolean isNotEmpty(Map map) {
return !isEmpty(map);
}
public static <T> boolean isNotEmpty(T[] array) {
return array != null && array.length > 0;
}
......
package org.skywalking.apm.collector.queue;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
......@@ -15,7 +17,7 @@ public abstract class QueueModuleDefine extends ModuleDefine {
throw new UnsupportedOperationException("");
}
@Override protected Client createClient(DataMonitor dataMonitor) {
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
......@@ -26,4 +28,8 @@ public abstract class QueueModuleDefine extends ModuleDefine {
@Override protected final Server server() {
throw new UnsupportedOperationException("");
}
@Override public final List<Handler> handlerList() {
throw new UnsupportedOperationException("");
}
}
package org.skywalking.apm.collector.queue;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.queue.datacarrier.DataCarrierQueueCreator;
/**
* @author pengys5
*/
public class QueueModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class);
@Override public String groupName() {
return QueueModuleGroupDefine.GROUP_NAME;
}
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning queue module install");
QueueModuleContext context = new QueueModuleContext(QueueModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public Context moduleContext() {
return new QueueModuleContext(groupName());
}
installSingle(moduleConfig, moduleDefineMap, serverHolder);
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
super.install();
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName())).setQueueCreator(new DataCarrierQueueCreator());
}
}
package org.skywalking.apm.collector.queue.datacarrier;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
......@@ -26,8 +22,7 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine {
return false;
}
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override protected void initializeOtherContext() {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DataCarrierQueueCreator());
}
}
package org.skywalking.apm.collector.queue.disruptor;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.queue.QueueModuleContext;
import org.skywalking.apm.collector.queue.QueueModuleDefine;
import org.skywalking.apm.collector.queue.QueueModuleGroupDefine;
......@@ -26,8 +22,7 @@ public class QueueDisruptorModuleDefine extends QueueModuleDefine {
return true;
}
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override protected void initializeOtherContext() {
((QueueModuleContext)CollectorContextHelper.INSTANCE.getContext(group())).setQueueCreator(new DisruptorQueueCreator());
}
}
package org.skywalking.apm.collector.storage;
import java.util.Map;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.framework.UnexpectedException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.storage.StorageException;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.slf4j.Logger;
......@@ -24,11 +24,8 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
private final Logger logger = LoggerFactory.getLogger(StorageModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
@Override protected void initializeOtherContext() {
try {
configParser().parse(config);
StorageModuleContext context = (StorageModuleContext)CollectorContextHelper.INSTANCE.getContext(StorageModuleGroupDefine.GROUP_NAME);
Client client = createClient(null);
client.initialize();
......@@ -36,11 +33,15 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
injectClientIntoDAO(client);
storageInstaller().install(client);
} catch (ConfigParseException | StorageException e) {
throw new StorageModuleException(e.getMessage(), e);
} catch (ClientException | StorageException | DefineException e) {
throw new UnexpectedException(e.getMessage());
}
}
@Override public final List<Handler> handlerList() {
throw new UnsupportedOperationException("");
}
@Override protected final Server server() {
throw new UnsupportedOperationException("");
}
......
package org.skywalking.apm.collector.storage;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StorageModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(StorageModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning storage module install");
StorageModuleContext context = new StorageModuleContext(StorageModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return StorageModuleGroupDefine.GROUP_NAME;
}
installSingle(moduleConfig, moduleDefineMap, serverHolder);
@Override public Context moduleContext() {
return new StorageModuleContext(groupName());
}
}
package org.skywalking.apm.collector.stream;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class StreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(StreamModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new StreamModuleException(e.getMessage(), e);
}
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList() throws DefineException;
@Override protected final void initializeOtherContext() {
}
}
package org.skywalking.apm.collector.stream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -22,25 +20,21 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StreamModuleInstaller implements ModuleInstaller {
public class StreamModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(StreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap,
ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning stream module install");
StreamModuleContext context = new StreamModuleContext(StreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return StreamModuleGroupDefine.GROUP_NAME;
}
initializeWorker(context);
@Override public Context moduleContext() {
return new StreamModuleContext(groupName());
}
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public void install() throws ClientException, DefineException, ConfigException, ServerException {
super.install();
initializeWorker((StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(groupName()));
}
private void initializeWorker(StreamModuleContext context) throws DefineException {
......
......@@ -5,7 +5,6 @@ import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
......@@ -50,7 +49,7 @@ public class StreamGRPCModuleDefine extends StreamModuleDefine {
return new StreamGRPCDataListener();
}
@Override public List<Handler> handlerList() throws DefineException {
@Override public List<Handler> handlerList() {
List<Handler> handlers = new ArrayList<>();
handlers.add(new RemoteCommonServiceHandler());
return handlers;
......
package org.skywalking.apm.collector.ui;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class UIModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(UIModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new UIModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
......@@ -47,5 +18,7 @@ public abstract class UIModuleDefine extends ModuleDefine implements ClusterData
return true;
}
public abstract List<Handler> handlerList();
@Override protected final void initializeOtherContext() {
}
}
package org.skywalking.apm.collector.ui;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.MultipleModuleInstaller;
/**
* @author pengys5
*/
public class UIModuleInstaller implements ModuleInstaller {
public class UIModuleInstaller extends MultipleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(UIModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning ui module install");
UIModuleContext context = new UIModuleContext(UIModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
@Override public String groupName() {
return UIModuleGroupDefine.GROUP_NAME;
}
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
@Override public Context moduleContext() {
return new UIModuleContext(groupName());
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册