未验证 提交 d7038fe6 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Refactor DataCarrier, support ArrayBlockingQueueBuffer as implementor (#3849)

* Refactor DataCarrier, support ArrayBlockingQueueBuffer as the implementation for blocking queue buffer.

* Fix style issue.

* Remove import.

* Remove uesless codes.
上级 e8f67897
......@@ -69,11 +69,6 @@ public class DataCarrier<T> {
return this;
}
public BlockingDataCarrier<T> toBlockingDataCarrier() {
this.channels.setStrategy(BufferStrategy.BLOCKING);
return new BlockingDataCarrier<T>(this.channels);
}
/**
* produce data to buffer, using the givven {@link BufferStrategy}.
*
......
......@@ -16,13 +16,54 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.callback;
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
/**
* Notify when the queue, which is in blocking strategy, has be blocked.
* The buffer implementation based on JDK ArrayBlockingQueue.
*
* This implementation has better performance in server side. We are still trying to research whether this is suitable
* for agent side, which is more sensitive about blocks.
*
* @author wu-sheng
* @author wusheng
*/
public interface QueueBlockingCallback<T> {
void notify(T message);
public class ArrayBlockingQueueBuffer<T> implements QueueBuffer<T> {
private BufferStrategy strategy;
private ArrayBlockingQueue<T> queue;
private int bufferSize;
ArrayBlockingQueueBuffer(int bufferSize, BufferStrategy strategy) {
this.strategy = strategy;
this.queue = new ArrayBlockingQueue<T>(bufferSize);
this.bufferSize = bufferSize;
}
@Override public boolean save(T data) {
switch (strategy) {
case IF_POSSIBLE:
return queue.offer(data);
default:
try {
queue.put(data);
} catch (InterruptedException e) {
// Ignore the error
return false;
}
}
return true;
}
@Override public void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
@Override public void obtain(List<T> consumeList) {
queue.drainTo(consumeList);
}
@Override public int getBufferSize() {
return bufferSize;
}
}
......@@ -18,49 +18,36 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
/**
* Created by wusheng on 2016/10/25.
* Self implementation ring queue.
*
* @author wusheng
*/
public class Buffer<T> {
public class Buffer<T> implements QueueBuffer<T> {
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
callbacks = new LinkedList<QueueBlockingCallback<T>>();
}
void setStrategy(BufferStrategy strategy) {
public void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
void addCallback(QueueBlockingCallback<T> callback) {
callbacks.add(callback);
}
boolean save(T data) {
public boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
callback.notify(data);
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
......@@ -69,7 +56,6 @@ public class Buffer<T> {
break;
case IF_POSSIBLE:
return false;
case OVERRIDE:
default:
}
}
......@@ -85,7 +71,7 @@ public class Buffer<T> {
this.obtain(consumeList, 0, buffer.length);
}
public void obtain(List<T> consumeList, int start, int end) {
void obtain(List<T> consumeList, int start, int end) {
for (int i = start; i < end; i++) {
if (buffer[i] != null) {
consumeList.add((T)buffer[i]);
......
......@@ -24,6 +24,5 @@ package org.apache.skywalking.apm.commons.datacarrier.buffer;
*/
public enum BufferStrategy {
BLOCKING,
OVERRIDE,
IF_POSSIBLE
}
......@@ -18,15 +18,14 @@
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
/**
* Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when buffer
* is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
* Channels of Buffer It contains all buffer data which belongs to this channel. It supports several strategy when
* buffer is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
*/
public class Channels<T> {
private final Buffer<T>[] bufferChannels;
private final QueueBuffer<T>[] bufferChannels;
private IDataPartitioner<T> dataPartitioner;
private BufferStrategy strategy;
private final long size;
......@@ -34,9 +33,13 @@ public class Channels<T> {
public Channels(int channelSize, int bufferSize, IDataPartitioner<T> partitioner, BufferStrategy strategy) {
this.dataPartitioner = partitioner;
this.strategy = strategy;
bufferChannels = new Buffer[channelSize];
bufferChannels = new QueueBuffer[channelSize];
for (int i = 0; i < channelSize; i++) {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
if (BufferStrategy.BLOCKING.equals(strategy)) {
bufferChannels[i] = new ArrayBlockingQueueBuffer<T>(bufferSize, strategy);
} else {
bufferChannels[i] = new Buffer<T>(bufferSize, strategy);
}
}
size = channelSize * bufferSize;
}
......@@ -69,7 +72,7 @@ public class Channels<T> {
* @param strategy
*/
public void setStrategy(BufferStrategy strategy) {
for (Buffer<T> buffer : bufferChannels) {
for (QueueBuffer<T> buffer : bufferChannels) {
buffer.setStrategy(strategy);
}
}
......@@ -87,13 +90,7 @@ public class Channels<T> {
return size;
}
public Buffer<T> getBuffer(int index) {
public QueueBuffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
public void addCallback(QueueBlockingCallback<T> callback) {
for (Buffer<T> channel : bufferChannels) {
channel.addCallback(callback);
}
}
}
......@@ -16,22 +16,34 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier;
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import java.util.List;
/**
* @author wu-sheng
* Queue buffer interface.
*
* @author wusheng
*/
public class BlockingDataCarrier<T> {
private Channels<T> channels;
public interface QueueBuffer<T> {
/**
* Save data into the queue;
* @param data to add.
* @return true if saved
*/
boolean save(T data);
/**
* Set different strategy when queue is full.
* @param strategy
*/
void setStrategy(BufferStrategy strategy);
BlockingDataCarrier(Channels<T> channels) {
this.channels = channels;
}
/**
* Obtain the existing data from the queue
* @param consumeList
*/
void obtain(List<T> consumeList);
public void addCallback(QueueBlockingCallback<T> callback) {
this.channels.addCallback(callback);
}
int getBufferSize();
}
......@@ -18,9 +18,8 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Pool of consumers <p> Created by wusheng on 2016/10/25.
......@@ -93,44 +92,19 @@ public class ConsumeDriver<T> implements IDriver {
private void allocateBuffer2Thread() {
int channelSize = this.channels.getChannelSize();
if (channelSize < consumerThreads.length) {
/**
* if consumerThreads.length > channelSize
* each channel will be process by several consumers.
*/
ArrayList<Integer>[] threadAllocation = new ArrayList[channelSize];
for (int threadIndex = 0; threadIndex < consumerThreads.length; threadIndex++) {
int index = threadIndex % channelSize;
if (threadAllocation[index] == null) {
threadAllocation[index] = new ArrayList<Integer>();
}
threadAllocation[index].add(threadIndex);
}
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
ArrayList<Integer> threadAllocationPerChannel = threadAllocation[channelIndex];
Buffer<T> channel = this.channels.getBuffer(channelIndex);
int bufferSize = channel.getBufferSize();
int step = bufferSize / threadAllocationPerChannel.size();
for (int i = 0; i < threadAllocationPerChannel.size(); i++) {
int threadIndex = threadAllocationPerChannel.get(i);
int start = i * step;
int end = i == threadAllocationPerChannel.size() - 1 ? bufferSize : (i + 1) * step;
consumerThreads[threadIndex].addDataSource(channel, start, end);
}
}
} else {
/**
* if consumerThreads.length < channelSize
* each consumer will process several channels.
*
* if consumerThreads.length == channelSize
* each consumer will process one channel.
*/
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
int consumerIndex = channelIndex % consumerThreads.length;
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
/**
* if consumerThreads.length < channelSize
* each consumer will process several channels.
*
* if consumerThreads.length == channelSize
* each consumer will process one channel.
*
* if consumerThreads.length > channelSize
* there will be some threads do nothing.
*/
for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {
int consumerIndex = channelIndex % consumerThreads.length;
consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));
}
}
......
......@@ -19,10 +19,10 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
/**
* Created by wusheng on 2016/10/25.
......@@ -41,24 +41,13 @@ public class ConsumerThread<T> extends Thread {
this.consumeCycle = consumeCycle;
}
/**
* add partition of buffer to consume
*
* @param sourceBuffer
* @param start
* @param end
*/
void addDataSource(Buffer<T> sourceBuffer, int start, int end) {
this.dataSources.add(new DataSource(sourceBuffer, start, end));
}
/**
* add whole buffer to consume
*
* @param sourceBuffer
*/
void addDataSource(Buffer<T> sourceBuffer) {
this.dataSources.add(new DataSource(sourceBuffer, 0, sourceBuffer.getBufferSize()));
void addDataSource(QueueBuffer<T> sourceBuffer) {
this.dataSources.add(new DataSource(sourceBuffer));
}
@Override
......@@ -108,18 +97,14 @@ public class ConsumerThread<T> extends Thread {
* DataSource is a refer to {@link Buffer}.
*/
class DataSource {
private Buffer<T> sourceBuffer;
private int start;
private int end;
private QueueBuffer<T> sourceBuffer;
DataSource(Buffer<T> sourceBuffer, int start, int end) {
DataSource(QueueBuffer<T> sourceBuffer) {
this.sourceBuffer = sourceBuffer;
this.start = start;
this.end = end;
}
void obtain(List<T> consumeList) {
sourceBuffer.obtain(consumeList, start, end);
sourceBuffer.obtain(consumeList);
}
}
}
......@@ -18,11 +18,10 @@
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
/**
* MultipleChannelsConsumer represent a single consumer thread, but support multiple channels with their {@link
......@@ -73,7 +72,7 @@ public class MultipleChannelsConsumer extends Thread {
private boolean consume(Group target, List consumeList) {
for (int i = 0; i < target.channels.getChannelSize(); i++) {
Buffer buffer = target.channels.getBuffer(i);
QueueBuffer buffer = target.channels.getBuffer(i);
buffer.obtain(consumeList);
}
......
......@@ -21,15 +21,15 @@ package org.apache.skywalking.apm.commons.datacarrier;
import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.buffer.QueueBuffer;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.commons.datacarrier.partition.ProducerThreadPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
import org.junit.Assert;
import org.junit.Test;
import org.powermock.api.support.membermodification.MemberModifier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
/**
* Created by wusheng on 2016/10/25.
......@@ -42,14 +42,14 @@ public class DataCarrierTest {
Assert.assertEquals(((Integer)(MemberModifier.field(DataCarrier.class, "channelSize").get(carrier))).intValue(), 5);
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Assert.assertEquals(channels.getChannelSize(), 5);
Assert.assertEquals(5, channels.getChannelSize());
Buffer<SampleData> buffer = channels.getBuffer(0);
Assert.assertEquals(buffer.getBufferSize(), 100);
QueueBuffer<SampleData> buffer = channels.getBuffer(0);
Assert.assertEquals(100, buffer.getBufferSize());
Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.BLOCKING);
Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.BLOCKING);
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(Buffer.class, "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(buffer.getClass(), "strategy").get(buffer), BufferStrategy.IF_POSSIBLE);
Assert.assertEquals(MemberModifier.field(Channels.class, "dataPartitioner").get(channels).getClass(), SimpleRollingPartitioner.class);
carrier.setPartitioner(new ProducerThreadPartitioner<SampleData>());
......@@ -65,38 +65,19 @@ public class DataCarrierTest {
Assert.assertTrue(carrier.produce(new SampleData().setName("d")));
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
List result = new ArrayList();
buffer1.obtain(result, 0, 100);
buffer1.obtain(result);
Assert.assertEquals(2, result.size());
Buffer<SampleData> buffer2 = channels.getBuffer(1);
buffer2.obtain(result, 0, 100);
QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
buffer2.obtain(result);
Assert.assertEquals(4, result.size());
}
@Test
public void testOverrideProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
carrier.setBufferStrategy(BufferStrategy.OVERRIDE);
for (int i = 0; i < 500; i++) {
Assert.assertTrue(carrier.produce(new SampleData().setName("d" + i)));
}
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
List result = new ArrayList();
buffer1.obtain(result, 0, 100);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
buffer2.obtain(result, 0, 100);
Assert.assertEquals(200, result.size());
}
@Test
public void testIfPossibleProduce() throws IllegalAccessException {
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(2, 100);
......@@ -111,12 +92,12 @@ public class DataCarrierTest {
}
Channels<SampleData> channels = (Channels<SampleData>)(MemberModifier.field(DataCarrier.class, "channels").get(carrier));
Buffer<SampleData> buffer1 = channels.getBuffer(0);
QueueBuffer<SampleData> buffer1 = channels.getBuffer(0);
List result = new ArrayList();
buffer1.obtain(result, 0, 100);
buffer1.obtain(result);
Buffer<SampleData> buffer2 = channels.getBuffer(1);
buffer2.obtain(result, 0, 100);
QueueBuffer<SampleData> buffer2 = channels.getBuffer(1);
buffer2.obtain(result);
Assert.assertEquals(200, result.size());
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.*;
import org.apache.skywalking.apm.commons.datacarrier.buffer.*;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
import org.junit.*;
/**
* @author wusheng
*/
public class BulkConsumePoolTest {
@Test
public void testOneThreadPool() throws InterruptedException {
BulkConsumePool pool = new BulkConsumePool("testPool", 1, 50);
final ArrayList<Object> result1 = new ArrayList();
Channels c1 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
pool.add("test", c1,
new IConsumer() {
@Override public void init() {
}
@Override public void consume(List data) {
for (Object datum : data) {
result1.add(datum);
}
}
@Override public void onError(List data, Throwable t) {
}
@Override public void onExit() {
}
});
pool.begin(c1);
final ArrayList<Object> result2 = new ArrayList();
Channels c2 = new Channels(2, 10, new SimpleRollingPartitioner(), BufferStrategy.OVERRIDE);
pool.add("test2", c2,
new IConsumer() {
@Override public void init() {
}
@Override public void consume(List data) {
for (Object datum : data) {
result2.add(datum);
}
}
@Override public void onError(List data, Throwable t) {
}
@Override public void onExit() {
}
});
pool.begin(c2);
c1.save(new Object());
c1.save(new Object());
c1.save(new Object());
c1.save(new Object());
c1.save(new Object());
c2.save(new Object());
c2.save(new Object());
Thread.sleep(2000);
Assert.assertEquals(5, result1.size());
Assert.assertEquals(2, result2.size());
}
}
......@@ -80,7 +80,7 @@ public class ConsumerTest {
for (SampleData data : result) {
consumerCounter.add(data.getIntValue());
}
Assert.assertEquals(5, consumerCounter.size());
Assert.assertEquals(2, consumerCounter.size());
}
@Test
......
......@@ -25,7 +25,6 @@ import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
......@@ -113,7 +112,6 @@ public class GRPCRemoteClient implements RemoteClient {
synchronized (GRPCRemoteClient.class) {
if (Objects.isNull(this.carrier)) {
this.carrier = new DataCarrier<>("GRPCRemoteClient", channelSize, bufferSize);
this.carrier.setBufferStrategy(BufferStrategy.BLOCKING);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册