未验证 提交 f5e38b88 编写于 作者: Z zifeihan 提交者: GitHub

Add agent config `PROPERTIES_REPORT_PERIOD_FACTOR `, Avoid the instanceTraffic...

Add agent config `PROPERTIES_REPORT_PERIOD_FACTOR `, Avoid the instanceTraffic record properties is null. (#5688)
上级 86ba4258
......@@ -142,6 +142,10 @@ public class Config {
* The period in which the agent report a heartbeat to the backend.
*/
public static long HEARTBEAT_PERIOD = 30;
/**
* The agent sends the instance properties to the backend every `collector.heartbeat_period * collector.properties_report_period_factor` seconds
*/
public static int PROPERTIES_REPORT_PERIOD_FACTOR = 10;
/**
* Collector skywalking trace receiver service addresses.
*/
......
......@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
......@@ -52,7 +53,7 @@ public class ServiceManagementClient implements BootService, Runnable, GRPCChann
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ManagementServiceGrpc.ManagementServiceBlockingStub managementServiceBlockingStub;
private volatile ScheduledFuture<?> heartbeatFuture;
private volatile boolean instancePropertiesSubmitted = false;
private volatile AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
@Override
public void statusChanged(GRPCChannelStatus status) {
......@@ -112,7 +113,7 @@ public class ServiceManagementClient implements BootService, Runnable, GRPCChann
if (GRPCChannelStatus.CONNECTED.equals(status)) {
try {
if (managementServiceBlockingStub != null) {
if (!instancePropertiesSubmitted) {
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
managementServiceBlockingStub
.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
......@@ -123,7 +124,6 @@ public class ServiceManagementClient implements BootService, Runnable, GRPCChann
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build());
instancePropertiesSubmitted = true;
} else {
final Commands commands = managementServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
......
......@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -57,6 +58,7 @@ public class KafkaServiceManagementServiceClient implements BootService, Runnabl
private KafkaProducer<String, Bytes> producer;
private String topic;
private AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
@Override
public void prepare() {
......@@ -85,28 +87,32 @@ public class KafkaServiceManagementServiceClient implements BootService, Runnabl
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.HEARTBEAT_PERIOD, TimeUnit.SECONDS);
InstanceProperties instance = InstanceProperties.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.addAllProperties(OSUtil.buildOSInfo(
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build();
producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER + instance.getServiceInstance(), Bytes.wrap(instance.toByteArray())));
producer.flush();
}
@Override
public void run() {
InstancePingPkg ping = InstancePingPkg.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build();
if (LOGGER.isDebugEnable()) {
LOGGER.debug("Heartbeat reporting, instance: {}", ping.getServiceInstance());
if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
InstanceProperties instance = InstanceProperties.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.addAllProperties(OSUtil.buildOSInfo(
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build();
producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER + instance.getServiceInstance(),
Bytes.wrap(instance.toByteArray())
));
producer.flush();
} else {
InstancePingPkg ping = InstancePingPkg.newBuilder()
.setService(Config.Agent.SERVICE_NAME)
.setServiceInstance(Config.Agent.INSTANCE_NAME)
.build();
if (LOGGER.isDebugEnable()) {
LOGGER.debug("Heartbeat reporting, instance: {}", ping.getServiceInstance());
}
producer.send(new ProducerRecord<>(topic, ping.getServiceInstance(), Bytes.wrap(ping.toByteArray())));
}
producer.send(new ProducerRecord<>(topic, ping.getServiceInstance(), Bytes.wrap(ping.toByteArray())));
}
@Override
......
......@@ -89,6 +89,7 @@ property key | Description | Default |
`osinfo.ipv4_list_size`| Limit the length of the ipv4 list size. |`10`|
`collector.grpc_channel_check_interval`|grpc channel status check interval.|`30`|
`collector.heartbeat_period`|agent heartbeat report period. Unit, second.|`30`|
`collector.properties_report_period_factor`|The agent sends the instance properties to the backend every `collector.heartbeat_period * collector.properties_report_period_factor` seconds |`10`|
`collector.backend_service`|Collector SkyWalking trace receiver service addresses.|`127.0.0.1:11800`|
`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending data to upstream. Unit is second.|`30` seconds|
`collector.get_profile_task_interval`|Sniffer get profile task list interval.|`20`|
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册