提交 b53a3c5b 编写于 作者: P pengys5

1. remove remote module

2. make stream module as a collector define module, contains grpc remote service for call remote workers
3. worker framework is ok
4. Collector start ok
上级 dd6ae268
......@@ -34,4 +34,49 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......@@ -6,6 +6,7 @@ import org.skywalking.apm.collector.core.framework.Context;
* @author pengys5
*/
public class AgentStreamModuleContext extends Context {
public AgentStreamModuleContext(String groupName) {
super(groupName);
}
......
package org.skywalking.apm.collector.remote.grpc.handler;
package org.skywalking.apm.collector.agentstream.worker;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
*/
public class RemoteHandlerDefineException extends DefineException {
public class AgentStreamModuleDefineException extends DefineException {
public RemoteHandlerDefineException(String message) {
public AgentStreamModuleDefineException(String message) {
super(message);
}
public RemoteHandlerDefineException(String message, Throwable cause) {
public AgentStreamModuleDefineException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.agentstream.worker.node.aggregation;
import org.skywalking.apm.collector.stream.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.LocalWorkerContext;
import org.skywalking.apm.collector.stream.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.Role;
import org.skywalking.apm.collector.stream.impl.AggregationWorker;
/**
* @author pengys5
*/
public class NodeComponentAggDayWorker extends AggregationWorker {
public NodeComponentAggDayWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void sendToNext() {
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class NodeComponentAggWorker extends AggregationWorker {
public NodeComponentAggWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void sendToNext() {
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<NodeComponentAggWorker> {
@Override
public Role role() {
return Role.INSTANCE;
}
@Override
public NodeComponentAggWorker workerInstance(ClusterWorkerContext clusterContext) {
return new NodeComponentAggWorker(role(), clusterContext);
}
@Override
public int queueSize() {
return 1024;
}
}
public enum Role implements org.skywalking.apm.collector.stream.worker.Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentAggWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new RollingSelector();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.stream.impl.data.Attribute;
import org.skywalking.apm.collector.stream.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.impl.data.operate.NonOperation;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.skywalking.apm.collector.agentstream.worker.node.define.proto.NodeComponent;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
......@@ -25,4 +29,14 @@ public class NodeComponentDataDefine extends DataDefine {
addAttribute(2, new Attribute("peers", AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute("aggregation", AttributeType.STRING, new CoverOperation()));
}
@Override public Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException {
NodeComponent.Message message = NodeComponent.Message.parseFrom(bytesData);
Data data = build();
data.setDataString(0, message.getId());
data.setDataString(1, message.getName());
data.setDataString(2, message.getPeers());
data.setDataString(3, message.getAggregation());
return data;
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
/**
* @author pengys5
*/
public class NodeComponentRemoteWorker extends AbstractRemoteWorker {
protected NodeComponentRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
}
}
package org.skywalking.apm.collector.agentstream.worker.node.define;
package org.skywalking.apm.collector.agentstream.worker.node.component;
import org.skywalking.apm.collector.agentstream.worker.CommonTable;
......
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentTable;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
......
package org.skywalking.apm.collector.agentstream.worker.node.define;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentTable;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
......
syntax = "proto3";
option java_multiple_files = true;
option java_multiple_files = false;
option java_package = "org.skywalking.apm.collector.agentstream.worker.node.define.proto";
message Message {
......
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory
\ No newline at end of file
......@@ -11,7 +11,6 @@ import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleDefineLoader;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.remote.SerializedDefineLoader;
import org.skywalking.apm.collector.core.server.ServerException;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
......@@ -28,9 +27,6 @@ public class CollectorStarter implements Starter {
ModuleConfigLoader configLoader = new ModuleConfigLoader();
Map<String, Map> configuration = configLoader.load();
SerializedDefineLoader serializedDefineLoader = new SerializedDefineLoader();
serializedDefineLoader.load();
ModuleGroupDefineLoader groupDefineLoader = new ModuleGroupDefineLoader();
Map<String, ModuleGroupDefine> moduleGroupDefineMap = groupDefineLoader.load();
......
......@@ -5,9 +5,6 @@ cluster:
# redis:
# host: localhost
# port: 6379
queue:
disruptor: on
data_carrier: off
agentstream:
grpc:
host: localhost
......
......@@ -37,10 +37,6 @@
<artifactId>snakeyaml</artifactId>
<groupId>org.yaml</groupId>
</exclusion>
<exclusion>
<artifactId>log4j-api</artifactId>
<groupId>org.apache.logging.log4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
......
......@@ -31,9 +31,9 @@ public abstract class SingleModuleInstaller implements ModuleInstaller {
}
}
} else {
Map.Entry<String, Map> clusterConfigEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(clusterConfigEntry.getKey());
moduleDefine.initialize(clusterConfigEntry.getValue(), serverHolder);
Map.Entry<String, Map> configEntry = moduleConfig.entrySet().iterator().next();
moduleDefine = moduleDefineMap.get(configEntry.getKey());
moduleDefine.initialize(configEntry.getValue(), serverHolder);
}
}
}
package org.skywalking.apm.collector.core.remote;
/**
* @author pengys5
*/
public interface SerializedDefine {
int ID();
Class clazz();
}
package org.skywalking.apm.collector.core.remote;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.module.ModuleGroupDefineLoader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class SerializedDefineLoader implements Loader<Map<Integer, SerializedDefine>> {
private final Logger logger = LoggerFactory.getLogger(ModuleGroupDefineLoader.class);
@Override public Map<Integer, SerializedDefine> load() throws ConfigException {
Map<Integer, SerializedDefine> serializedDefineMap = new LinkedHashMap<>();
SerializedDefinitionFile definitionFile = new SerializedDefinitionFile();
logger.info("serialized definition file name: {}", definitionFile.fileName());
DefinitionLoader<SerializedDefine> definitionLoader = DefinitionLoader.load(SerializedDefine.class, definitionFile);
int id = 1;
for (SerializedDefine serializedDefine : definitionLoader) {
logger.info("loaded serialized definition class: {}", serializedDefine.getClass().getName());
serializedDefineMap.put(id, serializedDefine);
}
return serializedDefineMap;
}
}
......@@ -2,9 +2,10 @@ package org.skywalking.apm.collector.queue;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.module.SingleModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -12,13 +13,16 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class QueueModuleInstaller implements ModuleInstaller {
public class QueueModuleInstaller extends SingleModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(QueueModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig,
Map<String, ModuleDefine> moduleDefineMap, ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning queue module install");
QueueModuleContext context = new QueueModuleContext(QueueModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
installSingle(moduleConfig, moduleDefineMap, serverHolder);
}
}
......@@ -23,7 +23,7 @@ public class QueueDataCarrierModuleDefine extends QueueModuleDefine {
}
@Override public boolean defaultModule() {
return true;
return false;
}
@Override
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>apm-collector</artifactId>
<groupId>org.skywalking</groupId>
<version>3.2-2017</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>apm-collector-remote</artifactId>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<grpc.version>1.4.0</grpc.version>
<netty.version>4.1.12.Final</netty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.remote;
import org.skywalking.apm.collector.core.framework.Context;
/**
* @author pengys5
*/
public class RemoteModuleContext extends Context {
public RemoteModuleContext(String groupName) {
super(groupName);
}
}
package org.skywalking.apm.collector.remote;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
/**
* @author pengys5
*/
public class RemoteModuleInstaller implements ModuleInstaller {
@Override public void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap,
ServerHolder serverHolder) throws DefineException, ClientException {
}
}
org.skywalking.apm.collector.remote.RemoteModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.remote.grpc.RemoteGRPCModuleDefine
\ No newline at end of file
......@@ -18,6 +18,11 @@
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-cluster</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-queue</artifactId>
......@@ -25,8 +30,53 @@
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-remote</artifactId>
<artifactId>apm-collector-server</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.stream;
/**
* @author pengys5
*/
public class LocalWorkerContext extends WorkerContext {
@Override
final public AbstractWorkerProvider findProvider(Role role) throws ProviderNotFoundException {
return null;
}
@Override
final public void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException {
}
}
package org.skywalking.apm.collector.stream;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public class StreamModuleContext extends Context {
private Map<Integer, DataDefine> dataDefineMap;
private ClusterWorkerContext clusterWorkerContext;
public StreamModuleContext(String groupName) {
super(groupName);
dataDefineMap = new HashMap<>();
}
public void putAllDataDefine(Map<Integer, DataDefine> dataDefineMap) {
this.dataDefineMap.putAll(dataDefineMap);
}
public DataDefine getDataDefine(int dataDefineId) {
return this.dataDefineMap.get(dataDefineId);
}
public ClusterWorkerContext getClusterWorkerContext() {
return clusterWorkerContext;
}
public void setClusterWorkerContext(ClusterWorkerContext clusterWorkerContext) {
this.clusterWorkerContext = clusterWorkerContext;
}
}
package org.skywalking.apm.collector.remote;
package org.skywalking.apm.collector.stream;
import java.util.List;
import java.util.Map;
......@@ -20,9 +20,9 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class RemoteModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
public abstract class StreamModuleDefine extends ModuleDefine implements ClusterDataListenerDefine {
private final Logger logger = LoggerFactory.getLogger(RemoteModuleDefine.class);
private final Logger logger = LoggerFactory.getLogger(StreamModuleDefine.class);
@Override
public final void initialize(Map config, ServerHolder serverHolder) throws DefineException, ClientException {
......@@ -33,9 +33,13 @@ public abstract class RemoteModuleDefine extends ModuleDefine implements Cluster
((ClusterModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME)).getDataMonitor().addListener(listener(), registration());
} catch (ConfigParseException | ServerException e) {
throw new RemoteModuleException(e.getMessage(), e);
throw new StreamModuleException(e.getMessage(), e);
}
}
@Override public final boolean defaultModule() {
return true;
}
public abstract List<Handler> handlerList() throws DefineException;
}
package org.skywalking.apm.collector.remote;
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.module.ModuleException;
/**
* @author pengys5
*/
public class RemoteModuleException extends ModuleException {
public class StreamModuleException extends ModuleException {
public RemoteModuleException(String message) {
public StreamModuleException(String message) {
super(message);
}
public RemoteModuleException(String message, Throwable cause) {
public StreamModuleException(String message, Throwable cause) {
super(message, cause);
}
}
package org.skywalking.apm.collector.remote;
package org.skywalking.apm.collector.stream;
import org.skywalking.apm.collector.core.framework.Context;
import org.skywalking.apm.collector.core.module.ModuleGroupDefine;
......@@ -7,19 +7,19 @@ import org.skywalking.apm.collector.core.module.ModuleInstaller;
/**
* @author pengys5
*/
public class RemoteModuleGroupDefine implements ModuleGroupDefine {
public class StreamModuleGroupDefine implements ModuleGroupDefine {
public static final String GROUP_NAME = "remote";
public static final String GROUP_NAME = "stream";
@Override public String name() {
return GROUP_NAME;
}
@Override public Context groupContext() {
return new RemoteModuleContext(GROUP_NAME);
return new StreamModuleContext(GROUP_NAME);
}
@Override public ModuleInstaller moduleInstaller() {
return new RemoteModuleInstaller();
return new StreamModuleInstaller();
}
}
package org.skywalking.apm.collector.stream;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
import org.skywalking.apm.collector.core.module.ModuleInstaller;
import org.skywalking.apm.collector.core.server.ServerHolder;
import org.skywalking.apm.collector.core.util.ObjectUtils;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.LocalAsyncWorkerProviderDefineLoader;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.RemoteWorkerProviderDefineLoader;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefineLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StreamModuleInstaller implements ModuleInstaller {
private final Logger logger = LoggerFactory.getLogger(StreamModuleInstaller.class);
@Override public void install(Map<String, Map> moduleConfig, Map<String, ModuleDefine> moduleDefineMap,
ServerHolder serverHolder) throws DefineException, ClientException {
logger.info("beginning stream module install");
StreamModuleContext context = new StreamModuleContext(StreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
DataDefineLoader dataDefineLoader = new DataDefineLoader();
try {
Map<Integer, DataDefine> dataDefineMap = dataDefineLoader.load();
context.putAllDataDefine(dataDefineMap);
} catch (ConfigException e) {
logger.error(e.getMessage(), e);
}
initializeWorker(context);
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
logger.info("module {} initialize", moduleDefine.getClass().getName());
moduleDefine.initialize((ObjectUtils.isNotEmpty(moduleConfig) && moduleConfig.containsKey(moduleDefine.name())) ? moduleConfig.get(moduleDefine.name()) : null, serverHolder);
}
}
private void initializeWorker(StreamModuleContext context) {
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext();
context.setClusterWorkerContext(clusterWorkerContext);
LocalAsyncWorkerProviderDefineLoader localAsyncProviderLoader = new LocalAsyncWorkerProviderDefineLoader();
RemoteWorkerProviderDefineLoader remoteProviderLoader = new RemoteWorkerProviderDefineLoader();
try {
List<AbstractLocalAsyncWorkerProvider> localAsyncProviders = localAsyncProviderLoader.load();
for (AbstractLocalAsyncWorkerProvider provider : localAsyncProviders) {
provider.setClusterContext(clusterWorkerContext);
provider.create();
clusterWorkerContext.putRole(provider.role());
}
List<AbstractRemoteWorkerProvider> remoteProviders = remoteProviderLoader.load();
for (AbstractRemoteWorkerProvider provider : remoteProviders) {
provider.setClusterContext(clusterWorkerContext);
provider.create();
clusterWorkerContext.putRole(provider.role());
}
} catch (ConfigException | ProviderNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.remote.grpc;
package org.skywalking.apm.collector.stream.grpc;
/**
* @author pengys5
*/
public class RemoteGRPCConfig {
public class StreamGRPCConfig {
public static String HOST;
public static int PORT;
}
package org.skywalking.apm.collector.remote.grpc;
package org.skywalking.apm.collector.stream.grpc;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigParseException;
......@@ -9,19 +9,19 @@ import org.skywalking.apm.collector.core.util.StringUtils;
/**
* @author pengys5
*/
public class RemoteGRPCConfigParser implements ModuleConfigParser {
public class StreamGRPCConfigParser implements ModuleConfigParser {
private static final String HOST = "host";
private static final String PORT = "port";
@Override public void parse(Map config) throws ConfigParseException {
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(HOST))) {
RemoteGRPCConfig.HOST = "localhost";
StreamGRPCConfig.HOST = "localhost";
}
if (ObjectUtils.isEmpty(config) || StringUtils.isEmpty(config.get(PORT))) {
RemoteGRPCConfig.PORT = 11800;
StreamGRPCConfig.PORT = 11800;
} else {
RemoteGRPCConfig.PORT = (Integer)config.get(PORT);
StreamGRPCConfig.PORT = (Integer)config.get(PORT);
}
}
}
package org.skywalking.apm.collector.remote.grpc;
package org.skywalking.apm.collector.stream.grpc;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
/**
* @author pengys5
*/
public class RemoteGRPCDataListener extends ClusterDataListener {
public class StreamGRPCDataListener extends ClusterDataListener {
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + RemoteModuleGroupDefine.GROUP_NAME + "." + RemoteGRPCModuleDefine.MODULE_NAME;
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + StreamModuleGroupDefine.GROUP_NAME + "." + StreamGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
......
package org.skywalking.apm.collector.remote.grpc;
package org.skywalking.apm.collector.stream.grpc;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
import org.skywalking.apm.collector.core.server.Server;
import org.skywalking.apm.collector.remote.RemoteModuleDefine;
import org.skywalking.apm.collector.remote.RemoteModuleGroupDefine;
import org.skywalking.apm.collector.remote.grpc.handler.RemoteHandlerDefineException;
import org.skywalking.apm.collector.remote.grpc.handler.RemoteHandlerDefineLoader;
import org.skywalking.apm.collector.server.grpc.GRPCServer;
import org.skywalking.apm.collector.stream.StreamModuleDefine;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.grpc.handler.RemoteCommonServiceHandler;
/**
* @author pengys5
*/
public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
public class StreamGRPCModuleDefine extends StreamModuleDefine {
public static final String MODULE_NAME = "remote";
public static final String MODULE_NAME = "stream";
@Override public String name() {
return MODULE_NAME;
}
@Override protected String group() {
return RemoteModuleGroupDefine.GROUP_NAME;
}
@Override public boolean defaultModule() {
return true;
return StreamModuleGroupDefine.GROUP_NAME;
}
@Override protected ModuleConfigParser configParser() {
return new RemoteGRPCConfigParser();
return new StreamGRPCConfigParser();
}
@Override protected Client createClient(DataMonitor dataMonitor) {
......@@ -44,25 +39,20 @@ public class RemoteGRPCModuleDefine extends RemoteModuleDefine {
}
@Override protected Server server() {
return new GRPCServer(RemoteGRPCConfig.HOST, RemoteGRPCConfig.PORT);
return new GRPCServer(StreamGRPCConfig.HOST, StreamGRPCConfig.PORT);
}
@Override protected ModuleRegistration registration() {
return new RemoteGRPCModuleRegistration();
return new StreamGRPCModuleRegistration();
}
@Override public ClusterDataListener listener() {
return new RemoteGRPCDataListener();
return new StreamGRPCDataListener();
}
@Override public List<Handler> handlerList() throws DefineException {
RemoteHandlerDefineLoader loader = new RemoteHandlerDefineLoader();
List<Handler> handlers = null;
try {
handlers = loader.load();
} catch (ConfigException e) {
throw new RemoteHandlerDefineException(e.getMessage(), e);
}
List<Handler> handlers = new ArrayList<>();
handlers.add(new RemoteCommonServiceHandler());
return handlers;
}
}
package org.skywalking.apm.collector.remote.grpc;
package org.skywalking.apm.collector.stream.grpc;
import org.skywalking.apm.collector.core.module.ModuleRegistration;
/**
* @author pengys5
*/
public class RemoteGRPCModuleRegistration extends ModuleRegistration {
public class StreamGRPCModuleRegistration extends ModuleRegistration {
@Override public Value buildValue() {
return new Value(RemoteGRPCConfig.HOST, RemoteGRPCConfig.PORT, null);
return new Value(StreamGRPCConfig.HOST, StreamGRPCConfig.PORT, null);
}
}
package org.skywalking.apm.collector.stream.impl;
package org.skywalking.apm.collector.stream.grpc.handler;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.Message;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -17,8 +25,18 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class);
@Override public void call(Message request, StreamObserver<Empty> responseObserver) {
String workerRole = request.getWorkerRole();
String roleName = request.getWorkerRole();
int dataDefineId = request.getDataDefineId();
ByteString bytesData = request.getDataBytes();
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME);
DataDefine dataDefine = context.getDataDefine(dataDefineId);
try {
Data data = dataDefine.parseFrom(bytesData);
context.getClusterWorkerContext().lookup(context.getClusterWorkerContext().getRole(roleName)).tell(data);
} catch (InvalidProtocolBufferException | WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.core.queue.QueueExecutor;
/**
* The <code>AbstractLocalAsyncWorker</code> implementations represent workers,
......@@ -7,7 +9,7 @@ package org.skywalking.apm.collector.stream;
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker implements QueueExecutor {
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
......@@ -15,10 +17,9 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
* @param role The responsibility of worker in cluster, more than one workers can have same responsibility which use
* to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.queue.QueueCreator;
......@@ -14,8 +14,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
public abstract int queueSize();
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws ProviderNotFoundException {
@Override final public WorkerRef create() throws ProviderNotFoundException {
T localAsyncWorker = workerInstance(getClusterContext());
localAsyncWorker.preStart();
......@@ -23,11 +22,6 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
QueueEventHandler queueEventHandler = queueCreator.create(queueSize(), localAsyncWorker);
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
if (localContext != null) {
localContext.put(workerRef);
}
return workerRef;
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* The <code>AbstractLocalSyncWorker</code> defines workers who receive data from jvm inside call and response in real
......@@ -8,8 +8,8 @@ package org.skywalking.apm.collector.stream;
* @since v3.0-2017
*/
public abstract class AbstractLocalSyncWorker extends AbstractLocalWorker {
public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
public AbstractLocalSyncWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public abstract class AbstractLocalSyncWorkerProvider<T extends AbstractLocalSyncWorker> extends AbstractLocalWorkerProvider<T> {
@Override
final public WorkerRef onCreate(
LocalWorkerContext localContext) throws ProviderNotFoundException {
T localSyncWorker = (T) workerInstance(getClusterContext());
@Override final public WorkerRef create() throws ProviderNotFoundException {
T localSyncWorker = workerInstance(getClusterContext());
localSyncWorker.preStart();
LocalSyncWorkerRef workerRef = new LocalSyncWorkerRef(role(), localSyncWorker);
if (localContext != null) {
localContext.put(workerRef);
}
return workerRef;
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public abstract class AbstractLocalWorker extends AbstractWorker {
public AbstractLocalWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
public AbstractLocalWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* The <code>AbstractRemoteWorker</code> implementations represent workers,
......@@ -17,10 +17,9 @@ public abstract class AbstractRemoteWorker extends AbstractWorker {
* @param role If multi-workers are for load balance, they should be more likely called worker instance. Meaning,
* each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
protected AbstractRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
/**
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* The <code>AbstractRemoteWorkerProvider</code> implementations represent providers,
......@@ -20,13 +20,11 @@ public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorke
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
* @param localContext Not used, will be null.
* @return The created worker reference. See {@link RemoteWorkerRef}
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef onCreate(
LocalWorkerContext localContext) throws ProviderNotFoundException {
@Override final public WorkerRef create() throws ProviderNotFoundException {
T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart();
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.core.framework.Executor;
......@@ -7,16 +7,13 @@ import org.skywalking.apm.collector.core.framework.Executor;
*/
public abstract class AbstractWorker implements Executor {
private final LocalWorkerContext selfContext;
private final Role role;
private final ClusterWorkerContext clusterContext;
public AbstractWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
public AbstractWorker(Role role, ClusterWorkerContext clusterContext) {
this.role = role;
this.clusterContext = clusterContext;
this.selfContext = selfContext;
}
@Override public final void execute(Object message) {
......@@ -25,10 +22,6 @@ public abstract class AbstractWorker implements Executor {
public abstract void preStart() throws ProviderNotFoundException;
final public LocalWorkerContext getSelfContext() {
return selfContext;
}
final public ClusterWorkerContext getClusterContext() {
return clusterContext;
}
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
......@@ -11,9 +11,6 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
public abstract T workerInstance(ClusterWorkerContext clusterContext);
public abstract WorkerRef onCreate(
LocalWorkerContext localContext) throws ProviderNotFoundException;
final public void setClusterContext(ClusterWorkerContext clusterContext) {
this.clusterContext = clusterContext;
}
......@@ -21,16 +18,4 @@ public abstract class AbstractWorkerProvider<T extends AbstractWorker> implement
final protected ClusterWorkerContext getClusterContext() {
return clusterContext;
}
final public WorkerRef create(
AbstractWorker workerOwner) throws ProviderNotFoundException {
if (workerOwner == null) {
return onCreate(null);
} else if (workerOwner.getSelfContext() instanceof LocalWorkerContext) {
return onCreate((LocalWorkerContext)workerOwner.getSelfContext());
} else {
throw new IllegalArgumentException("the argument of workerOwner is Illegal");
}
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......
package org.skywalking.apm.collector.stream.worker;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class LocalAsyncWorkerProviderDefineLoader implements Loader<List<AbstractLocalAsyncWorkerProvider>> {
private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerProviderDefineLoader.class);
@Override public List<AbstractLocalAsyncWorkerProvider> load() throws ConfigException {
List<AbstractLocalAsyncWorkerProvider> providers = new ArrayList<>();
LocalAsyncWorkerProviderDefinitionFile definitionFile = new LocalAsyncWorkerProviderDefinitionFile();
logger.info("local async worker provider definition file name: {}", definitionFile.fileName());
DefinitionLoader<AbstractLocalAsyncWorkerProvider> definitionLoader = DefinitionLoader.load(AbstractLocalAsyncWorkerProvider.class, definitionFile);
int id = 1;
for (AbstractLocalAsyncWorkerProvider provider : definitionLoader) {
logger.info("loaded local async worker provider definition class: {}", provider.getClass().getName());
providers.add(provider);
}
return providers;
}
}
package org.skywalking.apm.collector.remote.grpc.handler;
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class RemoteHandlerDefinitionFile extends DefinitionFile {
public class LocalAsyncWorkerProviderDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "remote_handler.define";
return "local_async_worker_provider.define";
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* @author pengys5
*/
public interface Provider {
WorkerRef create(AbstractWorker workerOwner) throws ProviderNotFoundException;
WorkerRef create() throws ProviderNotFoundException;
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
public class ProviderNotFoundException extends Exception {
public ProviderNotFoundException(String message) {
......
package org.skywalking.apm.collector.remote.grpc.handler;
package org.skywalking.apm.collector.stream.worker;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.Handler;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -12,19 +11,21 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class RemoteHandlerDefineLoader implements Loader<List<Handler>> {
public class RemoteWorkerProviderDefineLoader implements Loader<List<AbstractRemoteWorkerProvider>> {
private final Logger logger = LoggerFactory.getLogger(RemoteHandlerDefineLoader.class);
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerProviderDefineLoader.class);
@Override public List<Handler> load() throws ConfigException {
List<Handler> handlers = new ArrayList<>();
@Override public List<AbstractRemoteWorkerProvider> load() throws ConfigException {
List<AbstractRemoteWorkerProvider> providers = new ArrayList<>();
RemoteWorkerProviderDefinitionFile definitionFile = new RemoteWorkerProviderDefinitionFile();
logger.info("remote worker provider definition file name: {}", definitionFile.fileName());
RemoteHandlerDefinitionFile definitionFile = new RemoteHandlerDefinitionFile();
DefinitionLoader<Handler> definitionLoader = DefinitionLoader.load(Handler.class, definitionFile);
for (Handler handler : definitionLoader) {
logger.info("loaded remote handler definition class: {}", handler.getClass().getName());
handlers.add(handler);
DefinitionLoader<AbstractRemoteWorkerProvider> definitionLoader = DefinitionLoader.load(AbstractRemoteWorkerProvider.class, definitionFile);
for (AbstractRemoteWorkerProvider provider : definitionLoader) {
logger.info("loaded remote worker provider definition class: {}", provider.getClass().getName());
providers.add(provider);
}
return handlers;
return providers;
}
}
package org.skywalking.apm.collector.core.remote;
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class SerializedDefinitionFile extends DefinitionFile {
public class RemoteWorkerProviderDefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "serialized.define";
return "remote_worker_provider.define";
}
}
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.stream.selector.WorkerSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
public class UsedRoleNameException extends Exception {
public UsedRoleNameException(String message) {
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.skywalking.apm.collector.stream.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
/**
* @author pengys5
*/
public abstract class WorkerContext implements Context {
private Map<Integer, DataDefine> dataDefineMap;
private Map<String, List<WorkerRef>> roleWorkers;
private Map<String, Role> roles;
private Map<Integer, DataDefine> dataDefineMap;
public WorkerContext() {
this.roleWorkers = new ConcurrentHashMap<>();
this.roleWorkers = new HashMap<>();
this.roles = new HashMap<>();
}
private Map<String, List<WorkerRef>> getRoleWorkers() {
return this.roleWorkers;
}
public final DataDefine getDataDefine(int defineId) {
return dataDefineMap.get(defineId);
}
@Override final public WorkerRefs lookup(Role role) throws WorkerNotFoundException {
if (getRoleWorkers().containsKey(role.roleName())) {
WorkerRefs refs = new WorkerRefs(getRoleWorkers().get(role.roleName()), role.workerSelector());
......@@ -36,6 +33,18 @@ public abstract class WorkerContext implements Context {
}
}
public final void putRole(Role role) {
roles.put(role.roleName(), role);
}
public final Role getRole(String roleName) {
return roles.get(roleName);
}
public final DataDefine getDataDefine(int defineId) {
return dataDefineMap.get(defineId);
}
@Override final public void put(WorkerRef workerRef) {
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
/**
* This exception is raised when worker fails to process job during "call" or "ask"
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
public class WorkerNotFoundException extends WorkerException {
public WorkerNotFoundException(String message) {
......
package org.skywalking.apm.collector.stream;
package org.skywalking.apm.collector.stream.worker;
import java.util.List;
import org.skywalking.apm.collector.stream.selector.WorkerSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
package org.skywalking.apm.collector.stream.impl;
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.core.queue.EndOfBatchCommand;
import org.skywalking.apm.collector.stream.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.LocalWorkerContext;
import org.skywalking.apm.collector.stream.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.Role;
import org.skywalking.apm.collector.stream.WorkerException;
import org.skywalking.apm.collector.stream.impl.data.Data;
import org.skywalking.apm.collector.stream.impl.data.DataCache;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataCache;
/**
* @author pengys5
......@@ -17,8 +16,8 @@ public abstract class AggregationWorker extends AbstractLocalAsyncWorker {
private DataCache dataCache;
public AggregationWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
public AggregationWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
dataCache = new DataCache();
}
......
package org.skywalking.apm.collector.stream.impl;
package org.skywalking.apm.collector.stream.worker.impl;
import org.skywalking.apm.collector.stream.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.LocalWorkerContext;
import org.skywalking.apm.collector.stream.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.Role;
import org.skywalking.apm.collector.stream.WorkerException;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
/**
* @author pengys5
*/
public class GRPCRemoteWorker extends AbstractRemoteWorker {
protected GRPCRemoteWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
protected GRPCRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected final void onWork(Object message) throws WorkerException {
......
package org.skywalking.apm.collector.stream.impl.data;
package org.skywalking.apm.collector.stream.worker.impl.data;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* @author pengys5
......@@ -75,4 +78,6 @@ public abstract class DataDefine {
}
}
}
public abstract Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException;
}
package org.skywalking.apm.collector.stream.impl.data;
package org.skywalking.apm.collector.stream.worker.impl.data;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
......
package org.skywalking.apm.collector.stream.impl.data.operate;
package org.skywalking.apm.collector.stream.worker.impl.data.operate;
import org.skywalking.apm.collector.stream.impl.data.Operation;
import org.skywalking.apm.collector.stream.worker.impl.data.Operation;
/**
* @author pengys5
......
package org.skywalking.apm.collector.stream.impl.data.operate;
package org.skywalking.apm.collector.stream.worker.impl.data.operate;
import org.skywalking.apm.collector.stream.impl.data.Operation;
import org.skywalking.apm.collector.stream.worker.impl.data.Operation;
/**
* @author pengys5
......
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.selector;
/**
* The <code>AbstractHashMessage</code> implementations represent aggregate message,
......
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}. It choose {@link WorkerRef}
......@@ -17,7 +18,7 @@ public class HashCodeSelector implements WorkerSelector<WorkerRef> {
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link org.skywalking.apm.collector.stream.AbstractWorker} is going to send.
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
......
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
......@@ -18,7 +19,7 @@ public class RollingSelector implements WorkerSelector<WorkerRef> {
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message message the {@link org.skywalking.apm.collector.stream.AbstractWorker} is going to send.
* @param message message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
......
package org.skywalking.apm.collector.stream.selector;
package org.skywalking.apm.collector.stream.worker.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.WorkerRef;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
import org.skywalking.apm.collector.stream.worker.AbstractWorker;
/**
* The <code>WorkerSelector</code> should be implemented by any class whose instances
......@@ -18,7 +19,7 @@ public interface WorkerSelector<T extends WorkerRef> {
* select a {@link WorkerRef} from a {@link WorkerRef} list.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link org.skywalking.apm.collector.stream.AbstractWorker} is going to send.
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
T select(List<T> members, Object message);
......
org.skywalking.apm.collector.stream.StreamModuleGroupDefine
\ No newline at end of file
org.skywalking.apm.collector.stream.grpc.StreamGRPCModuleDefine
\ No newline at end of file
org.skywalking.apm.collector.stream.impl.RemoteCommonServiceHandler
\ No newline at end of file
......@@ -12,7 +12,6 @@
<module>apm-collector-agentstream</module>
<module>apm-collector-ui</module>
<module>apm-collector-boot</module>
<module>apm-collector-remote</module>
<module>apm-collector-stream</module>
<module>apm-collector-agentserver</module>
<module>apm-collector-agentregister</module>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册