提交 ccba3d5b 编写于 作者: wu-sheng's avatar wu-sheng 提交者: GitHub

Merge pull request #132 from wu-sheng/feature/collector

Going to adjust some comments after merge.

* add comments class in selector package

* actor comments

* 1. comments of collector config
2. index creator support options: overwrite, ignore, off

* fix index testcase bug

* fix comments

* change the config of index creat to index initialize model

* test case failure
......@@ -11,18 +11,44 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* The <code>AbstractClusterWorker</code> implementations represent workers,
* which receive remote messages.
* <p>
* Usually, the implementations are doing persistent, or aggregate works.
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorker extends AbstractWorker {
/**
* Construct an <code>AbstractClusterWorker</code> with the worker role and context.
*
* @param role If multi-workers are for load balance, they should be more likely called worker instance.
* Meaning, each worker have multi instances.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
protected AbstractClusterWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
/**
* This method use for message producer to call for send message.
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception {
onWork(message);
}
/**
* This method use for message receiver to analyse message.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception;
static class WorkerWithAkka extends UntypedActor {
......
......@@ -4,17 +4,36 @@ import akka.actor.ActorRef;
import akka.actor.Props;
/**
* The <code>AbstractClusterWorkerProvider</code> implementations represent providers,
* which create instance of cluster workers whose implemented {@link AbstractClusterWorker}.
* <p>
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractClusterWorkerProvider<T extends AbstractClusterWorker> extends AbstractWorkerProvider<T> {
/**
* Create how many worker instance of {@link AbstractClusterWorker} in one jvm.
*
* @return The worker instance number.
*/
public abstract int workerNum();
/**
* Create the worker instance into akka system, the akka system will control the cluster worker life cycle.
*
* @param localContext Not used, will be null.
* @return The created worker reference. See {@link ClusterWorkerRef}
* @throws IllegalArgumentException Not used.
* @throws ProviderNotFoundException This worker instance attempted to find a provider which use to create another worker
* instance, when the worker provider not find then Throw this Exception.
*/
@Override
final public WorkerRef onCreate(LocalWorkerContext localContext) throws IllegalArgumentException, ProviderNotFoundException {
int num = ClusterWorkerRefCounter.INSTANCE.incrementAndGet(role());
T clusterWorker = (T) workerInstance(getClusterContext());
T clusterWorker = workerInstance(getClusterContext());
clusterWorker.preStart();
ActorRef actorRef = getClusterContext().getAkkaSystem().actorOf(Props.create(AbstractClusterWorker.WorkerWithAkka.class, clusterWorker), role().roleName() + "_" + num);
......
......@@ -6,34 +6,72 @@ import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
/**
* The <code>AbstractLocalAsyncWorker</code> implementations represent workers,
* which receive local asynchronous message.
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
/**
* Construct an <code>AbstractLocalAsyncWorker</code> with the worker role and context.
*
* @param role The responsibility of worker in cluster, more than one workers can have
* same responsibility which use to provide load balancing ability.
* @param clusterContext See {@link ClusterWorkerContext}
* @param selfContext See {@link LocalWorkerContext}
*/
public AbstractLocalAsyncWorker(Role role, ClusterWorkerContext clusterContext, LocalWorkerContext selfContext) {
super(role, clusterContext, selfContext);
}
/**
* The asynchronous worker always use to persistence data into db, this is the end of the streaming,
* so usually no need to create the next worker instance at the time of this worker instance create.
*
* @throws ProviderNotFoundException When worker provider not found, it will be throw this exception.
*/
@Override
public void preStart() throws ProviderNotFoundException {
}
final public void allocateJob(Object request) throws Exception {
onWork(request);
/**
* Receive message
*
* @param message The persistence data or metric data.
* @throws Exception The Exception happen in {@link #onWork(Object)}
*/
final public void allocateJob(Object message) throws Exception {
onWork(message);
}
protected abstract void onWork(Object request) throws Exception;
/**
* The data process logic in this method.
*
* @param message Cast the message object to a expect subclass.
* @throws Exception Don't handle the exception, throw it.
*/
protected abstract void onWork(Object message) throws Exception;
static class WorkerWithDisruptor implements EventHandler<MessageHolder> {
private RingBuffer<MessageHolder> ringBuffer;
private AbstractLocalAsyncWorker asyncWorker;
public WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
WorkerWithDisruptor(RingBuffer<MessageHolder> ringBuffer, AbstractLocalAsyncWorker asyncWorker) {
this.ringBuffer = ringBuffer;
this.asyncWorker = asyncWorker;
}
/**
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
try {
Object message = event.getMessage();
......@@ -48,6 +86,12 @@ public abstract class AbstractLocalAsyncWorker extends AbstractLocalWorker {
}
}
/**
* Push the message into disruptor ring buffer.
*
* @param message of the data to process.
* @throws Exception not used.
*/
public void tell(Object message) throws Exception {
long sequence = ringBuffer.next();
try {
......
package com.a.eye.skywalking.collector.actor.selector;
/**
* The <code>AbstractHashMessage</code> implementations represent aggregate message,
* which use to aggregate metric.
* Make the message aggregator's worker selector use of {@link HashCodeSelector}.
* <p>
*
* @author pengys5
* @since v3.0-2017
*/
public abstract class AbstractHashMessage {
private int hashCode;
......@@ -10,7 +16,7 @@ public abstract class AbstractHashMessage {
this.hashCode = key.hashCode();
}
protected int getHashCode() {
int getHashCode() {
return hashCode;
}
}
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>HashCodeSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} by message {@link AbstractHashMessage} key's hashcode, so it can use to send the same hashcode
* message to same {@link WorkerRef}. Usually, use to database operate which avoid dirty data.
*
* @author pengys5
* @since v3.0-2017
*/
public class HashCodeSelector implements WorkerSelector<WorkerRef> {
/**
* Use message hashcode to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
if (message instanceof AbstractHashMessage) {
......
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>RollingSelector</code> is a simple implementation of {@link WorkerSelector}.
* It choose {@link WorkerRef} nearly random, by round-robin.
*
* @author pengys5
* @since v3.0-2017
*/
public class RollingSelector implements WorkerSelector<WorkerRef> {
private int index = 0;
/**
* Use round-robin to select {@link WorkerRef}.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
@Override
public WorkerRef select(List<WorkerRef> members, Object message) {
int size = members.size();
......
package com.a.eye.skywalking.collector.actor.selector;
import com.a.eye.skywalking.collector.actor.AbstractWorker;
import com.a.eye.skywalking.collector.actor.WorkerRef;
import java.util.List;
/**
* The <code>WorkerSelector</code> should be implemented by any class whose instances
* are intended to provide select a {@link WorkerRef} from a {@link WorkerRef} list.
* <p>
* Actually, the <code>WorkerRef</code> is designed to provide a routing ability in the collector cluster
*
* @author pengys5
* @since v3.0-2017
*/
public interface WorkerSelector<T extends WorkerRef> {
/**
* select a {@link WorkerRef} from a {@link WorkerRef} list.
*
* @param members given {@link WorkerRef} list, which size is greater than 0;
* @param message the {@link AbstractWorker} is going to send.
* @return the selected {@link WorkerRef}
*/
T select(List<T> members, Object message);
}
......@@ -16,13 +16,22 @@ public class EsConfig {
}
public static class Index {
public static class Initialize {
public static IndexInitMode mode;
}
public static class Shards {
public static String number;
public static String number = "";
}
public static class Replicas{
public static String number;
public static class Replicas {
public static String number = "";
}
}
}
public enum IndexInitMode {
auto, forced, manual
}
}
......@@ -5,6 +5,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.common.settings.Settings;
......@@ -86,5 +87,11 @@ public abstract class AbstractIndex {
return false;
}
final boolean isExists() {
IndicesAdminClient client = EsClient.INSTANCE.getClient().admin().indices();
IndicesExistsResponse response = client.prepareExists(index()).get();
return response.isExists();
}
public abstract String index();
}
......@@ -13,6 +13,7 @@ import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
......@@ -46,9 +47,9 @@ public enum EsClient {
public void indexRefresh(String... indexName) {
RefreshResponse response = client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
if (response.getShardFailures().length == response.getTotalShards()) {
logger.error("All elasticsearch shard index refresh failure, reason: %s", response.getShardFailures());
logger.error("All elasticsearch shard index refresh failure, reason: %s", Arrays.toString(response.getShardFailures()));
} else if (response.getShardFailures().length > 0) {
logger.error("In parts of elasticsearch shard index refresh failure, reason: %s", response.getShardFailures());
logger.error("In parts of elasticsearch shard index refresh failure, reason: %s", Arrays.toString(response.getShardFailures()));
}
logger.info("elasticsearch index refresh success");
}
......
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.worker.config.EsConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
......@@ -16,10 +17,19 @@ public enum IndexCreator {
private Logger logger = LogManager.getFormatterLogger(IndexCreator.class);
public void create() {
Set<AbstractIndex> indexSet = loadIndex();
for (AbstractIndex index : indexSet) {
index.deleteIndex();
index.createIndex();
if (!EsConfig.IndexInitMode.manual.equals(EsConfig.Es.Index.Initialize.mode)) {
Set<AbstractIndex> indexSet = loadIndex();
for (AbstractIndex index : indexSet) {
boolean isExists = index.isExists();
if (isExists) {
if (EsConfig.IndexInitMode.forced.equals(EsConfig.Es.Index.Initialize.mode)) {
index.deleteIndex();
index.createIndex();
}
} else {
index.createIndex();
}
}
}
}
......
# The remote server should connect to, hostname can be either hostname or IP address.
# Suggestion: set the real ip address.
cluster.current.hostname=127.0.0.1
cluster.current.port=11800
# The roles of this member. List of strings, e.g. roles = A, B
# In the future, the roles are part of the membership information and can be used by
# routers or other services to distribute work to certain member types,
# e.g. front-end and back-end nodes.
# In this version, all members has same roles, each of them will listen others status,
# because of network trouble or member jvm crash or every reason led to not reachable,
# the routers will stop to sending the message to the untouchable member.
cluster.current.roles=WorkersListener
# Initial contact points of the cluster, e.g. seed_nodes = 127.0.0.1:11800, 127.0.0.1:11801.
# The nodes to join automatically at startup.
# When setting akka configuration, it will be change.
# like: ["akka.tcp://system@127.0.0.1:11800", "akka.tcp://system@127.0.0.1:11801"].
# This is akka configuration, see: http://doc.akka.io/docs/akka/2.4/general/configuration.html
cluster.seed_nodes=127.0.0.1:11800
# elasticsearch configuration, config/elasticsearch.yml, see cluster.name
es.cluster.name=CollectorDBCluster
es.cluster.nodes=127.0.0.1:9300
es.cluster.transport.sniffer=true
# The elasticsearch nodes of cluster, comma separated, e.g. nodes=ip:port, ip:port
es.cluster.nodes=127.0.0.1:9300
# Automatic create elasticsearch index
# Options: auto, forced, manual
# auto: just create new index when index not created.
# forced: delete the index then create
es.index.initialize.mode=auto
es.index.shards.number=2
es.index.replicas.number=0
# You can configure a host either as a host name or IP address to identify a specific network
# interface on which to listen.
# Be used for web ui get the view data or agent post the trace segment.
http.hostname=127.0.0.1
# The TCP/IP port on which the connector listens for connections.
http.port=12800
# The contextPath is a URL prefix that identifies which context a HTTP request is destined for.
http.contextPath=/
# The analysis worker max cache size, when worker data size reach the size,
# then worker will send all cached data to the next worker and clear the cache.
cache.analysis.size=1024
# The persistence worker max cache size, same of "cache.analysis.size" ability.
cache.persistence.size=1024
WorkerNum.Node.NodeCompAgg.Value=10
......
......@@ -22,7 +22,7 @@ public class AbstractIndexTestCase {
@Test
public void testCreateSettingBuilder() throws IOException {
IndexTest indexTest = new IndexTest();
Assert.assertEquals("{\"index.number_of_shards\":null,\"index.number_of_replicas\":null}", indexTest.createSettingBuilder().string());
Assert.assertEquals("{\"index.number_of_shards\":\"\",\"index.number_of_replicas\":\"\"}", indexTest.createSettingBuilder().string());
}
class IndexTest extends AbstractIndex {
......
package com.a.eye.skywalking.collector.worker.storage;
import com.a.eye.skywalking.collector.worker.config.EsConfig;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
......@@ -25,6 +27,21 @@ import static org.powermock.api.mockito.PowerMockito.*;
@PowerMockIgnore({"javax.management.*"})
public class IndexCreatorTestCase {
private IndexCreator indexCreator;
private TestIndex testIndex;
@Before
public void init() throws Exception {
testIndex = mock(TestIndex.class);
indexCreator = mock(IndexCreator.class);
doCallRealMethod().when(indexCreator).create();
Set<AbstractIndex> indexSet = new HashSet<>();
indexSet.add(testIndex);
when(indexCreator, "loadIndex").thenReturn(indexSet);
}
@Test
public void testLoadIndex() throws Exception {
IndexCreator indexCreator = spy(IndexCreator.INSTANCE);
......@@ -47,20 +64,47 @@ public class IndexCreatorTestCase {
}
@Test
public void testCreate() throws Exception {
TestIndex testIndex = mock(TestIndex.class);
IndexCreator indexCreator = mock(IndexCreator.class);
doCallRealMethod().when(indexCreator).create();
public void testCreateOptionManual() throws Exception {
EsConfig.Es.Index.Initialize.mode = EsConfig.IndexInitMode.manual;
indexCreator.create();
Mockito.verify(testIndex, Mockito.never()).createIndex();
Mockito.verify(testIndex, Mockito.never()).deleteIndex();
}
Set<AbstractIndex> indexSet = new HashSet<>();
indexSet.add(testIndex);
@Test
public void testCreateOptionForcedIndexIsExists() throws Exception {
EsConfig.Es.Index.Initialize.mode = EsConfig.IndexInitMode.forced;
when(testIndex.isExists()).thenReturn(true);
indexCreator.create();
Mockito.verify(testIndex).createIndex();
Mockito.verify(testIndex).deleteIndex();
}
when(indexCreator, "loadIndex").thenReturn(indexSet);
@Test
public void testCreateOptionForcedIndexNotExists() throws Exception {
EsConfig.Es.Index.Initialize.mode = EsConfig.IndexInitMode.forced;
when(testIndex.isExists()).thenReturn(false);
indexCreator.create();
Mockito.verify(testIndex).createIndex();
Mockito.verify(testIndex, Mockito.never()).deleteIndex();
}
@Test
public void testCreateOptionAutoIndexNotExists() throws Exception {
EsConfig.Es.Index.Initialize.mode = EsConfig.IndexInitMode.auto;
when(testIndex.isExists()).thenReturn(false);
indexCreator.create();
Mockito.verify(testIndex).createIndex();
Mockito.verify(testIndex).deleteIndex();
Mockito.verify(testIndex, Mockito.never()).deleteIndex();
}
@Test
public void testCreateOptionAutoIndexExists() throws Exception {
EsConfig.Es.Index.Initialize.mode = EsConfig.IndexInitMode.auto;
when(testIndex.isExists()).thenReturn(true);
indexCreator.create();
Mockito.verify(testIndex, Mockito.never()).createIndex();
Mockito.verify(testIndex, Mockito.never()).deleteIndex();
}
class TestIndex extends AbstractIndex {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册