提交 76da1b07 编写于 作者: P peng-yongsheng

Provide queue module with disruptor and data carrier providers.

上级 51a652dd
...@@ -117,5 +117,17 @@ ...@@ -117,5 +117,17 @@
<version>${project.version}</version> <version>${project.version}</version>
</dependency> </dependency>
<!-- remote provider --> <!-- remote provider -->
<!-- queue provider -->
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-queue-datacarrier-provider</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-queue-disruptor-provider</artifactId>
<version>${project.version}</version>
</dependency>
<!-- queue provider -->
</dependencies> </dependencies>
</project> </project>
\ No newline at end of file
...@@ -14,7 +14,6 @@ ...@@ -14,7 +14,6 @@
<modules> <modules>
<module>client-component</module> <module>client-component</module>
<module>server-component</module> <module>server-component</module>
<module>queue-component</module>
</modules> </modules>
<dependencies> <dependencies>
......
...@@ -30,4 +30,11 @@ ...@@ -30,4 +30,11 @@
<artifactId>collector-queue-datacarrier-provider</artifactId> <artifactId>collector-queue-datacarrier-provider</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-queue-define</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.datacarrier;
import java.util.Properties;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.datacarrier.service.DataCarrierQueueCreatorService;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class QueueModuleDataCarrierProvider extends ModuleProvider {
@Override public String name() {
return "datacarrier";
}
@Override public Class<? extends Module> module() {
return QueueModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(QueueCreatorService.class, new DataCarrierQueueCreatorService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.datacarrier.service;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class DataCarrierQueueCreatorService implements QueueCreatorService {
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
return null;
}
}
#
# Copyright 2017, OpenSkywalking Organization All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.queue.datacarrier.QueueModuleDataCarrierProvider
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class QueueModule extends Module {
public static final String NAME = "queue";
@Override public String name() {
return NAME;
}
@Override public Class[] services() {
return new Class[] {QueueCreatorService.class};
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
import java.util.concurrent.ThreadFactory;
/**
* @author peng-yongsheng
*/
public enum DaemonThreadFactory implements ThreadFactory {
INSTANCE;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public class EndOfBatchCommand {
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public class MessageHolder {
private Object message;
public Object getMessage() {
return message;
}
public void setMessage(Object message) {
this.message = message;
}
public void reset() {
message = null;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public interface QueueCreator {
QueueEventHandler create(int queueSize, QueueExecutor executor);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public interface QueueEventHandler {
void tell(Object message);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.base;
/**
* @author peng-yongsheng
*/
public interface QueueExecutor {
void execute(Object message);
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.service;
import org.skywalking.apm.collector.core.module.Service;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
/**
* @author peng-yongsheng
*/
public interface QueueCreatorService extends Service {
QueueEventHandler create(int queueSize, QueueExecutor executor);
}
#
# Copyright 2017, OpenSkywalking Organization All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.queue.QueueModule
\ No newline at end of file
...@@ -30,4 +30,16 @@ ...@@ -30,4 +30,16 @@
<artifactId>collector-queue-disruptor-provider</artifactId> <artifactId>collector-queue-disruptor-provider</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>collector-queue-define</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor;
import java.util.Properties;
import org.skywalking.apm.collector.core.module.Module;
import org.skywalking.apm.collector.core.module.ModuleProvider;
import org.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.skywalking.apm.collector.queue.QueueModule;
import org.skywalking.apm.collector.queue.disruptor.service.DisruptorQueueCreatorService;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class QueueModuleDisruptorProvider extends ModuleProvider {
@Override public String name() {
return "disruptor";
}
@Override public Class<? extends Module> module() {
return QueueModule.class;
}
@Override public void prepare(Properties config) throws ServiceNotProvidedException {
this.registerServiceImplementation(QueueCreatorService.class, new DisruptorQueueCreatorService());
}
@Override public void start(Properties config) throws ServiceNotProvidedException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException {
}
@Override public String[] requiredModules() {
return new String[0];
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
import org.skywalking.apm.collector.queue.base.EndOfBatchCommand;
import org.skywalking.apm.collector.queue.base.MessageHolder;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author peng-yongsheng
*/
public class DisruptorEventHandler implements EventHandler<MessageHolder>, QueueEventHandler {
private final Logger logger = LoggerFactory.getLogger(DisruptorEventHandler.class);
private RingBuffer<MessageHolder> ringBuffer;
private QueueExecutor executor;
DisruptorEventHandler(RingBuffer<MessageHolder> ringBuffer, QueueExecutor executor) {
this.ringBuffer = ringBuffer;
this.executor = executor;
}
/**
* Receive the message from disruptor, when message in disruptor is empty, then send the cached data
* to the next workers.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
* @param endOfBatch flag to indicate if this is the last event in a batch from the {@link RingBuffer}
*/
public void onEvent(MessageHolder event, long sequence, boolean endOfBatch) {
Object message = event.getMessage();
event.reset();
executor.execute(message);
if (endOfBatch) {
executor.execute(new EndOfBatchCommand());
}
}
/**
* Push the message into disruptor ring buffer.
*
* @param message of the data to process.
*/
public void tell(Object message) {
long sequence = ringBuffer.next();
try {
ringBuffer.get(sequence).setMessage(message);
} finally {
ringBuffer.publish(sequence);
}
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.skywalking.apm.collector.queue.base.DaemonThreadFactory;
import org.skywalking.apm.collector.queue.base.MessageHolder;
import org.skywalking.apm.collector.queue.base.QueueCreator;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
/**
* @author peng-yongsheng
*/
public class DisruptorQueueCreator implements QueueCreator {
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
// Specify the size of the ring buffer, must be power of 2.
if (!((((queueSize - 1) & queueSize) == 0) && queueSize != 0)) {
throw new IllegalArgumentException("queue size must be power of 2");
}
// Construct the Disruptor
Disruptor<MessageHolder> disruptor = new Disruptor(MessageHolderFactory.INSTANCE, queueSize, DaemonThreadFactory.INSTANCE);
RingBuffer<MessageHolder> ringBuffer = disruptor.getRingBuffer();
DisruptorEventHandler eventHandler = new DisruptorEventHandler(ringBuffer, executor);
// Connect the handler
disruptor.handleEventsWith(eventHandler);
// Start the Disruptor, starts all threads running
disruptor.start();
return eventHandler;
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor.base;
import com.lmax.disruptor.EventFactory;
import org.skywalking.apm.collector.queue.base.MessageHolder;
/**
* @author peng-yongsheng
*/
public class MessageHolderFactory implements EventFactory<MessageHolder> {
public static MessageHolderFactory INSTANCE = new MessageHolderFactory();
public MessageHolder newInstance() {
return new MessageHolder();
}
}
/*
* Copyright 2017, OpenSkywalking Organization All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Project repository: https://github.com/OpenSkywalking/skywalking
*/
package org.skywalking.apm.collector.queue.disruptor.service;
import org.skywalking.apm.collector.queue.base.QueueEventHandler;
import org.skywalking.apm.collector.queue.base.QueueExecutor;
import org.skywalking.apm.collector.queue.service.QueueCreatorService;
/**
* @author peng-yongsheng
*/
public class DisruptorQueueCreatorService implements QueueCreatorService {
@Override public QueueEventHandler create(int queueSize, QueueExecutor executor) {
return null;
}
}
#
# Copyright 2017, OpenSkywalking Organization All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Project repository: https://github.com/OpenSkywalking/skywalking
#
org.skywalking.apm.collector.queue.disruptor.QueueModuleDisruptorProvider
\ No newline at end of file
...@@ -35,5 +35,11 @@ ...@@ -35,5 +35,11 @@
<module>collector-queue-disruptor-provider</module> <module>collector-queue-disruptor-provider</module>
</modules> </modules>
<dependencies>
<dependency>
<groupId>org.skywalking</groupId>
<artifactId>apm-collector-core</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</project> </project>
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册