提交 2069f761 编写于 作者: L lipenghui 提交者: Sijie Guo

[pulsar-sql] Handle schema not found (#4890)

* Handle get schema 404 in pulsar sql(table meta and get splits)

* Fix unit test.

* add defaultSchema()

* use Schema.BYTES.getSchemaInfo()

* add unit test

* rebase and fix unit tests
上级 b3ae47c6
...@@ -311,17 +311,17 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -311,17 +311,17 @@ public class PulsarMetadata implements ConnectorMetadata {
String.format("%s/%s", namespace, schemaTableName.getTableName())); String.format("%s/%s", namespace, schemaTableName.getTableName()));
} catch (PulsarAdminException e) { } catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) { if (e.getStatusCode() == 404) {
// to indicate that we can't read from topic because there is no schema // use default schema because there is no schema
return null; schemaInfo = PulsarSchemaHandlers.defaultSchema();
} else if (e.getStatusCode() == 401) { } else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED, throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized", String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
namespace, schemaTableName.getTableName())); namespace, schemaTableName.getTableName()));
} else {
throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
List<ColumnMetadata> handles = getPulsarColumns( List<ColumnMetadata> handles = getPulsarColumns(
topicName, schemaInfo, withInternalColumns topicName, schemaInfo, withInternalColumns
...@@ -355,7 +355,7 @@ public class PulsarMetadata implements ConnectorMetadata { ...@@ -355,7 +355,7 @@ public class PulsarMetadata implements ConnectorMetadata {
ColumnMetadata valueColumn = new PulsarColumnMetadata( ColumnMetadata valueColumn = new PulsarColumnMetadata(
"__value__", "__value__",
convertPulsarType(schemaInfo.getType()), convertPulsarType(schemaInfo.getType()),
null, null, false, false, "The value of the message with primitive type schema", null, false, false,
new String[0], new String[0],
new Integer[0]); new Integer[0]);
......
...@@ -23,6 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8; ...@@ -23,6 +23,8 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.PrestoException;
import java.util.List; import java.util.List;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaInfo;
class PulsarSchemaHandlers { class PulsarSchemaHandlers {
...@@ -50,4 +52,7 @@ class PulsarSchemaHandlers { ...@@ -50,4 +52,7 @@ class PulsarSchemaHandlers {
} }
} }
static SchemaInfo defaultSchema() {
return Schema.BYTES.getSchemaInfo();
}
} }
\ No newline at end of file
...@@ -111,11 +111,13 @@ public class PulsarSplitManager implements ConnectorSplitManager { ...@@ -111,11 +111,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
throw new PrestoException(QUERY_REJECTED, throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized", String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
namespace, tableHandle.getTableName())); namespace, tableHandle.getTableName()));
} else if (e.getStatusCode() == 404) {
schemaInfo = PulsarSchemaHandlers.defaultSchema();
} else {
throw new RuntimeException("Failed to get pulsar topic schema for topic "
+ String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
throw new RuntimeException("Failed to get pulsar topic schema for topic "
+ String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
} }
Collection<PulsarSplit> splits; Collection<PulsarSplit> splits;
......
...@@ -130,6 +130,8 @@ public abstract class TestPulsarConnector { ...@@ -130,6 +130,8 @@ public abstract class TestPulsarConnector {
protected static final TopicName TOPIC_4 = TopicName.get("persistent", NAMESPACE_NAME_3, "topic-1"); protected static final TopicName TOPIC_4 = TopicName.get("persistent", NAMESPACE_NAME_3, "topic-1");
protected static final TopicName TOPIC_5 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-1"); protected static final TopicName TOPIC_5 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-1");
protected static final TopicName TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-2"); protected static final TopicName TOPIC_6 = TopicName.get("persistent", NAMESPACE_NAME_4, "topic-2");
protected static final TopicName NON_SCHEMA_TOPIC = TopicName.get(
"persistent", NAMESPACE_NAME_2, "non-schema-topic");
protected static final TopicName PARTITIONED_TOPIC_1 = TopicName.get("persistent", NAMESPACE_NAME_1, protected static final TopicName PARTITIONED_TOPIC_1 = TopicName.get("persistent", NAMESPACE_NAME_1,
...@@ -209,6 +211,7 @@ public abstract class TestPulsarConnector { ...@@ -209,6 +211,7 @@ public abstract class TestPulsarConnector {
topicNames.add(TOPIC_4); topicNames.add(TOPIC_4);
topicNames.add(TOPIC_5); topicNames.add(TOPIC_5);
topicNames.add(TOPIC_6); topicNames.add(TOPIC_6);
topicNames.add(NON_SCHEMA_TOPIC);
partitionedTopicNames = new LinkedList<>(); partitionedTopicNames = new LinkedList<>();
partitionedTopicNames.add(PARTITIONED_TOPIC_1); partitionedTopicNames.add(PARTITIONED_TOPIC_1);
...@@ -274,6 +277,7 @@ public abstract class TestPulsarConnector { ...@@ -274,6 +277,7 @@ public abstract class TestPulsarConnector {
topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L); topicsToNumEntries.put(TOPIC_5.getSchemaName(), 8000L);
topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L); topicsToNumEntries.put(TOPIC_6.getSchemaName(), 1L);
topicsToNumEntries.put(NON_SCHEMA_TOPIC.getSchemaName(), 8000L);
topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L); topicsToNumEntries.put(PARTITIONED_TOPIC_1.getSchemaName(), 1233L);
topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L); topicsToNumEntries.put(PARTITIONED_TOPIC_2.getSchemaName(), 8000L);
topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L); topicsToNumEntries.put(PARTITIONED_TOPIC_3.getSchemaName(), 100L);
...@@ -550,13 +554,15 @@ public abstract class TestPulsarConnector { ...@@ -550,13 +554,15 @@ public abstract class TestPulsarConnector {
allTopics.addAll(partitionedTopicNames); allTopics.addAll(partitionedTopicNames);
for (TopicName topicName : allTopics) { for (TopicName topicName : allTopics) {
splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(), if (topicsToSchemas.containsKey(topicName.getSchemaName())) {
splits.put(topicName, new PulsarSplit(0, pulsarConnectorId.toString(),
topicName.getNamespace(), topicName.getLocalName(), topicName.getNamespace(), topicName.getLocalName(),
topicsToNumEntries.get(topicName.getSchemaName()), topicsToNumEntries.get(topicName.getSchemaName()),
new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()), new String(topicsToSchemas.get(topicName.getSchemaName()).getSchema()),
topicsToSchemas.get(topicName.getSchemaName()).getType(), topicsToSchemas.get(topicName.getSchemaName()).getType(),
0, topicsToNumEntries.get(topicName.getSchemaName()), 0, topicsToNumEntries.get(topicName.getSchemaName()),
0, 0, TupleDomain.all(), new HashMap<>())); 0, 0, TupleDomain.all(), new HashMap<>()));
}
} }
fooFunctions = new HashMap<>(); fooFunctions = new HashMap<>();
...@@ -742,7 +748,11 @@ public abstract class TestPulsarConnector { ...@@ -742,7 +748,11 @@ public abstract class TestPulsarConnector {
public SchemaInfo answer(InvocationOnMock invocationOnMock) throws Throwable { public SchemaInfo answer(InvocationOnMock invocationOnMock) throws Throwable {
Object[] args = invocationOnMock.getArguments(); Object[] args = invocationOnMock.getArguments();
String topic = (String) args[0]; String topic = (String) args[0];
return topicsToSchemas.get(topic); if (topicsToSchemas.get(topic) != null) {
return topicsToSchemas.get(topic);
} else {
throw new PulsarAdminException(new ClientErrorException(Response.status(404).build()));
}
} }
}); });
......
...@@ -45,6 +45,7 @@ import java.util.HashSet; ...@@ -45,6 +45,7 @@ import java.util.HashSet;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
...@@ -104,7 +105,7 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -104,7 +105,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
public void testGetTableMetadata(String delimiter) { public void testGetTableMetadata(String delimiter) {
updateRewriteNamespaceDelimiterIfNeeded(delimiter); updateRewriteNamespaceDelimiterIfNeeded(delimiter);
List<TopicName> allTopics = new LinkedList<>(); List<TopicName> allTopics = new LinkedList<>();
allTopics.addAll(topicNames); allTopics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
allTopics.addAll(partitionedTopicNames); allTopics.addAll(partitionedTopicNames);
for (TopicName topic : allTopics) { for (TopicName topic : allTopics) {
...@@ -120,7 +121,6 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -120,7 +121,6 @@ public class TestPulsarMetadata extends TestPulsarConnector {
assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace()); assertEquals(tableMetadata.getTable().getSchemaName(), topic.getNamespace());
assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName()); assertEquals(tableMetadata.getTable().getTableName(), topic.getLocalName());
assertEquals(tableMetadata.getColumns().size(), assertEquals(tableMetadata.getColumns().size(),
fooColumnHandles.size()); fooColumnHandles.size());
...@@ -200,7 +200,7 @@ public class TestPulsarMetadata extends TestPulsarConnector { ...@@ -200,7 +200,7 @@ public class TestPulsarMetadata extends TestPulsarConnector {
ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class), ConnectorTableMetadata tableMetadata = this.pulsarMetadata.getTableMetadata(mock(ConnectorSession.class),
pulsarTableHandle); pulsarTableHandle);
assertEquals(tableMetadata.getColumns().size(), 0); assertEquals(tableMetadata.getColumns().size(), PulsarInternalColumn.getInternalFields().size() + 1);
} }
@Test(dataProvider = "rewriteNamespaceDelimiter") @Test(dataProvider = "rewriteNamespaceDelimiter")
......
...@@ -36,6 +36,7 @@ import org.testng.annotations.Test; ...@@ -36,6 +36,7 @@ import org.testng.annotations.Test;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -51,6 +52,7 @@ import static org.mockito.Mockito.verify; ...@@ -51,6 +52,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertNotNull;
@Test(singleThreaded = true) @Test(singleThreaded = true)
public class TestPulsarSplitManager extends TestPulsarConnector { public class TestPulsarSplitManager extends TestPulsarConnector {
...@@ -73,7 +75,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector { ...@@ -73,7 +75,9 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
@Test(dataProvider = "rewriteNamespaceDelimiter") @Test(dataProvider = "rewriteNamespaceDelimiter")
public void testTopic(String delimiter) throws Exception { public void testTopic(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter); updateRewriteNamespaceDelimiterIfNeeded(delimiter);
for (TopicName topicName : topicNames) { List<TopicName> topics = new LinkedList<>();
topics.addAll(topicNames.stream().filter(topicName -> !topicName.equals(NON_SCHEMA_TOPIC)).collect(Collectors.toList()));
for (TopicName topicName : topics) {
setup(); setup();
log.info("!----- topic: %s -----!", topicName); log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(), PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
...@@ -378,5 +382,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector { ...@@ -378,5 +382,24 @@ public class TestPulsarSplitManager extends TestPulsarConnector {
} }
@Test(dataProvider = "rewriteNamespaceDelimiter")
public void testGetSplitNonSchema(String delimiter) throws Exception {
updateRewriteNamespaceDelimiterIfNeeded(delimiter);
TopicName topicName = NON_SCHEMA_TOPIC;
setup();
log.info("!----- topic: %s -----!", topicName);
PulsarTableHandle pulsarTableHandle = new PulsarTableHandle(pulsarConnectorId.toString(),
topicName.getNamespace(),
topicName.getLocalName(),
topicName.getLocalName());
Map<ColumnHandle, Domain> domainMap = new HashMap<>();
TupleDomain<ColumnHandle> tupleDomain = TupleDomain.withColumnDomains(domainMap);
PulsarTableLayoutHandle pulsarTableLayoutHandle = new PulsarTableLayoutHandle(pulsarTableHandle, tupleDomain);
ConnectorSplitSource connectorSplitSource = this.pulsarSplitManager.getSplits(
mock(ConnectorTransactionHandle.class), mock(ConnectorSession.class),
pulsarTableLayoutHandle, null);
assertNotNull(connectorSplitSource);
}
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册