提交 5bd5a69b 编写于 作者: P pengys5

Dao loader and client inject

上级 dedc158f
......@@ -28,5 +28,10 @@
<artifactId>apm-network</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-agentstream</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
/**
* @author pengys5
*/
public class ApplicationEsDAO extends EsDAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
ElasticSearchClient client = getClient();
return 0;
}
}
package org.skywalking.apm.collector.agentregister.application;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
/**
* @author pengys5
*/
public class ApplicationH2DAO extends H2DAO implements IApplicationDAO {
@Override public int getApplicationId(String applicationCode) {
H2Client client = getClient();
return 0;
}
}
package org.skywalking.apm.collector.agentregister.application;
/**
* @author pengys5
*/
public class ApplicationIDGetOrCreate {
private IApplicationDAO applicationDAO;
public int getOrCreate(String applicationCode) {
return 0;
}
}
package org.skywalking.apm.collector.agentregister.application;
/**
* @author pengys5
*/
public interface IApplicationDAO {
int getApplicationId(String applicationCode);
}
package org.skywalking.apm.collector.agentregister.grpc.handler;
import com.google.protobuf.ProtocolStringList;
import io.grpc.stub.StreamObserver;
import org.skywalking.apm.collector.agentregister.application.ApplicationIDGetOrCreate;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Application;
import org.skywalking.apm.network.proto.ApplicationMapping;
import org.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
import org.skywalking.apm.network.proto.KeyWithIntegerValue;
/**
* @author pengys5
*/
public class ApplicationRegisterServiceHandler extends ApplicationRegisterServiceGrpc.ApplicationRegisterServiceImplBase implements GRPCHandler {
private ApplicationIDGetOrCreate applicationIDGetOrCreate = new ApplicationIDGetOrCreate();
@Override public void register(Application request, StreamObserver<ApplicationMapping> responseObserver) {
ProtocolStringList applicationCodes = request.getApplicationCodeList();
for (int i = 0; i < applicationCodes.size(); i++) {
String applicationCode = applicationCodes.get(i);
int applicationId = applicationIDGetOrCreate.getOrCreate(applicationCode);
KeyWithIntegerValue value = KeyWithIntegerValue.newBuilder().setKey(applicationCode).setValue(applicationId).build();
ApplicationMapping mapping = ApplicationMapping.newBuilder().setApplication(i, value).build();
responseObserver.onNext(mapping);
}
responseObserver.onCompleted();
}
}
package org.skywalking.apm.collector.agentstream.grpc.handler;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import org.skywalking.apm.collector.server.grpc.GRPCHandler;
import org.skywalking.apm.network.proto.Downstream;
import org.skywalking.apm.network.proto.TraceSegmentObject;
import org.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.skywalking.apm.network.proto.UniqueId;
import org.skywalking.apm.network.proto.UpstreamSegment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -18,6 +22,12 @@ public class TraceSegmentServiceHandler extends TraceSegmentServiceGrpc.TraceSeg
@Override public StreamObserver<UpstreamSegment> collect(StreamObserver<Downstream> responseObserver) {
return new StreamObserver<UpstreamSegment>() {
@Override public void onNext(UpstreamSegment segment) {
try {
List<UniqueId> traceIds = segment.getGlobalTraceIdsList();
TraceSegmentObject segmentObject = TraceSegmentObject.parseFrom(segment.getSegment());
} catch (InvalidProtocolBufferException e) {
logger.error(e.getMessage(), e);
}
}
@Override public void onError(Throwable throwable) {
......
package org.skywalking.apm.collector.agentstream.worker.register.application;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.skywalking.apm.collector.agentstream.worker.register.application.proto.ApplicationRegisterOuterClass;
import org.skywalking.apm.collector.stream.worker.impl.data.Attribute;
import org.skywalking.apm.collector.stream.worker.impl.data.AttributeType;
import org.skywalking.apm.collector.stream.worker.impl.data.Data;
import org.skywalking.apm.collector.stream.worker.impl.data.DataDefine;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.CoverOperation;
import org.skywalking.apm.collector.stream.worker.impl.data.operate.NonOperation;
/**
* @author pengys5
*/
public class ApplicationRegisterDataDefine extends DataDefine {
@Override protected int defineId() {
return 101;
}
@Override protected int initialCapacity() {
return 3;
}
@Override protected void attributeDefine() {
addAttribute(0, new Attribute("id", AttributeType.STRING, new NonOperation()));
addAttribute(1, new Attribute(ApplicationRegisterTable.COLUMN_APPLICATION_CODE, AttributeType.STRING, new CoverOperation()));
addAttribute(2, new Attribute(ApplicationRegisterTable.COLUMN_APPLICATION_ID, AttributeType.INTEGER, new CoverOperation()));
}
@Override public Data parseFrom(ByteString bytesData) throws InvalidProtocolBufferException {
ApplicationRegisterOuterClass.ApplicationRegister applicationRegister = ApplicationRegisterOuterClass.ApplicationRegister.parseFrom(bytesData);
Data data = build();
data.setDataString(1, applicationRegister.getApplicationCode());
return data;
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorker;
import org.skywalking.apm.collector.stream.worker.AbstractRemoteWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.selector.ForeverFirstSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ApplicationRegisterRemoteWorker extends AbstractRemoteWorker {
protected ApplicationRegisterRemoteWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
}
@Override protected void onWork(Object message) throws WorkerException {
}
public static class Factory extends AbstractRemoteWorkerProvider<ApplicationRegisterRemoteWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ApplicationRegisterRemoteWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRegisterRemoteWorker(role(), clusterContext);
}
@Override public int workerNum() {
return 1;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return NodeComponentAggWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new ForeverFirstSelector();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorker;
import org.skywalking.apm.collector.stream.worker.AbstractLocalAsyncWorkerProvider;
import org.skywalking.apm.collector.stream.worker.ClusterWorkerContext;
import org.skywalking.apm.collector.stream.worker.ProviderNotFoundException;
import org.skywalking.apm.collector.stream.worker.Role;
import org.skywalking.apm.collector.stream.worker.WorkerException;
import org.skywalking.apm.collector.stream.worker.selector.HashCodeSelector;
import org.skywalking.apm.collector.stream.worker.selector.WorkerSelector;
/**
* @author pengys5
*/
public class ApplicationRegisterSerialWorker extends AbstractLocalAsyncWorker {
public ApplicationRegisterSerialWorker(Role role, ClusterWorkerContext clusterContext) {
super(role, clusterContext);
}
@Override public void preStart() throws ProviderNotFoundException {
super.preStart();
}
@Override protected void onWork(Object message) throws WorkerException {
}
public static class Factory extends AbstractLocalAsyncWorkerProvider<ApplicationRegisterSerialWorker> {
@Override
public Role role() {
return WorkerRole.INSTANCE;
}
@Override
public ApplicationRegisterSerialWorker workerInstance(ClusterWorkerContext clusterContext) {
return new ApplicationRegisterSerialWorker(role(), clusterContext);
}
@Override public int queueSize() {
return 256;
}
}
public enum WorkerRole implements Role {
INSTANCE;
@Override
public String roleName() {
return ApplicationRegisterSerialWorker.class.getSimpleName();
}
@Override
public WorkerSelector workerSelector() {
return new HashCodeSelector();
}
}
}
package org.skywalking.apm.collector.agentstream.worker.register.application;
/**
* @author pengys5
*/
public class ApplicationRegisterTable {
public static final String TABLE = "application_register";
public static final String COLUMN_APPLICATION_CODE = "application_code";
public static final String COLUMN_APPLICATION_ID = "application_id";
}
package org.skywalking.apm.collector.agentstream.worker.segment;
/**
* @author pengys5
*/
public class SegmentParse {
}
syntax = "proto3";
option java_multiple_files = false;
option java_package = "org.skywalking.apm.collector.agentstream.worker.register.application.proto";
message ApplicationRegister {
string application_code = 1;
}
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentDataDefine
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterDataDefine
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.node.component.NodeComponentAggWorker$Factory
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterSerialWorker$Factory
\ No newline at end of file
org.skywalking.apm.collector.agentstream.worker.register.application.ApplicationRegisterRemoteWorker$Factory
\ No newline at end of file
package org.skywalking.apm.collector.core.framework;
import org.skywalking.apm.collector.core.config.ConfigException;
/**
* @author pengys5
*/
public interface Loader<T> {
T load() throws ConfigException;
T load() throws DefineException;
}
......@@ -4,6 +4,7 @@ import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigLoader;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.util.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -16,7 +17,7 @@ public class ModuleConfigLoader implements ConfigLoader<Map<String, Map>> {
private final Logger logger = LoggerFactory.getLogger(ModuleConfigLoader.class);
@Override public Map<String, Map> load() throws ModuleConfigLoaderException {
@Override public Map<String, Map> load() throws DefineException {
Yaml yaml = new Yaml();
try {
try {
......@@ -27,7 +28,7 @@ public class ModuleConfigLoader implements ConfigLoader<Map<String, Map>> {
return (Map<String, Map>)yaml.load(ResourceUtils.read("application-default.yml"));
}
} catch (FileNotFoundException e) {
throw new ModuleConfigLoaderException(e.getMessage(), e);
throw new ModuleDefineException(e.getMessage(), e);
}
}
}
package org.skywalking.apm.collector.core.module;
import org.skywalking.apm.collector.core.config.ConfigLoaderException;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
*/
public class ModuleConfigLoaderException extends ConfigLoaderException {
public ModuleConfigLoaderException(String message) {
public class ModuleDefineException extends DefineException {
public ModuleDefineException(String message) {
super(message);
}
public ModuleConfigLoaderException(String message, Throwable cause) {
public ModuleDefineException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.module;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -15,7 +15,7 @@ public class ModuleDefineLoader implements Loader<Map<String, Map<String, Module
private final Logger logger = LoggerFactory.getLogger(ModuleDefineLoader.class);
@Override public Map<String, Map<String, ModuleDefine>> load() throws ConfigException {
@Override public Map<String, Map<String, ModuleDefine>> load() throws DefineException {
Map<String, Map<String, ModuleDefine>> moduleDefineMap = new LinkedHashMap<>();
ModuleDefinitionFile definitionFile = new ModuleDefinitionFile();
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.module;
import java.util.LinkedHashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -15,7 +15,7 @@ public class ModuleGroupDefineLoader implements Loader<Map<String, ModuleGroupDe
private final Logger logger = LoggerFactory.getLogger(ModuleGroupDefineLoader.class);
@Override public Map<String, ModuleGroupDefine> load() throws ConfigException {
@Override public Map<String, ModuleGroupDefine> load() throws DefineException {
Map<String, ModuleGroupDefine> moduleGroupDefineMap = new LinkedHashMap<>();
ModuleGroupDefineFile definitionFile = new ModuleGroupDefineFile();
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.storage;
import java.util.LinkedList;
import java.util.List;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -15,7 +15,7 @@ public class StorageDefineLoader implements Loader<List<TableDefine>> {
private final Logger logger = LoggerFactory.getLogger(StorageDefineLoader.class);
@Override public List<TableDefine> load() throws ConfigException {
@Override public List<TableDefine> load() throws DefineException {
List<TableDefine> tableDefines = new LinkedList<>();
StorageDefinitionFile definitionFile = new StorageDefinitionFile();
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.core.storage;
import java.util.List;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
/**
* @author pengys5
......@@ -22,7 +22,7 @@ public abstract class StorageInstaller {
createTable(client, tableDefine);
}
}
} catch (ConfigException e) {
} catch (DefineException e) {
throw new StorageInstallException(e.getMessage(), e);
}
}
......
package org.skywalking.apm.collector.core.config;
import java.io.FileNotFoundException;
import org.junit.Test;
import org.skywalking.apm.collector.core.module.ModuleConfigLoader;
import org.skywalking.apm.collector.core.module.ModuleConfigLoaderException;
import org.skywalking.apm.collector.core.module.ModuleDefineException;
/**
* @author pengys5
......@@ -11,7 +10,7 @@ import org.skywalking.apm.collector.core.module.ModuleConfigLoaderException;
public class ModuleConfigLoaderTestCase {
@Test
public void testLoad() throws ModuleConfigLoaderException {
public void testLoad() throws ModuleDefineException {
ModuleConfigLoader loader = new ModuleConfigLoader();
loader.load();
}
......
......@@ -33,6 +33,7 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
Client client = createClient(null);
client.initialize();
context.setClient(client);
injectClientIntoDAO(client);
storageInstaller().install(client);
} catch (ConfigParseException | StorageException e) {
......@@ -57,4 +58,6 @@ public abstract class StorageModuleDefine extends ModuleDefine implements Cluste
}
public abstract StorageInstaller storageInstaller();
public abstract void injectClientIntoDAO(Client client) throws DefineException;
}
package org.skywalking.apm.collector.storage.dao;
import org.skywalking.apm.collector.core.client.Client;
/**
* @author pengys5
*/
public abstract class DAO<C extends Client> {
private C client;
public C getClient() {
return client;
}
public void setClient(C client) {
this.client = client;
}
}
package org.skywalking.apm.collector.storage.elasticsearch;
import java.util.List;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.storage.StorageModuleDefine;
import org.skywalking.apm.collector.storage.StorageModuleGroupDefine;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAO;
import org.skywalking.apm.collector.storage.elasticsearch.dao.EsDAODefineLoader;
import org.skywalking.apm.collector.storage.elasticsearch.define.ElasticSearchStorageInstaller;
/**
......@@ -35,4 +39,12 @@ public class StorageElasticSearchModuleDefine extends StorageModuleDefine {
@Override public StorageInstaller storageInstaller() {
return new ElasticSearchStorageInstaller();
}
@Override public void injectClientIntoDAO(Client client) throws DefineException {
EsDAODefineLoader loader = new EsDAODefineLoader();
List<EsDAO> esDAOs = loader.load();
esDAOs.forEach(esDAO -> {
esDAO.setClient((ElasticSearchClient)client);
});
}
}
package org.skywalking.apm.collector.storage.elasticsearch.dao;
import org.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
import org.skywalking.apm.collector.storage.dao.DAO;
/**
* @author pengys5
*/
public abstract class EsDAO extends DAO<ElasticSearchClient> {
}
package org.skywalking.apm.collector.storage.elasticsearch.dao;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class EsDAODefineLoader implements Loader<List<EsDAO>> {
private final Logger logger = LoggerFactory.getLogger(EsDAODefineLoader.class);
@Override public List<EsDAO> load() throws DefineException {
List<EsDAO> esDAOs = new ArrayList<>();
EsDAODefinitionFile definitionFile = new EsDAODefinitionFile();
logger.info("elasticsearch dao definition file name: {}", definitionFile.fileName());
DefinitionLoader<EsDAO> definitionLoader = DefinitionLoader.load(EsDAO.class, definitionFile);
for (EsDAO dao : definitionLoader) {
logger.info("loaded elasticsearch dao definition class: {}", dao.getClass().getName());
esDAOs.add(dao);
}
return esDAOs;
}
}
package org.skywalking.apm.collector.storage.elasticsearch.dao;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class EsDAODefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "es_dao.define";
}
}
package org.skywalking.apm.collector.storage.h2;
import java.util.List;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.core.client.Client;
import org.skywalking.apm.collector.core.client.DataMonitor;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleConfigParser;
import org.skywalking.apm.collector.core.storage.StorageInstaller;
import org.skywalking.apm.collector.storage.StorageModuleDefine;
import org.skywalking.apm.collector.storage.StorageModuleGroupDefine;
import org.skywalking.apm.collector.storage.h2.dao.H2DAO;
import org.skywalking.apm.collector.storage.h2.dao.H2DAODefineLoader;
import org.skywalking.apm.collector.storage.h2.define.H2StorageInstaller;
/**
......@@ -35,4 +39,12 @@ public class StorageH2ModuleDefine extends StorageModuleDefine {
@Override public StorageInstaller storageInstaller() {
return new H2StorageInstaller();
}
@Override public void injectClientIntoDAO(Client client) throws DefineException {
H2DAODefineLoader loader = new H2DAODefineLoader();
List<H2DAO> h2DAOs = loader.load();
h2DAOs.forEach(h2DAO -> {
h2DAO.setClient((H2Client)client);
});
}
}
package org.skywalking.apm.collector.storage.h2.dao;
import org.skywalking.apm.collector.client.h2.H2Client;
import org.skywalking.apm.collector.storage.dao.DAO;
/**
* @author pengys5
*/
public abstract class H2DAO extends DAO<H2Client> {
}
package org.skywalking.apm.collector.storage.h2.dao;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author pengys5
*/
public class H2DAODefineLoader implements Loader<List<H2DAO>> {
private final Logger logger = LoggerFactory.getLogger(H2DAODefineLoader.class);
@Override public List<H2DAO> load() throws DefineException {
List<H2DAO> h2DAOs = new ArrayList<>();
H2DAODefinitionFile definitionFile = new H2DAODefinitionFile();
logger.info("h2 dao definition file name: {}", definitionFile.fileName());
DefinitionLoader<H2DAO> definitionLoader = DefinitionLoader.load(H2DAO.class, definitionFile);
for (H2DAO dao : definitionLoader) {
logger.info("loaded h2 dao definition class: {}", dao.getClass().getName());
h2DAOs.add(dao);
}
return h2DAOs;
}
}
package org.skywalking.apm.collector.storage.h2.dao;
import org.skywalking.apm.collector.core.framework.DefinitionFile;
/**
* @author pengys5
*/
public class H2DAODefinitionFile extends DefinitionFile {
@Override protected String fileName() {
return "h2_dao.define";
}
}
......@@ -4,7 +4,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.skywalking.apm.collector.core.client.ClientException;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.CollectorContextHelper;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.module.ModuleDefine;
......@@ -36,12 +35,8 @@ public class StreamModuleInstaller implements ModuleInstaller {
CollectorContextHelper.INSTANCE.putContext(context);
DataDefineLoader dataDefineLoader = new DataDefineLoader();
try {
Map<Integer, DataDefine> dataDefineMap = dataDefineLoader.load();
context.putAllDataDefine(dataDefineMap);
} catch (ConfigException e) {
logger.error(e.getMessage(), e);
}
Map<Integer, DataDefine> dataDefineMap = dataDefineLoader.load();
context.putAllDataDefine(dataDefineMap);
initializeWorker(context);
......@@ -54,7 +49,7 @@ public class StreamModuleInstaller implements ModuleInstaller {
}
}
private void initializeWorker(StreamModuleContext context) {
private void initializeWorker(StreamModuleContext context) throws DefineException {
ClusterWorkerContext clusterWorkerContext = new ClusterWorkerContext();
context.setClusterWorkerContext(clusterWorkerContext);
......@@ -74,7 +69,7 @@ public class StreamModuleInstaller implements ModuleInstaller {
provider.create();
clusterWorkerContext.putRole(provider.role());
}
} catch (ConfigException | ProviderNotFoundException e) {
} catch (ProviderNotFoundException e) {
logger.error(e.getMessage(), e);
}
}
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.stream.worker;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -15,7 +15,7 @@ public class LocalAsyncWorkerProviderDefineLoader implements Loader<List<Abstrac
private final Logger logger = LoggerFactory.getLogger(LocalAsyncWorkerProviderDefineLoader.class);
@Override public List<AbstractLocalAsyncWorkerProvider> load() throws ConfigException {
@Override public List<AbstractLocalAsyncWorkerProvider> load() throws DefineException {
List<AbstractLocalAsyncWorkerProvider> providers = new ArrayList<>();
LocalAsyncWorkerProviderDefinitionFile definitionFile = new LocalAsyncWorkerProviderDefinitionFile();
logger.info("local async worker provider definition file name: {}", definitionFile.fileName());
......
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.stream.worker;
import java.util.ArrayList;
import java.util.List;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -15,7 +15,7 @@ public class RemoteWorkerProviderDefineLoader implements Loader<List<AbstractRem
private final Logger logger = LoggerFactory.getLogger(RemoteWorkerProviderDefineLoader.class);
@Override public List<AbstractRemoteWorkerProvider> load() throws ConfigException {
@Override public List<AbstractRemoteWorkerProvider> load() throws DefineException {
List<AbstractRemoteWorkerProvider> providers = new ArrayList<>();
RemoteWorkerProviderDefinitionFile definitionFile = new RemoteWorkerProviderDefinitionFile();
logger.info("remote worker provider definition file name: {}", definitionFile.fileName());
......
......@@ -4,5 +4,5 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
* @author pengys5
*/
public enum AttributeType {
STRING, LONG, FLOAT
STRING, LONG, FLOAT, INTEGER
}
......@@ -2,7 +2,7 @@ package org.skywalking.apm.collector.stream.worker.impl.data;
import java.util.HashMap;
import java.util.Map;
import org.skywalking.apm.collector.core.config.ConfigException;
import org.skywalking.apm.collector.core.framework.DefineException;
import org.skywalking.apm.collector.core.framework.Loader;
import org.skywalking.apm.collector.core.util.DefinitionLoader;
import org.slf4j.Logger;
......@@ -15,7 +15,7 @@ public class DataDefineLoader implements Loader<Map<Integer, DataDefine>> {
private final Logger logger = LoggerFactory.getLogger(DataDefineLoader.class);
@Override public Map<Integer, DataDefine> load() throws ConfigException {
@Override public Map<Integer, DataDefine> load() throws DefineException {
Map<Integer, DataDefine> dataDefineMap = new HashMap<>();
DataDefinitionFile definitionFile = new DataDefinitionFile();
......
package org.skywalking.apm.collector.stream.worker.selector;
import java.util.List;
import org.skywalking.apm.collector.stream.worker.WorkerRef;
/**
* @author pengys5
*/
public class ForeverFirstSelector implements WorkerSelector<WorkerRef> {
@Override public WorkerRef select(List<WorkerRef> members, Object message) {
return members.get(0);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册