提交 e12dc5fa 编写于 作者: P pengys5

1. Add agent jvm module

2. Add agent register module
上级 730c6063
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-agentjvm</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class AgentJVMModuleContext extends Context {
public AgentJVMModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.agentjvm;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AgentJVMModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
Server server = server();
serverHolder.holdServer(server, handlerList());
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new AgentJVMModuleException(e.getMessage(), e);
}
}
@Override protected final Client createClient(DataMonitor dataMonitor) {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList();
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class AgentJVMModuleException extends ModuleException {
public AgentJVMModuleException(String message) {
super(message);
}
public AgentJVMModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentjvm;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class AgentJVMModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "agent_jvm";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new AgentJVMModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new AgentJVMModuleInstaller();
}
}
package org.skywalking.apm.collector.agentjvm;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class AgentJVMModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(AgentJVMModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent jvm module install");
AgentJVMModuleContext context = new AgentJVMModuleContext(AgentJVMModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
/**
* @author pengys5
*/
public class AgentJVMGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.agentjvm.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class AgentJVMGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
AgentJVMGRPCConfig.HOST = "localhost";
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
AgentJVMGRPCConfig.PORT = 11800;
} else {
AgentJVMGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
/**
* @author pengys5
*/
public class AgentJVMGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + AgentJVMModuleGroupDefine.GROUP_NAME + "." + AgentJVMGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleDefine;
import org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine;
import org.skywalking.apm.collector.agentjvm.grpc.handler.JVMMetricsServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class AgentJVMGRPCModuleDefine extends AgentJVMModuleDefine {
public static final String MODULE_NAME = "grpc";
@Override protected String group() {
return AgentJVMModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new AgentJVMGRPCConfigParser();
}
@Override protected Server server() {
return new GRPCServer(AgentJVMGRPCConfig.HOST, AgentJVMGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new AgentJVMGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new AgentJVMGRPCDataListener();
}
@Override public List<Handler> handlerList() {
List<Handler> handlers = new LinkedList<>();
handlers.add(new JVMMetricsServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentjvm.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class AgentJVMGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(AgentJVMGRPCConfig.HOST, AgentJVMGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.agentjvm.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.JVMMetrics;
import org.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
/**
* @author pengys5
*/
public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {
@Override public void collect(JVMMetrics request, StreamObserver<Downstream> responseObserver) {
super.collect(request, responseObserver);
}
}
org.skywalking.apm.collector.agentjvm.AgentJVMModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.agentjvm.grpc.AgentJVMGRPCModuleDefine
\ No newline at end of file
package org.skywalking.apm.collector.agentregister.grpc;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleDefine;
import org.skywalking.apm.collector.agentregister.AgentRegisterModuleGroupDefine;
import org.skywalking.apm.collector.agentregister.grpc.handler.ApplicationRegisterServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.InstanceDiscoveryServiceHandler;
import org.skywalking.apm.collector.agentregister.grpc.handler.ServiceNameDiscoveryServiceHandler;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
......@@ -42,6 +46,10 @@ public class AgentRegisterGRPCModuleDefine extends AgentRegisterModuleDefine {
}
@Override public List<Handler> handlerList() {
return null;
List<Handler> handlers = new LinkedList<>();
handlers.add(new ApplicationRegisterServiceHandler());
handlers.add(new InstanceDiscoveryServiceHandler());
handlers.add(new ServiceNameDiscoveryServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler {
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
}
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
/**
* @author pengys5
*/
public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceImplBase implements GRPCHandler {
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
super.register(request, responseObserver);
}
@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
super.heartbeat(request, responseObserver);
}
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
super.registerRecover(request, responseObserver);
}
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
/**
* @author pengys5
*/
public class ServiceNameDiscoveryServiceHandler extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements GRPCHandler {
@Override public void discovery(ServiceNameCollection request,
StreamObserver<ServiceNameMappingCollection> responseObserver) {
super.discovery(request, responseObserver);
}
}
......@@ -43,5 +43,10 @@
<artifactId>apm-collector-agentregister</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentjvm</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -13,6 +13,6 @@ import org.skywalking.apm.collector.core.server.ServerHolder;
public class RemoteModuleInstaller implements ModuleInstaller {
@Override public void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap,
ServerHolder serverHolder) throws DefineException, ClientException {
}
}
package org.skywalking.apm.collector.server.grpc;
import org.skywalking.apm.collector.core.framework.Handler;
/**
* @author pengys5
*/
public interface GRPCHandler extends Handler {
}
......@@ -50,5 +50,6 @@ public class GRPCServer implements Server {
}
@Override public void addHandler(Handler handler) {
nettyServerBuilder.addService((io.grpc.BindableService)handler);
}
}
......@@ -16,6 +16,7 @@
<module>apm-collector-stream</module>
<module>apm-collector-agentserver</module>
<module>apm-collector-agentregister</module>
<module>apm-collector-agentjvm</module>
</modules>
<parent>
<artifactId>apm</artifactId>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册