未验证 提交 f625f609 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Support direct server setting in agent. (#975)

* Support direct server setting in agent.

* Fix format.

* Add direct_servers check (#981)
上级 56c090b6
......@@ -80,11 +80,24 @@ public class Config {
*/
public static long DISCOVERY_CHECK_INTERVAL = 60;
/**
* Collector REST-Service address. e.g. SERVERS="127.0.0.1:8080" for single collector node.
* SERVERS="10.2.45.126:8080,10.2.45.127:7600" for multi collector nodes.
* Collector naming/jetty service addresses.
* Primary address setting.
*
* e.g.
* SERVERS="127.0.0.1:10800" for single collector node.
* SERVERS="10.2.45.126:10800,10.2.45.127:10800" for multi collector nodes.
*/
public static String SERVERS = "";
/**
* Collector agent_gRPC/grpc service addresses.
* Secondary address setting, only effect when #SERVERS is empty.
*
* By using this, no discovery mechanism provided. The agent only uses these addresses to uplink data.
*
*/
public static String DIRECT_SERVERS = "";
/**
* Collector service discovery REST service name
*/
......
......@@ -73,8 +73,8 @@ public class SnifferConfigInitializer {
if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) {
throw new ExceptionInInitializerError("`agent.application_code` is missing.");
}
if (StringUtil.isEmpty(Config.Collector.SERVERS)) {
throw new ExceptionInInitializerError("`collector.servers` is missing.");
if (StringUtil.isEmpty(Config.Collector.SERVERS) && StringUtil.isEmpty(Config.Collector.DIRECT_SERVERS)) {
throw new ExceptionInInitializerError("`collector.direct_servers` and `collector.servers` cannot be empty at the same time.");
}
IS_INIT_COMPLETED = true;
......
......@@ -18,16 +18,19 @@
package org.apache.skywalking.apm.agent.core.remote;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
*
......@@ -38,22 +41,34 @@ public class CollectorDiscoveryService implements BootService {
private ScheduledFuture<?> future;
@Override
public void beforeBoot() throws Throwable {
public void beforeBoot() {
}
@Override
public void boot() throws Throwable {
public void boot() {
DiscoveryRestServiceClient discoveryRestServiceClient = new DiscoveryRestServiceClient();
discoveryRestServiceClient.run();
future = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(discoveryRestServiceClient,
new RunnableWithExceptionProtection.CallbackWhenException() {
@Override public void handle(Throwable t) {
if (discoveryRestServiceClient.hasNamingServer()) {
discoveryRestServiceClient.run();
future = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(discoveryRestServiceClient, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), Config.Collector.DISCOVERY_CHECK_INTERVAL,
Config.Collector.DISCOVERY_CHECK_INTERVAL, TimeUnit.SECONDS);
}),
Config.Collector.DISCOVERY_CHECK_INTERVAL,
Config.Collector.DISCOVERY_CHECK_INTERVAL,
TimeUnit.SECONDS);
} else {
if (Config.Collector.DIRECT_SERVERS == null || Config.Collector.DIRECT_SERVERS.trim().length() == 0) {
logger.error("Collector server and direct server addresses are both not set.");
logger.error("Agent will not uplink any data.");
return;
}
RemoteDownstreamConfig.Collector.GRPC_SERVERS = Arrays.asList(Config.Collector.DIRECT_SERVERS.split(","));
}
}
@Override
......@@ -63,6 +78,8 @@ public class CollectorDiscoveryService implements BootService {
@Override
public void shutdown() throws Throwable {
future.cancel(true);
if (future != null) {
future.cancel(true);
}
}
}
......@@ -22,10 +22,6 @@ package org.apache.skywalking.apm.agent.core.remote;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
......@@ -36,6 +32,11 @@ import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import static org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig.Collector.GRPC_SERVERS;
/**
......@@ -55,7 +56,7 @@ public class DiscoveryRestServiceClient implements Runnable {
public DiscoveryRestServiceClient() {
if (Config.Collector.SERVERS == null || Config.Collector.SERVERS.trim().length() == 0) {
logger.warn("Collector server not configured.");
logger.warn("Collector server not set.");
return;
}
......@@ -64,7 +65,10 @@ public class DiscoveryRestServiceClient implements Runnable {
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
}
boolean hasNamingServer() {
return serverList != null && serverList.length > 0;
}
@Override
......@@ -138,9 +142,9 @@ public class DiscoveryRestServiceClient implements Runnable {
}
HttpGet httpGet = new HttpGet("http://" + serverList[selectedServer] + Config.Collector.DISCOVERY_SERVICE_NAME);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT).build();
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT).build();
httpGet.setConfig(requestConfig);
return httpGet;
}
......
......@@ -17,11 +17,21 @@ agent.application_code=Your_ApplicationName
# agent.is_open_debugging_class = true
# Server addresses.
# Primary address setting.
#
# Mapping to `naming/jetty/ip:port` in `config/application.yml` of Collector.
# Examples:
# Single collector:SERVERS="127.0.0.1:8080"
# Collector cluster:SERVERS="10.2.45.126:8080,10.2.45.127:7600"
collector.servers=127.0.0.1:10800
# Collector agent_gRPC/grpc service addresses.
# Secondary address setting, only effect when "collector.servers" is empty.
# By using this, no discovery mechanism provided. The agent only uses these addresses to uplink data.
# Recommend to use this only when collector cluster IPs are unreachable from agent side. Such as:
# 1. Agent and collector cluster are in different VPC in Cloud.
# 2. Agent uplinks data to collector cluster through Internet.
# collector.direct_servers=www.skywalking.service.io
# Logging level
logging.level=DEBUG
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册