提交 1bc7250e 编写于 作者: P pengys5

Remote worker call is successful

上级 e4f56ab1
......@@ -14,4 +14,8 @@ public class AgentJVMGRPCDataListener extends ClusterDataListener {
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
......@@ -4,7 +4,7 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterTable;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
......@@ -16,15 +16,15 @@ public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationRegisterTable.TABLE);
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationRegisterTable.COLUMN_APPLICATION_CODE, applicationCode));
searchRequestBuilder.setSize(10);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationTable.COLUMN_APPLICATION_CODE, applicationCode));
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
return searchResponse.getHits().getAt(0).getField(ApplicationRegisterTable.COLUMN_APPLICATION_ID).getValue();
return searchResponse.getHits().getAt(0).getField(ApplicationTable.COLUMN_APPLICATION_ID).getValue();
}
return 0;
}
......
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationIDGetOrCreate {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDGetOrCreate.class);
public int getOrCreate(String applicationCode) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
return dao.getApplicationId(applicationCode);
int applicationId = dao.getApplicationId(applicationCode);
if (applicationId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
ApplicationDataDefine.Application application = new ApplicationDataDefine.Application(applicationCode, applicationCode, 0);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(application);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
return applicationId;
}
}
......@@ -14,4 +14,7 @@ public class AgentRegisterGRPCDataListener extends ClusterDataListener {
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
......@@ -12,4 +12,8 @@ public class AgentServerJettyDataListener extends ClusterDataListener {
@Override public String path() {
return ClusterModuleDefine.BASE_CATALOG + "." + AgentServerModuleGroupDefine.GROUP_NAME + "." + AgentServerJettyModuleDefine.MODULE_NAME;
}
@Override public void addressChangedNotify() {
}
}
......@@ -34,49 +34,4 @@
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.4.1.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.5.0</version>
<configuration>
<!--
The version of protoc must match protobuf-java. If you don't depend on
protobuf-java directly, you will be transitively depending on the
protobuf-java version that grpc depends on.
-->
<protocArtifact>com.google.protobuf:protoc:3.3.0:exe:${os.detected.classifier}
</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.4.0:exe:${os.detected.classifier}
</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
......@@ -26,7 +26,6 @@ public class AgentStreamModuleInstaller implements ModuleInstaller {
AgentStreamModuleContext context = new AgentStreamModuleContext(AgentStreamModuleGroupDefine.GROUP_NAME);
CollectorContextHelper.INSTANCE.putContext(context);
logger.info("could not configure cluster module, use the default");
Iterator<Map.Entry<String, ModuleDefine>> moduleDefineEntry = moduleDefineMap.entrySet().iterator();
while (moduleDefineEntry.hasNext()) {
ModuleDefine moduleDefine = moduleDefineEntry.next().getValue();
......
......@@ -14,4 +14,7 @@ public class AgentStreamGRPCDataListener extends ClusterDataListener {
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
......@@ -14,4 +14,7 @@ public class AgentStreamJettyDataListener extends ClusterDataListener {
@Override public String path() {
return PATH;
}
@Override public void addressChangedNotify() {
}
}
......@@ -4,6 +4,7 @@ import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvid
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.AggregationWorker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.RollingSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -53,5 +54,9 @@ public class NodeComponentAggWorker extends AggregationWorker {
public WorkerSelector workerSelector() {
return new RollingSelector();
}
@Override public DataDefine dataDefine() {
return new NodeComponentDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.node.component;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.skywalking.apm.collector.agentstream.worker.node.define.proto.NodeComponent;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
......@@ -15,7 +12,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation
*/
public class NodeComponentDataDefine extends DataDefine {
@Override protected int defineId() {
@Override public int defineId() {
return 0;
}
......@@ -30,13 +27,11 @@ public class NodeComponentDataDefine extends DataDefine {
addAttribute(3, new Attribute("aggregation", AttributeType.STRING, new CoverOperation()));
}
@Override public Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException {
NodeComponent.Message message = NodeComponent.Message.parseFrom(bytesData);
Data data = build();
data.setDataString(0, message.getId());
data.setDataString(1, message.getName());
data.setDataString(2, message.getPeers());
data.setDataString(3, message.getAggregation());
return data;
@Override public Object deserialize(RemoteData remoteData) {
return null;
}
@Override public RemoteData serialize(Object object) {
return null;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class ApplicationDataDefine extends DataDefine {
public static final int DEFINE_ID = 101;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ApplicationTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ApplicationTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
String applicationCode = remoteData.getDataStrings(1);
int applicationId = remoteData.getDataIntegers(0);
return new Application(id, applicationCode, applicationId);
}
@Override public RemoteData serialize(Object object) {
Application application = (Application)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(application.getId());
builder.addDataStrings(application.getApplicationCode());
builder.addDataIntegers(application.getApplicationId());
return builder.build();
}
public static class Application {
private final String id;
private final String applicationCode;
private final int applicationId;
public Application(String id, String applicationCode, int applicationId) {
this.id = id;
this.applicationCode = applicationCode;
this.applicationId = applicationId;
}
public String getId() {
return id;
}
public String getApplicationCode() {
return applicationCode;
}
public int getApplicationId() {
return applicationId;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class ApplicationEsTableDefine extends ElasticSearchTableDefine {
public ApplicationEsTableDefine() {
super(ApplicationTable.TABLE);
}
@Override public int refreshInterval() {
return 0;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class ApplicationH2TableDefine extends H2TableDefine {
public ApplicationH2TableDefine() {
super(ApplicationTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(ApplicationTable.COLUMN_APPLICATION_CODE, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(ApplicationTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.skywalking.apm.collector.agentstream.worker.register.application.proto.ApplicationRegisterOuterClass;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class ApplicationRegisterDataDefine extends DataDefine {
@Override protected int defineId() {
return 101;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ApplicationRegisterTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ApplicationRegisterTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
}
@Override public Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException {
ApplicationRegisterOuterClass.ApplicationRegister applicationRegister = ApplicationRegisterOuterClass.ApplicationRegister.parseFrom(bytesData);
Data data = build();
data.setDataString(1, applicationRegister.getApplicationCode());
return data;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterRemoteWorker.class);
protected ApplicationRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -23,7 +27,8 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
}
@Override protected void onWork(Object message) throws WorkerException {
ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message;
logger.debug("application code: {}", application.getApplicationCode());
}
public static class Factory extends AbstractRemoteWorkerProvider<ApplicationRegisterRemoteWorker> {
......@@ -36,10 +41,6 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
public ApplicationRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRegisterRemoteWorker(role(), clusterContext);
}
@Override public int workerNum() {
return 1;
}
}
public enum WorkerRole implements Role {
......@@ -47,12 +48,16 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
@Override
public String roleName() {
return NodeComponentAggWorker.class.getSimpleName();
return ApplicationRegisterRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
......@@ -6,6 +6,7 @@ import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
......@@ -54,5 +55,10 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
......@@ -3,8 +3,8 @@ package org.skywalking.apm.collector.agentstream.worker.register.application;
/**
* @author pengys5
*/
public class ApplicationRegisterTable {
public static final String TABLE = "application_register";
public class ApplicationTable {
public static final String TABLE = "application";
public static final String COLUMN_APPLICATION_CODE = "application_code";
public static final String COLUMN_APPLICATION_ID = "application_id";
}
syntax = "proto3";
option java_multiple_files = false;
option java_package = "org.skywalking.apm.collector.agentstream.worker.register.application.proto";
message ApplicationRegister {
string application_code = 1;
}
\ No newline at end of file
syntax = "proto3";
option java_multiple_files = false;
option java_package = "org.skywalking.apm.collector.agentstream.worker.node.define.proto";
message Message {
string id = 1;
string name = 2;
string peers = 3;
string aggregation = 4;
}
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeComponentH2TableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingEsTableDefine
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
\ No newline at end of file
......@@ -58,5 +58,10 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-core</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.client.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.ClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class GRPCClient implements Client {
private final Logger logger = LoggerFactory.getLogger(GRPCClient.class);
private final String host;
private final int port;
private ManagedChannel channel;
public GRPCClient(String host, int port) {
this.host = host;
this.port = port;
}
@Override public void initialize() throws ClientException {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
public ManagedChannel getChannel() {
return channel;
}
}
package org.skywalking.apm.collector.client.grpc;
import org.skywalking.apm.collector.core.client.ClientException;
/**
* @author pengys5
*/
public class GRPCClientException extends ClientException {
public GRPCClientException(String message) {
super(message);
}
public GRPCClientException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -47,6 +47,7 @@ public class ClusterZKDataMonitor implements DataMonitor, Watcher {
String dataStr = new String(data);
logger.debug("path children has been changed, path: {}, data: {}", event.getPath() + "/" + serverPath, dataStr);
listeners.get(event.getPath()).addAddress(serverPath + dataStr);
listeners.get(event.getPath()).addressChangedNotify();
}
}
} catch (ZookeeperClientException e) {
......
......@@ -28,4 +28,6 @@ public abstract class ClusterDataListener implements Listener {
public final void clearData() {
addresses.clear();
}
public abstract void addressChangedNotify();
}
......@@ -3,12 +3,16 @@ package org.skywalking.apm.collector.core.storage;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class StorageInstaller {
private final Logger logger = LoggerFactory.getLogger(StorageInstaller.class);
public final void install(Client client) throws StorageException {
StorageDefineLoader defineLoader = new StorageDefineLoader();
try {
......@@ -16,9 +20,8 @@ public abstract class StorageInstaller {
defineFilter(tableDefines);
for (TableDefine tableDefine : tableDefines) {
if (isExists(client, tableDefine)) {
deleteIndex(client, tableDefine);
} else {
if (!isExists(client, tableDefine)) {
logger.info("table: {} not exists", tableDefine.getName());
createTable(client, tableDefine);
}
}
......
......@@ -21,11 +21,11 @@ public abstract class TableDefine {
columnDefines.add(columnDefine);
}
public String getName() {
public final String getName() {
return name;
}
public List<ColumnDefine> getColumnDefines() {
public final List<ColumnDefine> getColumnDefines() {
return columnDefines;
}
}
......@@ -11,6 +11,6 @@ public class ElasticSearchColumnDefine extends ColumnDefine {
}
public enum Type {
Binary, Boolean, Date, Keyword, Long, Text
Binary, Boolean, Date, Keyword, Long, Integer
}
}
......@@ -12,6 +12,6 @@ public class H2ColumnDefine extends ColumnDefine {
}
public enum Type {
Boolean, Varchar, Bigint, Date
Boolean, Varchar, Int, Bigint, Date
}
}
......@@ -66,8 +66,9 @@ public class StreamModuleInstaller implements ModuleInstaller {
List<AbstractRemoteWorkerProvider> remoteProviders = remoteProviderLoader.load();
for (AbstractRemoteWorkerProvider provider : remoteProviders) {
provider.setClusterContext(clusterWorkerContext);
provider.create();
// provider.create();
clusterWorkerContext.putRole(provider.role());
clusterWorkerContext.putProvider(provider);
}
} catch (ProviderNotFoundException e) {
logger.error(e.getMessage(), e);
......
package org.skywalking.apm.collector.stream.grpc;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.cluster.ClusterModuleDefine;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.cluster.ClusterDataListener;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.RemoteWorkerRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class StreamGRPCDataListener extends ClusterDataListener {
private final Logger logger = LoggerFactory.getLogger(StreamGRPCDataListener.class);
public static final String PATH = ClusterModuleDefine.BASE_CATALOG + "." + StreamModuleGroupDefine.GROUP_NAME + "." + StreamGRPCModuleDefine.MODULE_NAME;
@Override public String path() {
return PATH;
}
private Map<String, GRPCClient> clients = new HashMap<>();
private Map<String, RemoteWorkerRef> workerRefs = new HashMap<>();
@Override public void addressChangedNotify() {
String selfAddress = StreamGRPCConfig.HOST + ":" + StreamGRPCConfig.PORT;
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
List<String> addresses = getAddresses();
clients.keySet().forEach(address -> {
if (!addresses.contains(address)) {
context.getClusterWorkerContext().remove(workerRefs.get(address));
workerRefs.remove(address);
}
});
for (String address : addresses) {
if (!clients.containsKey(address)) {
logger.debug("new address: {}, create this address remote worker reference", address);
String[] hostPort = address.split(":");
GRPCClient client = new GRPCClient(hostPort[0], Integer.valueOf(hostPort[1]));
try {
client.initialize();
} catch (ClientException e) {
e.printStackTrace();
}
clients.put(address, client);
if (selfAddress.equals(address)) {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.debug("create remote worker self reference, role: {}", provider.role().roleName());
provider.create();
});
} else {
context.getClusterWorkerContext().getProviders().forEach(provider -> {
logger.debug("create remote worker reference, role: {}", provider.role().roleName());
RemoteWorkerRef workerRef = provider.create(client);
});
}
} else {
logger.debug("address: {} had remote worker reference, ignore", address);
}
}
}
}
package org.skywalking.apm.collector.stream.grpc.handler;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.cluster.ClusterModuleGroupDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.remote.grpc.proto.Empty;
import org.skywalking.apm.collector.remote.grpc.proto.Message;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -24,18 +22,16 @@ public class RemoteCommonServiceHandler extends RemoteCommonServiceGrpc.RemoteCo
private final Logger logger = LoggerFactory.getLogger(RemoteCommonServiceHandler.class);
@Override public void call(Message request, StreamObserver<Empty> responseObserver) {
@Override public void call(RemoteMessage request, StreamObserver<Empty> responseObserver) {
String roleName = request.getWorkerRole();
int dataDefineId = request.getDataDefineId();
ByteString bytesData = request.getDataBytes();
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(ClusterModuleGroupDefine.GROUP_NAME);
DataDefine dataDefine = context.getDataDefine(dataDefineId);
RemoteData remoteData = request.getRemoteData();
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
Role role = context.getClusterWorkerContext().getRole(roleName);
Object object = role.dataDefine().deserialize(remoteData);
try {
Data data = dataDefine.parseFrom(bytesData);
context.getClusterWorkerContext().lookup(context.getClusterWorkerContext().getRole(roleName)).tell(data);
} catch (InvalidProtocolBufferException | WorkerNotFoundException | WorkerInvokeException e) {
context.getClusterWorkerContext().lookupInSide(roleName).tell(object);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
......
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
/**
* The <code>AbstractRemoteWorkerProvider</code> implementations represent providers,
* which create instance of cluster workers whose implemented {@link AbstractRemoteWorker}.
......@@ -10,13 +12,6 @@ package org.skywalking.apm.collector.stream.worker;
*/
public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorker> extends AbstractWorkerProvider<T> {
/**
* Create how many worker instance of {@link AbstractRemoteWorker} in one jvm.
*
* @return The worker instance number.
*/
public abstract int workerNum();
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
......@@ -24,12 +19,16 @@ public abstract class AbstractRemoteWorkerProvider<T extends AbstractRemoteWorke
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another
* worker instance, when the worker provider not find then Throw this Exception.
*/
@Override final public WorkerRef create() throws ProviderNotFoundException {
@Override final public WorkerRef create() {
T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart();
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), clusterWorker);
getClusterContext().put(workerRef);
return workerRef;
}
public final RemoteWorkerRef create(GRPCClient client) {
RemoteWorkerRef workerRef = new RemoteWorkerRef(role(), client);
getClusterContext().put(workerRef);
return workerRef;
}
}
package org.skywalking.apm.collector.stream.worker;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
/**
* @author pengys5
*/
public class ClusterWorkerContext extends WorkerContext {
private final Logger logger = LoggerFactory.getLogger(ClusterWorkerContext.class);
private Map<String, AbstractWorkerProvider> providers = new ConcurrentHashMap<>();
private List<AbstractRemoteWorkerProvider> providers = new ArrayList<>();
@Override
public AbstractWorkerProvider findProvider(Role role) throws ProviderNotFoundException {
logger.debug("find role of %s provider from ClusterWorkerContext", role.roleName());
if (providers.containsKey(role.roleName())) {
return providers.get(role.roleName());
} else {
throw new ProviderNotFoundException("role=" + role.roleName() + ", no available provider.");
}
public List<AbstractRemoteWorkerProvider> getProviders() {
return providers;
}
@Override
public void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException {
logger.debug("put role of %s provider into ClusterWorkerContext", provider.role().roleName());
if (providers.containsKey(provider.role().roleName())) {
throw new UsedRoleNameException("provider with role=" + provider.role().roleName() + " duplicate each other.");
} else {
providers.put(provider.role().roleName(), provider);
}
public void putProvider(AbstractRemoteWorkerProvider provider) {
providers.add(provider);
}
}
......@@ -5,10 +5,12 @@ package org.skywalking.apm.collector.stream.worker;
*/
public interface Context extends LookUp {
void putProvider(AbstractWorkerProvider provider) throws UsedRoleNameException;
void putProvider(AbstractRemoteWorkerProvider provider);
WorkerRefs lookup(Role role) throws WorkerNotFoundException;
RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException;
void put(WorkerRef workerRef);
void remove(WorkerRef workerRef);
......
......@@ -6,6 +6,4 @@ package org.skywalking.apm.collector.stream.worker;
public interface LookUp {
WorkerRefs lookup(Role role) throws WorkerNotFoundException;
Provider findProvider(Role role) throws ProviderNotFoundException;
}
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.client.grpc.GRPCClient;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteCommonServiceGrpc;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteMessage;
/**
* @author pengys5
*/
public class RemoteWorkerRef extends WorkerRef {
private AbstractRemoteWorker clusterWorker;
private final Boolean acrossJVM;
private final RemoteCommonServiceGrpc.RemoteCommonServiceBlockingStub stub;
private final AbstractRemoteWorker remoteWorker;
public RemoteWorkerRef(Role role, AbstractRemoteWorker remoteWorker) {
super(role);
this.remoteWorker = remoteWorker;
this.acrossJVM = false;
this.stub = null;
}
public RemoteWorkerRef(Role role, AbstractRemoteWorker clusterWorker) {
public RemoteWorkerRef(Role role, GRPCClient client) {
super(role);
this.clusterWorker = clusterWorker;
this.remoteWorker = null;
this.acrossJVM = true;
this.stub = RemoteCommonServiceGrpc.newBlockingStub(client.getChannel());
}
@Override
public void tell(Object message) throws WorkerInvokeException {
clusterWorker.allocateJob(message);
if (acrossJVM) {
RemoteData remoteData = getRole().dataDefine().serialize(message);
RemoteMessage.Builder builder = RemoteMessage.newBuilder();
builder.setWorkerRole(getRole().roleName());
builder.setRemoteData(remoteData);
stub.call(builder.build());
} else {
remoteWorker.allocateJob(message);
}
}
public Boolean isAcrossJVM() {
return acrossJVM;
}
}
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
......@@ -10,4 +11,6 @@ public interface Role {
String roleName();
WorkerSelector workerSelector();
DataDefine dataDefine();
}
......@@ -11,6 +11,7 @@ import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
*/
public abstract class WorkerContext implements Context {
private Map<String, RemoteWorkerRef> remoteWorkerRefs;
private Map<String, List<WorkerRef>> roleWorkers;
private Map<String, Role> roles;
private Map<Integer, DataDefine> dataDefineMap;
......@@ -18,6 +19,7 @@ public abstract class WorkerContext implements Context {
public WorkerContext() {
this.roleWorkers = new HashMap<>();
this.roles = new HashMap<>();
this.remoteWorkerRefs = new HashMap<>();
}
private Map<String, List<WorkerRef>> getRoleWorkers() {
......@@ -33,6 +35,14 @@ public abstract class WorkerContext implements Context {
}
}
@Override final public RemoteWorkerRef lookupInSide(String roleName) throws WorkerNotFoundException {
if (remoteWorkerRefs.containsKey(roleName)) {
return remoteWorkerRefs.get(roleName);
} else {
throw new WorkerNotFoundException("role=" + roleName + ", no available worker.");
}
}
public final void putRole(Role role) {
roles.put(role.roleName(), role);
}
......@@ -47,9 +57,16 @@ public abstract class WorkerContext implements Context {
@Override final public void put(WorkerRef workerRef) {
if (!getRoleWorkers().containsKey(workerRef.getRole().roleName())) {
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<WorkerRef>());
getRoleWorkers().putIfAbsent(workerRef.getRole().roleName(), new ArrayList<>());
}
getRoleWorkers().get(workerRef.getRole().roleName()).add(workerRef);
if (workerRef instanceof RemoteWorkerRef) {
RemoteWorkerRef remoteWorkerRef = (RemoteWorkerRef)workerRef;
if (!remoteWorkerRef.isAcrossJVM()) {
remoteWorkerRefs.put(workerRef.getRole().roleName(), remoteWorkerRef);
}
}
}
@Override final public void remove(WorkerRef workerRef) {
......
......@@ -14,23 +14,21 @@ public class WorkerRefs<T extends WorkerRef> {
private List<T> workerRefs;
private WorkerSelector workerSelector;
private Role role;
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
}
public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of %s", workerSelector.getClass());
workerSelector.select(workerRefs, message).tell(message);
protected WorkerRefs(List<T> workerRefs, WorkerSelector workerSelector, Role role) {
this.workerRefs = workerRefs;
this.workerSelector = workerSelector;
this.role = role;
}
public void ask(Object request, Object response) throws WorkerInvokeException {
WorkerRef workerRef = workerSelector.select(workerRefs, request);
if (workerRef instanceof LocalSyncWorkerRef) {
((LocalSyncWorkerRef)workerRef).ask(request, response);
} else {
throw new IllegalAccessError("only local sync worker can ask");
}
public void tell(Object message) throws WorkerInvokeException {
logger.debug("WorkerSelector instance of {}", workerSelector.getClass());
workerSelector.select(workerRefs, message).tell(message);
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author pengys5
*/
public class Data {
private int defineId;
private final int stringCapacity;
private final int longCapacity;
private final int floatCapacity;
private final int integerCapacity;
private String[] dataStrings;
private Long[] dataLongs;
private Float[] dataFloats;
private Integer[] dataIntegers;
public Data(int defineId, int stringCapacity, int longCapacity, int floatCapacity) {
public Data(int defineId, int stringCapacity, int longCapacity, int floatCapacity, int integerCapacity) {
this.defineId = defineId;
this.dataStrings = new String[stringCapacity];
this.dataLongs = new Long[longCapacity];
this.dataFloats = new Float[floatCapacity];
this.dataIntegers = new Integer[integerCapacity];
this.stringCapacity = stringCapacity;
this.longCapacity = longCapacity;
this.floatCapacity = floatCapacity;
this.integerCapacity = integerCapacity;
}
public void setDataString(int position, String value) {
......@@ -28,6 +40,10 @@ public class Data {
dataFloats[position] = value;
}
public void setDataInteger(int position, Integer value) {
dataIntegers[position] = value;
}
public String getDataString(int position) {
return dataStrings[position];
}
......@@ -40,6 +56,10 @@ public class Data {
return dataFloats[position];
}
public Integer getDataInteger(int position) {
return dataIntegers[position];
}
public String id() {
return dataStrings[0];
}
......@@ -47,4 +67,26 @@ public class Data {
public int getDefineId() {
return defineId;
}
public RemoteData serialize() {
RemoteData.Builder builder = RemoteData.newBuilder();
builder.setIntegerCapacity(integerCapacity);
builder.setFloatCapacity(floatCapacity);
builder.setStringCapacity(stringCapacity);
builder.setLongCapacity(longCapacity);
for (int i = 0; i < dataStrings.length; i++) {
builder.setDataStrings(i, dataStrings[i]);
}
for (int i = 0; i < dataIntegers.length; i++) {
builder.setDataIntegers(i, dataIntegers[i]);
}
for (int i = 0; i < dataFloats.length; i++) {
builder.setDataFloats(i, dataFloats[i]);
}
for (int i = 0; i < dataLongs.length; i++) {
builder.setDataLongs(i, dataLongs[i]);
}
return builder.build();
}
}
package org.skywalking.apm.collector.stream.worker.impl.data;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
/**
* @author pengys5
......@@ -11,14 +10,18 @@ public abstract class DataDefine {
private int stringCapacity;
private int longCapacity;
private int floatCapacity;
private int integerCapacity;
public DataDefine() {
stringCapacity = 0;
longCapacity = 0;
floatCapacity = 0;
integerCapacity = 0;
}
public final void initial() {
attributes = new Attribute[initialCapacity()];
attributeDefine();
for (Attribute attribute : attributes) {
if (AttributeType.STRING.equals(attribute.getType())) {
stringCapacity++;
......@@ -26,6 +29,8 @@ public abstract class DataDefine {
longCapacity++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
floatCapacity++;
} else if (AttributeType.INTEGER.equals(attribute.getType())) {
integerCapacity++;
}
}
}
......@@ -34,36 +39,21 @@ public abstract class DataDefine {
attributes[position] = attribute;
}
public final void define() {
attributes = new Attribute[initialCapacity()];
}
protected abstract int defineId();
public abstract int defineId();
protected abstract int initialCapacity();
protected abstract void attributeDefine();
public int getStringCapacity() {
return stringCapacity;
}
public int getLongCapacity() {
return longCapacity;
}
public int getFloatCapacity() {
return floatCapacity;
}
public Data build() {
return new Data(defineId(), getStringCapacity(), getLongCapacity(), getFloatCapacity());
public final Data build() {
return new Data(defineId(), stringCapacity, longCapacity, floatCapacity, integerCapacity);
}
public void mergeData(Data newData, Data oldData) {
int stringPosition = 0;
int longPosition = 0;
int floatPosition = 0;
int integerPosition = 0;
for (int i = 0; i < initialCapacity(); i++) {
Attribute attribute = attributes[i];
if (AttributeType.STRING.equals(attribute.getType())) {
......@@ -75,9 +65,14 @@ public abstract class DataDefine {
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataFloat(floatPosition), oldData.getDataFloat(floatPosition));
floatPosition++;
} else if (AttributeType.FLOAT.equals(attribute.getType())) {
attribute.getOperation().operate(newData.getDataInteger(integerPosition), oldData.getDataInteger(integerPosition));
integerPosition++;
}
}
}
public abstract Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException;
public abstract Object deserialize(RemoteData remoteData);
public abstract RemoteData serialize(Object object);
}
......@@ -22,6 +22,7 @@ public class DataDefineLoader implements Loader<Map<Integer, DataDefine>> {
DefinitionLoader<DataDefine> definitionLoader = DefinitionLoader.load(DataDefine.class, definitionFile);
for (DataDefine dataDefine : definitionLoader) {
logger.info("loaded data definition class: {}", dataDefine.getClass().getName());
dataDefine.initial();
dataDefineMap.put(dataDefine.defineId(), dataDefine);
}
return dataDefineMap;
......
......@@ -9,4 +9,6 @@ public interface Operation {
Long operate(Long newValue, Long oldValue);
Float operate(Float newValue, Float oldValue);
Integer operate(Integer newValue, Integer oldValue);
}
......@@ -17,4 +17,8 @@ public class CoverOperation implements Operation {
@Override public Float operate(Float newValue, Float oldValue) {
return newValue;
}
@Override public Integer operate(Integer newValue, Integer oldValue) {
return newValue;
}
}
......@@ -17,4 +17,8 @@ public class NonOperation implements Operation {
@Override public Float operate(Float newValue, Float oldValue) {
return oldValue;
}
@Override public Integer operate(Integer newValue, Integer oldValue) {
return oldValue;
}
}
......@@ -2,13 +2,18 @@ package org.skywalking.apm.collector.stream.worker.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ForeverFirstSelector implements WorkerSelector<WorkerRef> {
private final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);
@Override public WorkerRef select(List<WorkerRef> members, Object message) {
logger.debug("member size: {}", members.size());
return members.get(0);
}
}
......@@ -4,14 +4,24 @@ option java_multiple_files = true;
option java_package = "org.skywalking.apm.collector.remote.grpc.proto";
service RemoteCommonService {
rpc call (Message) returns (Empty) {
rpc call (RemoteMessage) returns (Empty) {
}
}
message Message {
message RemoteMessage {
string workerRole = 1;
int32 dataDefineId = 2;
bytes dataBytes = 3;
RemoteData remoteData = 2;
}
message RemoteData {
int32 stringCapacity = 1;
int32 longCapacity = 2;
int32 floatCapacity = 3;
int32 integerCapacity = 4;
repeated string dataStrings = 5;
repeated int64 dataLongs = 6;
repeated float dataFloats = 7;
repeated int32 dataIntegers = 8;
}
message Empty {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册