未验证 提交 4eb5dabb 编写于 作者: 彭勇升 pengys 提交者: GitHub

Refactor the collector configuration initialization. (#1058)

* Refactor the collector configuration initialization.

#1047
上级 8abb01dc
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.agent.grpc.provider;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServerConfig;
/**
* @author peng-yongsheng
*/
class AgentModuleGRPCConfig extends GRPCServerConfig {
}
......@@ -18,8 +18,14 @@
package org.apache.skywalking.apm.collector.agent.grpc.provider;
import java.io.File;
import org.apache.skywalking.apm.collector.agent.grpc.define.AgentGRPCModule;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.*;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ApplicationRegisterServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.InstanceDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.JVMMetricsServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.NetworkAddressRegisterServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ServiceNameDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.TraceSegmentServiceHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingListener;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
......@@ -28,8 +34,11 @@ import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
import org.apache.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
......@@ -37,20 +46,18 @@ import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterS
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import org.eclipse.jetty.util.StringUtil;
import java.io.File;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class AgentModuleGRPCProvider extends ModuleProvider {
public static final String NAME = "gRPC";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String SSL_CERT_CHAIN_FILEPATH = "ssl_cert_chain_file";
private static final String SSL_PRIVATE_KEY_FILE = "ssl_private_key_file";
private static final String AUTHENTICATION = "authentication";
private final AgentModuleGRPCConfig config;
public AgentModuleGRPCProvider() {
super();
this.config = new AgentModuleGRPCConfig();
}
@Override
public String name() {
......@@ -62,19 +69,23 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
return AgentGRPCModule.class;
}
@Override
public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override
public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer) config.get(PORT);
String sslCertChainFilePath = config.getProperty(SSL_CERT_CHAIN_FILEPATH);
String sslPrivateKeyFilePath = config.getProperty(SSL_PRIVATE_KEY_FILE);
public void prepare() {
}
AuthenticationSimpleChecker.INSTANCE.setExpectedToken(config.getProperty(AUTHENTICATION, ""));
@Override
public void start() throws ServiceNotProvidedException {
String host = config.getHost();
Integer port = config.getPort();
String sslCertChainFilePath = config.getSslCertChainFilePath();
String sslPrivateKeyFilePath = config.getSslPrivateKeyFilePath();
String authentication = StringUtils.isNotEmpty(config.getAuthentication()) ? config.getAuthentication() : Const.EMPTY_STRING;
AuthenticationSimpleChecker.INSTANCE.setExpectedToken(authentication);
File sslCertChainFile = null;
File sslPrivateKeyFile = null;
if (StringUtil.isNotBlank(sslCertChainFilePath)) {
......@@ -112,13 +123,12 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException {
public void notifyAfterCompleted() {
}
@Override
public String[] requiredModules() {
return new String[]{ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME};
return new String[] {ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME};
}
private void addHandlers(GRPCServer gRPCServer) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.agent.jetty.provider;
import org.apache.skywalking.apm.collector.server.jetty.JettyServerConfig;
/**
* @author peng-yongsheng
*/
class AgentModuleJettyConfig extends JettyServerConfig {
}
......@@ -19,32 +19,37 @@
package org.apache.skywalking.apm.collector.agent.jetty.provider;
import org.apache.skywalking.apm.collector.agent.jetty.define.AgentJettyModule;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.*;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ApplicationRegisterServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.InstanceDiscoveryServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.NetworkAddressRegisterServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ServiceNameDiscoveryServiceHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.TraceSegmentServletHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingListener;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class AgentModuleJettyProvider extends ModuleProvider {
public static final String NAME = "jetty";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CONTEXT_PATH = "context_path";
private final AgentModuleJettyConfig config;
public AgentModuleJettyProvider() {
super();
this.config = new AgentModuleJettyConfig();
}
@Override public String name() {
return NAME;
......@@ -54,17 +59,16 @@ public class AgentModuleJettyProvider extends ModuleProvider {
return AgentJettyModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
String contextPath = config.getProperty(CONTEXT_PATH);
@Override public void prepare() {
}
@Override public void start() {
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(AgentJettyModule.NAME, this.name(), new AgentModuleJettyRegistration(host, port, contextPath));
moduleRegisterService.register(AgentJettyModule.NAME, this.name(), new AgentModuleJettyRegistration(config.getHost(), config.getPort(), config.getContextPath()));
AgentJettyNamingListener namingListener = new AgentJettyNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -74,12 +78,11 @@ public class AgentModuleJettyProvider extends ModuleProvider {
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
JettyServer jettyServer = managerService.createIfAbsent(host, port, contextPath);
JettyServer jettyServer = managerService.createIfAbsent(config.getHost(), config.getPort(), config.getContextPath());
addHandlers(jettyServer);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.alarm.provider;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class AnalysisAlarmModuleConfig extends ModuleConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.analysis.alarm.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.alarm.define.AnalysisAlarmModule;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.ApplicationMetricAlarmGraph;
import org.apache.skywalking.apm.collector.analysis.alarm.provider.worker.application.ApplicationReferenceMetricAlarmGraph;
......@@ -31,8 +30,8 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCrea
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -48,6 +47,13 @@ import org.apache.skywalking.apm.collector.storage.table.alarm.ServiceAlarmList;
*/
public class AnalysisAlarmModuleProvider extends ModuleProvider {
private final AnalysisAlarmModuleConfig config;
public AnalysisAlarmModuleProvider() {
super();
this.config = new AnalysisAlarmModuleConfig();
}
@Override public String name() {
return "default";
}
......@@ -56,11 +62,14 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider {
return AnalysisAlarmModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() {
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
WorkerCreateListener workerCreateListener = new WorkerCreateListener();
ServiceMetricAlarmGraph serviceMetricAlarmGraph = new ServiceMetricAlarmGraph(getManager(), workerCreateListener);
......@@ -87,8 +96,7 @@ public class AnalysisAlarmModuleProvider extends ModuleProvider {
persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.baseline.computing.provider;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class AnalysisBaselineComputingModuleConfig extends ModuleConfig {
}
......@@ -18,11 +18,10 @@
package org.apache.skywalking.apm.collector.analysis.baseline.computing.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.baseline.computing.define.AnalysisBaselineComputingModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
/**
* The <code>AnalysisBaselineComputingModuleProvider</code> is the default implementation of {@link
......@@ -32,7 +31,13 @@ import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedExcepti
*/
public class AnalysisBaselineComputingModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private static final String NAME = "default";
private final AnalysisBaselineComputingModuleConfig config;
public AnalysisBaselineComputingModuleProvider() {
super();
this.config = new AnalysisBaselineComputingModuleConfig();
}
@Override public String name() {
return NAME;
......@@ -42,16 +47,17 @@ public class AnalysisBaselineComputingModuleProvider extends ModuleProvider {
return AnalysisBaselineComputingModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void prepare() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.jvm.provider;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class AnalysisJVMModuleConfig extends ModuleConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.analysis.jvm.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.jvm.define.AnalysisJVMModule;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.ICpuMetricService;
import org.apache.skywalking.apm.collector.analysis.jvm.define.service.IGCMetricService;
......@@ -35,6 +34,7 @@ import org.apache.skywalking.apm.collector.analysis.jvm.provider.worker.memorypo
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
......@@ -46,6 +46,12 @@ import org.apache.skywalking.apm.collector.storage.StorageModule;
public class AnalysisJVMModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private final AnalysisJVMModuleConfig config;
public AnalysisJVMModuleProvider() {
super();
this.config = new AnalysisJVMModuleConfig();
}
@Override public String name() {
return NAME;
......@@ -55,14 +61,18 @@ public class AnalysisJVMModuleProvider extends ModuleProvider {
return AnalysisJVMModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(ICpuMetricService.class, new CpuMetricService());
this.registerServiceImplementation(IGCMetricService.class, new GCMetricService());
this.registerServiceImplementation(IMemoryMetricService.class, new MemoryMetricService());
this.registerServiceImplementation(IMemoryPoolMetricService.class, new MemoryPoolMetricService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
WorkerCreateListener workerCreateListener = new WorkerCreateListener();
graphCreate(workerCreateListener);
......@@ -71,8 +81,7 @@ public class AnalysisJVMModuleProvider extends ModuleProvider {
persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.metric.provider;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class AnalysisMetricModuleConfig extends ModuleConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.analysis.metric.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
import org.apache.skywalking.apm.collector.analysis.metric.define.service.IInstanceHeartBeatService;
import org.apache.skywalking.apm.collector.analysis.metric.provider.service.InstanceHeartBeatService;
......@@ -44,11 +43,15 @@ import org.apache.skywalking.apm.collector.analysis.segment.parser.define.Analys
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParserListenerRegister;
import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCreateListener;
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationComponent;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMapping;
import org.apache.skywalking.apm.collector.storage.table.application.ApplicationMetric;
......@@ -65,6 +68,12 @@ import org.apache.skywalking.apm.collector.storage.table.service.ServiceReferenc
public class AnalysisMetricModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private final AnalysisMetricModuleConfig config;
public AnalysisMetricModuleProvider() {
super();
this.config = new AnalysisMetricModuleConfig();
}
@Override public String name() {
return NAME;
......@@ -74,11 +83,15 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
return AnalysisMetricModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(IInstanceHeartBeatService.class, new InstanceHeartBeatService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
segmentParserListenerRegister();
WorkerCreateListener workerCreateListener = new WorkerCreateListener();
......@@ -91,12 +104,11 @@ public class AnalysisMetricModuleProvider extends ModuleProvider {
persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[] {AnalysisSegmentParserModule.NAME};
return new String[] {AnalysisSegmentParserModule.NAME, ConfigurationModule.NAME, CacheModule.NAME, StorageModule.NAME};
}
private void segmentParserListenerRegister() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.register.provider;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class AnalysisRegisterModuleConfig extends ModuleConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.analysis.register.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IApplicationIDService;
import org.apache.skywalking.apm.collector.analysis.register.define.service.IInstanceIDService;
......@@ -36,6 +35,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCrea
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
......@@ -52,6 +52,12 @@ import org.apache.skywalking.apm.collector.storage.table.register.ServiceName;
public class AnalysisRegisterModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private final AnalysisRegisterModuleConfig config;
public AnalysisRegisterModuleProvider() {
super();
this.config = new AnalysisRegisterModuleConfig();
}
@Override public String name() {
return NAME;
......@@ -61,14 +67,18 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider {
return AnalysisRegisterModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(IApplicationIDService.class, new ApplicationIDService(getManager()));
this.registerServiceImplementation(IInstanceIDService.class, new InstanceIDService(getManager()));
this.registerServiceImplementation(IServiceNameService.class, new ServiceNameService(getManager()));
this.registerServiceImplementation(INetworkAddressIDService.class, new NetworkAddressIDService(getManager()));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
WorkerCreateListener workerCreateListener = new WorkerCreateListener();
graphCreate(workerCreateListener);
......@@ -79,8 +89,7 @@ public class AnalysisRegisterModuleProvider extends ModuleProvider {
persistenceTimer.start(getManager(), workerCreateListener.getPersistenceWorkers());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public class AnalysisSegmentParserModuleConfig extends ModuleConfig {
private String bufferFilePath;
private String bufferOffsetMaxFileSize;
private String bufferSegmentMaxFileSize;
public String getBufferFilePath() {
return bufferFilePath;
}
public void setBufferFilePath(String bufferFilePath) {
this.bufferFilePath = bufferFilePath;
}
public String getBufferOffsetMaxFileSize() {
return bufferOffsetMaxFileSize;
}
public void setBufferOffsetMaxFileSize(String bufferOffsetMaxFileSize) {
this.bufferOffsetMaxFileSize = bufferOffsetMaxFileSize;
}
public String getBufferSegmentMaxFileSize() {
return bufferSegmentMaxFileSize;
}
public void setBufferSegmentMaxFileSize(String bufferSegmentMaxFileSize) {
this.bufferSegmentMaxFileSize = bufferSegmentMaxFileSize;
}
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.register.define.AnalysisRegisterModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.AnalysisSegmentParserModule;
import org.apache.skywalking.apm.collector.analysis.segment.parser.define.service.ISegmentParseService;
......@@ -34,6 +33,7 @@ import org.apache.skywalking.apm.collector.analysis.worker.model.base.WorkerCrea
import org.apache.skywalking.apm.collector.analysis.worker.timer.PersistenceTimer;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -43,9 +43,15 @@ import org.apache.skywalking.apm.collector.storage.StorageModule;
*/
public class AnalysisSegmentParserModuleProvider extends ModuleProvider {
public static final String NAME = "default";
private static final String NAME = "default";
private final AnalysisSegmentParserModuleConfig config;
private SegmentParserListenerManager listenerManager;
public AnalysisSegmentParserModuleProvider() {
super();
this.config = new AnalysisSegmentParserModuleConfig();
}
@Override public String name() {
return NAME;
}
......@@ -54,7 +60,11 @@ public class AnalysisSegmentParserModuleProvider extends ModuleProvider {
return AnalysisSegmentParserModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
this.listenerManager = new SegmentParserListenerManager();
this.registerServiceImplementation(ISegmentParserListenerRegister.class, new SegmentParserListenerRegister(listenerManager));
this.registerServiceImplementation(ISegmentParseService.class, new SegmentParseService(getManager(), listenerManager));
......@@ -63,7 +73,7 @@ public class AnalysisSegmentParserModuleProvider extends ModuleProvider {
parser.parse(config);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
WorkerCreateListener workerCreateListener = new WorkerCreateListener();
graphCreate(workerCreateListener);
......@@ -74,12 +84,11 @@ public class AnalysisSegmentParserModuleProvider extends ModuleProvider {
SegmentBufferReader.INSTANCE.setSegmentParserListenerManager(listenerManager);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[] {AnalysisRegisterModule.NAME, CacheModule.NAME, StorageModule.NAME};
return new String[] {StorageModule.NAME, AnalysisRegisterModule.NAME, CacheModule.NAME};
}
private void graphCreate(WorkerCreateListener workerCreateListener) {
......
......@@ -16,10 +16,10 @@
*
*/
package org.apache.skywalking.apm.collector.analysis.segment.parser.provider.buffer;
import java.util.Properties;
import org.apache.skywalking.apm.collector.analysis.segment.parser.provider.AnalysisSegmentParserModuleConfig;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
/**
* @author peng-yongsheng
......@@ -29,19 +29,15 @@ public class BufferFileConfig {
static int BUFFER_SEGMENT_MAX_FILE_SIZE = 10 * 1024 * 1024;
static String BUFFER_PATH = "../buffer/";
private static final String BUFFER_PATH_KEY = "buffer_file_path";
private static final String BUFFER_OFFSET_MAX_FILE_SIZE_KEY = "buffer_offset_max_file_size";
private static final String BUFFER_SEGMENT_MAX_FILE_SIZE_KEY = "buffer_segment_max_file_size";
public static class Parser {
public void parse(Properties config) {
if (config.containsKey(BUFFER_PATH_KEY)) {
BUFFER_PATH = config.getProperty(BUFFER_PATH_KEY);
public void parse(AnalysisSegmentParserModuleConfig config) {
if (StringUtils.isNotEmpty(config.getBufferFilePath())) {
BUFFER_PATH = config.getBufferFilePath();
}
if (config.containsKey(BUFFER_OFFSET_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_OFFSET_MAX_FILE_SIZE_KEY).toUpperCase();
if (StringUtils.isNotEmpty(config.getBufferOffsetMaxFileSize())) {
String sizeStr = config.getBufferOffsetMaxFileSize().toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_OFFSET_MAX_FILE_SIZE = size * 1024;
......@@ -61,8 +57,8 @@ public class BufferFileConfig {
BUFFER_OFFSET_MAX_FILE_SIZE = 10 * 1024 * 1024;
}
if (config.containsKey(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY)) {
String sizeStr = config.getProperty(BUFFER_SEGMENT_MAX_FILE_SIZE_KEY).toUpperCase();
if (StringUtils.isNotEmpty(config.getBufferSegmentMaxFileSize())) {
String sizeStr = config.getBufferSegmentMaxFileSize().toUpperCase();
if (sizeStr.endsWith("K")) {
int size = Integer.parseInt(sizeStr.replace("K", ""));
BUFFER_SEGMENT_MAX_FILE_SIZE = size * 1024;
......
......@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.collector.boot;
import org.apache.skywalking.apm.collector.boot.config.ApplicationConfigLoader;
import org.apache.skywalking.apm.collector.boot.config.ConfigFileNotFoundException;
import org.apache.skywalking.apm.collector.core.module.ApplicationConfiguration;
import org.apache.skywalking.apm.collector.core.module.ModuleConfigException;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
import org.apache.skywalking.apm.collector.core.module.ModuleNotFoundException;
import org.apache.skywalking.apm.collector.core.module.ProviderNotFoundException;
......@@ -41,7 +42,7 @@ public class CollectorBootStartUp {
try {
ApplicationConfiguration applicationConfiguration = configLoader.load();
manager.init(applicationConfiguration);
} catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException e) {
} catch (ConfigFileNotFoundException | ModuleNotFoundException | ProviderNotFoundException | ServiceNotProvidedException | ModuleConfigException e) {
logger.error(e.getMessage(), e);
}
}
......
......@@ -20,10 +20,10 @@ package org.apache.skywalking.apm.collector.boot.config;
import java.io.FileNotFoundException;
import java.io.Reader;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.skywalking.apm.collector.core.module.ApplicationConfiguration;
import org.apache.skywalking.apm.collector.core.util.CollectionUtils;
import org.apache.skywalking.apm.collector.core.util.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,67 +52,69 @@ public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfigur
return configuration;
}
@SuppressWarnings("unchecked")
private void loadConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException {
try {
Reader applicationReader = ResourceUtils.read("application.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
logger.info("Get a module define from application.yml, module name: {}", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
logger.info("Get a provider define belong to {} module, provider name: {}", moduleName, name);
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
logger.info("The property with key: {}, value: {}, in {} provider", key, value, name);
});
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
} else {
logger.warn("Get a module define from application.yml, but no provider define, use default, module name: {}", moduleName);
}
});
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (providerConfig.size() > 0) {
logger.info("Get a module define from application.yml, module name: {}", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
logger.info("Get a provider define belong to {} module, provider name: {}", moduleName, name);
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach((key, value) -> {
properties.put(key, value);
logger.info("The property with key: {}, value: {}, in {} provider", key, value, name);
});
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
} else {
logger.warn("Get a module define from application.yml, but no provider define, use default, module name: {}", moduleName);
}
});
}
} catch (FileNotFoundException e) {
throw new ConfigFileNotFoundException(e.getMessage(), e);
}
}
@SuppressWarnings("unchecked")
private void loadDefaultConfig(ApplicationConfiguration configuration) throws ConfigFileNotFoundException {
try {
Reader applicationReader = ResourceUtils.read("application-default.yml");
Map<String, Map<String, Map<String, ?>>> moduleConfig = yaml.loadAs(applicationReader, Map.class);
moduleConfig.forEach((moduleName, providerConfig) -> {
if (!configuration.has(moduleName)) {
logger.warn("The {} module did't define in application.yml, use default", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach(properties::put);
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
if (CollectionUtils.isNotEmpty(moduleConfig)) {
moduleConfig.forEach((moduleName, providerConfig) -> {
if (!configuration.has(moduleName)) {
logger.warn("The {} module did't define in application.yml, use default", moduleName);
ApplicationConfiguration.ModuleConfiguration moduleConfiguration = configuration.addModule(moduleName);
providerConfig.forEach((name, propertiesConfig) -> {
Properties properties = new Properties();
if (propertiesConfig != null) {
propertiesConfig.forEach(properties::put);
}
moduleConfiguration.addProviderConfiguration(name, properties);
});
}
});
}
} catch (FileNotFoundException e) {
throw new ConfigFileNotFoundException(e.getMessage(), e);
}
}
private void overrideConfigBySystemEnv(ApplicationConfiguration configuration) {
Iterator<Map.Entry<Object, Object>> entryIterator = System.getProperties().entrySet().iterator();
while (entryIterator.hasNext()) {
Map.Entry<Object, Object> prop = entryIterator.next();
overrideModuleSettings(configuration, prop.getKey().toString(), prop.getValue().toString(), true);
for (Map.Entry<Object, Object> prop : System.getProperties().entrySet()) {
overrideModuleSettings(configuration, prop.getKey().toString(), prop.getValue().toString());
}
}
private void overrideModuleSettings(ApplicationConfiguration configuration, String key, String value,
boolean isSystemProperty) {
private void overrideModuleSettings(ApplicationConfiguration configuration, String key, String value) {
int moduleAndConfigSeparator = key.indexOf('.');
if (moduleAndConfigSeparator <= 0) {
return;
......@@ -151,6 +153,6 @@ public class ApplicationConfigLoader implements ConfigLoader<ApplicationConfigur
}
logger.info("The setting has been override by key: {}, value: {}, in {} provider of {} module through {}",
settingKey, value, providerName, moduleName, isSystemProperty ? "System.properties" : "System.envs");
settingKey, value, providerName, moduleName, "System.properties");
}
}
......@@ -22,7 +22,7 @@ naming:
jetty:
host: localhost
port: 10800
context_path: /
contextPath: /
cache:
# guava:
caffeine:
......@@ -35,8 +35,8 @@ agent_gRPC:
host: localhost
port: 11800
#Set these two setting to open ssl
#ssl_cert_chain_file: $path
#ssl_private_key_file: $path
#sslCertChainFile: $path
#sslPrivateKeyFile: $path
#Set your own token to active auth
#authentication: xxxxxx
......@@ -44,40 +44,40 @@ agent_jetty:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
analysis_register:
default:
analysis_jvm:
default:
analysis_segment_parser:
default:
buffer_file_path: ../buffer/
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
bufferFilePath: ../buffer/
bufferOffsetMaxFileSize: 10M
bufferSegmentMaxFileSize: 500M
ui:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
clusterName: CollectorDBCluster
clusterTransportSniffer: true
clusterNodes: localhost:9300
indexShardsNumber: 2
indexReplicasNumber: 0
ttl: 7
#storage:
# h2:
# url: jdbc:h2:tcp://localhost/~/test
# user_name: sa
# url: jdbc:h2:~/memorydb
# userName: sa
configuration:
default:
# namespace: xxxxx
application_apdex_threshold: 2000
service_error_rate_threshold: 10.00
service_average_response_time_threshold: 2000
instance_error_rate_threshold: 10.00
instance_average_response_time_threshold: 2000
application_error_rate_threshold: 10.00
application_average_response_time_threshold: 2000
\ No newline at end of file
# namespace: xxxxx
applicationApdexThreshold: 2000
serviceErrorRateThreshold: 10.00
serviceAverageResponseTimeThreshold: 2000
instanceErrorRateThreshold: 10.00
instanceAverageResponseTimeThreshold: 2000
applicationErrorRateThreshold: 10.00
applicationAverageResponseTimeThreshold: 2000
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.cache.caffeine;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class CacheModuleCaffeineConfig extends ModuleConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.cache.caffeine;
import java.util.Properties;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.caffeine.service.ApplicationCacheCaffeineService;
import org.apache.skywalking.apm.collector.cache.caffeine.service.InstanceCacheCaffeineService;
......@@ -31,6 +30,7 @@ import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheServ
import org.apache.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.apache.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -40,6 +40,13 @@ import org.apache.skywalking.apm.collector.storage.StorageModule;
*/
public class CacheModuleCaffeineProvider extends ModuleProvider {
private final CacheModuleCaffeineConfig config;
public CacheModuleCaffeineProvider() {
super();
this.config = new CacheModuleCaffeineConfig();
}
@Override public String name() {
return "caffeine";
}
......@@ -48,7 +55,11 @@ public class CacheModuleCaffeineProvider extends ModuleProvider {
return CacheModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(ApplicationCacheService.class, new ApplicationCacheCaffeineService(getManager()));
this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheCaffeineService(getManager()));
this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheCaffeineService(getManager()));
......@@ -56,7 +67,7 @@ public class CacheModuleCaffeineProvider extends ModuleProvider {
this.registerServiceImplementation(NetworkAddressCacheService.class, new NetworkAddressCacheCaffeineService(getManager()));
}
@Override public void start(Properties config) {
@Override public void start() {
}
@Override public void notifyAfterCompleted() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.cache.guava;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class CacheModuleGuavaConfig extends ModuleConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.cache.guava;
import java.util.Properties;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cache.guava.service.ApplicationCacheGuavaService;
import org.apache.skywalking.apm.collector.cache.guava.service.InstanceCacheGuavaService;
......@@ -31,6 +30,7 @@ import org.apache.skywalking.apm.collector.cache.service.NetworkAddressCacheServ
import org.apache.skywalking.apm.collector.cache.service.ServiceIdCacheService;
import org.apache.skywalking.apm.collector.cache.service.ServiceNameCacheService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.storage.StorageModule;
......@@ -40,6 +40,13 @@ import org.apache.skywalking.apm.collector.storage.StorageModule;
*/
public class CacheModuleGuavaProvider extends ModuleProvider {
private final CacheModuleGuavaConfig config;
public CacheModuleGuavaProvider() {
super();
this.config = new CacheModuleGuavaConfig();
}
@Override public String name() {
return "guava";
}
......@@ -48,7 +55,11 @@ public class CacheModuleGuavaProvider extends ModuleProvider {
return CacheModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(ApplicationCacheService.class, new ApplicationCacheGuavaService(getManager()));
this.registerServiceImplementation(InstanceCacheService.class, new InstanceCacheGuavaService(getManager()));
this.registerServiceImplementation(ServiceIdCacheService.class, new ServiceIdCacheGuavaService(getManager()));
......@@ -56,11 +67,10 @@ public class CacheModuleGuavaProvider extends ModuleProvider {
this.registerServiceImplementation(NetworkAddressCacheService.class, new NetworkAddressCacheGuavaService(getManager()));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.cluster.redis;
import org.apache.skywalking.apm.collector.client.redis.RedisClientConfig;
/**
* @author peng-yongsheng
*/
class ClusterModuleRedisConfig extends RedisClientConfig {
}
......@@ -16,14 +16,13 @@
*
*/
package org.apache.skywalking.apm.collector.cluster.redis;
import java.util.Properties;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.redis.service.RedisModuleRegisterService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
......@@ -32,6 +31,13 @@ import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedExcepti
*/
public class ClusterModuleRedisProvider extends ModuleProvider {
private final ClusterModuleRedisConfig config;
public ClusterModuleRedisProvider() {
super();
this.config = new ClusterModuleRedisConfig();
}
@Override public String name() {
return "redis";
}
......@@ -40,16 +46,18 @@ public class ClusterModuleRedisProvider extends ModuleProvider {
return ClusterModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(ModuleRegisterService.class, new RedisModuleRegisterService());
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(ModuleRegisterService.class, new RedisModuleRegisterService());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.cluster.standalone;
import org.apache.skywalking.apm.collector.client.h2.H2ClientConfig;
/**
* @author peng-yongsheng
*/
class ClusterModuleStandaloneConfig extends H2ClientConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.cluster.standalone;
import java.util.Properties;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
......@@ -29,6 +28,7 @@ import org.apache.skywalking.apm.collector.cluster.standalone.service.Standalone
import org.apache.skywalking.apm.collector.core.CollectorException;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.core.util.Const;
......@@ -42,12 +42,15 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(ClusterModuleStandaloneProvider.class);
private static final String URL = "url";
private static final String USER_NAME = "user_name";
private final ClusterModuleStandaloneConfig config;
private H2Client h2Client;
private ClusterStandaloneDataMonitor dataMonitor;
public ClusterModuleStandaloneProvider() {
super();
this.config = new ClusterModuleStandaloneConfig();
}
@Override public String name() {
return "standalone";
}
......@@ -56,19 +59,20 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
return ClusterModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.dataMonitor = new ClusterStandaloneDataMonitor();
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
final String url = config.getProperty(URL);
final String userName = config.getProperty(USER_NAME);
h2Client = new H2Client(url, userName, Const.EMPTY_STRING);
@Override public void prepare() throws ServiceNotProvidedException {
this.dataMonitor = new ClusterStandaloneDataMonitor();
h2Client = new H2Client(config.getUrl(), config.getUserName(), Const.EMPTY_STRING);
this.dataMonitor.setClient(h2Client);
this.registerServiceImplementation(ModuleListenerService.class, new StandaloneModuleListenerService(dataMonitor));
this.registerServiceImplementation(ModuleRegisterService.class, new StandaloneModuleRegisterService(dataMonitor));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
try {
h2Client.initialize();
} catch (H2ClientException e) {
......@@ -76,7 +80,7 @@ public class ClusterModuleStandaloneProvider extends ModuleProvider {
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
try {
dataMonitor.start();
} catch (CollectorException e) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.cluster.zookeeper;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClientConfig;
/**
* @author peng-yongsheng
*/
class ClusterModuleZKConfig extends ZookeeperClientConfig {
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.cluster.zookeeper;
import java.util.Properties;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClient;
import org.apache.skywalking.apm.collector.client.zookeeper.ZookeeperClientException;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
......@@ -31,6 +30,7 @@ import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfi
import org.apache.skywalking.apm.collector.core.CollectorException;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.slf4j.Logger;
......@@ -43,12 +43,15 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
private final Logger logger = LoggerFactory.getLogger(ClusterModuleZookeeperProvider.class);
private static final String HOST_PORT = "hostPort";
private static final String SESSION_TIMEOUT = "sessionTimeout";
private final ClusterModuleZKConfig config;
private ZookeeperClient zookeeperClient;
private ClusterZKDataMonitor dataMonitor;
public ClusterModuleZookeeperProvider() {
super();
this.config = new ClusterModuleZKConfig();
}
@Override public String name() {
return "zookeeper";
}
......@@ -57,19 +60,20 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
return ClusterModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
dataMonitor = new ClusterZKDataMonitor();
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
final String hostPort = config.getProperty(HOST_PORT);
final int sessionTimeout = (Integer)config.get(SESSION_TIMEOUT);
zookeeperClient = new ZookeeperClient(hostPort, sessionTimeout, dataMonitor);
@Override public void prepare() throws ServiceNotProvidedException {
dataMonitor = new ClusterZKDataMonitor();
zookeeperClient = new ZookeeperClient(config.getHostPort(), config.getSessionTimeout(), dataMonitor);
dataMonitor.setClient(zookeeperClient);
this.registerServiceImplementation(ModuleListenerService.class, new ZookeeperModuleListenerService(dataMonitor));
this.registerServiceImplementation(ModuleRegisterService.class, new ZookeeperModuleRegisterService(dataMonitor));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
dataMonitor.setNamespace(getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace());
try {
zookeeperClient.initialize();
......@@ -78,7 +82,7 @@ public class ClusterModuleZookeeperProvider extends ModuleProvider {
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
try {
dataMonitor.start();
} catch (CollectorException e) {
......
......@@ -58,14 +58,14 @@ public class ElasticSearchClient implements Client {
private final String clusterName;
private final Boolean clusterTransportSniffer;
private final boolean clusterTransportSniffer;
private final String clusterNodes;
private boolean ready = false;
private String namespace;
public ElasticSearchClient(String clusterName, Boolean clusterTransportSniffer,
public ElasticSearchClient(String clusterName, boolean clusterTransportSniffer,
String clusterNodes) {
this.clusterName = clusterName;
this.clusterTransportSniffer = clusterTransportSniffer;
......@@ -218,9 +218,7 @@ public class ElasticSearchClient implements Client {
rowHandler.setPrepareMultiGet(prepareMultiGet);
rowHandler.setNamespace(namespace);
rows.forEach(row -> {
rowHandler.accept(row);
});
rows.forEach(row -> rowHandler.accept(row));
return rowHandler.getPrepareMultiGet();
}
......@@ -229,11 +227,11 @@ public class ElasticSearchClient implements Client {
private MultiGetRequestBuilder prepareMultiGet;
private String namespace;
public void setPrepareMultiGet(MultiGetRequestBuilder prepareMultiGet) {
void setPrepareMultiGet(MultiGetRequestBuilder prepareMultiGet) {
this.prepareMultiGet = prepareMultiGet;
}
public void setNamespace(String namespace) {
void setNamespace(String namespace) {
this.namespace = namespace;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.client.elasticsearch;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class ElasticSearchClientConfig extends ModuleConfig {
private String clusterName;
private boolean clusterTransportSniffer;
private String clusterNodes;
private String namespace;
public String getClusterName() {
return clusterName;
}
public void setClusterName(String clusterName) {
this.clusterName = clusterName;
}
public boolean getClusterTransportSniffer() {
return clusterTransportSniffer;
}
public void setClusterTransportSniffer(boolean clusterTransportSniffer) {
this.clusterTransportSniffer = clusterTransportSniffer;
}
public String getClusterNodes() {
return clusterNodes;
}
public void setClusterNodes(String clusterNodes) {
this.clusterNodes = clusterNodes;
}
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
}
......@@ -16,12 +16,10 @@
*
*/
package org.apache.skywalking.apm.collector.client.grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.client.Client;
/**
......@@ -40,7 +38,7 @@ public class GRPCClient implements Client {
this.port = port;
}
@Override public void initialize() throws ClientException {
@Override public void initialize() {
channel = ManagedChannelBuilder.forAddress(host, port).usePlaintext(true).build();
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.client.grpc;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class GRPCClientConfig extends ModuleConfig {
private String host;
private int port;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.client.h2;
import java.sql.Connection;
......@@ -25,8 +24,8 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.h2.util.IOUtils;
import org.apache.skywalking.apm.collector.client.Client;
import org.h2.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -68,7 +67,7 @@ public class H2Client implements Client {
IOUtils.closeSilently(conn);
}
public Connection getConnection() throws H2ClientException {
public Connection getConnection() {
return conn;
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.client.h2;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class H2ClientConfig extends ModuleConfig {
private String url;
private String userName;
private String password;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.client.redis;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class RedisClientConfig extends ModuleConfig {
private String host;
private int port;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.client.zookeeper;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class ZookeeperClientConfig extends ModuleConfig {
private String hostPort;
private int sessionTimeout;
public String getHostPort() {
return hostPort;
}
public void setHostPort(String hostPort) {
this.hostPort = hostPort;
}
public int getSessionTimeout() {
return sessionTimeout;
}
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.server.grpc;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class GRPCServerConfig extends ModuleConfig {
private String host;
private int port;
private String sslCertChainFilePath;
private String sslPrivateKeyFilePath;
private String authentication;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getSslCertChainFilePath() {
return sslCertChainFilePath;
}
public void setSslCertChainFilePath(String sslCertChainFilePath) {
this.sslCertChainFilePath = sslCertChainFilePath;
}
public String getSslPrivateKeyFilePath() {
return sslPrivateKeyFilePath;
}
public void setSslPrivateKeyFilePath(String sslPrivateKeyFilePath) {
this.sslPrivateKeyFilePath = sslPrivateKeyFilePath;
}
public String getAuthentication() {
return authentication;
}
public void setAuthentication(String authentication) {
this.authentication = authentication;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.server.jetty;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
public abstract class JettyServerConfig extends ModuleConfig {
private String host;
private int port;
private String contextPath;
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public String getContextPath() {
return contextPath;
}
public void setContextPath(String contextPath) {
this.contextPath = contextPath;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.configuration;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class ConfigurationModuleConfig extends ModuleConfig {
private String namespace;
private int applicationApdexThreshold;
private double serviceErrorRateThreshold;
private int serviceAverageResponseTimeThreshold;
private double instanceErrorRateThreshold;
private int instanceAverageResponseTimeThreshold;
private double applicationErrorRateThreshold;
private int applicationAverageResponseTimeThreshold;
public String getNamespace() {
return namespace;
}
public void setNamespace(String namespace) {
this.namespace = namespace;
}
public int getApplicationApdexThreshold() {
return applicationApdexThreshold;
}
public void setApplicationApdexThreshold(int applicationApdexThreshold) {
this.applicationApdexThreshold = applicationApdexThreshold;
}
public double getServiceErrorRateThreshold() {
return serviceErrorRateThreshold;
}
public void setServiceErrorRateThreshold(double serviceErrorRateThreshold) {
this.serviceErrorRateThreshold = serviceErrorRateThreshold;
}
public int getServiceAverageResponseTimeThreshold() {
return serviceAverageResponseTimeThreshold;
}
public void setServiceAverageResponseTimeThreshold(int serviceAverageResponseTimeThreshold) {
this.serviceAverageResponseTimeThreshold = serviceAverageResponseTimeThreshold;
}
public double getInstanceErrorRateThreshold() {
return instanceErrorRateThreshold;
}
public void setInstanceErrorRateThreshold(double instanceErrorRateThreshold) {
this.instanceErrorRateThreshold = instanceErrorRateThreshold;
}
public int getInstanceAverageResponseTimeThreshold() {
return instanceAverageResponseTimeThreshold;
}
public void setInstanceAverageResponseTimeThreshold(int instanceAverageResponseTimeThreshold) {
this.instanceAverageResponseTimeThreshold = instanceAverageResponseTimeThreshold;
}
public double getApplicationErrorRateThreshold() {
return applicationErrorRateThreshold;
}
public void setApplicationErrorRateThreshold(double applicationErrorRateThreshold) {
this.applicationErrorRateThreshold = applicationErrorRateThreshold;
}
public int getApplicationAverageResponseTimeThreshold() {
return applicationAverageResponseTimeThreshold;
}
public void setApplicationAverageResponseTimeThreshold(int applicationAverageResponseTimeThreshold) {
this.applicationAverageResponseTimeThreshold = applicationAverageResponseTimeThreshold;
}
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.configuration;
import java.util.Properties;
import org.apache.skywalking.apm.collector.configuration.service.ApdexThresholdService;
import org.apache.skywalking.apm.collector.configuration.service.ApplicationAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.ApplicationReferenceAlarmRuleConfig;
......@@ -36,21 +35,23 @@ import org.apache.skywalking.apm.collector.configuration.service.InstanceReferen
import org.apache.skywalking.apm.collector.configuration.service.ServiceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.configuration.service.ServiceReferenceAlarmRuleConfig;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.core.util.Const;
import org.apache.skywalking.apm.collector.core.util.StringUtils;
/**
* @author peng-yongsheng
*/
public class ConfigurationModuleProvider extends ModuleProvider {
private static final String NAMESPACE = "namespace";
private static final String APPLICATION_APDEX_THRESHOLD = "application_apdex_threshold";
private static final String SERVICE_ERROR_RATE_THRESHOLD = "service_error_rate_threshold";
private static final String SERVICE_AVERAGE_RESPONSE_TIME_THRESHOLD = "service_average_response_time_threshold";
private static final String INSTANCE_ERROR_RATE_THRESHOLD = "instance_error_rate_threshold";
private static final String INSTANCE_AVERAGE_RESPONSE_TIME_THRESHOLD = "instance_average_response_time_threshold";
private static final String APPLICATION_ERROR_RATE_THRESHOLD = "application_error_rate_threshold";
private static final String APPLICATION_AVERAGE_RESPONSE_TIME_THRESHOLD = "application_average_response_time_threshold";
private final ConfigurationModuleConfig config;
public ConfigurationModuleProvider() {
super();
this.config = new ConfigurationModuleConfig();
}
@Override public String name() {
return "default";
......@@ -60,15 +61,19 @@ public class ConfigurationModuleProvider extends ModuleProvider {
return ConfigurationModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
String namespace = (String)config.getOrDefault(NAMESPACE, "");
Integer applicationApdexThreshold = (Integer)config.getOrDefault(APPLICATION_APDEX_THRESHOLD, 2000);
Double serviceErrorRateThreshold = (Double)config.getOrDefault(SERVICE_ERROR_RATE_THRESHOLD, 10.00);
Integer serviceAverageResponseTimeThreshold = (Integer)config.getOrDefault(SERVICE_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000);
Double instanceErrorRateThreshold = (Double)config.getOrDefault(INSTANCE_ERROR_RATE_THRESHOLD, 10.00);
Integer instanceAverageResponseTimeThreshold = (Integer)config.getOrDefault(INSTANCE_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000);
Double applicationErrorRateThreshold = (Double)config.getOrDefault(APPLICATION_ERROR_RATE_THRESHOLD, 10.00);
Integer applicationAverageResponseTimeThreshold = (Integer)config.getOrDefault(APPLICATION_AVERAGE_RESPONSE_TIME_THRESHOLD, 2000);
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
String namespace = StringUtils.isNotEmpty(config.getNamespace()) ? config.getNamespace() : Const.EMPTY_STRING;
Integer applicationApdexThreshold = config.getApplicationApdexThreshold() == 0 ? 2000 : config.getApplicationApdexThreshold();
Double serviceErrorRateThreshold = config.getServiceErrorRateThreshold() == 0 ? 10.00 : config.getServiceErrorRateThreshold();
Integer serviceAverageResponseTimeThreshold = config.getServiceAverageResponseTimeThreshold() == 0 ? 2000 : config.getServiceAverageResponseTimeThreshold();
Double instanceErrorRateThreshold = config.getInstanceErrorRateThreshold() == 0 ? 10.00 : config.getInstanceErrorRateThreshold();
Integer instanceAverageResponseTimeThreshold = config.getInstanceAverageResponseTimeThreshold() == 0 ? 2000 : config.getInstanceAverageResponseTimeThreshold();
Double applicationErrorRateThreshold = config.getApplicationErrorRateThreshold() == 0 ? 10.00 : config.getApplicationErrorRateThreshold();
Integer applicationAverageResponseTimeThreshold = config.getApplicationAverageResponseTimeThreshold() == 0 ? 2000 : config.getApplicationAverageResponseTimeThreshold();
this.registerServiceImplementation(ICollectorConfig.class, new CollectorConfigService(namespace));
this.registerServiceImplementation(IApdexThresholdService.class, new ApdexThresholdService(applicationApdexThreshold));
......@@ -80,12 +85,10 @@ public class ConfigurationModuleProvider extends ModuleProvider {
this.registerServiceImplementation(IApplicationReferenceAlarmRuleConfig.class, new ApplicationReferenceAlarmRuleConfig(applicationErrorRateThreshold, applicationAverageResponseTimeThreshold));
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
......@@ -82,7 +82,7 @@ public class ApplicationConfiguration {
this.properties = properties;
}
public Properties getProperties() {
private Properties getProperties() {
return properties;
}
}
......
......@@ -27,26 +27,23 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author wu-sheng
* @author wu-sheng, peng-yongsheng
*/
public class BootstrapFlow {
class BootstrapFlow {
private static final Logger logger = LoggerFactory.getLogger(BootstrapFlow.class);
private Map<String, Module> loadedModules;
private ApplicationConfiguration applicationConfiguration;
private List<ModuleProvider> startupSequence;
public BootstrapFlow(Map<String, Module> loadedModules,
ApplicationConfiguration applicationConfiguration) throws CycleDependencyException {
BootstrapFlow(Map<String, Module> loadedModules) throws CycleDependencyException {
this.loadedModules = loadedModules;
this.applicationConfiguration = applicationConfiguration;
startupSequence = new LinkedList<>();
makeSequence();
}
void start(ModuleManager moduleManager,
ApplicationConfiguration configuration) throws ProviderNotFoundException, ModuleNotFoundException, ServiceNotProvidedException {
@SuppressWarnings("unchecked")
void start(ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException {
for (ModuleProvider provider : startupSequence) {
String[] requiredModules = provider.requiredModules();
if (requiredModules != null) {
......@@ -60,11 +57,11 @@ public class BootstrapFlow {
logger.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
provider.requiredCheck(provider.getModule().services());
provider.start(configuration.getModuleConfiguration(provider.getModuleName()).getProviderConfiguration(provider.name()));
provider.start();
}
}
void notifyAfterCompleted() throws ProviderNotFoundException, ModuleNotFoundException, ServiceNotProvidedException {
void notifyAfterCompleted() throws ServiceNotProvidedException {
for (ModuleProvider provider : startupSequence) {
provider.notifyAfterCompleted();
}
......@@ -72,13 +69,9 @@ public class BootstrapFlow {
private void makeSequence() throws CycleDependencyException {
List<ModuleProvider> allProviders = new ArrayList<>();
loadedModules.forEach((moduleName, module) -> {
module.providers().forEach(provider -> {
allProviders.add(provider);
});
});
loadedModules.forEach((moduleName, module) -> allProviders.addAll(module.providers()));
while (true) {
do {
int numOfToBeSequenced = allProviders.size();
for (int i = 0; i < allProviders.size(); i++) {
ModuleProvider provider = allProviders.get(i);
......@@ -114,14 +107,10 @@ public class BootstrapFlow {
if (numOfToBeSequenced == allProviders.size()) {
StringBuilder unSequencedProviders = new StringBuilder();
allProviders.forEach(provider -> {
unSequencedProviders.append(provider.getModuleName()).append("[provider=").append(provider.getClass().getName()).append("]\n");
});
allProviders.forEach(provider -> unSequencedProviders.append(provider.getModuleName()).append("[provider=").append(provider.getClass().getName()).append("]\n"));
throw new CycleDependencyException("Exist cycle module dependencies in \n" + unSequencedProviders.substring(0, unSequencedProviders.length() - 1));
}
if (allProviders.size() == 0) {
break;
}
}
while (allProviders.size() != 0);
}
}
......@@ -16,11 +16,13 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
import java.lang.reflect.Field;
import java.util.Enumeration;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.ServiceLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -54,7 +56,7 @@ public abstract class Module {
* @throws ProviderNotFoundException when even don't find a single one providers.
*/
void prepare(ModuleManager moduleManager,
ApplicationConfiguration.ModuleConfiguration configuration) throws ProviderNotFoundException, ServiceNotProvidedException {
ApplicationConfiguration.ModuleConfiguration configuration) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException {
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
boolean providerExist = false;
for (ModuleProvider provider : moduleProviderLoader) {
......@@ -67,9 +69,7 @@ public abstract class Module {
ModuleProvider newProvider;
try {
newProvider = provider.getClass().newInstance();
} catch (InstantiationException e) {
throw new ProviderNotFoundException(e);
} catch (IllegalAccessException e) {
} catch (InstantiationException | IllegalAccessException e) {
throw new ProviderNotFoundException(e);
}
newProvider.setManager(moduleManager);
......@@ -84,10 +84,46 @@ public abstract class Module {
for (ModuleProvider moduleProvider : loadedProviders) {
logger.info("Prepare the {} provider in {} module.", moduleProvider.name(), this.name());
moduleProvider.prepare(configuration.getProviderConfiguration(moduleProvider.name()));
try {
copyProperties(moduleProvider.createConfigBeanIfAbsent(), configuration.getProviderConfiguration(moduleProvider.name()), this.name(), moduleProvider.name());
} catch (IllegalAccessException e) {
throw new ModuleConfigException(this.name() + " module config transport to config bean failure.", e);
}
moduleProvider.prepare();
}
}
private void copyProperties(ModuleConfig dest, Properties src, String moduleName,
String providerName) throws IllegalAccessException {
Enumeration<?> propertyNames = src.propertyNames();
while (propertyNames.hasMoreElements()) {
String propertyName = (String)propertyNames.nextElement();
Class<? extends ModuleConfig> destClass = dest.getClass();
try {
Field field = getDeclaredField(destClass, propertyName);
field.setAccessible(true);
field.set(dest, src.get(propertyName));
} catch (NoSuchFieldException e) {
logger.warn(propertyName + " setting is not supported in " + providerName + " provider of " + moduleName + " module");
}
}
}
private Field getDeclaredField(Class<?> destClass, String fieldName) throws NoSuchFieldException {
if (destClass != null) {
Field[] fields = destClass.getDeclaredFields();
for (Field field : fields) {
if (field.getName().equals(fieldName)) {
return field;
}
}
return getDeclaredField(destClass.getSuperclass(), fieldName);
}
throw new NoSuchFieldException();
}
/**
* @return providers of this module
*/
......@@ -95,7 +131,7 @@ public abstract class Module {
return loadedProviders;
}
final ModuleProvider provider() throws ProviderNotFoundException, DuplicateProviderException {
final ModuleProvider provider() throws DuplicateProviderException {
if (loadedProviders.size() > 1) {
throw new DuplicateProviderException(this.name() + " module exist " + loadedProviders.size() + " providers");
}
......@@ -106,7 +142,7 @@ public abstract class Module {
public final <T extends Service> T getService(Class<T> serviceType) throws ServiceNotProvidedRuntimeException {
try {
return provider().getService(serviceType);
} catch (ProviderNotFoundException | DuplicateProviderException | ServiceNotProvidedException e) {
} catch (DuplicateProviderException | ServiceNotProvidedException e) {
throw new ServiceNotProvidedRuntimeException(e.getMessage());
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.module;
/**
* @author peng-yongsheng
*/
public abstract class ModuleConfig {
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.core.module;
/**
* @author peng-yongsheng
*/
public class ModuleConfigException extends Exception {
public ModuleConfigException(String message, Throwable cause) {
super(message, cause);
}
}
......@@ -35,23 +35,19 @@ public class ModuleManager {
/**
* Init the given modules
*
* @param applicationConfiguration
*/
public void init(
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException {
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException {
String[] moduleNames = applicationConfiguration.moduleList();
ServiceLoader<Module> moduleServiceLoader = ServiceLoader.load(Module.class);
LinkedList<String> moduleList = new LinkedList(Arrays.asList(moduleNames));
LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
for (Module module : moduleServiceLoader) {
for (String moduleName : moduleNames) {
if (moduleName.equals(module.name())) {
Module newInstance;
try {
newInstance = module.getClass().newInstance();
} catch (InstantiationException e) {
throw new ModuleNotFoundException(e);
} catch (IllegalAccessException e) {
} catch (InstantiationException | IllegalAccessException e) {
throw new ModuleNotFoundException(e);
}
newInstance.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName));
......@@ -67,9 +63,9 @@ public class ModuleManager {
throw new ModuleNotFoundException(moduleList.toString() + " missing.");
}
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules, applicationConfiguration);
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
bootstrapFlow.start(this, applicationConfiguration);
bootstrapFlow.start(this);
bootstrapFlow.notifyAfterCompleted();
}
......
......@@ -16,12 +16,10 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* The <code>ModuleProvider</code> is an implementation of a {@link Module}.
......@@ -60,24 +58,23 @@ public abstract class ModuleProvider {
*/
public abstract Class<? extends Module> module();
/**
* @return ModuleConfig
*/
public abstract ModuleConfig createConfigBeanIfAbsent();
/**
* In prepare stage, the module should initialize things which are irrelative other modules.
*
* @param config from `application.yml`
*/
public abstract void prepare(Properties config) throws ServiceNotProvidedException;
public abstract void prepare() throws ServiceNotProvidedException;
/**
* In start stage, the module has been ready for interop.
*
* @param config from `application.yml`
*/
public abstract void start(Properties config) throws ServiceNotProvidedException;
public abstract void start() throws ServiceNotProvidedException;
/**
* This callback executes after all modules start up successfully.
*
* @throws ServiceNotProvidedException
*/
public abstract void notifyAfterCompleted() throws ServiceNotProvidedException;
......@@ -88,9 +85,6 @@ public abstract class ModuleProvider {
/**
* Register a implementation for the service of this module provider.
*
* @param serviceType
* @param service
*/
protected final void registerServiceImplementation(Class<? extends Service> serviceType,
Service service) throws ServiceNotProvidedException {
......@@ -122,7 +116,8 @@ public abstract class ModuleProvider {
}
}
<T extends Service> T getService(Class<T> serviceType) throws ServiceNotProvidedException {
@SuppressWarnings("unchecked") <T extends Service> T getService(
Class<T> serviceType) throws ServiceNotProvidedException {
Service serviceImpl = services.get(serviceType);
if (serviceImpl != null) {
return (T)serviceImpl;
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
public class ProviderNotFoundException extends Exception {
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
public class ServiceNotProvidedException extends Exception {
......
......@@ -17,14 +17,14 @@
cluster:
standalone:
url: jdbc:h2:~/memorydb
user_name: sa
cache:
guava:
userName: sa
naming:
jetty:
host: localhost
port: 10800
context_path: /
contextPath: /
cache:
guava:
remote:
gRPC:
host: localhost
......@@ -33,34 +33,45 @@ agent_gRPC:
gRPC:
host: localhost
port: 11800
#Set these two setting to open ssl
#sslCertChainFile: $path
#sslPrivateKeyFile: $path
#Set your own token to active auth
#authentication: xxxxxx
agent_jetty:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
analysis_segment_parser:
default:
buffer_file_path: ../buffer/
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
bufferFilePath: ../buffer/
bufferOffsetMaxFileSize: 10M
bufferSegmentMaxFileSize: 500M
analysis_jvm:
default:
analysis_register:
default:
analysis_metric:
default:
analysis_alarm:
default:
configuration:
default:
application_apdex_threshold: 2000
service_error_rate_threshold: 10.00
service_average_response_time_threshold: 2000
instance_error_rate_threshold: 10.00
instance_average_response_time_threshold: 2000
application_error_rate_threshold: 10.00
application_average_response_time_threshold: 2000
# namespace: xxxxx
applicationApdexThreshold: 2000
serviceErrorRateThreshold: 10.00
serviceAverageResponseTimeThreshold: 2000
instanceErrorRateThreshold: 10.00
instanceAverageResponseTimeThreshold: 2000
applicationErrorRateThreshold: 10.00
applicationAverageResponseTimeThreshold: 2000
ui:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
jetty_manager:
default:
gRPC_manager:
......@@ -68,4 +79,4 @@ gRPC_manager:
storage:
h2:
url: jdbc:h2:~/memorydb
user_name: sa
\ No newline at end of file
userName: sa
\ No newline at end of file
......@@ -16,11 +16,8 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
import java.util.Properties;
/**
* @author wu-sheng
*/
......@@ -29,23 +26,29 @@ public class ModuleAProvider extends ModuleProvider {
return "P-A";
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return null;
}
@Override public Class<? extends Module> module() {
return BaseModuleA.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(BaseModuleA.ServiceABusiness1.class, new ModuleABusiness1Impl());
this.registerServiceImplementation(BaseModuleA.ServiceABusiness2.class, new ModuleABusiness2Impl());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[0];
}
class Config {
}
}
......@@ -16,36 +16,40 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
import java.util.Properties;
/**
* @author wu-sheng
*/
public class ModuleBProvider extends ModuleProvider {
@Override public String name() {
return "P-B";
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
return null;
}
@Override public Class<? extends Module> module() {
return BaseModuleB.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(BaseModuleB.ServiceBBusiness1.class, new ModuleBBusiness1Impl());
this.registerServiceImplementation(BaseModuleB.ServiceBBusiness2.class, new ModuleBBusiness2Impl());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[0];
}
class Config {
}
}
......@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.collector.core.module;
import java.util.Properties;
import org.junit.Assert;
import org.junit.Test;
......@@ -27,11 +28,11 @@ import org.junit.Test;
*/
public class ModuleManagerTest {
@Test
public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException {
public void testInit() throws ServiceNotProvidedException, ModuleNotFoundException, ProviderNotFoundException, DuplicateProviderException, ModuleConfigException {
ApplicationConfiguration configuration = new ApplicationConfiguration();
configuration.addModule("Test").addProviderConfiguration("TestModule-Provider", null);
configuration.addModule("BaseA").addProviderConfiguration("P-A", null);
configuration.addModule("BaseB").addProviderConfiguration("P-B", null);
configuration.addModule("Test").addProviderConfiguration("TestModule-Provider", new Properties());
configuration.addModule("BaseA").addProviderConfiguration("P-A", new Properties());
configuration.addModule("BaseB").addProviderConfiguration("P-B", new Properties());
ModuleManager manager = new ModuleManager();
manager.init(configuration);
......
......@@ -16,11 +16,8 @@
*
*/
package org.apache.skywalking.apm.collector.core.module;
import java.util.Properties;
/**
* @author wu-sheng
*/
......@@ -33,19 +30,23 @@ public class TestModuleProvider extends ModuleProvider {
return TestModule.class;
}
@Override public void prepare(Properties config) {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return null;
}
@Override public void start(Properties config) {
@Override public void prepare() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void start() {
}
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[] {"BaseA", "BaseB"};
}
class Config {
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.grpc.manager;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class GRPCManagerConfig extends ModuleConfig {
}
......@@ -20,8 +20,8 @@ package org.apache.skywalking.apm.collector.grpc.manager;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
......@@ -38,8 +38,14 @@ public class GRPCManagerProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(GRPCManagerProvider.class);
private final GRPCManagerConfig config;
private Map<String, GRPCServer> servers = new HashMap<>();
public GRPCManagerProvider() {
super();
this.config = new GRPCManagerConfig();
}
@Override public String name() {
return "default";
}
......@@ -48,15 +54,18 @@ public class GRPCManagerProvider extends ModuleProvider {
return GRPCManagerModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(GRPCManagerService.class, new GRPCManagerServiceImpl(servers));
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(GRPCManagerService.class, new GRPCManagerServiceImpl(servers));
}
@Override public void start() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
servers.values().forEach(server -> {
try {
server.start();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.jetty.manager;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
/**
* @author peng-yongsheng
*/
class JettyManagerConfig extends ModuleConfig {
}
......@@ -20,8 +20,8 @@ package org.apache.skywalking.apm.collector.jetty.manager;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
......@@ -38,8 +38,13 @@ public class JettyManagerProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(JettyManagerProvider.class);
private final JettyManagerConfig jettyManagerConfig;
private Map<String, JettyServer> servers = new HashMap<>();
public JettyManagerProvider() {
this.jettyManagerConfig = new JettyManagerConfig();
}
@Override public String name() {
return "default";
}
......@@ -48,15 +53,18 @@ public class JettyManagerProvider extends ModuleProvider {
return JettyManagerModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(JettyManagerService.class, new JettyManagerServiceImpl(servers));
@Override public ModuleConfig createConfigBeanIfAbsent() {
return jettyManagerConfig;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(JettyManagerService.class, new JettyManagerServiceImpl(servers));
}
@Override public void start() {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
servers.values().forEach(server -> {
try {
server.start();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.naming.jetty;
import org.apache.skywalking.apm.collector.server.jetty.JettyServerConfig;
/**
* @author peng-yongsheng
*/
class NamingModuleJettyConfig extends JettyServerConfig {
}
......@@ -16,28 +16,30 @@
*
*/
package org.apache.skywalking.apm.collector.naming.jetty;
import java.util.Properties;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.jetty.service.NamingJettyHandlerRegisterService;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
/**
* @author peng-yongsheng
*/
public class NamingModuleJettyProvider extends ModuleProvider {
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CONTEXT_PATH = "context_path";
private final NamingModuleJettyConfig config;
public NamingModuleJettyProvider() {
super();
this.config = new NamingModuleJettyConfig();
}
@Override public String name() {
return "jetty";
......@@ -47,22 +49,20 @@ public class NamingModuleJettyProvider extends ModuleProvider {
return NamingModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
final String host = config.getProperty(HOST);
final Integer port = (Integer)config.get(PORT);
this.registerServiceImplementation(NamingHandlerRegisterService.class, new NamingJettyHandlerRegisterService(host, port, getManager()));
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
String contextPath = config.getProperty(CONTEXT_PATH);
@Override public void prepare() throws ServiceNotProvidedException {
this.registerServiceImplementation(NamingHandlerRegisterService.class, new NamingJettyHandlerRegisterService(config.getHost(), config.getPort(), getManager()));
}
@Override public void start() {
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
managerService.createIfAbsent(host, port, contextPath);
managerService.createIfAbsent(config.getHost(), config.getPort(), config.getContextPath());
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.remote.grpc;
import org.apache.skywalking.apm.collector.client.grpc.GRPCClientConfig;
/**
* @author peng-yongsheng
*/
class RemoteModuleGRPCConfig extends GRPCClientConfig {
private int channelSize;
private int bufferSize;
public int getChannelSize() {
return channelSize;
}
public void setChannelSize(int channelSize) {
this.channelSize = channelSize;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
}
......@@ -16,13 +16,13 @@
*
*/
package org.apache.skywalking.apm.collector.remote.grpc;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
......@@ -35,23 +35,22 @@ import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterServ
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class RemoteModuleGRPCProvider extends ModuleProvider {
private final RemoteModuleGRPCConfig config;
public static final String NAME = "gRPC";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CHANNEL_SIZE = "channel_size";
private static final String BUFFER_SIZE = "buffer_size";
private GRPCRemoteSenderService remoteSenderService;
private CommonRemoteDataRegisterService remoteDataRegisterService;
public RemoteModuleGRPCProvider() {
super();
this.config = new RemoteModuleGRPCConfig();
}
@Override public String name() {
return NAME;
}
......@@ -60,35 +59,33 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
return RemoteModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
Integer channelSize = (Integer)config.getOrDefault(CHANNEL_SIZE, 5);
Integer bufferSize = (Integer)config.getOrDefault(BUFFER_SIZE, 1000);
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
Integer channelSize = config.getChannelSize() == 0 ? 5 : config.getChannelSize();
Integer bufferSize = config.getBufferSize() == 0 ? 1000 : config.getBufferSize();
remoteDataRegisterService = new CommonRemoteDataRegisterService();
remoteSenderService = new GRPCRemoteSenderService(host, port, channelSize, bufferSize, remoteDataRegisterService);
remoteSenderService = new GRPCRemoteSenderService(config.getHost(), config.getPort(), channelSize, bufferSize, remoteDataRegisterService);
this.registerServiceImplementation(RemoteSenderService.class, remoteSenderService);
this.registerServiceImplementation(RemoteDataRegisterService.class, remoteDataRegisterService);
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
@Override public void start() throws ServiceNotProvidedException {
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
GRPCServer gRPCServer = managerService.createIfAbsent(host, port);
GRPCServer gRPCServer = managerService.createIfAbsent(config.getHost(), config.getPort());
gRPCServer.addHandler(new RemoteCommonServiceHandler(remoteDataRegisterService));
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(host, port));
moduleRegisterService.register(RemoteModule.NAME, this.name(), new RemoteModuleGRPCRegistration(config.getHost(), config.getPort()));
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(remoteSenderService);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
......@@ -18,21 +18,16 @@
package org.apache.skywalking.apm.collector.remote.grpc.service;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.client.grpc.GRPCClient;
import org.apache.skywalking.apm.collector.remote.service.RemoteClient;
import org.apache.skywalking.apm.collector.remote.service.RemoteClientService;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataIDGetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class GRPCRemoteClientService implements RemoteClientService {
private static final Logger logger = LoggerFactory.getLogger(GRPCRemoteClientService.class);
private final RemoteDataIDGetter remoteDataIDGetter;
GRPCRemoteClientService(RemoteDataIDGetter remoteDataIDGetter) {
......@@ -41,11 +36,7 @@ public class GRPCRemoteClientService implements RemoteClientService {
@Override public RemoteClient create(String host, int port, int channelSize, int bufferSize) {
GRPCClient client = new GRPCClient(host, port);
try {
client.initialize();
} catch (ClientException e) {
logger.error(e.getMessage(), e);
}
client.initialize();
return new GRPCRemoteClient(client, remoteDataIDGetter, channelSize, bufferSize);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.es;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClientConfig;
/**
* @author peng-yongsheng
*/
class StorageModuleEsConfig extends ElasticSearchClientConfig {
private Integer indexShardsNumber;
private Integer indexReplicasNumber;
private Integer ttl;
Integer getIndexShardsNumber() {
return indexShardsNumber;
}
void setIndexShardsNumber(Integer indexShardsNumber) {
this.indexShardsNumber = indexShardsNumber;
}
Integer getIndexReplicasNumber() {
return indexReplicasNumber;
}
void setIndexReplicasNumber(Integer indexReplicasNumber) {
this.indexReplicasNumber = indexReplicasNumber;
}
Integer getTtl() {
return ttl;
}
void setTtl(Integer ttl) {
this.ttl = ttl;
}
}
......@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.storage.es;
import java.util.Properties;
import java.util.UUID;
import org.apache.skywalking.apm.collector.client.ClientException;
import org.apache.skywalking.apm.collector.client.elasticsearch.ElasticSearchClient;
......@@ -28,8 +27,10 @@ import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService
import org.apache.skywalking.apm.collector.configuration.ConfigurationModule;
import org.apache.skywalking.apm.collector.configuration.service.ICollectorConfig;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.storage.StorageException;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
......@@ -254,32 +255,29 @@ public class StorageModuleEsProvider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleEsProvider.class);
static final String NAME = "elasticsearch";
private static final String CLUSTER_NAME = "cluster_name";
private static final String CLUSTER_TRANSPORT_SNIFFER = "cluster_transport_sniffer";
private static final String CLUSTER_NODES = "cluster_nodes";
private static final String INDEX_SHARDS_NUMBER = "index_shards_number";
private static final String INDEX_REPLICAS_NUMBER = "index_replicas_number";
private static final String TIME_TO_LIVE_OF_DATA = "ttl";
private final StorageModuleEsConfig config;
private ElasticSearchClient elasticSearchClient;
private DataTTLKeeperTimer deleteTimer;
@Override
public String name() {
public StorageModuleEsProvider() {
super();
this.config = new StorageModuleEsConfig();
}
@Override public String name() {
return NAME;
}
@Override
public Class<? extends Module> module() {
@Override public Class<? extends Module> module() {
return StorageModule.class;
}
@Override
public void prepare(Properties config) throws ServiceNotProvidedException {
String clusterName = config.getProperty(CLUSTER_NAME);
Boolean clusterTransportSniffer = (Boolean)config.get(CLUSTER_TRANSPORT_SNIFFER);
String clusterNodes = config.getProperty(CLUSTER_NODES);
elasticSearchClient = new ElasticSearchClient(clusterName, clusterTransportSniffer, clusterNodes);
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
elasticSearchClient = new ElasticSearchClient(config.getClusterName(), config.getClusterTransportSniffer(), config.getClusterNodes());
this.registerServiceImplementation(IBatchDAO.class, new BatchEsDAO(elasticSearchClient));
registerCacheDAO();
......@@ -290,16 +288,14 @@ public class StorageModuleEsProvider extends ModuleProvider {
}
@Override
public void start(Properties config) throws ServiceNotProvidedException {
Integer indexShardsNumber = (Integer)config.get(INDEX_SHARDS_NUMBER);
Integer indexReplicasNumber = (Integer)config.get(INDEX_REPLICAS_NUMBER);
public void start() {
try {
String namespace = getManager().find(ConfigurationModule.NAME).getService(ICollectorConfig.class).getNamespace();
elasticSearchClient.setNamespace(namespace);
elasticSearchClient.initialize();
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(indexShardsNumber, indexReplicasNumber);
ElasticSearchStorageInstaller installer = new ElasticSearchStorageInstaller(config.getIndexShardsNumber(), config.getIndexReplicasNumber());
installer.install(elasticSearchClient);
} catch (ClientException | StorageException e) {
logger.error(e.getMessage(), e);
......@@ -313,18 +309,18 @@ public class StorageModuleEsProvider extends ModuleProvider {
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
moduleListenerService.addListener(namingListener);
Integer beforeDay = (Integer)config.getOrDefault(TIME_TO_LIVE_OF_DATA, 3);
Integer beforeDay = config.getTtl() == 0 ? 3 : config.getTtl();
deleteTimer = new DataTTLKeeperTimer(getManager(), namingListener, uuId + 0, beforeDay);
}
@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException {
public void notifyAfterCompleted() {
deleteTimer.start();
}
@Override
public String[] requiredModules() {
return new String[] {ClusterModule.NAME, ConfigurationModule.NAME};
return new String[] {ClusterModule.NAME, ConfigurationModule.NAME, RemoteModule.NAME};
}
private void registerCacheDAO() throws ServiceNotProvidedException {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.storage.h2;
import org.apache.skywalking.apm.collector.client.h2.H2ClientConfig;
/**
* @author peng-yongsheng
*/
class StorageModuleH2Config extends H2ClientConfig {
}
......@@ -18,12 +18,13 @@
package org.apache.skywalking.apm.collector.storage.h2;
import java.util.Properties;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.storage.StorageException;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
......@@ -247,11 +248,12 @@ public class StorageModuleH2Provider extends ModuleProvider {
private static final Logger logger = LoggerFactory.getLogger(StorageModuleH2Provider.class);
private static final String URL = "url";
private static final String USER_NAME = "user_name";
private static final String PASSWORD = "password";
private H2Client h2Client;
private final StorageModuleH2Config config;
public StorageModuleH2Provider() {
this.config = new StorageModuleH2Config();
}
@Override public String name() {
return "h2";
......@@ -261,11 +263,12 @@ public class StorageModuleH2Provider extends ModuleProvider {
return StorageModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
String url = config.getProperty(URL);
String userName = config.getProperty(USER_NAME);
String password = config.getProperty(PASSWORD);
h2Client = new H2Client(url, userName, password);
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void prepare() throws ServiceNotProvidedException {
h2Client = new H2Client(config.getUrl(), config.getUserName(), config.getPassword());
this.registerServiceImplementation(IBatchDAO.class, new BatchH2DAO(h2Client));
registerCacheDAO();
......@@ -275,7 +278,7 @@ public class StorageModuleH2Provider extends ModuleProvider {
registerAlarmDAO();
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
@Override public void start() {
try {
h2Client.initialize();
......@@ -286,12 +289,11 @@ public class StorageModuleH2Provider extends ModuleProvider {
}
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
return new String[0];
return new String[] {RemoteModule.NAME};
}
private void registerCacheDAO() throws ServiceNotProvidedException {
......
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.collector.storage.h2.base.dao;
import java.sql.Connection;
......@@ -26,7 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.storage.base.dao.IBatchDAO;
import org.apache.skywalking.apm.collector.storage.h2.base.define.H2SqlEntity;
import org.slf4j.Logger;
......@@ -76,7 +74,7 @@ public class BatchH2DAO extends H2DAO implements IBatchDAO {
for (String k : batchSqls.keySet()) {
batchSqls.get(k).executeBatch();
}
} catch (SQLException | H2ClientException e) {
} catch (SQLException e) {
logger.error(e.getMessage(), e);
}
batchSqls.clear();
......
......@@ -16,23 +16,21 @@
*
*/
package org.apache.skywalking.apm.collector.storage.h2.base.define;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import org.apache.skywalking.apm.collector.client.Client;
import org.apache.skywalking.apm.collector.client.h2.H2Client;
import org.apache.skywalking.apm.collector.client.h2.H2ClientException;
import org.apache.skywalking.apm.collector.core.data.TableDefine;
import org.apache.skywalking.apm.collector.storage.StorageException;
import org.apache.skywalking.apm.collector.storage.StorageInstallException;
import org.apache.skywalking.apm.collector.storage.StorageInstaller;
import org.apache.skywalking.apm.collector.core.data.TableDefine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
/**
* @author peng-yongsheng
*/
......@@ -58,7 +56,7 @@ public class H2StorageInstaller extends StorageInstaller {
if (rs.next()) {
return true;
}
} catch (SQLException | H2ClientException e) {
} catch (SQLException e) {
throw new StorageInstallException(e.getMessage(), e);
} finally {
try {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.collector.ui.jetty;
import org.apache.skywalking.apm.collector.server.jetty.JettyServerConfig;
/**
* @author peng-yongsheng
*/
class UIModuleJettyConfig extends JettyServerConfig {
}
......@@ -23,8 +23,8 @@ import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleConfig;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
......@@ -36,17 +36,18 @@ import org.apache.skywalking.apm.collector.ui.jetty.handler.GraphQLHandler;
import org.apache.skywalking.apm.collector.ui.jetty.handler.naming.UIJettyNamingHandler;
import org.apache.skywalking.apm.collector.ui.jetty.handler.naming.UIJettyNamingListener;
import java.util.Properties;
/**
* @author peng-yongsheng
*/
public class UIModuleJettyProvider extends ModuleProvider {
public static final String NAME = "jetty";
private static final String HOST = "host";
private static final String PORT = "port";
private static final String CONTEXT_PATH = "context_path";
private final UIModuleJettyConfig config;
public UIModuleJettyProvider() {
super();
this.config = new UIModuleJettyConfig();
}
@Override public String name() {
return NAME;
......@@ -56,16 +57,16 @@ public class UIModuleJettyProvider extends ModuleProvider {
return UIModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
@Override public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
String host = config.getProperty(HOST);
Integer port = (Integer)config.get(PORT);
String contextPath = config.getProperty(CONTEXT_PATH);
@Override public void prepare() {
}
@Override public void start() {
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
moduleRegisterService.register(UIModule.NAME, this.name(), new UIModuleJettyRegistration(host, port, contextPath));
moduleRegisterService.register(UIModule.NAME, this.name(), new UIModuleJettyRegistration(config.getHost(), config.getPort(), config.getContextPath()));
UIJettyNamingListener namingListener = new UIJettyNamingListener();
ModuleListenerService moduleListenerService = getManager().find(ClusterModule.NAME).getService(ModuleListenerService.class);
......@@ -75,12 +76,11 @@ public class UIModuleJettyProvider extends ModuleProvider {
namingHandlerRegisterService.register(new UIJettyNamingHandler(namingListener));
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
JettyServer jettyServer = managerService.createIfAbsent(host, port, contextPath);
JettyServer jettyServer = managerService.createIfAbsent(config.getHost(), config.getPort(), config.getContextPath());
addHandlers(jettyServer);
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
@Override public void notifyAfterCompleted() {
}
@Override public String[] requiredModules() {
......
......@@ -40,7 +40,7 @@ naming:
jetty:
host: localhost
port: 10800
context_path: /
contextPath: /
remote:
gRPC:
host: localhost
......@@ -53,30 +53,40 @@ agent_jetty:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
analysis_register:
default:
analysis_jvm:
default:
analysis_segment_parser:
default:
buffer_file_path: ../buffer/
buffer_offset_max_file_size: 10M
buffer_segment_max_file_size: 500M
bufferFilePath: ../buffer/
bufferOffsetMaxFileSize: 10M
bufferSegmentMaxFileSize: 500M
ui:
jetty:
host: localhost
port: 12800
context_path: /
contextPath: /
# Config Elasticsearch cluster connection info.
storage:
elasticsearch:
cluster_name: CollectorDBCluster
cluster_transport_sniffer: true
cluster_nodes: localhost:9300
index_shards_number: 2
index_replicas_number: 0
clusterName: CollectorDBCluster
clusterTransportSniffer: true
clusterNodes: localhost:9300
indexShardsNumber: 2
indexReplicasNumber: 0
ttl: 7
configuration:
default:
# namespace: xxxxx
applicationApdexThreshold: 2000
serviceErrorRateThreshold: 10.00
serviceAverageResponseTimeThreshold: 2000
instanceErrorRateThreshold: 10.00
instanceAverageResponseTimeThreshold: 2000
applicationErrorRateThreshold: 10.00
applicationAverageResponseTimeThreshold: 2000
```
3. Run `bin/collectorService.sh`
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册