提交 e507bb9e 编写于 作者: P pengys5

1. Application register successfully.

2. Instance discovery table and data define finish.
上级 1bc7250e
package org.skywalking.apm.collector.agentregister.application;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.index.query.QueryBuilders;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
* @author pengys5
*/
public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationTable.COLUMN_APPLICATION_CODE, applicationCode));
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
return searchResponse.getHits().getAt(0).getField(ApplicationTable.COLUMN_APPLICATION_ID).getValue();
}
return 0;
}
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -14,9 +15,9 @@ import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationIDGetOrCreate {
public class ApplicationIDService {
private final Logger logger = LoggerFactory.getLogger(ApplicationIDGetOrCreate.class);
private final Logger logger = LoggerFactory.getLogger(ApplicationIDService.class);
public int getOrCreate(String applicationCode) {
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
......
package org.skywalking.apm.collector.agentregister.application;
/**
* @author pengys5
*/
public interface IApplicationDAO {
int getApplicationId(String applicationCode);
}
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.agentregister.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.application.ApplicationIDGetOrCreate;
import org.skywalking.apm.collector.agentregister.application.ApplicationIDService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
......@@ -18,14 +18,14 @@ public class ApplicationRegisterServiceHandler extends ApplicationRegisterServic
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServiceHandler.class);
private ApplicationIDGetOrCreate applicationIDGetOrCreate = new ApplicationIDGetOrCreate();
private ApplicationIDService applicationIDService = new ApplicationIDService();
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
logger.debug("register application");
ProtocolStringList applicationCodes = request.getApplicationCodeList();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDGetOrCreate.getOrCreate(applicationCode);
int applicationId = applicationIDService.getOrCreate(applicationCode);
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
ApplicationMapping mapping = ApplicationMapping.newBuilder().addApplication(i, value).build();
......
package org.skywalking.apm.collector.agentregister.grpc.handler;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.instance.InstanceIDService;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.ApplicationInstance;
import org.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
......@@ -8,23 +9,38 @@ import org.skywalking.apm.network.proto.ApplicationInstanceMapping;
import org.skywalking.apm.network.proto.ApplicationInstanceRecover;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceDiscoveryServiceHandler extends InstanceDiscoveryServiceGrpc.InstanceDiscoveryServiceImplBase implements GRPCHandler {
private final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServiceHandler.class);
private InstanceIDService instanceIDService = new InstanceIDService();
@Override
public void register(ApplicationInstance request, StreamObserver<ApplicationInstanceMapping> responseObserver) {
super.register(request, responseObserver);
int instanceId = instanceIDService.getOrCreate(request.getApplicationId(), request.getAgentUUID(), request.getRegisterTime());
ApplicationInstanceMapping.Builder builder = ApplicationInstanceMapping.newBuilder();
builder.setApplicationId(request.getApplicationId());
builder.setApplicationInstanceId(instanceId);
responseObserver.onNext(builder.build());
responseObserver.onCompleted();
}
@Override public void heartbeat(ApplicationInstanceHeartbeat request, StreamObserver<Downstream> responseObserver) {
super.heartbeat(request, responseObserver);
instanceIDService.heartBeat(request.getApplicationInstanceId(), request.getHeartbeatTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
@Override
public void registerRecover(ApplicationInstanceRecover request, StreamObserver<Downstream> responseObserver) {
super.registerRecover(request, responseObserver);
instanceIDService.recover(request.getApplicationInstanceId(), request.getApplicationId(), request.getRegisterTime());
responseObserver.onNext(Downstream.newBuilder().build());
responseObserver.onCompleted();
}
}
package org.skywalking.apm.collector.agentregister.instance;
/**
* @author pengys5
*/
public interface IInstanceDAO {
int getInstanceId(String agentUUID);
}
package org.skywalking.apm.collector.agentregister.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
import org.skywalking.apm.collector.stream.StreamModuleGroupDefine;
import org.skywalking.apm.collector.stream.worker.WorkerInvokeException;
import org.skywalking.apm.collector.stream.worker.WorkerNotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceIDService {
private final Logger logger = LoggerFactory.getLogger(InstanceIDService.class);
public int getOrCreate(int applicationId, String agentUUID, long registerTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(agentUUID);
if (instanceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(agentUUID, applicationId, agentUUID, registerTime, 0);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
logger.error(e.getMessage(), e);
}
}
return applicationId;
}
public void heartBeat(int instanceId, long heartbeatTime) {
}
public void recover(int instanceId, int applicationId, long registerTime) {
}
}
org.skywalking.apm.collector.agentregister.application.ApplicationEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentregister.application.ApplicationH2DAO
\ No newline at end of file
......@@ -19,7 +19,7 @@ public class ApplicationRegisterServiceHandlerTestCase {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 11800).usePlaintext(true).build();
stub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
Application application = Application.newBuilder().addApplicationCode("test").build();
Application application = Application.newBuilder().addApplicationCode("test141").build();
ApplicationMapping mapping = stub.register(application);
System.out.println(mapping.getApplication(0).getKey() + ", " + mapping.getApplication(0).getValue());
}
......
......@@ -45,9 +45,9 @@ public class ApplicationDataDefine extends DataDefine {
}
public static class Application {
private final String id;
private final String applicationCode;
private final int applicationId;
private String id;
private String applicationCode;
private int applicationId;
public Application(String id, String applicationCode, int applicationId) {
this.id = id;
......@@ -66,5 +66,17 @@ public class ApplicationDataDefine extends DataDefine {
public int getApplicationId() {
return applicationId;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationCode(String applicationCode) {
this.applicationCode = applicationCode;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
}
}
......@@ -29,6 +29,7 @@ public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
@Override protected void onWork(Object message) throws WorkerException {
ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message;
logger.debug("application code: {}", application.getApplicationCode());
getClusterContext().lookup(ApplicationRegisterSerialWorker.WorkerRole.INSTANCE).tell(application);
}
public static class Factory extends AbstractRemoteWorkerProvider<ApplicationRegisterRemoteWorker> {
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -7,14 +9,18 @@ import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(ApplicationRegisterSerialWorker.class);
public ApplicationRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
......@@ -24,7 +30,34 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof ApplicationDataDefine.Application) {
ApplicationDataDefine.Application application = (ApplicationDataDefine.Application)message;
logger.debug("register application, application code: {}", application.getApplicationCode());
IApplicationDAO dao = (IApplicationDAO)DAOContainer.INSTANCE.get(IApplicationDAO.class.getName());
int min = dao.getMinApplicationId();
if (min == 0) {
application.setApplicationId(1);
application.setId("1");
} else {
int max = dao.getMaxApplicationId();
int instanceId;
if (min == max) {
instanceId = -1;
} else if (min + max == 0) {
instanceId = max + 1;
} else if (min + max > 0) {
instanceId = min - 1;
} else if (max < 0) {
instanceId = 1;
} else {
instanceId = max + 1;
}
application.setApplicationId(instanceId);
application.setId(String.valueOf(instanceId));
}
dao.save(application);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ApplicationRegisterSerialWorker> {
......@@ -53,12 +86,11 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
private final Logger logger = LoggerFactory.getLogger(ApplicationEsDAO.class);
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
searchRequestBuilder.setQuery(QueryBuilders.termQuery(ApplicationTable.COLUMN_APPLICATION_CODE, applicationCode));
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int applicationId = (int)searchHit.getSource().get(ApplicationTable.COLUMN_APPLICATION_ID);
return applicationId;
}
return 0;
}
@Override public int getMaxApplicationId() {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field(ApplicationTable.COLUMN_APPLICATION_ID);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Max agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
@Override public int getMinApplicationId() {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field(ApplicationTable.COLUMN_APPLICATION_ID);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Min agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
@Override public void save(ApplicationDataDefine.Application application) {
logger.debug("save application register info, application id: {}, application code: {}", application.getApplicationId(), application.getApplicationCode());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap();
source.put(ApplicationTable.COLUMN_APPLICATION_CODE, application.getApplicationCode());
source.put(ApplicationTable.COLUMN_APPLICATION_ID, application.getApplicationId());
IndexResponse response = client.prepareIndex(ApplicationTable.TABLE, application.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save application register info, application id: {}, application code: {}, status: {}", application.getApplicationId(), application.getApplicationCode(), response.status().name());
}
}
package org.skywalking.apm.collector.agentregister.application;
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
......@@ -12,4 +13,16 @@ public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
H2Client client = getClient();
return 100;
}
@Override public int getMaxApplicationId() {
return 0;
}
@Override public int getMinApplicationId() {
return 0;
}
@Override public void save(ApplicationDataDefine.Application application) {
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application.dao;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
/**
* @author pengys5
*/
public interface IApplicationDAO {
int getApplicationId(String applicationCode);
int getMaxApplicationId();
int getMinApplicationId();
void save(ApplicationDataDefine.Application application);
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.remote.grpc.proto.RemoteData;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class InstanceDataDefine extends DataDefine {
public static final int DEFINE_ID = 102;
@Override public int defineId() {
return DEFINE_ID;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(InstanceTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(2, new Attribute(InstanceTable.COLUMN_AGENTUUID, AttributeType.STRING, new CoverOperation()));
addAttribute(3, new Attribute(InstanceTable.COLUMN_REGISTER_TIME, AttributeType.LONG, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_INSTANCE_ID, AttributeType.INTEGER, new CoverOperation()));
addAttribute(4, new Attribute(InstanceTable.COLUMN_HEARTBEAT_TIME, AttributeType.LONG, new CoverOperation()));
}
@Override public Object deserialize(RemoteData remoteData) {
String id = remoteData.getDataStrings(0);
int applicationId = remoteData.getDataIntegers(0);
String agentUUID = remoteData.getDataStrings(1);
int instanceId = remoteData.getDataIntegers(1);
long registerTime = remoteData.getDataLongs(0);
return new Instance(id, applicationId, agentUUID, registerTime, instanceId);
}
@Override public RemoteData serialize(Object object) {
Instance instance = (Instance)object;
RemoteData.Builder builder = RemoteData.newBuilder();
builder.addDataStrings(instance.getId());
builder.addDataIntegers(instance.getApplicationId());
builder.addDataStrings(instance.getAgentUUID());
builder.addDataLongs(instance.getRegisterTime());
return builder.build();
}
public static class Instance {
private final String id;
private final int applicationId;
private final String agentUUID;
private final long registerTime;
private final int instanceId;
public Instance(String id, int applicationId, String agentUUID, long registerTime, int instanceId) {
this.id = id;
this.applicationId = applicationId;
this.agentUUID = agentUUID;
this.registerTime = registerTime;
this.instanceId = instanceId;
}
public String getId() {
return id;
}
public int getApplicationId() {
return applicationId;
}
public String getAgentUUID() {
return agentUUID;
}
public long getRegisterTime() {
return registerTime;
}
public int getInstanceId() {
return instanceId;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchColumnDefine;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchTableDefine;
/**
* @author pengys5
*/
public class InstanceEsTableDefine extends ElasticSearchTableDefine {
public InstanceEsTableDefine() {
super(InstanceTable.TABLE);
}
@Override public int refreshInterval() {
return 0;
}
@Override public int numberOfShards() {
return 2;
}
@Override public int numberOfReplicas() {
return 0;
}
@Override public void initialize() {
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_AGENTUUID, ElasticSearchColumnDefine.Type.Keyword.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, ElasticSearchColumnDefine.Type.Long.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, ElasticSearchColumnDefine.Type.Integer.name()));
addColumn(new ElasticSearchColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, ElasticSearchColumnDefine.Type.Long.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.storage.h2.define.H2ColumnDefine;
import org.skywalking.apm.collector.storage.h2.define.H2TableDefine;
/**
* @author pengys5
*/
public class InstanceH2TableDefine extends H2TableDefine {
public InstanceH2TableDefine() {
super(InstanceTable.TABLE);
}
@Override public void initialize() {
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_APPLICATION_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_AGENTUUID, H2ColumnDefine.Type.Varchar.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_REGISTER_TIME, H2ColumnDefine.Type.Bigint.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_INSTANCE_ID, H2ColumnDefine.Type.Int.name()));
addColumn(new H2ColumnDefine(InstanceTable.COLUMN_HEARTBEAT_TIME, H2ColumnDefine.Type.Bigint.name()));
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker {
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterRemoteWorker.class);
protected InstanceRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message;
logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
}
public static class Factory extends AbstractRemoteWorkerProvider<InstanceRegisterRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public InstanceRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new InstanceRegisterRemoteWorker(role(), clusterContext);
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return InstanceRegisterRemoteWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
/**
* @author pengys5
*/
public class InstanceTable {
public static final String TABLE = "instance";
public static final String COLUMN_APPLICATION_ID = "application_id";
public static final String COLUMN_AGENTUUID = "agent_uuid";
public static final String COLUMN_REGISTER_TIME = "register_time";
public static final String COLUMN_INSTANCE_ID = "instance_id";
public static final String COLUMN_HEARTBEAT_TIME = "heartbeatTime";
}
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterRemoteWorker$Factory
\ No newline at end of file
......@@ -4,4 +4,7 @@ org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingEsTableDe
org.skywalking.apm.collector.agentstream.worker.node.define.NodeMappingH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationH2TableDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceEsTableDefine
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceH2TableDefine
\ No newline at end of file
......@@ -106,7 +106,7 @@ public class ElasticSearchClient implements Client {
return client.prepareSearch(indexName);
}
public IndexRequestBuilder prepareIndex(String indexName) {
return null;
public IndexRequestBuilder prepareIndex(String indexName, String id) {
return client.prepareIndex(indexName, "type", id);
}
}
......@@ -8,11 +8,11 @@ import org.skywalking.apm.collector.core.client.Client;
public abstract class DAO<C extends Client> {
private C client;
public C getClient() {
public final C getClient() {
return client;
}
public void setClient(C client) {
public final void setClient(C client) {
this.client = client;
}
}
......@@ -41,12 +41,4 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker imple
final public void allocateJob(Object message) throws WorkerException {
onWork(message);
}
/**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws WorkerException Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
}
......@@ -22,6 +22,7 @@ public abstract class AbstractLocalAsyncWorkerProvider<T extends AbstractLocalAs
QueueEventHandler queueEventHandler = queueCreator.create(queueSize(), localAsyncWorker);
LocalAsyncWorkerRef workerRef = new LocalAsyncWorkerRef(role(), queueEventHandler);
getClusterContext().put(workerRef);
return workerRef;
}
}
......@@ -35,12 +35,4 @@ public abstract class AbstractRemoteWorker extends AbstractWorker {
throw new WorkerInvokeException(e.getMessage(), e.getCause());
}
}
/**
* This method use for message receiver to analyse message.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
}
package org.skywalking.apm.collector.stream.worker;
import org.skywalking.apm.collector.core.framework.Executor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public abstract class AbstractWorker implements Executor {
private final Logger logger = LoggerFactory.getLogger(AbstractWorker.class);
private final Role role;
private final ClusterWorkerContext clusterContext;
......@@ -17,9 +21,21 @@ public abstract class AbstractWorker implements Executor {
}
@Override public final void execute(Object message) {
try {
onWork(message);
} catch (WorkerException e) {
logger.error(e.getMessage(), e);
}
}
/**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws WorkerException Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws WorkerException;
public abstract void preStart() throws ProviderNotFoundException;
final public ClusterWorkerContext getClusterContext() {
......
......@@ -22,7 +22,6 @@ public class LocalAsyncWorkerProviderDefineLoader implements Loader<List<Abstrac
DefinitionLoader<AbstractLocalAsyncWorkerProvider> definitionLoader = DefinitionLoader.load(AbstractLocalAsyncWorkerProvider.class, definitionFile);
int id = 1;
for (AbstractLocalAsyncWorkerProvider provider : definitionLoader) {
logger.info("loaded local async worker provider definition class: {}", provider.getClass().getName());
providers.add(provider);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册