未验证 提交 fc0ef901 编写于 作者: S Sijie Guo 提交者: GitHub

[functions][worker] timeout creating producer for worker (#2738)



*Motivation*

Sometime when we run worker service as part of broker, some pods can be hanging on creating producers to assignment topics.
It is unknown whether is it a k8s problem or not. But in general, timeout to fail fast to allow k8s to reschedule the pods.

*Changes*

Add a timeout logic at creating producers.
上级 d26bea8f
......@@ -18,19 +18,19 @@
*/
package org.apache.pulsar.functions.worker;
import static org.apache.pulsar.functions.worker.SchedulerManager.checkHeartBeatFunction;
import com.google.common.base.Stopwatch;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
......@@ -88,21 +88,48 @@ public class SchedulerManager implements AutoCloseable {
this.scheduler = Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class,
Thread.currentThread().getContextClassLoader());
try {
this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic())
.enableBatching(false).blockIfQueueFull(true).compressionType(CompressionType.LZ4).
sendTimeout(0, TimeUnit.MILLISECONDS).create();
} catch (PulsarClientException e) {
log.error("Failed to create producer to function assignment topic "
+ this.workerConfig.getFunctionAssignmentTopic(), e);
throw new RuntimeException(e);
}
this.producer = createProducer(pulsarClient, workerConfig);
this.executorService = executor;
scheduleCompaction(executor, workerConfig.getTopicCompactionFrequencySec());
}
private static Producer<byte[]> createProducer(PulsarClient client, WorkerConfig config) {
Stopwatch stopwatch = Stopwatch.createStarted();
for (int i = 0; i < 6; i++) {
try {
return client.newProducer().topic(config.getFunctionAssignmentTopic())
.enableBatching(false)
.blockIfQueueFull(true)
.compressionType(CompressionType.LZ4)
.sendTimeout(0, TimeUnit.MILLISECONDS)
.createAsync().get(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} catch (ExecutionException e) {
log.error("Encountered exceptions at creating producer for topic {}",
config.getFunctionAssignmentTopic(), e);
throw new RuntimeException(e);
} catch (TimeoutException e) {
try {
log.info("Can't create a producer on assignment topic {} in {} seconds, retry in 10 seconds ...",
stopwatch.elapsed(TimeUnit.SECONDS));
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e1) {
log.error("Interrupted at creating producer to topic {}", config.getFunctionAssignmentTopic(), e);
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
continue;
}
}
throw new RuntimeException("Can't create a producer on assignment topic "
+ config.getFunctionAssignmentTopic() + " in " + stopwatch.elapsed(TimeUnit.SECONDS)
+ " seconds, fail fast ...");
}
public Future<?> schedule() {
return executorService.submit(() -> {
synchronized (SchedulerManager.this) {
......
......@@ -53,7 +53,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.mockito.Mockito;
......@@ -81,18 +80,6 @@ public class SchedulerManagerTest {
private TypedMessageBuilder<byte[]> message;
private ScheduledExecutorService executor;
private static PulsarClient mockPulsarClient() throws PulsarClientException {
ProducerBuilder<byte[]> builder = mock(ProducerBuilder.class);
when(builder.topic(anyString())).thenReturn(builder);
when(builder.create()).thenReturn(mock(Producer.class));
PulsarClient client = mock(PulsarClient.class);
when(client.newProducer()).thenReturn(builder);
return client;
}
@BeforeMethod
public void setup() throws PulsarClientException {
WorkerConfig workerConfig = new WorkerConfig();
......@@ -121,7 +108,7 @@ public class SchedulerManagerTest {
when(builder.compressionType(any(CompressionType.class))).thenReturn(builder);
when(builder.sendTimeout(anyInt(), any(TimeUnit.class))).thenReturn(builder);
when(builder.create()).thenReturn(producer);
when(builder.createAsync()).thenReturn(CompletableFuture.completedFuture(producer));
PulsarClient pulsarClient = mock(PulsarClient.class);
when(pulsarClient.newProducer()).thenReturn(builder);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册