提交 116fca8a 编写于 作者: P pengys5

module install

#266
上级 b0003f01
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-client</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>client-h2</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.196</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.client.h2;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class H2Client implements Client {
@Override public void initialize() throws ClientException {
}
@Override public void insert(String path) throws ClientException {
}
@Override public void update() {
}
@Override public String select(String path) throws ClientException {
return null;
}
@Override public void delete() {
}
@Override public boolean exist(String path) throws ClientException {
return false;
}
@Override public void listen(String path) throws ClientException {
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-client</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>client-redis</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.client.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class RedisClient implements Client {
@Override public void initialize() throws ClientException {
}
@Override public void insert(String path) throws ClientException {
}
@Override public void update() {
}
@Override public String select(String path) throws ClientException {
return null;
}
@Override public void delete() {
}
@Override public boolean exist(String path) throws ClientException {
return false;
}
@Override public void listen(String path) throws ClientException {
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-client</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>client-zookeeper</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
<exclusions>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>log4j</artifactId>
<groupId>log4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.client.zookeeper;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ZookeeperClient implements Client {
private final Logger logger = LoggerFactory.getLogger(ZookeeperClient.class);
private ZooKeeper zk;
@Override public void initialize() throws ZookeeperClientException {
try {
zk = new ZooKeeper(ZookeeperConfig.hostPort, ZookeeperConfig.sessionTimeout, new ZookeeperDataListener(this));
} catch (IOException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void insert(String path) throws ZookeeperClientException {
logger.info("add the zookeeper path \"{}\"", path);
try {
zk.create(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void update() {
}
@Override public String select(String path) throws ZookeeperClientException {
logger.info("get the zookeeper data from path \"{}\"", path);
try {
return zk.getData(path, false, null).toString();
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void delete() {
}
@Override public boolean exist(String path) throws ZookeeperClientException {
logger.info("assess the zookeeper path \"{}\" exist", path);
try {
Stat stat = zk.exists(path, false);
if (ObjectUtils.isEmpty(stat)) {
return false;
} else {
return true;
}
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
@Override public void listen(String path) throws ZookeeperClientException {
try {
zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
throw new ZookeeperClientException(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.client.zookeeper;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class ZookeeperClientException extends ClientException {
public ZookeeperClientException(String message) {
super(message);
}
public ZookeeperClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.client.zookeeper;
/**
* @author pengys5
*/
public class ZookeeperConfig {
public static String hostPort;
public static int sessionTimeout;
}
package org.skywalking.apm.collector.client.zookeeper;
import java.util.LinkedList;
import java.util.List;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.client.DataListener;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ZookeeperDataListener implements DataListener, Watcher {
private final Logger logger = LoggerFactory.getLogger(ZookeeperDataListener.class);
private Client client;
public ZookeeperDataListener(Client client) {
this.client = client;
}
@Override public void process(WatchedEvent event) {
logger.debug("path {}", event.getPath());
if (StringUtils.isEmpty(event.getPath())) {
return;
}
try {
String data = client.select(event.getPath());
logger.debug("data {}", data);
} catch (ClientException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void listen() throws ClientException {
for (String itemKey : items()) {
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
client.listen(pathBuilder.toString());
}
}
@Override public List<String> items() {
List<String> items = new LinkedList<>();
items.add(ClusterDataInitializer.FOR_AGENT_CATALOG);
items.add(ClusterDataInitializer.FOR_UI_CATALOG);
return items;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-client</artifactId>
<packaging>pom</packaging>
<modules>
<module>client-zookeeper</module>
<module>client-redis</module>
<module>client-h2</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -3,20 +3,20 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<artifactId>apm-collector-cluster-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-commons</artifactId>
<artifactId>cluster-redis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<artifactId>client-redis</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.cluster.redis;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class ClusterRedisConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
/**
* @author pengys5
*/
public class ClusterRedisDataInitializer extends ClusterDataInitializer {
@Override public void addItem(Client client, String itemKey) throws ClientException {
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
return false;
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.client.redis.RedisClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
/**
* @author pengys5
*/
public class ClusterRedisModuleDefine extends ClusterModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Cluster;
}
@Override public String name() {
return "redis";
}
@Override public boolean defaultModule() {
return false;
}
@Override protected ModuleConfigParser configParser() {
return new ClusterRedisConfigParser();
}
@Override protected Client client() {
return new RedisClient();
}
@Override protected DataInitializer dataInitializer() {
return new ClusterRedisDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterRedisModuleRegistrationWriter();
}
}
package org.skywalking.apm.collector.cluster.redis;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
/**
* @author pengys5
*/
public class ClusterRedisModuleRegistrationWriter implements ClusterModuleRegistrationWriter {
@Override public void write(String key, String value) {
}
}
org.skywalking.apm.collector.cluster.redis.ClusterRedisModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-cluster-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-standalone</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>client-h2</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.cluster.standalone;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class ClusterStandaloneConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
/**
* @author pengys5
*/
public class ClusterStandaloneDataInitializer extends ClusterDataInitializer {
@Override public void addItem(Client client, String itemKey) throws ClientException {
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
return false;
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.framework.DataInitializer;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleDefine extends ClusterModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Cluster;
}
@Override public String name() {
return "standalone";
}
@Override public boolean defaultModule() {
return true;
}
@Override protected ModuleConfigParser configParser() {
return new ClusterStandaloneConfigParser();
}
@Override protected Client client() {
return new H2Client();
}
@Override protected DataInitializer dataInitializer() {
return new ClusterStandaloneDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterStandaloneModuleRegistrationWriter();
}
}
package org.skywalking.apm.collector.cluster.standalone;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
/**
* @author pengys5
*/
public class ClusterStandaloneModuleRegistrationWriter implements ClusterModuleRegistrationWriter {
@Override public void write(String key, String value) {
}
}
org.skywalking.apm.collector.cluster.standalone.ClusterStandaloneModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-cluster-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cluster-zookeeper</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>client-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.Map;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class ClusterZKConfigParser implements ModuleConfigParser {
private final String HOST_PORT = "hostPort";
private final String SESSION_TIMEOUT = "sessionTimeout";
@Override public void parse(Map config) throws ConfigParseException {
if (StringUtils.isEmpty(config.get(HOST_PORT))) {
throw new ConfigParseException("");
}
ZookeeperConfig.hostPort = (String)config.get(HOST_PORT);
if (StringUtils.isEmpty(config.get(SESSION_TIMEOUT))) {
ZookeeperConfig.sessionTimeout = 1000;
} else {
ZookeeperConfig.sessionTimeout = (Integer)config.get(SESSION_TIMEOUT);
}
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterZKDataInitializer extends ClusterDataInitializer {
private final Logger logger = LoggerFactory.getLogger(ClusterZKDataInitializer.class);
@Override public void addItem(Client client, String itemKey) throws ClientException {
logger.info("add the zookeeper item key \"{}\" exist", itemKey);
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
if (!client.exist(pathBuilder.toString())) {
client.insert(pathBuilder.toString());
}
}
}
@Override public boolean existItem(Client client, String itemKey) throws ClientException {
logger.info("assess the zookeeper item key \"{}\" exist", itemKey);
String[] catalogs = itemKey.split("\\.");
StringBuilder pathBuilder = new StringBuilder();
for (String catalog : catalogs) {
pathBuilder.append("/").append(catalog);
}
return client.exist(pathBuilder.toString());
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.cluster.ClusterDataInitializer;
import org.skywalking.apm.collector.core.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
/**
* @author pengys5
*/
public class ClusterZKModuleDefine extends ClusterModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Cluster;
}
@Override public String name() {
return "zookeeper";
}
@Override public boolean defaultModule() {
return false;
}
@Override public ModuleConfigParser configParser() {
return new ClusterZKConfigParser();
}
@Override public Client client() {
return new ZookeeperClient();
}
@Override public ClusterDataInitializer dataInitializer() {
return new ClusterZKDataInitializer();
}
@Override protected ClusterModuleRegistrationWriter registrationWriter() {
return new ClusterZKModuleRegistrationWriter();
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.util.List;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationReader;
/**
* @author pengys5
*/
public class ClusterZKModuleRegistrationReader implements ClusterModuleRegistrationReader {
@Override public List<String> read(String key) {
return null;
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import org.skywalking.apm.collector.core.cluster.ClusterModuleRegistrationWriter;
/**
* @author pengys5
*/
public class ClusterZKModuleRegistrationWriter implements ClusterModuleRegistrationWriter {
@Override public void write(String key, String value) {
}
}
org.skywalking.apm.collector.cluster.zookeeper.ClusterZKModuleDefine
\ No newline at end of file
package org.skywalking.apm.collector.cluster.zookeeper;
import java.io.FileNotFoundException;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.skywalking.apm.collector.client.zookeeper.ZookeeperConfig;
import org.skywalking.apm.collector.core.cluster.ClusterModuleException;
import org.yaml.snakeyaml.Yaml;
/**
* @author pengys5
*/
public class ClusterZKModuleDefineTestCase {
private Map config;
@Before
public void before() throws FileNotFoundException {
Yaml yaml = new Yaml();
config = (Map)yaml.load("hostPort: localhost:2181" + System.lineSeparator() + "sessionTimeout: 2000");
}
@Test
public void testInitialize() throws ClusterModuleException {
ClusterZKModuleDefine define = new ClusterZKModuleDefine();
define.initialize(config);
System.out.println(ZookeeperConfig.hostPort);
System.out.println(ZookeeperConfig.sessionTimeout);
}
}
package org.skywalking.apm.collector.cluster.zookeeper;
import java.io.IOException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
/**
* @author pengys5
*/
public class ZookeeperTestCase {
@Test
public void test() throws IOException, KeeperException, InterruptedException {
String hostPort = "localhost:2181";
String znode = "/collector/module";
String filename = "";
String exec[] = new String[5 - 3];
// new ZookeeperExecutor(hostPort, znode, filename, exec).run();
ZooKeeper zk = new ZooKeeper(hostPort, 1000, new Watcher() {
@Override public void process(WatchedEvent event) {
String path = event.getPath();
System.out.println("已经触发了" + event.getType() + "事件!");
System.out.println("path: " + path);
}
});
zk.create("/testRootPath", "testRootData".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 创建一个子目录节点
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath",false,null)));
// 取出子目录节点列表
System.out.println(zk.getChildren("/testRootPath",true));
// 修改子目录节点数据
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]");
// 创建另外一个子目录节点
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
// System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
// 删除子目录节点
zk.delete("/testRootPath/testChildPathTwo",-1);
zk.delete("/testRootPath/testChildPathOne",-1);
// 删除父目录节点
zk.delete("/testRootPath",-1);
zk.close();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-cluster-new</artifactId>
<packaging>pom</packaging>
<modules>
<module>cluster-zookeeper</module>
<module>cluster-redis</module>
<module>cluster-standalone</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.commons.config;
/**
* @author pengys5
*/
public enum SeedNodesFormatter {
INSTANCE;
public String formatter(String seedNodes) {
return null;
}
}
package org.skywalking.apm.collector.commons.role;
import org.skywalking.apm.collector.actor.Role;
import org.skywalking.apm.collector.actor.selector.RollingSelector;
import org.skywalking.apm.collector.actor.selector.WorkerSelector;
/**
* @author pengys5
*/
public enum TraceSegmentReceiverRole implements Role {
INSTANCE;
@Override
public String roleName() {
return "TraceSegmentReceiver";
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
serializers {
java = "akka.serialization.JavaSerializer"
proto = "akka.remote.serialization.ProtobufSerializer"
// TraceSegment = "org.skywalking.apm.collector.worker.TraceSegmentSerializer"
// json = "org.skywalking.apm.collector.commons.serializer.JsonSerializer"
}
serialization-bindings {
"java.lang.String" = java
"com.google.protobuf.Message" = proto
// "TraceSegment" = TraceSegment
// "com.google.gson.JsonObject" = json
}
warn-about-java-serializer-usage = on
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
}
}
cluster {
auto-down-unreachable-after = off
metrics.enabled = off
roles = ["WorkersListener"]
}
}
package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public class ClusterModuleContext {
private ClusterModuleRegistrationWriter writer;
public ClusterModuleRegistrationWriter getWriter() {
return writer;
}
public void setWriter(ClusterModuleRegistrationWriter writer) {
this.writer = writer;
}
}
......@@ -5,6 +5,7 @@ import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
/**
......@@ -26,4 +27,10 @@ public abstract class ClusterModuleDefine extends ModuleDefine {
@Override public final Server server() {
throw new UnsupportedOperationException("");
}
@Override protected final ModuleRegistration registration() {
throw new UnsupportedOperationException("Cluster module do not need module registration.");
}
protected abstract ClusterModuleRegistrationWriter registrationWriter();
}
......@@ -3,7 +3,7 @@ package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public interface Discovery {
public interface ClusterModuleDiscovery {
void discover();
}
package org.skywalking.apm.collector.core.cluster;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException {
logger.info("beginning cluster module install");
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null);
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
ModuleDefine moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue());
}
}
}
package org.skywalking.apm.collector.core.cluster;
import java.util.List;
/**
* @author pengys5
*/
public interface ClusterModuleRegistrationReader {
List<String> read(String key);
}
......@@ -3,7 +3,6 @@ package org.skywalking.apm.collector.core.cluster;
/**
* @author pengys5
*/
public interface Registration {
void register();
public interface ClusterModuleRegistrationWriter {
void write(String key, String value);
}
......@@ -5,5 +5,5 @@ import org.skywalking.apm.collector.core.framework.Loader;
/**
* @author pengys5
*/
public interface ConfigLoader extends Loader {
public interface ConfigLoader<T> extends Loader<T> {
}
package org.skywalking.apm.collector.core.framework;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleInstallerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class CollectorStarter implements Starter {
private final Logger logger = LoggerFactory.getLogger(CollectorStarter.class);
@Override public void start() throws ConfigException, DefineException {
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
ModuleDefineLoader defineLoader = new ModuleDefineLoader();
Map<String, Map<String, ModuleDefine>> moduleDefineMap = defineLoader.load();
ModuleInstallerAdapter moduleInstallerAdapter = new ModuleInstallerAdapter(ModuleGroup.Cluster);
moduleInstallerAdapter.install(configuration.get(ModuleGroup.Cluster.name().toLowerCase()), moduleDefineMap.get(ModuleGroup.Cluster.name().toLowerCase()));
ModuleGroup[] moduleGroups = ModuleGroup.values();
for (ModuleGroup moduleGroup : moduleGroups) {
if (!ModuleGroup.Cluster.equals(moduleGroup)) {
moduleInstallerAdapter = new ModuleInstallerAdapter(moduleGroup);
logger.info("module group {}, configuration {}", moduleGroup.name().toLowerCase(), configuration.get(moduleGroup.name().toLowerCase()));
moduleInstallerAdapter.install(configuration.get(moduleGroup.name().toLowerCase()), moduleDefineMap.get(moduleGroup.name().toLowerCase()));
}
}
}
}
package org.skywalking.apm.collector.core.framework;
/**
* @author pengys5
*/
public class Context {
}
......@@ -9,7 +9,5 @@ public interface Define {
void initialize(Map config) throws DefineException;
String getName();
void setName(String name);
String name();
}
......@@ -5,7 +5,6 @@ import org.skywalking.apm.collector.core.config.ConfigException;
/**
* @author pengys5
*/
public interface Loader {
void load() throws ConfigException;
public interface Loader<T> {
T load() throws ConfigException;
}
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public interface Starter {
void start();
void start() throws CollectorException;
}
......@@ -11,28 +11,16 @@ import org.yaml.snakeyaml.Yaml;
/**
* @author pengys5
*/
public class ModuleConfigLoader implements ConfigLoader {
public class ModuleConfigLoader implements ConfigLoader<Map<String, Map>> {
private final Logger logger = LoggerFactory.getLogger(ModuleConfigLoader.class);
@Override public void load() throws ModuleConfigLoaderException {
@Override public Map<String, Map> load() throws ModuleConfigLoaderException {
Yaml yaml = new Yaml();
ModuleInstaller installer = new ModuleInstaller();
Map<String, Map> configurations = null;
try {
configurations = (Map<String, Map>)yaml.load(ResourceUtils.read("application.yml"));
return (Map<String, Map>)yaml.load(ResourceUtils.read("application.yml"));
} catch (FileNotFoundException e) {
throw new ModuleConfigLoaderException(e.getMessage(), e);
}
configurations.forEach((moduleName, moduleConfig) -> {
logger.info("module name \"{}\" from application.yml", moduleName);
try {
installer.install(moduleName, moduleConfig);
} catch (ModuleException e) {
logger.error("module \"{}\" install failure", moduleName);
logger.error(e.getMessage(), e);
}
});
}
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
/**
* @author pengys5
*/
public class ModuleContext {
private ClusterModuleContext clusterContext;
public ClusterModuleContext getClusterContext() {
return clusterContext;
}
public void setClusterContext(ClusterModuleContext clusterContext) {
this.clusterContext = clusterContext;
}
}
......@@ -10,19 +10,9 @@ import org.skywalking.apm.collector.core.server.Server;
*/
public abstract class ModuleDefine implements Define {
private String moduleName;
@Override public final String getName() {
return moduleName;
}
@Override public final void setName(String name) {
this.moduleName = name;
}
protected abstract ModuleGroup group();
protected abstract boolean defaultModule();
public abstract boolean defaultModule();
protected abstract ModuleConfigParser configParser();
......@@ -31,4 +21,6 @@ public abstract class ModuleDefine implements Define {
protected abstract Server server();
protected abstract DataInitializer dataInitializer();
protected abstract ModuleRegistration registration();
}
package org.skywalking.apm.collector.core.module;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ModuleDefineLoader implements Loader<Map<String, Map<String, ModuleDefine>>> {
private final Logger logger = LoggerFactory.getLogger(ModuleDefineLoader.class);
@Override public Map<String, Map<String, ModuleDefine>> load() throws ConfigException {
Map<String, Map<String, ModuleDefine>> moduleDefineMap = new LinkedHashMap<>();
ModuleDefinitionFile definitionFile = new ModuleDefinitionFile();
logger.info("definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleDefine> definitionLoader = DefinitionLoader.load(ModuleDefine.class, definitionFile);
for (ModuleDefine moduleDefine : definitionLoader) {
logger.info("loaded module class: {}", moduleDefine.getClass().getName());
String groupName = moduleDefine.group().name().toLowerCase();
if (!moduleDefineMap.containsKey(groupName)) {
moduleDefineMap.put(groupName, new LinkedHashMap<>());
}
moduleDefineMap.get(groupName).put(moduleDefine.name().toLowerCase(), moduleDefine);
}
return moduleDefineMap;
}
}
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.core.module;
* @author pengys5
*/
public enum ModuleGroup {
Cluster, Worker, Queue
Cluster, Worker
}
package org.skywalking.apm.collector.core.module;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ModuleInstaller.class);
private final Map<String, ModuleDefine> moduleDefineMap;
protected ModuleInstaller() {
moduleDefineMap = new LinkedHashMap<>();
ModuleDefinitionFile definitionFile = new ModuleDefinitionFile();
logger.info("definition file name: {}", definitionFile.fileName());
DefinitionLoader<ModuleDefine> definitionLoader = DefinitionLoader.load(ModuleDefine.class, definitionFile);
for (ModuleDefine moduleDefine : definitionLoader) {
logger.info("loaded module class: {}", moduleDefine.getClass().getName());
moduleDefineMap.put(moduleDefine.getName(), moduleDefine);
}
}
public void install(String moduleName, Map moduleConfig) throws ModuleException {
Map<String, Map> module = (LinkedHashMap)moduleConfig;
module.entrySet().forEach(subModuleConfig -> {
String subMoudleName = moduleName + "." + subModuleConfig.getKey();
logger.info("install sub module {}", subMoudleName);
try {
if (moduleDefineMap.containsKey(subMoudleName)) {
moduleDefineMap.get(subMoudleName).initialize(subModuleConfig.getValue());
} else {
logger.error("could not found the module definition, module name: {}", subMoudleName);
}
} catch (DefineException e) {
logger.error(e.getMessage(), e);
}
});
}
public interface ModuleInstaller {
void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap) throws DefineException;
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.skywalking.apm.collector.core.cluster.ClusterModuleInstaller;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.worker.WorkerModuleInstaller;
/**
* @author pengys5
*/
public class ModuleInstallerAdapter implements ModuleInstaller {
private ModuleInstaller moduleInstaller;
public ModuleInstallerAdapter(ModuleGroup moduleGroup) {
if (ModuleGroup.Cluster.equals(moduleGroup)) {
moduleInstaller = new ClusterModuleInstaller();
} else if (ModuleGroup.Worker.equals(moduleGroup)) {
moduleInstaller = new WorkerModuleInstaller();
}
}
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException {
moduleInstaller.install(moduleConfig, moduleDefineMap);
}
}
package org.skywalking.apm.collector.core.module;
/**
* @author pengys5
*/
public abstract class ModuleRegistration {
protected static final String SEPARATOR = "|";
protected abstract String buildValue();
}
package org.skywalking.apm.collector.core.util;
import com.sun.istack.internal.Nullable;
import java.util.Map;
/**
* @author pengys5
*/
public class CollectionUtils {
public static boolean isEmpty(@Nullable Map map) {
return (map == null || map.size() == 0);
}
}
......@@ -5,11 +5,10 @@ import java.io.IOException;
import java.net.URL;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.skywalking.apm.collector.core.framework.Define;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,7 +35,7 @@ public class DefinitionLoader<D> implements Iterable<D> {
@Override public final Iterator<D> iterator() {
logger.info("load definition file: {}", definitionFile.get());
Properties properties = new Properties();
Map<String, String> definitionList = new LinkedHashMap<>();
List<String> definitionList = new LinkedList<>();
try {
Enumeration<URL> urlEnumeration = this.getClass().getClassLoader().getResources(definitionFile.get());
while (urlEnumeration.hasMoreElements()) {
......@@ -46,16 +45,15 @@ public class DefinitionLoader<D> implements Iterable<D> {
Enumeration defineItem = properties.propertyNames();
while (defineItem.hasMoreElements()) {
String key = (String)defineItem.nextElement();
String fullNameClass = properties.getProperty(key);
definitionList.put(key, fullNameClass);
String fullNameClass = (String)defineItem.nextElement();
definitionList.add(fullNameClass);
}
}
} catch (IOException e) {
e.printStackTrace();
}
Iterator<Map.Entry<String, String>> moduleDefineIterator = definitionList.entrySet().iterator();
Iterator<String> moduleDefineIterator = definitionList.iterator();
return new Iterator<D>() {
@Override public boolean hasNext() {
......@@ -63,16 +61,13 @@ public class DefinitionLoader<D> implements Iterable<D> {
}
@Override public D next() {
Map.Entry<String, String> moduleDefineEntry = moduleDefineIterator.next();
String definitionName = moduleDefineEntry.getKey();
String definitionClass = moduleDefineEntry.getValue();
logger.info("key: {}, definitionClass: {}", definitionName, definitionClass);
String definitionClass = moduleDefineIterator.next();
logger.info("definitionClass: {}", definitionClass);
try {
Class c = Class.forName(definitionClass);
Define define = (Define)c.newInstance();
define.setName(definitionName);
return (D)define;
return (D)c.newInstance();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return null;
}
......
package org.skywalking.apm.collector.core.worker;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class WorkerModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(WorkerModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap) throws DefineException {
logger.info("beginning worker module install");
Map.Entry<String, Map> workerConfigEntry = moduleConfig.entrySet().iterator().next();
ModuleDefine moduleDefine = moduleDefineMap.get(workerConfigEntry.getKey());
moduleDefine.initialize(workerConfigEntry.getValue());
}
}
package org.skywalking.apm.collector.core.module;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleForTest implements Module {
private final Logger logger = LoggerFactory.getLogger(ModuleInstaller.class);
@Override public void install(Map configuration) {
logger.debug(configuration.toString());
}
}
package org.skywalking.apm.collector.core.module;
import org.junit.Before;
import org.junit.Test;
/**
* @author pengys5
*/
public class ModuleInstallerTestCase {
@Before
public void init() {
}
@Test
public void testInstall() {
ModuleInstaller installer = new ModuleInstaller();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-queue</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>datacarrier-queue</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-queue</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>disruptor-queue</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-queue</artifactId>
<packaging>pom</packaging>
<modules>
<module>disruptor-queue</module>
<module>datacarrier-queue</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-server</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>google-rpc-server</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.server.grpc;
import io.grpc.netty.NettyServerBuilder;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GRPCServer implements Server {
private final Logger logger = LoggerFactory.getLogger(GRPCServer.class);
private final String host;
private final int port;
public GRPCServer(String host, int port) {
this.host = host;
this.port = port;
}
@Override public void initialize() throws ServerException {
InetSocketAddress address = new InetSocketAddress(host, port);
NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forAddress(address);
try {
io.grpc.Server server = nettyServerBuilder.build().start();
blockUntilShutdown(server);
} catch (InterruptedException | IOException e) {
throw new GRPCServerException(e.getMessage(), e);
}
logger.info("Server started, host {} listening on {}", host, port);
}
private void blockUntilShutdown(io.grpc.Server server) throws InterruptedException {
if (server != null) {
server.awaitTermination();
}
}
}
package org.skywalking.apm.collector.server.grpc;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class GRPCServerException extends ServerException {
public GRPCServerException(String message) {
super(message);
}
public GRPCServerException(String message, Throwable cause) {
super(message, cause);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-server</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jetty-server</artifactId>
<packaging>jar</packaging>
<properties>
<jetty.version>9.4.2.v20170220</jetty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jetty.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-servlet</artifactId>
<version>${jetty.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.server.jetty;
import java.net.InetSocketAddress;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class JettyServer implements Server {
private final Logger logger = LoggerFactory.getLogger(JettyServer.class);
private final String host;
private final int port;
private final String contextPath;
public JettyServer(String host, int port, String contextPath) {
this.host = host;
this.port = port;
this.contextPath = contextPath;
}
@Override public void initialize() throws ServerException {
org.eclipse.jetty.server.Server server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
ServletContextHandler servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
servletContextHandler.setContextPath(contextPath);
logger.info("http server root context path: {}", contextPath);
server.setHandler(servletContextHandler);
try {
server.start();
} catch (Exception e) {
throw new JettyServerException(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.server.jetty;
import org.skywalking.apm.collector.core.server.ServerException;
/**
* @author pengys5
*/
public class JettyServerException extends ServerException {
public JettyServerException(String message) {
super(message);
}
public JettyServerException(String message, Throwable cause) {
super(message, cause);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-server</artifactId>
<packaging>pom</packaging>
<modules>
<module>google-rpc-server</module>
<module>jetty-server</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-storage</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>elasticsearch-storage</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-storage</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>h2-storage</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-storage</artifactId>
<packaging>pom</packaging>
<modules>
<module>h2-storage</module>
<module>elasticsearch-storage</module>
</modules>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-worker-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-worker-agent</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.worker.agent;
/**
* @author pengys5
*/
public class WorkerAgentConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.worker.agent;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class WorkerAgentConfigParser implements ModuleConfigParser {
private final String HOST = "host";
private final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (StringUtils.isEmpty(config.get(HOST))) {
throw new ConfigParseException("");
}
WorkerAgentConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
}
WorkerAgentConfig.PORT = (Integer)config.get(PORT);
}
}
package org.skywalking.apm.collector.worker.agent;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.worker.WorkerModuleDefine;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
/**
* @author pengys5
*/
public class WorkerAgentModuleDefine extends WorkerModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Worker;
}
@Override public String name() {
return "agent";
}
@Override public boolean defaultModule() {
return true;
}
@Override public ModuleConfigParser configParser() {
return new WorkerAgentConfigParser();
}
@Override public Server server() {
return new GRPCServer(WorkerAgentConfig.HOST, WorkerAgentConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new WorkerAgentModuleRegistration();
}
}
package org.skywalking.apm.collector.worker.agent;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class WorkerAgentModuleRegistration extends ModuleRegistration {
@Override protected String buildValue() {
return WorkerAgentConfig.HOST + ModuleRegistration.SEPARATOR + WorkerAgentConfig.PORT;
}
}
org.skywalking.apm.collector.worker.agent.WorkerAgentModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-worker-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-worker-impl</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>cluster-standalone</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>cluster-zookeeper</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>cluster-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-worker-agent</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-worker-ui</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.worker.impl;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorStarter;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class CollectorBootStartUp {
private static final Logger logger = LoggerFactory.getLogger(CollectorBootStartUp.class);
public static void main(String[] args) throws ConfigException, DefineException {
logger.info("collector starting...");
CollectorStarter starter = new CollectorStarter();
starter.start();
}
}
cluster:
zookeeper:
hostPort: localhost:2181
sessionTimeout: 1000
redis:
host: localhost-rd
port: 2000
worker:
ui:
host: localhost
port: 12800
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-worker-new</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-worker-ui</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.worker.ui;
/**
* @author pengys5
*/
public class WorkerUIConfig {
public static String HOST;
public static int PORT;
public static String CONTEXT_PATH;
}
package org.skywalking.apm.collector.worker.ui;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class WorkerUIConfigParser implements ModuleConfigParser {
private final String HOST = "host";
private final String PORT = "port";
private final String CONTEXT_PATH = "context_path";
@Override public void parse(Map config) throws ConfigParseException {
if (StringUtils.isEmpty(config.get(HOST))) {
throw new ConfigParseException("HOST must be require");
}
WorkerUIConfig.HOST = (String)config.get(HOST);
if (StringUtils.isEmpty(config.get(PORT))) {
throw new ConfigParseException("");
}
WorkerUIConfig.PORT = (Integer)config.get(PORT);
if (StringUtils.isEmpty(config.get(CONTEXT_PATH))) {
WorkerUIConfig.CONTEXT_PATH = "/";
} else {
WorkerUIConfig.CONTEXT_PATH = (String)config.get(CONTEXT_PATH);
}
}
}
package org.skywalking.apm.collector.worker.ui;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleGroup;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.worker.WorkerModuleDefine;
import org.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author pengys5
*/
public class WorkerUIModuleDefine extends WorkerModuleDefine {
@Override public ModuleGroup group() {
return ModuleGroup.Worker;
}
@Override public String name() {
return "ui";
}
@Override public boolean defaultModule() {
return true;
}
@Override public ModuleConfigParser configParser() {
return new WorkerUIConfigParser();
}
@Override public Server server() {
return new JettyServer(WorkerUIConfig.HOST, WorkerUIConfig.PORT, WorkerUIConfig.CONTEXT_PATH);
}
@Override protected ModuleRegistration registration() {
return new WorkerUIModuleRegistration();
}
}
package org.skywalking.apm.collector.worker.ui;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class WorkerUIModuleRegistration extends ModuleRegistration {
@Override protected String buildValue() {
return WorkerUIConfig.HOST + ModuleRegistration.SEPARATOR + WorkerUIConfig.PORT + ModuleRegistration.SEPARATOR + WorkerUIConfig.CONTEXT_PATH;
}
}
org.skywalking.apm.collector.worker.ui.WorkerUIModuleDefine
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-worker-new</artifactId>
<packaging>pom</packaging>
<modules>
<module>apm-collector-worker-agent</module>
<module>apm-collector-worker-ui</module>
<module>apm-collector-worker-impl</module>
</modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>jetty-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>google-rpc-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
......@@ -25,11 +25,6 @@
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-network</artifactId>
......
package org.skywalking.apm.collector.worker.discovery;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
/**
* @author pengys5
*/
public class InstanceDiscoveryServiceImpl extends InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceImplBase implements WorkerCaller {
@Override public void preStart() throws ProviderNotFoundException {
}
@Override public void inject(ClusterWorkerContext clusterWorkerContext) {
}
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
super.register(request, responseObserver);
}
@Override
public void registerRecover(ApplicationInstanceMapping request, StreamObserver<Downstream> responseObserver) {
super.registerRecover(request, responseObserver);
}
}
package org.skywalking.apm.collector.worker.discovery;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller;
import org.skywalking.apm.network.proto.ServiceNameCollection;
import org.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
import org.skywalking.apm.network.proto.ServiceNameMappingCollection;
/**
* @author pengys5
*/
public class ServiceNameDisCoveryServiceImpl extends ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceImplBase implements WorkerCaller {
@Override public void preStart() throws ProviderNotFoundException {
}
@Override public void inject(ClusterWorkerContext clusterWorkerContext) {
}
@Override public void discovery(ServiceNameCollection request,
StreamObserver<ServiceNameMappingCollection> responseObserver) {
super.discovery(request, responseObserver);
}
}
package org.skywalking.apm.collector.worker.register;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.skywalking.apm.collector.actor.AbstractWorker;
import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceImpl extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements WorkerCaller {
private Logger logger = LogManager.getFormatterLogger(ApplicationRegisterServiceImpl.class);
private ClusterWorkerContext clusterWorkerContext;
private WorkerRef segmentReceiverWorkRef;
@Override public void preStart() throws ProviderNotFoundException {
segmentReceiverWorkRef = clusterWorkerContext.findProvider(SegmentReceiver.WorkerRole.INSTANCE).create(AbstractWorker.noOwner());
}
@Override public void inject(ClusterWorkerContext clusterWorkerContext) {
this.clusterWorkerContext = clusterWorkerContext;
}
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
}
}
package org.skywalking.apm.collector.worker.grpcserver;
package org.skywalking.apm.collector.worker.segment;
import io.grpc.stub.StreamObserver;
import org.apache.logging.log4j.LogManager;
......@@ -8,7 +8,7 @@ import org.skywalking.apm.collector.actor.ClusterWorkerContext;
import org.skywalking.apm.collector.actor.ProviderNotFoundException;
import org.skywalking.apm.collector.actor.WorkerInvokeException;
import org.skywalking.apm.collector.actor.WorkerRef;
import org.skywalking.apm.collector.worker.segment.SegmentReceiver;
import org.skywalking.apm.collector.worker.grpcserver.WorkerCaller;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UpstreamSegment;
......
org.skywalking.apm.collector.worker.grpcserver.TraceSegmentServiceImpl
\ No newline at end of file
org.skywalking.apm.collector.worker.segment.TraceSegmentServiceImpl
\ No newline at end of file
......@@ -5,7 +5,13 @@
<modules>
<module>apm-collector-cluster</module>
<module>apm-collector-worker</module>
<module>apm-collector-commons</module>
<module>apm-collector-core</module>
<module>apm-collector-queue</module>
<module>apm-collector-worker-new</module>
<module>apm-collector-storage</module>
<module>apm-collector-cluster-new</module>
<module>apm-collector-client</module>
<module>apm-collector-server</module>
</modules>
<parent>
<artifactId>apm</artifactId>
......@@ -21,48 +27,5 @@
</properties>
<dependencies>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.11</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.11</artifactId>
<version>${akka.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-jcl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_2.11</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册