未验证 提交 0724482f 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

Add ability for BatchPushSource to notify errors asynchronously (#7865)

Co-authored-by: NJerry Peng <jerryp@splunk.com>
上级 b00f5e79
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.batchdatagenerator;
import io.codearte.jfairy.Fairy;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.SourceContext;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
@Slf4j
public class BatchDataGeneratorPushSource extends BatchPushSource<Person> implements Runnable {
private Fairy fairy;
private SourceContext sourceContext;
private int maxRecordsPerCycle = 10;
private ExecutorService executor = Executors.newSingleThreadExecutor();
@Override
public void close() {
executor.shutdownNow();
}
@Override
public void open(Map config, SourceContext context) throws Exception {
this.fairy = Fairy.create();
this.sourceContext = context;
}
@Override
public void discover(Consumer taskEater) throws Exception {
log.info("Generating one task for each instance");
for (int i = 0; i < sourceContext.getNumInstances(); ++i) {
taskEater.accept(String.format("something-%d", System.currentTimeMillis()).getBytes());
}
}
@Override
public void prepare(byte[] instanceSplit) throws Exception {
log.info("Instance " + sourceContext.getInstanceId() + " got a new discovered task {}", new String(instanceSplit));
executor.submit(this);
}
@Override
public void run() {
try {
for (int i = 0; i < maxRecordsPerCycle; i++) {
Thread.sleep(50);
Record<Person> record = () -> new Person(fairy.person());
consume(record);
}
// this task is completed
consume(null);
} catch (Exception e) {
notifyError(e);
}
}
}
......@@ -37,6 +37,21 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
}
}
private static class ErrorNotifierRecord implements Record {
private Exception e;
public ErrorNotifierRecord(Exception e) {
this.e = e;
}
@Override
public Object getValue() {
return null;
}
public Exception getException() {
return e;
}
}
private LinkedBlockingQueue<Record<T>> queue;
private static final int DEFAULT_QUEUE_LENGTH = 1000;
private final NullRecord nullRecord = new NullRecord();
......@@ -48,6 +63,9 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
@Override
public Record<T> readNext() throws Exception {
Record<T> record = queue.take();
if (record instanceof ErrorNotifierRecord) {
throw ((ErrorNotifierRecord) record).getException();
}
if (record instanceof NullRecord) {
return null;
} else {
......@@ -80,4 +98,12 @@ public abstract class BatchPushSource<T> implements BatchSource<T> {
public int getQueueLength() {
return DEFAULT_QUEUE_LENGTH;
}
/**
* Allows the source to notify errors asynchronously
* @param ex
*/
public void notifyError(Exception ex) {
consume(new ErrorNotifierRecord(ex));
}
}
\ No newline at end of file
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.core;
import org.apache.pulsar.io.core.BatchPushSource;
import org.apache.pulsar.io.core.SourceContext;
import org.testng.annotations.Test;
import java.util.Map;
import java.util.function.Consumer;
public class BatchPushSourceTest {
BatchPushSource testBatchSource = new BatchPushSource() {
@Override
public void open(Map config, SourceContext context) throws Exception {
}
@Override
public void discover(Consumer taskEater) throws Exception {
}
@Override
public void prepare(byte[] task) throws Exception {
}
@Override
public void close() throws Exception {
}
};
@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "test exception")
public void testNotifyErrors() throws Exception {
testBatchSource.notifyError(new RuntimeException("test exception"));
testBatchSource.readNext();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册