提交 09c0e8e3 编写于 作者: wu-sheng's avatar wu-sheng

Fix DataCarrier's...

Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
上级 54269101
......@@ -19,6 +19,7 @@ Release Notes.
* Update `byte-buddy` to 1.10.19.
* Fix thrift plugin trace link broken when intermediate service does not mount agent
* Fix thrift plugin collects wrong args when the method without parameter.
* Fix DataCarrier's `org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer` implementation isn't activated in `IF_POSSIBLE` mode.
#### OAP-Backend
* Make meter receiver support MAL.
......
......@@ -44,10 +44,18 @@ public class DataCarrier<T> {
}
public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize) {
this(name, envPrefix, channelSize, bufferSize, BufferStrategy.BLOCKING);
}
public DataCarrier(String name, String envPrefix, int channelSize, int bufferSize, BufferStrategy strategy) {
this.name = name;
bufferSize = EnvUtil.getInt(envPrefix + "_BUFFER_SIZE", bufferSize);
channelSize = EnvUtil.getInt(envPrefix + "_CHANNEL_SIZE", channelSize);
channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), BufferStrategy.BLOCKING);
channels = new Channels<>(channelSize, bufferSize, new SimpleRollingPartitioner<T>(), strategy);
}
public DataCarrier(int channelSize, int bufferSize, BufferStrategy strategy) {
this("DEFAULT", "DEFAULT", channelSize, bufferSize, strategy);
}
/**
......@@ -62,14 +70,6 @@ public class DataCarrier<T> {
return this;
}
/**
* override the strategy at runtime. Notice, {@link Channels} will override several channels one by one.
*/
public DataCarrier setBufferStrategy(BufferStrategy strategy) {
this.channels.setStrategy(strategy);
return this;
}
/**
* produce data to buffer, using the given {@link BufferStrategy}.
*
......
......@@ -33,7 +33,7 @@ import org.powermock.api.support.membermodification.MemberModifier;
public class DataCarrierTest {
@Test
public void testCreateDataCarrier() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100);
DataCarrier<SampleData> carrier = new DataCarrier<>(5, 100, BufferStrategy.IF_POSSIBLE);
Channels<SampleData> channels = (Channels<SampleData>) (MemberModifier.field(DataCarrier.class, "channels")
.get(carrier));
......@@ -42,8 +42,7 @@ public class DataCarrierTest {
QueueBuffer<SampleData> buffer = channels.getBuffer(0);
Assert.assertEquals(100, buffer.getBufferSize());
Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy")
.get(buffer), BufferStrategy.IF_POSSIBLE);
......@@ -81,8 +80,7 @@ public class DataCarrierTest {
@Test
public void testIfPossibleProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100, BufferStrategy.IF_POSSIBLE);
for (int i = 0; i < 200; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
......
......@@ -64,8 +64,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
lastLogTime = System.currentTimeMillis();
segmentUplinkedCounter = 0;
segmentAbandonedCounter = 0;
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
}
......
......@@ -59,8 +59,7 @@ public class KafkaTraceSegmentServiceClient implements BootService, IConsumer<Tr
@Override
public void boot() {
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
carrier = new DataCarrier<>(CHANNEL_SIZE, BUFFER_SIZE, BufferStrategy.IF_POSSIBLE);
carrier.consume(this, 1);
producer = ServiceManager.INSTANCE.findService(KafkaProducerManager.class).getProducer();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册