未验证 提交 1c55781a 编写于 作者: N nileblack 提交者: GitHub

Catch all exception when consume kafka record (#5760)

上级 d625f290
......@@ -27,6 +27,7 @@ Release Notes.
* Improve Kubernetes service registry for ALS analysis.
* Add health checker for cluster management
* Improve the queryable tags generation. Remove the duplicated tags to reduce the storage payload.
* Fix the threads of the Kafka fetcher exit if some unexpected exceptions happen.
* Fix the excessive timeout period set by the kubernetes-client.
* Fix deadlock problem when using elasticsearch-client-7.0.0.
* Fix storage-jdbc isExists not set dbname.
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -67,8 +66,8 @@ public class JVMMetricsHandler implements KafkaHandler {
builder.getMetricsList().forEach(jvmMetric -> {
jvmSourceDispatcher.sendMetric(builder.getService(), builder.getServiceInstance(), jvmMetric);
});
} catch (InvalidProtocolBufferException e) {
log.error("", e);
} catch (Exception e) {
log.error("handle record failed", e);
}
}
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -51,8 +50,8 @@ public class MeterServiceHandler implements KafkaHandler {
meterDataCollection.getMeterDataList().forEach(meterData -> processor.read(meterData));
processor.process();
} catch (InvalidProtocolBufferException e) {
log.error("", e);
} catch (Exception e) {
log.error("handle record failed", e);
}
}
......
......@@ -18,7 +18,6 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
......@@ -62,8 +61,8 @@ public class ProfileTaskHandler implements KafkaHandler {
snapshotRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(snapshot.getTime()));
RecordStreamProcessor.getInstance().in(snapshotRecord);
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage(), e);
} catch (Exception e) {
log.error("handle record failed", e);
}
}
......
......@@ -19,7 +19,6 @@
package org.apache.skywalking.oap.server.analyzer.agent.kafka.provider.handler;
import com.google.gson.JsonObject;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
......@@ -67,8 +66,8 @@ public class ServiceManagementHandler implements KafkaHandler {
} else {
keepAlive(InstancePingPkg.parseFrom(record.value().get()));
}
} catch (InvalidProtocolBufferException e) {
log.error("", e);
} catch (Exception e) {
log.error("handle record failed", e);
}
}
......
......@@ -90,7 +90,7 @@ public class TraceSegmentHandler implements KafkaHandler {
timer.finish();
}
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage(), e);
log.error("handle record failed", e);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册