提交 b95b568a 编写于 作者: P pengys5

actor comments

上级 daa753a6
...@@ -11,18 +11,45 @@ import org.apache.logging.log4j.LogManager; ...@@ -11,18 +11,45 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
/** /**
* The <code>AbstractClusterWorker</code> should be implemented by any class whose instances
* are intended to provide receive remote message that message will transfer across different jvm
* which running in same server or another address server.
* <p>
* Usually the implemented class used to receive persistence data or aggregation the metric.
*
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public abstract class AbstractClusterWorker extends AbstractWorker { public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Constructs a <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have
* same responsibility which use to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext); super(role, clusterContext, selfContext);
} }
/**
* Receive message
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception { final public void allocateJob(Object message) throws Exception {
onWork(message); onWork(message);
} }
/**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception; protected abstract void onWork(Object message) throws Exception;
static class WorkerWithAkka extends UntypedActor { static class WorkerWithAkka extends UntypedActor {
......
...@@ -4,17 +4,36 @@ import akka.actor.ActorRef; ...@@ -4,17 +4,36 @@ import akka.actor.ActorRef;
import akka.actor.Props; import akka.actor.Props;
/** /**
* The <code>AbstractClusterWorkerProvider</code> should be implemented by any class whose instances
* are intended to provide create the class instance whose implemented {@link AbstractClusterWorker}.
* <p>
*
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWorker> extends AbstractWorkerProvider<T> { public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWorker> extends AbstractWorkerProvider<T> {
/**
* Create how many worker instance of {@link AbstractClusterWorker} in one jvm.
*
* @return The worker instance number.
*/
public abstract int workerNum(); public abstract int workerNum();
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
* @param localContext Not used, will be null.
* @return The created worker reference. See {@link ClusterWorkerRef}
* @throws IllegalArgumentException Not used.
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another worker
* instance, when the worker provider not find then Throw this Exception.
*/
@Override @Override
final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException { final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role()); int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());
T clusterWorker = (T) workerInstance(getClusterContext()); T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart(); clusterWorker.preStart();
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num); ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num);
......
...@@ -6,34 +6,72 @@ import com.lmax.disruptor.EventHandler; ...@@ -6,34 +6,72 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
/** /**
* The <code>AbstractLocalAsyncWorker</code> should be implemented by any class whose instances
* are intended to provide receive asynchronous message in same jvm.
*
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker { public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
/**
* Constructs a <code>AbstractLocalAsyncWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have
* same responsibility which use to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) { public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext); super(role, clusterContext, selfContext);
} }
/**
* The asynchronous worker always use to persistence data into db, this is the end of the streaming,
* so usually no need to create the next worker instance at the time of this worker instance create.
*
* @throws ProviderNotFoundException When worker provider not found, it will be throw this exception.
*/
@Override @Override
public void preStart() throws ProviderNotFoundException { public void preStart() throws ProviderNotFoundException {
} }
final public void allocateJob(Object request) throws Exception { /**
onWork(request); * Receive message
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception {
onWork(message);
} }
protected abstract void onWork(Object request) throws Exception; /**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception;
static class WorkerWithDisruptor implements EventHandler<MessageHolder> { static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
private RingBuffer<MessageHolder> ringBuffer; private RingBuffer<MessageHolder> ringBuffer;
private AbstractLocalAsyncWorker asyncWorker; private AbstractLocalAsyncWorker asyncWorker;
public WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) { WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
this.ringBuffer = ringBuffer; this.ringBuffer = ringBuffer;
this.asyncWorker = asyncWorker; this.asyncWorker = asyncWorker;
} }
/**
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) { public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
try { try {
Object message = event.getMessage(); Object message = event.getMessage();
...@@ -48,6 +86,12 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker { ...@@ -48,6 +86,12 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
} }
} }
/**
* Push the message into disruptor ring buffer.
*
* @param message of the data to process.
* @throws Exception not used.
*/
public void tell(Object message) throws Exception { public void tell(Object message) throws Exception {
long sequence = ringBuffer.next(); long sequence = ringBuffer.next();
try { try {
......
...@@ -5,9 +5,10 @@ package com.a.eye.skywalking.collector.actor.selector; ...@@ -5,9 +5,10 @@ package com.a.eye.skywalking.collector.actor.selector;
* are intended to provide send message with {@link HashCodeSelector}. * are intended to provide send message with {@link HashCodeSelector}.
* <p> * <p>
* Usually the implemented class used to persistence data to database * Usually the implemented class used to persistence data to database
* or aggregation the metric, * or aggregation the metric.
* *
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public abstract class AbstractHashMessage { public abstract class AbstractHashMessage {
private int hashCode; private int hashCode;
...@@ -16,7 +17,7 @@ public abstract class AbstractHashMessage { ...@@ -16,7 +17,7 @@ public abstract class AbstractHashMessage {
this.hashCode = key.hashCode(); this.hashCode = key.hashCode();
} }
protected int getHashCode() { int getHashCode() {
return hashCode; return hashCode;
} }
} }
...@@ -11,6 +11,7 @@ import java.util.List; ...@@ -11,6 +11,7 @@ import java.util.List;
* message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data. * message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data.
* *
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public class HashCodeSelector implements WorkerSelector<WorkerRef> { public class HashCodeSelector implements WorkerSelector<WorkerRef> {
......
...@@ -10,6 +10,7 @@ import java.util.List; ...@@ -10,6 +10,7 @@ import java.util.List;
* It choose {@link WorkerRef} nearly random, by round-robin. * It choose {@link WorkerRef} nearly random, by round-robin.
* *
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public class RollingSelector implements WorkerSelector<WorkerRef> { public class RollingSelector implements WorkerSelector<WorkerRef> {
......
...@@ -12,6 +12,7 @@ import java.util.List; ...@@ -12,6 +12,7 @@ import java.util.List;
* Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster * Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster
* *
* @author pengys5 * @author pengys5
* @since feature3.0
*/ */
public interface WorkerSelector<T extends WorkerRef> { public interface WorkerSelector<T extends WorkerRef> {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册