From 7b60e2d46b0f67a99dd1b5ad0c6220f7bf7e7393 Mon Sep 17 00:00:00 2001 From: Marvin Cai Date: Fri, 21 Aug 2020 06:48:55 -0700 Subject: [PATCH] Return more informative error message when trying to create subscription on non-persistent throug Rest API or pulsar-admin CLI. (#7831) Fixes #7397 Motivation When use pulsar-admin to create a subscription on a non-persistent topic, get the server error This change return more informative error message when trying to create subscript ion on non-persistent through Rest API or pulsar-admin CLI. Modifications Currently when creating subscription is called with non-persistent topic service will try to create the subscription which will fail with casting exception when trying to cast NonPersistentSubscription to PersistentSubscription and client will see internal error. Add check if create subscription command is called for a non-persistent topic before actually Verifying this change This change added tests and can be verified as follows: Added unit test Verified with local standalone --- .../pulsar/broker/admin/v1/PersistentTopics.java | 5 +++++ .../pulsar/broker/admin/v2/PersistentTopics.java | 5 +++++ .../pulsar/broker/admin/PersistentTopicsTest.java | 13 +++++++++++++ 3 files changed, 23 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index 36ed69cf7d0..84414fa752c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -522,6 +522,7 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"), @ApiResponse(code = 403, message = "Don't have admin permission"), @ApiResponse(code = 404, message = "Topic/Subscription does not exist"), @ApiResponse(code = 405, message = "Not supported for partitioned topics") }) @@ -532,6 +533,10 @@ public class PersistentTopics extends PersistentTopicsBase { @QueryParam("replicated") boolean replicated) { try { validateTopicName(property, cluster, namespace, topic); + if (!topicName.isPersistent()) { + throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" + + "can only be done through client"); + } internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated); } catch (WebApplicationException wae) { asyncResponse.resume(wae); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 39477c052a9..521fb75252c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -971,6 +971,7 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id") @ApiResponses(value = { @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"), + @ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"), @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" + "subscriber is not authorized to access this operation"), @ApiResponse(code = 403, message = "Don't have admin permission"), @@ -1001,6 +1002,10 @@ public class PersistentTopics extends PersistentTopicsBase { ) { try { validateTopicName(tenant, namespace, topic); + if (!topicName.isPersistent()) { + throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" + + "can only be done through client"); + } internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated); } catch (WebApplicationException wae) { asyncResponse.resume(wae); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 92f7693d758..dbcac1b17a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.util.Arrays; +import javax.ws.rs.WebApplicationException; import javax.ws.rs.core.Response.Status; import org.apache.pulsar.broker.admin.v2.NonPersistentTopics; import org.apache.pulsar.broker.admin.v2.PersistentTopics; @@ -205,6 +206,18 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode()); } + @Test + public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException { + doReturn(TopicDomain.non_persistent.value()).when(persistentTopics).domain(); + AsyncResponse response = mock(AsyncResponse.class); + ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(RestException.class); + persistentTopics.createSubscription(response, testTenant, testNamespace, + "testCreateSubscriptionForNonPersistentTopic", "sub", + true, (MessageIdImpl) MessageId.earliest, false); + verify(response, timeout(5000).times(1)).resume(responseCaptor.capture()); + Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); + } + @Test public void testTerminatePartitionedTopic() { String testLocalTopicName = "topic-not-found"; -- GitLab