提交 706874c9 编写于 作者: P pengys5

Add storage module, start storage client and share with other module

上级 e12dc5fa
......@@ -23,6 +23,11 @@
<artifactId>apm-collector-queue</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-storage</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-ui</artifactId>
......
......@@ -28,6 +28,17 @@
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.2</version>
<exclusions>
<exclusion>
<artifactId>snakeyaml</artifactId>
<groupId>org.yaml</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
......
package org.skywalking.apm.collector.client.elasticsearch;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ElasticSearchClient implements Client {
private final Logger logger = LoggerFactory.getLogger(ElasticSearchClient.class);
private org.elasticsearch.client.Client client;
private final String clusterName;
private final String clusterTransportSniffer;
private final String clusterNodes;
public ElasticSearchClient(String clusterName, String clusterTransportSniffer, String clusterNodes) {
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
this.clusterNodes = clusterNodes;
}
@Override public void initialize() throws ClientException {
Settings settings = Settings.builder()
.put("cluster.name", clusterName)
.put("client.transport.sniff", clusterTransportSniffer)
.build();
client = new PreBuiltTransportClient(settings);
List<AddressPairs> pairsList = parseClusterNodes(clusterNodes);
for (AddressPairs pairs : pairsList) {
try {
((PreBuiltTransportClient)client).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(pairs.host), pairs.port));
} catch (UnknownHostException e) {
throw new ElasticSearchClientException(e.getMessage(), e);
}
}
}
private List<AddressPairs> parseClusterNodes(String nodes) {
List<AddressPairs> pairsList = new LinkedList<>();
logger.info("elasticsearch cluster nodes: {}", nodes);
String[] nodesSplit = nodes.split(",");
for (int i = 0; i < nodesSplit.length; i++) {
String node = nodesSplit[i];
String host = node.split(":")[0];
String port = node.split(":")[1];
pairsList.add(new AddressPairs(host, Integer.valueOf(port)));
}
return pairsList;
}
class AddressPairs {
private String host;
private Integer port;
public AddressPairs(String host, Integer port) {
this.host = host;
this.port = port;
}
}
}
package org.skywalking.apm.collector.client.elasticsearch;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class ElasticSearchClientException extends ClientException {
public ElasticSearchClientException(String message) {
super(message);
}
public ElasticSearchClientException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.cluster;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterModuleContext;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ClusterModuleInstaller implements ModuleInstaller {
public class ClusterModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleInstaller.class);
......@@ -27,22 +25,6 @@ public class ClusterModuleInstaller implements ModuleInstaller {
ClusterModuleContext context = new ClusterModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null, serverHolder);
break;
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue(), serverHolder);
}
installSingle(moduleConfig, moduleDefineMap, serverHolder);
}
}
package org.skywalking.apm.collector.core.module;
import java.util.Iterator;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class SingleModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(SingleModuleInstaller.class);
protected void installSingle(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
ModuleDefine moduleDefine = null;
if (CollectionUtils.isEmpty(moduleConfig)) {
logger.info("could not configure module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
moduleDefine = moduleDefineEntry.next().getValue();
if (moduleDefine.defaultModule()) {
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize(null, serverHolder);
break;
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue(), serverHolder);
}
}
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public abstract class Column<T> {
private String name;
private T value;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public abstract class Create {
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public abstract class Insert {
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public class IntegerColumn extends Column<Integer> {
}
......@@ -3,5 +3,6 @@ package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public class LongColumn extends Column<Long> {
public interface Storage {
void initialize() throws StorageException;
}
package org.skywalking.apm.collector.core.storage;
import org.skywalking.apm.collector.core.CollectorException;
/**
* @author pengys5
*/
public abstract class StorageException extends CollectorException {
public StorageException(String message) {
super(message);
}
public StorageException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.core.storage;
/**
* @author pengys5
*/
public class StringColumn extends Column<String> {
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-storage</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>elasticsearch-storage</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector-storage</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>h2-storage</artifactId>
<packaging>jar</packaging>
</project>
\ No newline at end of file
......@@ -10,9 +10,18 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-storage</artifactId>
<packaging>pom</packaging>
<modules>
<module>h2-storage</module>
<module>elasticsearch-storage</module>
</modules>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class StorageModuleContext extends Context {
private Client client;
public StorageModuleContext(String groupName) {
super(groupName);
}
public Client getClient() {
return client;
}
public void setClient(Client client) {
this.client = client;
}
}
package org.skywalking.apm.collector.storage;
import java.util.Map;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.cluster.ClusterDataListenerDefine;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class StorageModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(StorageModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
try {
configParser().parse(config);
StorageModuleContext context = new StorageModuleContext(ClusterModuleGroupDefine.GROUP_NAME);
context.setClient(createClient(null));
CollectorContextHelper.INSTANCE.putContext(context);
} catch (ConfigParseException e) {
throw new StorageModuleException(e.getMessage(), e);
}
}
@Override protected final Server server() {
throw new UnsupportedOperationException("");
}
@Override protected final ModuleRegistration registration() {
throw new UnsupportedOperationException("");
}
@Override public final ClusterDataListener listener() {
throw new UnsupportedOperationException("");
}
@Override public final boolean defaultModule() {
return true;
}
}
package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class StorageModuleException extends ModuleException {
public StorageModuleException(String message) {
super(message);
}
public StorageModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.storage;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class StorageModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "storage";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new StorageModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new StorageModuleInstaller();
}
}
package org.skywalking.apm.collector.storage;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StorageModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(StorageModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning agent stream module install");
StorageModuleContext context = new StorageModuleContext(StorageModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
installSingle(moduleConfig, moduleDefineMap, serverHolder);
}
}
package org.skywalking.apm.collector.storage.elasticsearch;
/**
* @author pengys5
*/
public class StorageElasticSearchConfig {
public static String CLUSTER_NAME;
public static String CLUSTER_TRANSPORT_SNIFFER;
public static String CLUSTER_NODES;
}
package org.skywalking.apm.collector.storage.elasticsearch;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class StorageElasticSearchConfigParser implements ModuleConfigParser {
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NAME))) {
StorageElasticSearchConfig.CLUSTER_NAME = (String)config.get(CLUSTER_NAME);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_TRANSPORT_SNIFFER))) {
StorageElasticSearchConfig.CLUSTER_TRANSPORT_SNIFFER = (String)config.get(CLUSTER_TRANSPORT_SNIFFER);
}
if (ObjectUtils.isNotEmpty(config) && StringUtils.isNotEmpty(config.get(CLUSTER_NODES))) {
StorageElasticSearchConfig.CLUSTER_NODES = (String)config.get(CLUSTER_NODES);
}
}
}
package org.skywalking.apm.collector.storage.elasticsearch;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.storage.StorageModuleDefine;
import org.skywalking.apm.collector.storage.StorageModuleGroupDefine;
/**
* @author pengys5
*/
public class StorageElasticSearchModuleDefine extends StorageModuleDefine {
public static final String MODULE_NAME = "elasticsearch";
@Override protected String group() {
return StorageModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new StorageElasticSearchConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
return new ElasticSearchClient(StorageElasticSearchConfig.CLUSTER_NAME, StorageElasticSearchConfig.CLUSTER_TRANSPORT_SNIFFER, StorageElasticSearchConfig.CLUSTER_NODES);
}
}
package org.skywalking.apm.collector.storage.h2;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
/**
* @author pengys5
*/
public class StorageH2ConfigParser implements ModuleConfigParser {
@Override public void parse(Map config) throws ConfigParseException {
}
}
package org.skywalking.apm.collector.storage.h2;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.storage.StorageModuleDefine;
import org.skywalking.apm.collector.storage.StorageModuleGroupDefine;
/**
* @author pengys5
*/
public class StorageH2ModuleDefine extends StorageModuleDefine {
public static final String MODULE_NAME = "h2";
@Override protected String group() {
return StorageModuleGroupDefine.GROUP_NAME;
}
@Override public String name() {
return MODULE_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new StorageH2ConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
return new H2Client();
}
}
org.skywalking.apm.collector.storage.StorageModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.storage.elasticsearch.StorageElasticSearchModuleDefine
org.skywalking.apm.collector.storage.h2.StorageH2ModuleDefine
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册