未验证 提交 3cf0a593 编写于 作者: 彭勇升 pengys 提交者: GitHub

Merge pull request #695 from apache/feature/data-carrier

Add new features for DataCarrier queue
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
/**
* @author wu-sheng
*/
public class BlockingDataCarrier<T> {
private Channels<T> channels;
BlockingDataCarrier(Channels<T> channels) {
this.channels = channels;
}
public void addCallback(QueueBlockingCallback<T> callback) {
this.channels.addCallback(callback);
}
}
......@@ -16,21 +16,17 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
import org.apache.skywalking.apm.commons.datacarrier.partition.SimpleRollingPartitioner;
/**
* DataCarrier main class.
* use this instance to set Producer/Consumer Model
* <p>
* Created by wusheng on 2016/10/25.
* DataCarrier main class. use this instance to set Producer/Consumer Model.
*/
public class DataCarrier<T> {
private final int bufferSize;
......@@ -45,8 +41,8 @@ public class DataCarrier<T> {
}
/**
* set a new IDataPartitioner.
* It will cover the current one or default one.(Default is {@link SimpleRollingPartitioner)}
* set a new IDataPartitioner. It will cover the current one or default one.(Default is {@link
* SimpleRollingPartitioner)}
*
* @param dataPartitioner
* @return
......@@ -57,8 +53,7 @@ public class DataCarrier<T> {
}
/**
* override the strategy at runtime.
* Notice, {@link Channels<T>} will override several channels one by one.
* override the strategy at runtime. Notice, {@link Channels<T>} will override several channels one by one.
*
* @param strategy
*/
......@@ -67,6 +62,11 @@ public class DataCarrier<T> {
return this;
}
public BlockingDataCarrier<T> toBlockingDataCarrier() {
this.channels.setStrategy(BufferStrategy.BLOCKING);
return new BlockingDataCarrier<T>(this.channels);
}
/**
* produce data to buffer, using the givven {@link BufferStrategy}.
*
......@@ -84,38 +84,59 @@ public class DataCarrier<T> {
}
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num);
consumerPool = new ConsumerPool<T>(this.channels, consumerClass, num, consumeCycle);
consumerPool.begin();
return this;
}
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work with 20
* millis consume cycle.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
public DataCarrier consume(Class<? extends IConsumer<T>> consumerClass, int num) {
return this.consume(consumerClass, num, 20);
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num) {
public DataCarrier consume(IConsumer<T> consumer, int num, long consumeCycle) {
if (consumerPool != null) {
consumerPool.close();
}
consumerPool = new ConsumerPool<T>(this.channels, consumer, num);
consumerPool = new ConsumerPool<T>(this.channels, consumer, num, consumeCycle);
consumerPool.begin();
return this;
}
/**
* set consumers to this Carrier. consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work with 20
* millis consume cycle.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
public DataCarrier consume(IConsumer<T> consumer, int num) {
return this.consume(consumer, num, 20);
}
/**
* shutdown all consumer threads, if consumer threads are running. Notice {@link BufferStrategy}: if {@link
* BufferStrategy} == {@link BufferStrategy#BLOCKING}, shutdown consumers maybe cause blocking when producing.
......
......@@ -16,10 +16,11 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import java.util.LinkedList;
import java.util.List;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.common.AtomicRangeInteger;
/**
......@@ -29,23 +30,36 @@ public class Buffer<T> {
private final Object[] buffer;
private BufferStrategy strategy;
private AtomicRangeInteger index;
private List<QueueBlockingCallback<T>> callbacks;
Buffer(int bufferSize, BufferStrategy strategy) {
buffer = new Object[bufferSize];
this.strategy = strategy;
index = new AtomicRangeInteger(0, bufferSize);
callbacks = new LinkedList<QueueBlockingCallback<T>>();
}
void setStrategy(BufferStrategy strategy) {
this.strategy = strategy;
}
void addCallback(QueueBlockingCallback<T> callback) {
callbacks.add(callback);
}
boolean save(T data) {
int i = index.getAndIncrement();
if (buffer[i] != null) {
switch (strategy) {
case BLOCKING:
boolean isFirstTimeBlocking = true;
while (buffer[i] != null) {
if (isFirstTimeBlocking) {
isFirstTimeBlocking = false;
for (QueueBlockingCallback<T> callback : callbacks) {
callback.notify(data);
}
}
try {
Thread.sleep(1L);
} catch (InterruptedException e) {
......
......@@ -16,17 +16,14 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.buffer;
import org.apache.skywalking.apm.commons.datacarrier.callback.QueueBlockingCallback;
import org.apache.skywalking.apm.commons.datacarrier.partition.IDataPartitioner;
/**
* Channels of Buffer
* It contais all buffer data which belongs to this channel.
* It supports several strategy when buffer is full. The Default is BLOCKING
* <p>
* Created by wusheng on 2016/10/25.
* Channels of Buffer It contais all buffer data which belongs to this channel. It supports several strategy when buffer
* is full. The Default is BLOCKING <p> Created by wusheng on 2016/10/25.
*/
public class Channels<T> {
private final Buffer<T>[] bufferChannels;
......@@ -87,4 +84,10 @@ public class Channels<T> {
public Buffer<T> getBuffer(int index) {
return this.bufferChannels[index];
}
public void addCallback(QueueBlockingCallback<T> callback) {
for (Buffer<T> channel : bufferChannels) {
channel.addCallback(callback);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.apm.commons.datacarrier.callback;
/**
* Notify when the queue, which is in blocking strategy, has be blocked.
*
* @author wu-sheng
*/
public interface QueueBlockingCallback<T> {
void notify(T message);
}
......@@ -16,7 +16,6 @@
*
*/
package org.apache.skywalking.apm.commons.datacarrier.consumer;
import java.util.ArrayList;
......@@ -25,9 +24,7 @@ import org.apache.skywalking.apm.commons.datacarrier.buffer.Buffer;
import org.apache.skywalking.apm.commons.datacarrier.buffer.Channels;
/**
* Pool of consumers
* <p>
* Created by wusheng on 2016/10/25.
* Pool of consumers <p> Created by wusheng on 2016/10/25.
*/
public class ConsumerPool<T> {
private boolean running;
......@@ -35,19 +32,19 @@ public class ConsumerPool<T> {
private Channels<T> channels;
private ReentrantLock lock;
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num) {
public ConsumerPool(Channels<T> channels, Class<? extends IConsumer<T>> consumerClass, int num, long consumeCycle) {
this(channels, num);
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass));
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", getNewConsumerInstance(consumerClass), consumeCycle);
consumerThreads[i].setDaemon(true);
}
}
public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num) {
public ConsumerPool(Channels<T> channels, IConsumer<T> prototype, int num, long consumeCycle) {
this(channels, num);
prototype.init();
for (int i = 0; i < num; i++) {
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype);
consumerThreads[i] = new ConsumerThread("DataCarrier.Consumser." + i + ".Thread", prototype, consumeCycle);
consumerThreads[i].setDaemon(true);
}
......
......@@ -30,12 +30,14 @@ public class ConsumerThread<T> extends Thread {
private volatile boolean running;
private IConsumer<T> consumer;
private List<DataSource> dataSources;
private long consumeCycle;
ConsumerThread(String threadName, IConsumer<T> consumer) {
ConsumerThread(String threadName, IConsumer<T> consumer, long consumeCycle) {
super(threadName);
this.consumer = consumer;
running = false;
dataSources = new LinkedList<DataSource>();
this.consumeCycle = consumeCycle;
}
/**
......@@ -67,7 +69,7 @@ public class ConsumerThread<T> extends Thread {
if (!hasData) {
try {
Thread.sleep(20);
Thread.sleep(consumeCycle);
} catch (InterruptedException e) {
}
}
......
......@@ -34,7 +34,7 @@ public class ConsumerPoolTest {
@Test
public void testBeginConsumerPool() throws IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
pool.begin();
ConsumerThread[] threads = (ConsumerThread[])MemberModifier.field(ConsumerPool.class, "consumerThreads").get(pool);
......@@ -46,7 +46,7 @@ public class ConsumerPoolTest {
@Test
public void testCloseConsumerPool() throws InterruptedException, IllegalAccessException {
Channels<SampleData> channels = new Channels<SampleData>(2, 100, new SimpleRollingPartitioner<SampleData>(), BufferStrategy.BLOCKING);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2);
ConsumerPool<SampleData> pool = new ConsumerPool<SampleData>(channels, new SampleConsumer(), 2, 20);
pool.begin();
Thread.sleep(5000);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册