diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index fca14857f1b32d0e35a90425498c1cc12162c658..f826453799f8c8547f5a53f4edd6709c53922f86 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -31,6 +31,8 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.slf4j.Logger; @@ -47,6 +49,7 @@ import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.spy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { @@ -132,5 +135,63 @@ public class PulsarBrokerStatsClientTest extends ProducerConsumerBase { log.info("-- Exiting {} test --", methodName); } + @Test + public void testGetPartitionedTopicMetaData() throws Exception { + log.info("-- Starting {} test --", methodName); + + final String topicName = "persistent://my-property/my-ns/my-topic1"; + final String subscriptionName = "my-subscriber-name"; + + try { + String url = "http://localhost:" + BROKER_WEBSERVICE_PORT; + if (isTcpLookup) { + url = "pulsar://localhost:" + BROKER_PORT; + } + PulsarClient client = newPulsarClient(url, 0); + + Consumer consumer = client.newConsumer().topic(topicName).subscriptionName(subscriptionName) + .acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe(); + Producer producer = client.newProducer().topic(topicName).create(); + + consumer.close(); + producer.close(); + client.close(); + } catch (PulsarClientException pce) { + log.error("create producer or consumer error: ", pce); + fail(); + } + + log.info("-- Exiting {} test --", methodName); + } + + @Test (timeOut = 4000) + public void testGetPartitionedTopicDataTimeout() { + log.info("-- Starting {} test --", methodName); + + final String topicName = "persistent://my-property/my-ns/my-topic1"; + + String url = "http://localhost:51000,localhost:51001"; + if (isTcpLookup) { + url = "pulsar://localhost:51000,localhost:51001"; + } + + PulsarClient client; + try { + client = PulsarClient.builder() + .serviceUrl(url) + .statsInterval(0, TimeUnit.SECONDS) + .operationTimeout(3, TimeUnit.SECONDS) + .build(); + + Producer producer = client.newProducer().topic(topicName).create(); + + fail(); + } catch (PulsarClientException pce) { + log.error("create producer error: ", pce); + } + + log.info("-- Exiting {} test --", methodName); + } + private static final Logger log = LoggerFactory.getLogger(PulsarBrokerStatsClientTest.class); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java index 96c62d2e9b6ef81c1d0dc9b7cc3ae68c3e902359..845c741eb43c456f26791b1a7ca8341d60809b3d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import java.io.Closeable; import java.io.IOException; import java.net.HttpURLConnection; +import java.net.URI; import java.net.URL; import java.util.Map; import java.util.Map.Entry; @@ -127,8 +128,9 @@ public class HttpClient implements Closeable { public CompletableFuture get(String path, Class clazz) { final CompletableFuture future = new CompletableFuture<>(); try { - String requestUrl = new URL(serviceNameResolver.resolveHostUri().toURL(), path).toString(); - String remoteHostName = serviceNameResolver.resolveHostUri().getHost(); + URI hostUri = serviceNameResolver.resolveHostUri(); + String requestUrl = new URL(hostUri.toURL(), path).toString(); + String remoteHostName = hostUri.getHost(); AuthenticationDataProvider authData = authentication.getAuthData(remoteHostName); CompletableFuture> authFuture = new CompletableFuture<>(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index a63b8641c6b8fc4f1fe55c5cf81aa18ce70c4d3b..f053da8608e514534877756ed4b708cc879e2089 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -651,17 +651,42 @@ public class PulsarClientImpl implements PulsarClient { public CompletableFuture getPartitionedTopicMetadata(String topic) { - CompletableFuture metadataFuture; + CompletableFuture metadataFuture = new CompletableFuture<>(); try { TopicName topicName = TopicName.get(topic); - metadataFuture = lookup.getPartitionedTopicMetadata(topicName); + AtomicLong opTimeoutMs = new AtomicLong(conf.getOperationTimeoutMs()); + Backoff backoff = new BackoffBuilder() + .setInitialTime(100, TimeUnit.NANOSECONDS) + .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) + .setMax(0, TimeUnit.NANOSECONDS) + .create(); + getPartitionedTopicMetadata(topicName, backoff, opTimeoutMs, metadataFuture); } catch (IllegalArgumentException e) { return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(e.getMessage())); } return metadataFuture; } + private void getPartitionedTopicMetadata(TopicName topicName, + Backoff backoff, + AtomicLong remainingTime, + CompletableFuture future) { + lookup.getPartitionedTopicMetadata(topicName).thenAccept(future::complete).exceptionally(e -> { + long nextDelay = Math.min(backoff.next(), remainingTime.get()); + if (nextDelay <= 0) { + future.completeExceptionally(e); + return null; + } + + timer.newTimeout( task -> { + remainingTime.addAndGet(-nextDelay); + getPartitionedTopicMetadata(topicName, backoff, remainingTime, future); + }, nextDelay, TimeUnit.MILLISECONDS); + return null; + }); + } + @Override public CompletableFuture> getPartitionsForTopic(String topic) { return getPartitionedTopicMetadata(topic).thenApply(metadata -> { diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java index eac23c5211107405a33c566aeb4e32095c8bb154..fb072e573891cde74c09f2f4980d170f899c84dc 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConnectionTimeoutTest.java @@ -23,8 +23,10 @@ import io.netty.channel.ConnectTimeoutException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import org.apache.http.conn.ConnectionPoolTimeoutException; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.testng.Assert; import org.testng.annotations.Test; @@ -48,9 +50,7 @@ public class ConnectionTimeoutTest { Assert.fail("Shouldn't be able to connect to anything"); } catch (Exception e) { Assert.assertFalse(defaultFuture.isDone()); - Assert.assertEquals(e.getCause().getCause().getCause().getClass(), - ConnectTimeoutException.class); - Assert.assertTrue((System.nanoTime() - startNanos) < TimeUnit.SECONDS.toNanos(3)); + Assert.assertEquals(e.getCause().getCause().getCause().getClass(), ConnectTimeoutException.class); } } }