未验证 提交 e9d4c52a 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Make sure heartbeat working always. (#1873)

* Make sure heartbeat working always. cc @liuhaoyang make sure .net agent hasn't this issue.

* FIx CI.
上级 13c417c0
......@@ -19,34 +19,38 @@
package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.NetworkAddressDictionary;
import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
import org.apache.skywalking.apm.network.language.agent.*;
import org.apache.skywalking.apm.network.language.agent.Application;
import org.apache.skywalking.apm.network.language.agent.ApplicationInstance;
import org.apache.skywalking.apm.network.language.agent.ApplicationInstanceHeartbeat;
import org.apache.skywalking.apm.network.language.agent.ApplicationInstanceMapping;
import org.apache.skywalking.apm.network.language.agent.ApplicationMapping;
import org.apache.skywalking.apm.network.language.agent.ApplicationRegisterServiceGrpc;
import org.apache.skywalking.apm.network.language.agent.InstanceDiscoveryServiceGrpc;
import org.apache.skywalking.apm.network.language.agent.NetworkAddressRegisterServiceGrpc;
import org.apache.skywalking.apm.network.language.agent.ServiceNameDiscoveryServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
/**
* @author wusheng
*/
@DefaultImplementor
public class AppAndServiceRegisterClient implements BootService, GRPCChannelListener, Runnable, TracingContextListener {
public class AppAndServiceRegisterClient implements BootService, Runnable, GRPCChannelListener {
private static final ILog logger = LogManager.getLogger(AppAndServiceRegisterClient.class);
private static final String PROCESS_UUID = UUID.randomUUID().toString().replaceAll("-", "");
......@@ -56,7 +60,6 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
private volatile ServiceNameDiscoveryServiceGrpc.ServiceNameDiscoveryServiceBlockingStub serviceNameDiscoveryServiceBlockingStub;
private volatile NetworkAddressRegisterServiceGrpc.NetworkAddressRegisterServiceBlockingStub networkAddressRegisterServiceBlockingStub;
private volatile ScheduledFuture<?> applicationRegisterFuture;
private volatile long lastSegmentTime = -1;
@Override
public void statusChanged(GRPCChannelStatus status) {
......@@ -82,18 +85,17 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void boot() throws Throwable {
applicationRegisterFuture = Executors
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("AppAndServiceRegisterClient"))
.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
@Override
public void handle(Throwable t) {
logger.error("unexpected exception.", t);
}
}), 0, Config.Collector.APP_AND_SERVICE_REGISTER_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
public void onComplete() throws Throwable {
TracingContext.ListenerManager.add(this);
}
@Override
......@@ -111,7 +113,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
if (RemoteDownstreamConfig.Agent.APPLICATION_ID == DictionaryUtil.nullValue()) {
if (applicationRegisterServiceBlockingStub != null) {
ApplicationMapping applicationMapping = applicationRegisterServiceBlockingStub.applicationCodeRegister(
Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
Application.newBuilder().setApplicationCode(Config.Agent.APPLICATION_CODE).build());
if (applicationMapping != null) {
RemoteDownstreamConfig.Agent.APPLICATION_ID = applicationMapping.getApplication().getValue();
shouldTry = true;
......@@ -122,22 +124,20 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
if (RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID == DictionaryUtil.nullValue()) {
ApplicationInstanceMapping instanceMapping = instanceDiscoveryServiceBlockingStub.registerInstance(ApplicationInstance.newBuilder()
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(PROCESS_UUID)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(OSUtil.buildOSInfo())
.build());
.setApplicationId(RemoteDownstreamConfig.Agent.APPLICATION_ID)
.setAgentUUID(PROCESS_UUID)
.setRegisterTime(System.currentTimeMillis())
.setOsinfo(OSUtil.buildOSInfo())
.build());
if (instanceMapping.getApplicationInstanceId() != DictionaryUtil.nullValue()) {
RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID
= instanceMapping.getApplicationInstanceId();
= instanceMapping.getApplicationInstanceId();
}
} else {
if (lastSegmentTime - System.currentTimeMillis() > 60 * 1000) {
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
}
instanceDiscoveryServiceBlockingStub.heartbeat(ApplicationInstanceHeartbeat.newBuilder()
.setApplicationInstanceId(RemoteDownstreamConfig.Agent.APPLICATION_INSTANCE_ID)
.setHeartbeatTime(System.currentTimeMillis())
.build());
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(networkAddressRegisterServiceBlockingStub);
OperationNameDictionary.INSTANCE.syncRemoteDictionary(serviceNameDiscoveryServiceBlockingStub);
......@@ -150,9 +150,4 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
}
}
}
@Override
public void afterFinished(TraceSegment traceSegment) {
lastSegmentTime = System.currentTimeMillis();
}
}
......@@ -76,7 +76,7 @@ public class ServiceManagerTest {
private void assertTracingContextListener() throws Exception {
List<TracingContextListener> listeners = getFieldValue(TracingContext.ListenerManager.class, "LISTENERS");
assertThat(listeners.size(), is(3));
assertThat(listeners.size(), is(2));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(ContextManager.class)), is(true));
assertThat(listeners.contains(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class)), is(true));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册