提交 a2469423 编写于 作者: wu-sheng's avatar wu-sheng

Add `set null` on the end of distuptor’s onEvent. Let GC works better.

上级 b17aec9b
......@@ -26,19 +26,23 @@ public class SendAckSpanEventHandler implements EventHandler<AckSpanHolder> {
@Override
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
if (buffer[bufferIdx] != null) {
return;
}
try {
if (buffer[bufferIdx] != null) {
return;
}
buffer[bufferIdx] = event.getData();
bufferIdx++;
buffer[bufferIdx] = event.getData();
bufferIdx++;
if (bufferIdx == buffer.length) {
bufferIdx = 0;
}
if (bufferIdx == buffer.length) {
bufferIdx = 0;
}
if (endOfBatch) {
HealthCollector.getCurrentHeathReading("SendAckSpanEventHandler").updateData(HeathReading.INFO, "AckSpan messages were successful consumed .");
if (endOfBatch) {
HealthCollector.getCurrentHeathReading("SendAckSpanEventHandler").updateData(HeathReading.INFO, "AckSpan messages were successful consumed .");
}
}finally {
event.setData(null);
}
}
......
......@@ -26,19 +26,23 @@ public class SendRequestSpanEventHandler implements EventHandler<RequestSpanHold
@Override
public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
if (buffer[bufferIdx] != null) {
return;
}
try {
if (buffer[bufferIdx] != null) {
return;
}
buffer[bufferIdx] = event.getData();
bufferIdx++;
buffer[bufferIdx] = event.getData();
bufferIdx++;
if (bufferIdx == buffer.length) {
bufferIdx = 0;
}
if (bufferIdx == buffer.length) {
bufferIdx = 0;
}
if (endOfBatch) {
HealthCollector.getCurrentHeathReading("SendRequestSpanEventHandler").updateData(HeathReading.INFO, "Request Span messages were successful consumed .");
if (endOfBatch) {
HealthCollector.getCurrentHeathReading("SendRequestSpanEventHandler").updateData(HeathReading.INFO, "Request Span messages were successful consumed .");
}
} finally {
event.setData(null);
}
}
......
......@@ -29,34 +29,38 @@ public class RouteAckSpanBufferEventHandler extends AbstractRouteSpanEventHandle
@Override
public void onEvent(AckSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getAckSpan());
try {
buffer.add(event.getAckSpan());
if (stop) {
try {
for (AckSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
if (stop) {
try {
for (AckSpan ackSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(ackSpan);
spanDisruptor.saveSpan(ackSpan);
}
} finally {
buffer.clear();
}
} finally {
buffer.clear();
return;
}
return;
}
wait2Finish();
wait2Finish();
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendACKSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteAckSpanBufferEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
} finally {
event.setAckSpan(null);
}
}
}
package com.a.eye.skywalking.routing.disruptor.request;
import com.a.eye.skywalking.health.report.HealthCollector;
import com.a.eye.skywalking.health.report.HeathReading;
import com.a.eye.skywalking.logging.api.ILog;
......@@ -33,34 +32,38 @@ public class RouteSendRequestSpanEventHandler extends AbstractRouteSpanEventHand
@Override
public void onEvent(RequestSpanHolder event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event.getRequestSpan());
try {
buffer.add(event.getRequestSpan());
if (stop) {
try {
for (RequestSpan requestSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(requestSpan);
spanDisruptor.saveSpan(requestSpan);
if (stop) {
try {
for (RequestSpan requestSpan : buffer) {
SpanDisruptor spanDisruptor = RoutingService.getRouter().lookup(requestSpan);
spanDisruptor.saveSpan(requestSpan);
}
} finally {
buffer.clear();
}
} finally {
buffer.clear();
}
return;
}
return;
}
wait2Finish();
wait2Finish();
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("RequestSpan messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
if (endOfBatch || buffer.size() == bufferSize) {
try {
SpanStorageClient spanStorageClient = getStorageClient();
spanStorageClient.sendRequestSpan(buffer);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("RequestSpan messages consume failure.", e);
HealthCollector.getCurrentHeathReading("RouteSendRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
} finally {
event.setRequestSpan(null);
}
}
}
......@@ -22,8 +22,8 @@ import java.util.List;
public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
private static ILog logger = LogManager.getLogger(StoreAckSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
private int bufferSize;
private IndexOperator operator;
private int bufferSize;
private List<SpanData> buffer;
public StoreAckSpanEventHandler() {
......@@ -35,20 +35,24 @@ public class StoreAckSpanEventHandler implements EventHandler<AckSpanData> {
@Override
public void onEvent(AckSpanData event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event);
try {
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreAckSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
} finally {
event.setAckSpan(null);
}
}
}
......@@ -22,8 +22,8 @@ import java.util.List;
public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanData> {
private static ILog logger = LogManager.getLogger(StoreRequestSpanEventHandler.class);
private DataFileWriter fileWriter;
private IndexOperator operator;
private int bufferSize;
private IndexOperator operator;
private int bufferSize;
private List<SpanData> buffer;
public StoreRequestSpanEventHandler() {
......@@ -35,21 +35,25 @@ public class StoreRequestSpanEventHandler implements EventHandler<RequestSpanDat
@Override
public void onEvent(RequestSpanData event, long sequence, boolean endOfBatch) throws Exception {
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
try {
buffer.add(event);
if (endOfBatch || buffer.size() == bufferSize) {
try {
IndexMetaCollection collection = fileWriter.write(buffer);
operator.batchUpdate(collection);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.INFO, "Batch consume %s messages successfully.", buffer.size());
} catch (Throwable e) {
logger.error("Ack messages consume failure.", e);
HealthCollector.getCurrentHeathReading("StoreRequestSpanEventHandler").updateData(HeathReading.ERROR, "Batch consume %s messages failure.", buffer.size());
} finally {
buffer.clear();
}
}
} finally {
event.setRequestSpan(null);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册