提交 aeb04aa5 编写于 作者: L lipenghui 提交者: Jia Zhai

Add options to rewrite namespace delimiter for pulsar sql. (#4749)

### Motivation

Fix #4732

### Modifications

Add options to rewrite the namespace delimiter, disable by default

Enable rewrite namespace delimiter can work well with superset:
<img width="1279" alt="superset" src="https://user-images.githubusercontent.com/12592133/61385412-f0f35700-a8e4-11e9-87b2-a31b62128b58.png">

### Does this pull request potentially affect one of the following parts:

*If `yes` was chosen, please highlight the changes*

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API: (no)
  - The schema: (no)
  - The default values of configurations: (no)
  - The wire protocol: (no)
  - The rest endpoints: (no)
  - The admin cli options: (no)
  - Anything that affects deployment: (no)

### Documentation

  - Does this pull request introduce a new feature? (no)

(cherry picked from commit 6ddd51ff)
上级 12c59047
...@@ -30,7 +30,12 @@ pulsar.target-num-splits=2 ...@@ -30,7 +30,12 @@ pulsar.target-num-splits=2
# max message queue size # max message queue size
pulsar.max-split-message-queue-size=10000 pulsar.max-split-message-queue-size=10000
# max entry queue size # max entry queue size
pulsar.max-split-entry-queue-size = 1000 pulsar.max-split-entry-queue-size=1000
# Rewrite namespace delimiter
# Warn: avoid using symbols allowed by Namespace (a-zA-Z_0-9 -=:%)
# to prevent erroneous rewriting
pulsar.namespace-delimiter-rewrite-enable=false
pulsar.rewrite-namespace-delimiter=/
####### TIERED STORAGE OFFLOADER CONFIGS ####### ####### TIERED STORAGE OFFLOADER CONFIGS #######
......
...@@ -31,7 +31,7 @@ public class NamedEntity { ...@@ -31,7 +31,7 @@ public class NamedEntity {
// allowed characters for property, namespace, cluster and topic names are // allowed characters for property, namespace, cluster and topic names are
// alphanumeric (a-zA-Z_0-9) and these special chars -=:. // alphanumeric (a-zA-Z_0-9) and these special chars -=:.
// % is allowed as part of valid URL encoding // % is allowed as part of valid URL encoding
private static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$"); public static final Pattern NAMED_ENTITY_PATTERN = Pattern.compile("^[-=:.\\w]*$");
public static void checkName(String name) throws IllegalArgumentException { public static void checkName(String name) throws IllegalArgumentException {
Matcher m = NAMED_ENTITY_PATTERN.matcher(name); Matcher m = NAMED_ENTITY_PATTERN.matcher(name);
......
...@@ -23,12 +23,14 @@ import io.airlift.configuration.Config; ...@@ -23,12 +23,14 @@ import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.protocol.Commands;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher;
public class PulsarConnectorConfig implements AutoCloseable { public class PulsarConnectorConfig implements AutoCloseable {
...@@ -40,8 +42,12 @@ public class PulsarConnectorConfig implements AutoCloseable { ...@@ -40,8 +42,12 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int maxSplitEntryQueueSize = 1000; private int maxSplitEntryQueueSize = 1000;
private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE; private int maxMessageSize = Commands.DEFAULT_MAX_MESSAGE_SIZE;
private String statsProvider = NullStatsProvider.class.getName(); private String statsProvider = NullStatsProvider.class.getName();
private Map<String, String> statsProviderConfigs = new HashMap<>(); private Map<String, String> statsProviderConfigs = new HashMap<>();
private boolean namespaceDelimiterRewriteEnable = false;
private String rewriteNamespaceDelimiter = "/";
/**** --- Ledger Offloading --- ****/ /**** --- Ledger Offloading --- ****/
private String managedLedgerOffloadDriver = null; private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = 2; private int managedLedgerOffloadMaxThreads = 2;
...@@ -191,6 +197,33 @@ public class PulsarConnectorConfig implements AutoCloseable { ...@@ -191,6 +197,33 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this; return this;
} }
public String getRewriteNamespaceDelimiter() {
return rewriteNamespaceDelimiter;
}
@Config("pulsar.rewrite-namespace-delimiter")
public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
if (m.matches()) {
throw new IllegalArgumentException(
"Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+ "because delimiter must contain characters which name of namespace not allowed"
);
}
this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
return this;
}
public boolean getNamespaceDelimiterRewriteEnable() {
return namespaceDelimiterRewriteEnable;
}
@Config("pulsar.namespace-delimiter-rewrite-enable")
public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
return this;
}
@NotNull @NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException { public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) { if (this.pulsarAdmin == null) {
......
...@@ -49,7 +49,9 @@ public class PulsarConnectorFactory implements ConnectorFactory { ...@@ -49,7 +49,9 @@ public class PulsarConnectorFactory implements ConnectorFactory {
@Override @Override
public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) { public Connector create(String connectorId, Map<String, String> config, ConnectorContext context) {
requireNonNull(config, "requiredConfig is null"); requireNonNull(config, "requiredConfig is null");
log.debug("Creating Pulsar connector with configs: %s", config); if (log.isDebugEnabled()) {
log.debug("Creating Pulsar connector with configs: %s", config);
}
try { try {
// A plugin is not required to use Guice; it is just very convenient // A plugin is not required to use Guice; it is just very convenient
Bootstrap app = new Bootstrap( Bootstrap app = new Bootstrap(
......
...@@ -83,4 +83,18 @@ public class PulsarConnectorUtils { ...@@ -83,4 +83,18 @@ public class PulsarConnectorUtils {
} }
return properties; return properties;
} }
public static String rewriteNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) {
return config.getNamespaceDelimiterRewriteEnable()
? namespace.replace("/", config.getRewriteNamespaceDelimiter())
: namespace;
}
public static String restoreNamespaceDelimiterIfNeeded(String namespace, PulsarConnectorConfig config) {
return config.getNamespaceDelimiterRewriteEnable()
? namespace.replace(config.getRewriteNamespaceDelimiter(), "/")
: namespace;
}
} }
...@@ -72,6 +72,7 @@ import java.util.Optional; ...@@ -72,6 +72,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.Stack; import java.util.Stack;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
...@@ -81,11 +82,14 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; ...@@ -81,11 +82,14 @@ import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle; import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle; import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
public class PulsarMetadata implements ConnectorMetadata { public class PulsarMetadata implements ConnectorMetadata {
private final String connectorId; private final String connectorId;
private final PulsarAdmin pulsarAdmin; private final PulsarAdmin pulsarAdmin;
private final PulsarConnectorConfig pulsarConnectorConfig;
private static final String INFORMATION_SCHEMA = "information_schema"; private static final String INFORMATION_SCHEMA = "information_schema";
...@@ -94,6 +98,7 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -94,6 +98,7 @@ public class PulsarMetadata implements ConnectorMetadata {
@Inject @Inject
public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) { public PulsarMetadata(PulsarConnectorId connectorId, PulsarConnectorConfig pulsarConnectorConfig) {
this.connectorId = requireNonNull(connectorId, "connectorId is null").toString(); this.connectorId = requireNonNull(connectorId, "connectorId is null").toString();
this.pulsarConnectorConfig = pulsarConnectorConfig;
try { try {
this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin(); this.pulsarAdmin = pulsarConnectorConfig.getPulsarAdmin();
} catch (PulsarClientException e) { } catch (PulsarClientException e) {
...@@ -107,7 +112,8 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -107,7 +112,8 @@ public class PulsarMetadata implements ConnectorMetadata {
try { try {
List<String> tenants = pulsarAdmin.tenants().getTenants(); List<String> tenants = pulsarAdmin.tenants().getTenants();
for (String tenant : tenants) { for (String tenant : tenants) {
prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant)); prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
} }
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get schemas from pulsar: " throw new RuntimeException("Failed to get schemas from pulsar: "
...@@ -163,7 +169,7 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -163,7 +169,7 @@ public class PulsarMetadata implements ConnectorMetadata {
} else { } else {
List<String> pulsarTopicList = null; List<String> pulsarTopicList = null;
try { try {
pulsarTopicList = this.pulsarAdmin.topics().getList(schemaNameOrNull); pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) { if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit"); log.warn("Schema " + schemaNameOrNull + " does not exsit");
...@@ -256,28 +262,29 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -256,28 +262,29 @@ public class PulsarMetadata implements ConnectorMetadata {
if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) { if (schemaTableName.getSchemaName().equals(INFORMATION_SCHEMA)) {
return null; return null;
} }
String namespace = restoreNamespaceDelimiterIfNeeded(schemaTableName.getSchemaName(), pulsarConnectorConfig);
TopicName topicName = TopicName.get( TopicName topicName = TopicName.get(
String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())); String.format("%s/%s", namespace, schemaTableName.getTableName()));
List<String> topics; List<String> topics;
try { try {
if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) { if (!PulsarConnectorUtils.isPartitionedTopic(topicName, this.pulsarAdmin)) {
topics = this.pulsarAdmin.topics().getList(schemaTableName.getSchemaName()); topics = this.pulsarAdmin.topics().getList(namespace);
} else { } else {
topics = this.pulsarAdmin.topics().getPartitionedTopicList((schemaTableName.getSchemaName())); topics = this.pulsarAdmin.topics().getPartitionedTopicList(namespace);
} }
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) { if (e.getStatusCode() == 404) {
throw new PrestoException(NOT_FOUND, "Schema " + schemaTableName.getSchemaName() + " does not exist"); throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
} }
throw new RuntimeException("Failed to get topics in schema " + schemaTableName.getSchemaName() throw new RuntimeException("Failed to get topics in schema " + namespace
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
if (!topics.contains(topicName.toString())) { if (!topics.contains(topicName.toString())) {
log.error("Table %s not found", log.error("Table %s not found",
String.format("%s/%s", schemaTableName.getSchemaName(), String.format("%s/%s", namespace,
schemaTableName.getTableName())); schemaTableName.getTableName()));
throw new TableNotFoundException(schemaTableName); throw new TableNotFoundException(schemaTableName);
} }
...@@ -285,14 +292,14 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -285,14 +292,14 @@ public class PulsarMetadata implements ConnectorMetadata {
SchemaInfo schemaInfo; SchemaInfo schemaInfo;
try { try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo( schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName())); String.format("%s/%s", namespace, schemaTableName.getTableName()));
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) { if (e.getStatusCode() == 404) {
// to indicate that we can't read from topic because there is no schema // to indicate that we can't read from topic because there is no schema
return null; return null;
} }
throw new RuntimeException("Failed to get schema information for topic " throw new RuntimeException("Failed to get schema information for topic "
+ String.format("%s/%s", schemaTableName.getSchemaName(), schemaTableName.getTableName()) + String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
List<ColumnMetadata> handles = getPulsarColumns( List<ColumnMetadata> handles = getPulsarColumns(
......
...@@ -60,6 +60,7 @@ import java.util.List; ...@@ -60,6 +60,7 @@ import java.util.List;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries; import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
public class PulsarSplitManager implements ConnectorSplitManager { public class PulsarSplitManager implements ConnectorSplitManager {
...@@ -94,16 +95,18 @@ public class PulsarSplitManager implements ConnectorSplitManager { ...@@ -94,16 +95,18 @@ public class PulsarSplitManager implements ConnectorSplitManager {
PulsarTableHandle tableHandle = layoutHandle.getTable(); PulsarTableHandle tableHandle = layoutHandle.getTable();
TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain(); TupleDomain<ColumnHandle> tupleDomain = layoutHandle.getTupleDomain();
TopicName topicName = TopicName.get("persistent", NamespaceName.get(tableHandle.getSchemaName()), String namespace = restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig);
TopicName topicName = TopicName.get("persistent", NamespaceName.get(namespace),
tableHandle.getTableName()); tableHandle.getTableName());
SchemaInfo schemaInfo; SchemaInfo schemaInfo;
try { try {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo( schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName())); String.format("%s/%s", namespace, tableHandle.getTableName()));
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get schema for topic " throw new RuntimeException("Failed to get schema for topic "
+ String.format("%s/%s", tableHandle.getSchemaName(), tableHandle.getTableName()) + String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
...@@ -258,7 +261,7 @@ public class PulsarSplitManager implements ConnectorSplitManager { ...@@ -258,7 +261,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition(); PositionImpl endPosition = (PositionImpl) readOnlyCursor.getReadPosition();
splits.add(new PulsarSplit(i, this.connectorId, splits.add(new PulsarSplit(i, this.connectorId,
tableHandle.getSchemaName(), restoreNamespaceDelimiterIfNeeded(tableHandle.getSchemaName(), pulsarConnectorConfig),
tableName, tableName,
entriesForSplit, entriesForSplit,
new String(schemaInfo.getSchema()), new String(schemaInfo.getSchema()),
......
...@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl; ...@@ -38,6 +38,7 @@ import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl; import org.apache.bookkeeper.mledger.impl.ReadOnlyCursorImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
...@@ -63,6 +64,7 @@ import org.mockito.invocation.InvocationOnMock; ...@@ -63,6 +64,7 @@ import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.time.LocalDate; import java.time.LocalDate;
...@@ -934,4 +936,20 @@ public abstract class TestPulsarConnector { ...@@ -934,4 +936,20 @@ public abstract class TestPulsarConnector {
public void cleanup() { public void cleanup() {
completedBytes = 0L; completedBytes = 0L;
} }
@DataProvider(name = "rewriteNamespaceDelimiter")
public static Object[][] serviceUrls() {
return new Object[][] {
{ "|" }, { null }
};
}
protected void updateRewriteNamespaceDelimiterIfNeeded(String delimiter) {
if (StringUtils.isNotBlank(delimiter)) {
pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(true);
pulsarConnectorConfig.setRewriteNamespaceDelimiter(delimiter);
} else {
pulsarConnectorConfig.setNamespaceDelimiterRewriteEnable(false);
}
}
} }
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import org.testng.Assert;
import org.testng.annotations.Test;
public class TestPulsarConnectorConfig {
@Test
public void testDefaultNamespaceDelimiterRewrite() {
PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
Assert.assertFalse(connectorConfig.getNamespaceDelimiterRewriteEnable());
Assert.assertEquals("/", connectorConfig.getRewriteNamespaceDelimiter());
}
@Test
public void testNamespaceRewriteDelimiterRestriction() {
PulsarConnectorConfig connectorConfig = new PulsarConnectorConfig();
try {
connectorConfig.setRewriteNamespaceDelimiter("-=:.Az09_");
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalArgumentException);
}
connectorConfig.setRewriteNamespaceDelimiter("|");
Assert.assertEquals("|", (connectorConfig.getRewriteNamespaceDelimiter()));
connectorConfig.setRewriteNamespaceDelimiter("||");
Assert.assertEquals("||", (connectorConfig.getRewriteNamespaceDelimiter()));
connectorConfig.setRewriteNamespaceDelimiter("$");
Assert.assertEquals("$", (connectorConfig.getRewriteNamespaceDelimiter()));
connectorConfig.setRewriteNamespaceDelimiter("&");
Assert.assertEquals("&", (connectorConfig.getRewriteNamespaceDelimiter()));
connectorConfig.setRewriteNamespaceDelimiter("--&");
Assert.assertEquals("--&", (connectorConfig.getRewriteNamespaceDelimiter()));
}
}
...@@ -29,6 +29,7 @@ import com.facebook.presto.spi.SchemaTablePrefix; ...@@ -29,6 +29,7 @@ import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.TableNotFoundException; import com.facebook.presto.spi.TableNotFoundException;
import io.airlift.log.Logger; import io.airlift.log.Logger;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
...@@ -61,19 +62,29 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -61,19 +62,29 @@ public class TestPulsarMetadata extends TestPulsarConnector {
private static final Logger log = Logger.get(TestPulsarMetadata.class); private static final Logger log = Logger.get(TestPulsarMetadata.class);
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testListSchemaNames() { public void testListSchemaNames(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<String> schemas = this.pulsarMetadata.listSchemaNames(mock(ConnectorSession.class)); List<String> schemas = this.pulsarMetadata.listSchemaNames(mock(ConnectorSession.class));
String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
}
@Test if (StringUtils.isBlank(delimiter)) {
public void testGetTableHandle() { String[] expectedSchemas = {NAMESPACE_NAME_1.toString(), NAMESPACE_NAME_2.toString(),
NAMESPACE_NAME_3.toString(), NAMESPACE_NAME_4.toString()};
assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
} else {
String[] expectedSchemas = {
PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_1.toString(), pulsarConnectorConfig),
PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_2.toString(), pulsarConnectorConfig),
PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_3.toString(), pulsarConnectorConfig),
PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded(NAMESPACE_NAME_4.toString(), pulsarConnectorConfig)};
assertEquals(new HashSet<>(schemas), new HashSet<>(Arrays.asList(expectedSchemas)));
}
}
@Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableHandle(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName()); SchemaTableName schemaTableName = new SchemaTableName(TOPIC_1.getNamespace(), TOPIC_1.getLocalName());
ConnectorTableHandle connectorTableHandle ConnectorTableHandle connectorTableHandle
...@@ -89,9 +100,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -89,9 +100,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName()); assertEquals(pulsarTableHandle.getTopicName(), TOPIC_1.getLocalName());
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableMetadata() { public void testGetTableMetadata(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<TopicName> allTopics = new LinkedList<>(); List<TopicName> allTopics = new LinkedList<>();
allTopics.addAll(topicNames); allTopics.addAll(topicNames);
allTopics.addAll(partitionedTopicNames); allTopics.addAll(partitionedTopicNames);
...@@ -133,9 +144,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -133,9 +144,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
} }
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableMetadataWrongSchema() { public void testGetTableMetadataWrongSchema(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle( PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
pulsarConnectorId.toString(), pulsarConnectorId.toString(),
"wrong-tenant/wrong-ns", "wrong-tenant/wrong-ns",
...@@ -153,9 +164,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -153,9 +164,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
} }
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableMetadataWrongTable() { public void testGetTableMetadataWrongTable(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle( PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(
pulsarConnectorId.toString(), pulsarConnectorId.toString(),
TOPIC_1.getNamespace(), TOPIC_1.getNamespace(),
...@@ -173,9 +184,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -173,9 +184,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
} }
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableMetadataTableNoSchema() throws PulsarAdminException { public void testGetTableMetadataTableNoSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenThrow( when(this.schemas.getSchemaInfo(eq(TOPIC_1.getSchemaName()))).thenThrow(
new PulsarAdminException(new ClientErrorException(Response.Status.NOT_FOUND))); new PulsarAdminException(new ClientErrorException(Response.Status.NOT_FOUND)));
...@@ -192,9 +203,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -192,9 +203,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertEquals(tableMetadata.getColumns().size(), 0); assertEquals(tableMetadata.getColumns().size(), 0);
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableMetadataTableBlankSchema() throws PulsarAdminException { public void testGetTableMetadataTableBlankSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo(); SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema(new byte[0]); badSchemaInfo.setSchema(new byte[0]);
badSchemaInfo.setType(SchemaType.AVRO); badSchemaInfo.setType(SchemaType.AVRO);
...@@ -218,9 +229,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -218,9 +229,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
} }
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetTableMetadataTableInvalidSchema() throws PulsarAdminException { public void testGetTableMetadataTableInvalidSchema(String delimiter) throws PulsarAdminException {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
SchemaInfo badSchemaInfo = new SchemaInfo(); SchemaInfo badSchemaInfo = new SchemaInfo();
badSchemaInfo.setSchema("foo".getBytes()); badSchemaInfo.setSchema("foo".getBytes());
badSchemaInfo.setType(SchemaType.AVRO); badSchemaInfo.setType(SchemaType.AVRO);
...@@ -244,8 +255,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -244,8 +255,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
} }
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testListTable() { public void testListTable(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty()); assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), null).isEmpty());
assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns") assertTrue(this.pulsarMetadata.listTables(mock(ConnectorSession.class), "wrong-tenant/wrong-ns")
.isEmpty()); .isEmpty());
...@@ -260,9 +272,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -260,9 +272,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2))); NAMESPACE_NAME_4.toString())), new HashSet<>(Arrays.asList(expectedTopics2)));
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetColumnHandles() { public void testGetColumnHandles(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), TOPIC_1.getNamespace(), PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), TOPIC_1.getNamespace(),
TOPIC_1.getLocalName(), TOPIC_1.getLocalName()); TOPIC_1.getLocalName(), TOPIC_1.getLocalName());
Map<String, ColumnHandle> columnHandleMap Map<String, ColumnHandle> columnHandleMap
...@@ -296,8 +308,9 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -296,8 +308,9 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertTrue(columnHandleMap.isEmpty()); assertTrue(columnHandleMap.isEmpty());
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testListTableColumns() { public void testListTableColumns(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
Map<SchemaTableName, List<ColumnMetadata>> tableColumnsMap Map<SchemaTableName, List<ColumnMetadata>> tableColumnsMap
= this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class), = this.pulsarMetadata.listTableColumns(mock(ConnectorSession.class),
new SchemaTablePrefix(TOPIC_1.getNamespace())); new SchemaTablePrefix(TOPIC_1.getNamespace()));
......
...@@ -66,9 +66,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { ...@@ -66,9 +66,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
} }
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testTopic() throws Exception { public void testTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : topicNames) { for (TopicName topicName : topicNames) {
setup(); setup();
log.info("!----- topic: %s -----!", topicName); log.info("!----- topic: %s -----!", topicName);
...@@ -113,8 +113,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { ...@@ -113,8 +113,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testPartitionedTopic() throws Exception { public void testPartitionedTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : partitionedTopicNames) { for (TopicName topicName : partitionedTopicNames) {
setup(); setup();
log.info("!----- topic: %s -----!", topicName); log.info("!----- topic: %s -----!", topicName);
...@@ -169,9 +170,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { ...@@ -169,9 +170,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testPublishTimePredicatePushdown() throws Exception { public void testPublishTimePredicatePushdown(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = TOPIC_1; TopicName topicName = TOPIC_1;
setup(); setup();
...@@ -226,9 +227,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { ...@@ -226,9 +227,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
} }
@Test @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testPublishTimePredicatePushdownPartitionedTopic() throws Exception { public void testPublishTimePredicatePushdownPartitionedTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = PARTITIONED_TOPIC_1; TopicName topicName = PARTITIONED_TOPIC_1;
setup(); setup();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册