提交 54fe8e43 编写于 作者: P pengys5

Instance register, heartBeat, recover finish

上级 e507bb9e
package org.skywalking.apm.collector.agentregister.instance;
/**
* @author pengys5
*/
public interface IInstanceDAO {
int getInstanceId(String agentUUID);
}
......@@ -2,6 +2,7 @@ package org.skywalking.apm.collector.agentregister.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.StreamModuleContext;
......@@ -20,11 +21,11 @@ public class InstanceIDService {
public int getOrCreate(int applicationId, String agentUUID, long registerTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int instanceId = dao.getInstanceId(agentUUID);
int instanceId = dao.getInstanceId(applicationId, agentUUID);
if (instanceId == 0) {
StreamModuleContext context = (StreamModuleContext)CollectorContextHelper.INSTANCE.getContext(StreamModuleGroupDefine.GROUP_NAME);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(agentUUID, applicationId, agentUUID, registerTime, 0);
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance("0", applicationId, agentUUID, registerTime, 0);
try {
context.getClusterWorkerContext().lookup(ApplicationRegisterRemoteWorker.WorkerRole.INSTANCE).tell(instance);
} catch (WorkerNotFoundException | WorkerInvokeException e) {
......@@ -35,10 +36,14 @@ public class InstanceIDService {
}
public void heartBeat(int instanceId, long heartbeatTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
dao.updateHeartbeatTime(instanceId, heartbeatTime);
}
public void recover(int instanceId, int applicationId, long registerTime) {
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
InstanceDataDefine.Instance instance = new InstanceDataDefine.Instance(String.valueOf(instanceId), applicationId, "", registerTime, instanceId);
dao.save(instance);
}
}
package org.skywalking.apm.collector.agentstream.worker.register;
/**
* @author pengys5
*/
public enum IdAutoIncrement {
INSTANCE;
public int increment(int min, int max) {
int instanceId;
if (min == max) {
instanceId = -1;
} else if (min + max == 0) {
instanceId = max + 1;
} else if (min + max > 0) {
instanceId = min - 1;
} else if (max < 0) {
instanceId = 1;
} else {
instanceId = max + 1;
}
return instanceId;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.dao.IApplicationDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
......@@ -41,20 +42,9 @@ public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
application.setId("1");
} else {
int max = dao.getMaxApplicationId();
int instanceId;
if (min == max) {
instanceId = -1;
} else if (min + max == 0) {
instanceId = max + 1;
} else if (min + max > 0) {
instanceId = min - 1;
} else if (max < 0) {
instanceId = 1;
} else {
instanceId = max + 1;
}
application.setApplicationId(instanceId);
application.setId(String.valueOf(instanceId));
int applicationId = IdAutoIncrement.INSTANCE.increment(min, max);
application.setApplicationId(applicationId);
application.setId(String.valueOf(applicationId));
}
dao.save(application);
}
......
......@@ -9,11 +9,6 @@ import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
......@@ -47,41 +42,11 @@ public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
}
@Override public int getMaxApplicationId() {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field(ApplicationTable.COLUMN_APPLICATION_ID);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Max agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
return getMaxId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public int getMinApplicationId() {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(ApplicationTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field(ApplicationTable.COLUMN_APPLICATION_ID);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Min agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
return getMinId(ApplicationTable.TABLE, ApplicationTable.COLUMN_APPLICATION_ID);
}
@Override public void save(ApplicationDataDefine.Application application) {
......
......@@ -51,11 +51,11 @@ public class InstanceDataDefine extends DataDefine {
}
public static class Instance {
private final String id;
private final int applicationId;
private final String agentUUID;
private final long registerTime;
private final int instanceId;
private String id;
private int applicationId;
private String agentUUID;
private long registerTime;
private int instanceId;
public Instance(String id, int applicationId, String agentUUID, long registerTime, int instanceId) {
this.id = id;
......@@ -84,5 +84,25 @@ public class InstanceDataDefine extends DataDefine {
public int getInstanceId() {
return instanceId;
}
public void setId(String id) {
this.id = id;
}
public void setApplicationId(int applicationId) {
this.applicationId = applicationId;
}
public void setAgentUUID(String agentUUID) {
this.agentUUID = agentUUID;
}
public void setRegisterTime(long registerTime) {
this.registerTime = registerTime;
}
public void setInstanceId(int instanceId) {
this.instanceId = instanceId;
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
......@@ -30,6 +29,7 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker {
@Override protected void onWork(Object message) throws WorkerException {
InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message;
logger.debug("application id: {}, agentUUID: {}, register time: {}", instance.getApplicationId(), instance.getAgentUUID(), instance.getRegisterTime());
getClusterContext().lookup(InstanceRegisterSerialWorker.WorkerRole.INSTANCE).tell(instance);
}
public static class Factory extends AbstractRemoteWorkerProvider<InstanceRegisterRemoteWorker> {
......@@ -58,7 +58,7 @@ public class InstanceRegisterRemoteWorker extends AbstractRemoteWorker {
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
return new InstanceDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance;
import org.skywalking.apm.collector.agentstream.worker.register.IdAutoIncrement;
import org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.dao.IInstanceDAO;
import org.skywalking.apm.collector.storage.dao.DAOContainer;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceRegisterSerialWorker extends AbstractLocalAsyncWorker {
private final Logger logger = LoggerFactory.getLogger(InstanceRegisterSerialWorker.class);
public InstanceRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onWork(Object message) throws WorkerException {
if (message instanceof InstanceDataDefine.Instance) {
InstanceDataDefine.Instance instance = (InstanceDataDefine.Instance)message;
logger.debug("register instance, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
IInstanceDAO dao = (IInstanceDAO)DAOContainer.INSTANCE.get(IInstanceDAO.class.getName());
int min = dao.getMinInstanceId();
if (min == 0) {
instance.setId("1");
instance.setInstanceId(1);
} else {
int max = dao.getMaxInstanceId();
int instanceId = IdAutoIncrement.INSTANCE.increment(min, max);
instance.setId(String.valueOf(instanceId));
instance.setInstanceId(instanceId);
}
dao.save(instance);
}
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<InstanceRegisterSerialWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public InstanceRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) {
return new InstanceRegisterSerialWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 256;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return InstanceRegisterSerialWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
@Override public DataDefine dataDefine() {
return new ApplicationDataDefine();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
/**
* @author pengys5
*/
public interface IInstanceDAO {
int getInstanceId(int applicationId, String agentUUID);
int getMaxInstanceId();
int getMinInstanceId();
void save(InstanceDataDefine.Instance instance);
void updateHeartbeatTime(int instanceId, long heartbeatTime);
}
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import java.util.HashMap;
import java.util.Map;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceTable;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class InstanceEsDAO extends EsDAO implements IInstanceDAO {
private final Logger logger = LoggerFactory.getLogger(InstanceEsDAO.class);
@Override public int getInstanceId(int applicationId, String agentUUID) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(InstanceTable.TABLE);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSearchType(SearchType.QUERY_THEN_FETCH);
BoolQueryBuilder builder = QueryBuilders.boolQuery();
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_APPLICATION_ID, applicationId));
builder.must().add(QueryBuilders.termQuery(InstanceTable.COLUMN_AGENTUUID, agentUUID));
searchRequestBuilder.setQuery(builder);
searchRequestBuilder.setSize(1);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
if (searchResponse.getHits().totalHits > 0) {
SearchHit searchHit = searchResponse.getHits().iterator().next();
int instanceId = (int)searchHit.getSource().get(InstanceTable.COLUMN_INSTANCE_ID);
return instanceId;
}
return 0;
}
@Override public int getMaxInstanceId() {
return getMaxId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public int getMinInstanceId() {
return getMinId(InstanceTable.TABLE, InstanceTable.COLUMN_INSTANCE_ID);
}
@Override public void save(InstanceDataDefine.Instance instance) {
logger.debug("save instance register info, application id: {}, agentUUID: {}", instance.getApplicationId(), instance.getAgentUUID());
ElasticSearchClient client = getClient();
Map<String, Object> source = new HashMap();
source.put(InstanceTable.COLUMN_INSTANCE_ID, instance.getInstanceId());
source.put(InstanceTable.COLUMN_APPLICATION_ID, instance.getApplicationId());
source.put(InstanceTable.COLUMN_AGENTUUID, instance.getAgentUUID());
source.put(InstanceTable.COLUMN_REGISTER_TIME, instance.getRegisterTime());
IndexResponse response = client.prepareIndex(InstanceTable.TABLE, instance.getId()).setSource(source).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
logger.debug("save instance register info, application id: {}, agentUUID: {}, status: {}", instance.getApplicationId(), instance.getAgentUUID(), response.status().name());
}
@Override public void updateHeartbeatTime(int instanceId, long heartbeatTime) {
ElasticSearchClient client = getClient();
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(InstanceTable.TABLE);
updateRequest.type("type");
updateRequest.id(String.valueOf(instanceId));
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
Map<String, Object> source = new HashMap();
source.put(InstanceTable.COLUMN_HEARTBEAT_TIME, heartbeatTime);
updateRequest.doc(source);
client.update(updateRequest);
}
}
package org.skywalking.apm.collector.agentstream.worker.register.instance.dao;
import org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceDataDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class InstanceH2DAO extends H2DAO implements IInstanceDAO {
@Override public int getInstanceId(int applicationId, String agentUUID) {
return 0;
}
@Override public int getMaxInstanceId() {
return 0;
}
@Override public int getMinInstanceId() {
return 0;
}
@Override public void save(InstanceDataDefine.Instance instance) {
}
}
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationEsDAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceEsDAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.dao.ApplicationH2DAO
org.skywalking.apm.collector.agentstream.worker.register.instance.dao.InstanceH2DAO
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.instance.InstanceRegisterSerialWorker$Factory
\ No newline at end of file
......@@ -4,11 +4,13 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
......@@ -109,4 +111,12 @@ public class ElasticSearchClient implements Client {
public IndexRequestBuilder prepareIndex(String indexName, String id) {
return client.prepareIndex(indexName, "type", id);
}
public void update(UpdateRequest updateRequest) {
try {
client.update(updateRequest).get();
} catch (InterruptedException | ExecutionException e) {
logger.error(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.storage.elasticsearch.dao;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.Min;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.DAO;
......@@ -7,4 +14,42 @@ import org.skywalking.apm.collector.storage.dao.DAO;
* @author pengys5
*/
public abstract class EsDAO extends DAO<ElasticSearchClient> {
public final int getMaxId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field(columnName);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Max agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
public final int getMinId(String indexName, String columnName) {
ElasticSearchClient client = getClient();
SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indexName);
searchRequestBuilder.setTypes("type");
searchRequestBuilder.setSize(0);
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field(columnName);
searchRequestBuilder.addAggregation(aggregation);
SearchResponse searchResponse = searchRequestBuilder.execute().actionGet();
Min agg = searchResponse.getAggregations().get("agg");
int id = (int)agg.getValue();
if (id == Integer.MAX_VALUE || id == Integer.MIN_VALUE) {
return 0;
} else {
return id;
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册