提交 1ec420b0 编写于 作者: wu-sheng's avatar wu-sheng

Remove collector.servers and collector.direct_servers, and add...

Remove collector.servers and collector.direct_servers, and add collector.backend_service setting. cc @ascrutae
上级 fc3b440a
Subproject commit 75c74186a1548657013a299f388e6e8b7b4b5251
Subproject commit 33b132bffaabacbd003eec41b498d2810f386161
......@@ -82,11 +82,6 @@
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.3</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
......
......@@ -82,32 +82,9 @@ public class Config {
*/
public static long APP_AND_SERVICE_REGISTER_CHECK_INTERVAL = 3;
/**
* discovery rest check interval
* Collector skywalking trace receiver service addresses.
*/
public static long DISCOVERY_CHECK_INTERVAL = 60;
/**
* Collector naming/jetty service addresses.
* Primary address setting.
*
* e.g.
* SERVERS="127.0.0.1:10800" for single collector node.
* SERVERS="10.2.45.126:10800,10.2.45.127:10800" for multi collector nodes.
*/
public static String SERVERS = "";
/**
* Collector agent_gRPC/grpc service addresses.
* Secondary address setting, only effect when #SERVERS is empty.
*
* By using this, no discovery mechanism provided. The agent only uses these addresses to uplink data.
*
*/
public static String DIRECT_SERVERS = "";
/**
* Collector service discovery REST service name
*/
public static String DISCOVERY_SERVICE_NAME = "/agent/gRPC";
public static String BACKEND_SERVICE = "";
}
public static class Jvm {
......
......@@ -19,8 +19,6 @@
package org.apache.skywalking.apm.agent.core.conf;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
/**
......@@ -35,11 +33,4 @@ public class RemoteDownstreamConfig {
public volatile static int APPLICATION_INSTANCE_ID = DictionaryUtil.nullValue();
}
public static class Collector {
/**
* Collector GRPC-Service address.
*/
public volatile static List<String> GRPC_SERVERS = new LinkedList<String>();
}
}
......@@ -75,7 +75,7 @@ public class SnifferConfigInitializer {
if (StringUtil.isEmpty(Config.Agent.APPLICATION_CODE)) {
throw new ExceptionInInitializerError("`agent.application_code` is missing.");
}
if (StringUtil.isEmpty(Config.Collector.SERVERS) && StringUtil.isEmpty(Config.Collector.DIRECT_SERVERS)) {
if (StringUtil.isEmpty(Config.Collector.BACKEND_SERVICE)) {
throw new ExceptionInInitializerError("`collector.direct_servers` and `collector.servers` cannot be empty at the same time.");
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.Arrays;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* The <code>CollectorDiscoveryService</code> is responsible for start {@link DiscoveryRestServiceClient}.
*
* @author wusheng
*/
@DefaultImplementor
public class CollectorDiscoveryService implements BootService {
private static final ILog logger = LogManager.getLogger(CollectorDiscoveryService.class);
private ScheduledFuture<?> future;
@Override
public void prepare() {
}
@Override
public void boot() {
DiscoveryRestServiceClient discoveryRestServiceClient = new DiscoveryRestServiceClient();
if (discoveryRestServiceClient.hasNamingServer()) {
discoveryRestServiceClient.run();
future = Executors.newSingleThreadScheduledExecutor(
new DefaultNamedThreadFactory("CollectorDiscoveryService"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(discoveryRestServiceClient, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}),
Config.Collector.DISCOVERY_CHECK_INTERVAL,
Config.Collector.DISCOVERY_CHECK_INTERVAL,
TimeUnit.SECONDS);
} else {
if (Config.Collector.DIRECT_SERVERS == null || Config.Collector.DIRECT_SERVERS.trim().length() == 0) {
logger.error("Collector server and direct server addresses are both not set.");
logger.error("Agent will not uplink any data.");
return;
}
RemoteDownstreamConfig.Collector.GRPC_SERVERS = Arrays.asList(Config.Collector.DIRECT_SERVERS.split(","));
}
}
@Override
public void onComplete() throws Throwable {
}
@Override
public void shutdown() throws Throwable {
if (future != null) {
future.cancel(true);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import static org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig.Collector.GRPC_SERVERS;
/**
* The <code>DiscoveryRestServiceClient</code> try to get the collector's grpc-server list
* in every 60 seconds,
* and override {@link org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig.Collector#GRPC_SERVERS}.
*
* @author wusheng
*/
public class DiscoveryRestServiceClient implements Runnable {
private static final ILog logger = LogManager.getLogger(DiscoveryRestServiceClient.class);
private static final int HTTP_CONNECT_TIMEOUT = 2000;
private static final int HTTP_CONNECTION_REQUEST_TIMEOUT = 1000;
private static final int HTTP_SOCKET_TIMEOUT = 2000;
private String[] serverList;
private volatile int selectedServer = -1;
public DiscoveryRestServiceClient() {
if (Config.Collector.SERVERS == null || Config.Collector.SERVERS.trim().length() == 0) {
logger.warn("Collector server not set.");
return;
}
serverList = Config.Collector.SERVERS.split(",");
Random r = new Random();
if (serverList.length > 0) {
selectedServer = r.nextInt(serverList.length);
}
}
boolean hasNamingServer() {
return serverList != null && serverList.length > 0;
}
@Override
public void run() {
try {
findServerList();
} catch (Throwable t) {
logger.error(t, "Find server list fail.");
}
}
private void findServerList() throws RESTResponseStatusError, IOException {
CloseableHttpClient httpClient = HttpClients.custom().build();
try {
HttpGet httpGet = buildGet();
if (httpGet != null) {
CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
int statusCode = httpResponse.getStatusLine().getStatusCode();
if (200 != statusCode) {
findBackupServer();
throw new RESTResponseStatusError(statusCode);
} else {
JsonArray serverList = new Gson().fromJson(EntityUtils.toString(httpResponse.getEntity()), JsonArray.class);
if (serverList != null && serverList.size() > 0) {
LinkedList<String> newServerList = new LinkedList<String>();
for (JsonElement element : serverList) {
newServerList.add(element.getAsString());
}
if (!isListEquals(newServerList, GRPC_SERVERS)) {
GRPC_SERVERS = newServerList;
logger.debug("Refresh GRPC server list: {}", GRPC_SERVERS);
} else {
logger.debug("GRPC server list remain unchanged: {}", GRPC_SERVERS);
}
}
}
}
} catch (IOException e) {
findBackupServer();
throw e;
} finally {
httpClient.close();
}
}
private boolean isListEquals(List<String> list1, List<String> list2) {
if (list1.size() != list2.size()) {
return false;
}
for (String ip1 : list1) {
if (!list2.contains(ip1)) {
return false;
}
}
return true;
}
/**
* Prepare the given message for HTTP Post service.
*
* @return {@link HttpGet}, when is ready to send. otherwise, null.
*/
private HttpGet buildGet() {
if (selectedServer == -1) {
//no available server
return null;
}
HttpGet httpGet = new HttpGet("http://" + serverList[selectedServer] + Config.Collector.DISCOVERY_SERVICE_NAME);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(HTTP_CONNECT_TIMEOUT)
.setConnectionRequestTimeout(HTTP_CONNECTION_REQUEST_TIMEOUT)
.setSocketTimeout(HTTP_SOCKET_TIMEOUT).build();
httpGet.setConfig(requestConfig);
return httpGet;
}
/**
* Choose the next server in {@link #serverList}, by moving {@link #selectedServer}.
*/
private void findBackupServer() {
selectedServer++;
if (selectedServer >= serverList.length) {
selectedServer = 0;
}
if (serverList.length == 0) {
selectedServer = -1;
}
}
}
......@@ -21,6 +21,7 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
......@@ -32,7 +33,6 @@ import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
......@@ -49,6 +49,7 @@ public class GRPCChannelManager implements BootService, Runnable {
private volatile boolean reconnect = true;
private Random random = new Random();
private List<GRPCChannelListener> listeners = Collections.synchronizedList(new LinkedList<GRPCChannelListener>());
private volatile List<String> grpcServers;
@Override
public void prepare() throws Throwable {
......@@ -57,6 +58,12 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
if (Config.Collector.BACKEND_SERVICE.trim().length() == 0) {
logger.error("Collector server addresses are not set.");
logger.error("Agent will not uplink any data.");
return;
}
grpcServers = Arrays.asList(Config.Collector.BACKEND_SERVICE.split(","));
connectCheckFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
......@@ -85,11 +92,11 @@ public class GRPCChannelManager implements BootService, Runnable {
public void run() {
logger.debug("Selected collector grpc service running, reconnect:{}.", reconnect);
if (reconnect) {
if (RemoteDownstreamConfig.Collector.GRPC_SERVERS.size() > 0) {
if (grpcServers.size() > 0) {
String server = "";
try {
int index = Math.abs(random.nextInt()) % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
int index = Math.abs(random.nextInt()) % grpcServers.size();
server = grpcServers.get(index);
String[] ipAndPort = server.split(":");
managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
......
......@@ -18,7 +18,6 @@
org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient
org.apache.skywalking.apm.agent.core.context.ContextManager
org.apache.skywalking.apm.agent.core.remote.CollectorDiscoveryService
org.apache.skywalking.apm.agent.core.sampling.SamplingService
org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager
org.apache.skywalking.apm.agent.core.jvm.JVMService
......
......@@ -22,21 +22,19 @@ package org.apache.skywalking.apm.agent.core.boot;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.apache.skywalking.apm.agent.core.context.ContextManager;
import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.jvm.JVMService;
import org.apache.skywalking.apm.agent.core.remote.CollectorDiscoveryService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
......@@ -57,11 +55,10 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService = getFieldValue(ServiceManager.INSTANCE, "bootedServices");
assertThat(registryService.size(), is(8));
assertThat(registryService.size(), is(7));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
assertCollectorDiscoveryService(ServiceManager.INSTANCE.findService(CollectorDiscoveryService.class));
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
......@@ -100,10 +97,6 @@ public class ServiceManagerTest {
assertNotNull(service);
}
private void assertCollectorDiscoveryService(CollectorDiscoveryService service) {
assertNotNull(service);
}
private void assertContextManager(ContextManager service) {
assertNotNull(service);
}
......
......@@ -32,11 +32,11 @@ public class SnifferConfigInitializerTest {
@Test
public void testLoadConfigFromJavaAgentDir() throws AgentPackageNotFoundException, ConfigNotFoundException {
System.setProperty("skywalking.agent.application_code", "testApp");
System.setProperty("skywalking.collector.servers", "127.0.0.1:8090");
System.setProperty("skywalking.collector.backend_service", "127.0.0.1:8090");
System.setProperty("skywalking.logging.level", "info");
SnifferConfigInitializer.initialize();
assertThat(Config.Agent.APPLICATION_CODE, is("testApp"));
assertThat(Config.Collector.SERVERS, is("127.0.0.1:8090"));
assertThat(Config.Collector.BACKEND_SERVICE, is("127.0.0.1:8090"));
assertThat(Config.Logging.LEVEL, is(LogLevel.INFO));
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import com.github.tomakehurst.wiremock.junit.WireMockRule;
import java.io.IOException;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.test.tools.AgentServiceRule;
import org.hamcrest.MatcherAssert;
import org.junit.*;
import org.apache.skywalking.apm.agent.core.conf.Config;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
public class DiscoveryRestServiceClientTest {
@Rule
public AgentServiceRule agentServiceRule = new AgentServiceRule();
private DiscoveryRestServiceClient client;
@Rule
public WireMockRule wireMockRule = new WireMockRule(8089);
@AfterClass
public static void afterClass() {
ServiceManager.INSTANCE.shutdown();
}
@Before
public void setUpBeforeClass() {
Config.Collector.DISCOVERY_CHECK_INTERVAL = 1;
stubFor(get(urlEqualTo("/withoutResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("[]")));
stubFor(get(urlEqualTo("/withResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("['127.0.0.1:8080','127.0.0.1:8090']")));
stubFor(get(urlEqualTo("/withSameResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("['127.0.0.1:8090','127.0.0.1:8080']")));
stubFor(get(urlEqualTo("/withDifferenceResult"))
.willReturn(aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody("['127.0.0.1:9090','127.0.0.1:18090']")));
stubFor(get(urlEqualTo("/with404"))
.willReturn(aResponse()
.withStatus(400)));
}
@Test
public void testWithoutCollectorServer() throws RESTResponseStatusError, IOException {
client = new DiscoveryRestServiceClient();
client.run();
MatcherAssert.assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(0));
}
@Test
public void testWithGRPCAddress() throws RESTResponseStatusError, IOException {
Config.Collector.SERVERS = "127.0.0.1:8089";
Config.Collector.DISCOVERY_SERVICE_NAME = "/withResult";
client = new DiscoveryRestServiceClient();
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(2));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:8080"), is(true));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:8090"), is(true));
}
@Test
public void testWithoutGRPCAddress() throws RESTResponseStatusError, IOException {
Config.Collector.SERVERS = "127.0.0.1:8089";
Config.Collector.DISCOVERY_SERVICE_NAME = "/withoutResult";
client = new DiscoveryRestServiceClient();
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(0));
}
@Test
public void testChangeGrpcAddress() throws RESTResponseStatusError, IOException {
Config.Collector.SERVERS = "127.0.0.1:8089";
Config.Collector.DISCOVERY_SERVICE_NAME = "/withResult";
client = new DiscoveryRestServiceClient();
client.run();
Config.Collector.DISCOVERY_SERVICE_NAME = "/withDifferenceResult";
client.run();
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.size(), is(2));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:9090"), is(true));
assertThat(RemoteDownstreamConfig.Collector.GRPC_SERVERS.contains("127.0.0.1:18090"), is(true));
}
@After
public void tearDown() {
Config.Collector.SERVERS = "";
Config.Collector.DISCOVERY_SERVICE_NAME = "/grpc/address";
RemoteDownstreamConfig.Collector.GRPC_SERVERS.clear();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.netty.NettyChannelBuilder;
import io.grpc.testing.GrpcServerRule;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.apache.skywalking.apm.agent.core.conf.Config;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;
@RunWith(PowerMockRunner.class)
@PrepareForTest({GRPCChannelManager.class, NettyChannelBuilder.class})
public class GRPCChannelManagerTest {
@Rule
private GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Spy
private GRPCChannelManager grpcChannelManager = new GRPCChannelManager();
@Mock
private NettyChannelBuilder mock;
@Spy
private MockGRPCChannelListener listener = new MockGRPCChannelListener();
@Before
public void setUp() throws Throwable {
List<String> grpcServers = new ArrayList<String>();
grpcServers.add("127.0.0.1:2181");
RemoteDownstreamConfig.Collector.GRPC_SERVERS = grpcServers;
Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL = 1;
mockStatic(NettyChannelBuilder.class);
when(NettyChannelBuilder.forAddress(anyString(), anyInt())).thenReturn(mock);
when(mock.nameResolverFactory(any(NameResolver.Factory.class))).thenReturn(mock);
when(mock.maxInboundMessageSize(anyInt())).thenReturn(mock);
when(mock.usePlaintext(true)).thenReturn(mock);
when(mock.build()).thenReturn(grpcServerRule.getChannel());
grpcChannelManager.addChannelListener(listener);
}
@Test
public void changeStatusToConnectedWithReportError() throws Throwable {
grpcChannelManager.reportError(new StatusRuntimeException(Status.ABORTED));
grpcChannelManager.run();
verify(listener, times(1)).statusChanged(GRPCChannelStatus.CONNECTED);
assertThat(listener.status, is(GRPCChannelStatus.CONNECTED));
}
@Test
public void changeStatusToDisConnectedWithReportError() throws Throwable {
doThrow(new RuntimeException()).when(mock).nameResolverFactory(any(NameResolver.Factory.class));
grpcChannelManager.run();
verify(listener, times(1)).statusChanged(GRPCChannelStatus.DISCONNECT);
assertThat(listener.status, is(GRPCChannelStatus.DISCONNECT));
}
@Test
public void reportErrorWithoutChangeStatus() throws Throwable {
grpcChannelManager.run();
grpcChannelManager.reportError(new RuntimeException());
grpcChannelManager.run();
verify(listener, times(1)).statusChanged(GRPCChannelStatus.CONNECTED);
assertThat(listener.status, is(GRPCChannelStatus.CONNECTED));
}
private class MockGRPCChannelListener implements GRPCChannelListener {
private GRPCChannelStatus status;
@Override
public void statusChanged(GRPCChannelStatus status) {
this.status = status;
}
}
}
......@@ -15,5 +15,5 @@
# limitations under the License.
agent.application_code = crmApp
collector.servers = 127.0.0.1:8080
collector.backend_service = 127.0.0.1:8080
logging.level=info
......@@ -38,22 +38,8 @@ agent.application_code=Your_ApplicationName
# Skywalking team may ask for these files in order to resolve compatible problem.
# agent.is_open_debugging_class = true
# Server addresses.
# Primary address setting.
#
# Mapping to `naming/jetty/ip:port` in `config/application.yml` of Collector.
# Examples:
# Single collector:SERVERS="127.0.0.1:8080"
# Collector cluster:SERVERS="10.2.45.126:8080,10.2.45.127:7600"
collector.servers=127.0.0.1:10800
# Collector agent_gRPC/grpc service addresses.
# Secondary address setting, only effect when "collector.servers" is empty.
# By using this, no discovery mechanism provided. The agent only uses these addresses to uplink data.
# Recommend to use this only when collector cluster IPs are unreachable from agent side. Such as:
# 1. Agent and collector cluster are in different VPC in Cloud.
# 2. Agent uplinks data to collector cluster through Internet.
# collector.direct_servers=www.skywalking.service.io
# Backend service addresses.
collector.backend_service=127.0.0.1:10800
# Logging level
logging.level=DEBUG
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册