提交 3f6e0854 编写于 作者: B Boyang Jerry Peng 提交者: Jia Zhai

add basic authentication capabilities to Pulsar SQL (#4779)

(cherry picked from commit 075f28b7)
上级 aeb04aa5
......@@ -53,4 +53,22 @@ pulsar.rewrite-namespace-delimiter=/
#pulsar.offloader-properties = \
# {"s3ManagedLedgerOffloadBucket": "offload-bucket", \
# "s3ManagedLedgerOffloadRegion": "us-west-2", \
# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"}
\ No newline at end of file
# "s3ManagedLedgerOffloadServiceEndpoint": "http://s3.amazonaws.com"}
####### AUTHENTICATION CONFIGS #######
## the authentication plugin to be used to authenticate to Pulsar cluster
#pulsar.auth-plugin =
## the authentication parameter to be used to authenticate to Pulsar cluster
#pulsar.auth-params =
## Accept untrusted TLS certificate
#pulsar.tls-allow-insecure-connection =
## Whether to enable hostname verification on TLS connections
#pulsar.tls-hostname-verification-enable =
## Path for the trusted TLS certificate file
#pulsar.tls-trust-cert-file-path =
......@@ -21,6 +21,8 @@ package org.apache.pulsar.sql.presto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.common.naming.NamedEntity;
......@@ -44,6 +46,11 @@ public class PulsarConnectorConfig implements AutoCloseable {
private String statsProvider = NullStatsProvider.class.getName();
private Map<String, String> statsProviderConfigs = new HashMap<>();
private String authPluginClassName;
private String authParams;
private String tlsTrustCertsFilePath;
private Boolean tlsAllowInsecureConnection;
private Boolean tlsHostnameVerificationEnable;
private boolean namespaceDelimiterRewriteEnable = false;
private String rewriteNamespaceDelimiter = "/";
......@@ -154,6 +161,33 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
public String getRewriteNamespaceDelimiter() {
return rewriteNamespaceDelimiter;
}
@Config("pulsar.rewrite-namespace-delimiter")
public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
if (m.matches()) {
throw new IllegalArgumentException(
"Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+ "because delimiter must contain characters which name of namespace not allowed"
);
}
this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
return this;
}
public boolean getNamespaceDelimiterRewriteEnable() {
return namespaceDelimiterRewriteEnable;
}
@Config("pulsar.namespace-delimiter-rewrite-enable")
public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
return this;
}
/**** --- Ledger Offloading --- ****/
public int getManagedLedgerOffloadMaxThreads() {
......@@ -197,37 +231,80 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
public String getRewriteNamespaceDelimiter() {
return rewriteNamespaceDelimiter;
/**** --- Authentication --- ****/
public String getAuthPlugin() {
return this.authPluginClassName;
}
@Config("pulsar.rewrite-namespace-delimiter")
public PulsarConnectorConfig setRewriteNamespaceDelimiter(String rewriteNamespaceDelimiter) {
Matcher m = NamedEntity.NAMED_ENTITY_PATTERN.matcher(rewriteNamespaceDelimiter);
if (m.matches()) {
throw new IllegalArgumentException(
"Can't use " + rewriteNamespaceDelimiter + "as delimiter, "
+ "because delimiter must contain characters which name of namespace not allowed"
);
}
this.rewriteNamespaceDelimiter = rewriteNamespaceDelimiter;
@Config("pulsar.auth-plugin")
public PulsarConnectorConfig setAuthPlugin(String authPluginClassName) throws IOException {
this.authPluginClassName = authPluginClassName;
return this;
}
public boolean getNamespaceDelimiterRewriteEnable() {
return namespaceDelimiterRewriteEnable;
public String getAuthParams() {
return this.authParams;
}
@Config("pulsar.namespace-delimiter-rewrite-enable")
public PulsarConnectorConfig setNamespaceDelimiterRewriteEnable(boolean namespaceDelimiterRewriteEnable) {
this.namespaceDelimiterRewriteEnable = namespaceDelimiterRewriteEnable;
@Config("pulsar.auth-params")
public PulsarConnectorConfig setAuthParams(String authParams) throws IOException {
this.authParams = authParams;
return this;
}
public Boolean isTlsAllowInsecureConnection() {
return tlsAllowInsecureConnection;
}
@Config("pulsar.tls-allow-insecure-connection")
public PulsarConnectorConfig setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
return this;
}
public Boolean isTlsHostnameVerificationEnable() {
return tlsHostnameVerificationEnable;
}
@Config("pulsar.tls-hostname-verification-enable")
public PulsarConnectorConfig setTlsHostnameVerificationEnable(boolean tlsHostnameVerificationEnable) {
this.tlsHostnameVerificationEnable = tlsHostnameVerificationEnable;
return this;
}
public String getTlsTrustCertsFilePath() {
return tlsTrustCertsFilePath;
}
@Config("pulsar.tls-trust-cert-file-path")
public PulsarConnectorConfig setTlsTrustCertsFilePath(String tlsTrustCertsFilePath) {
this.tlsTrustCertsFilePath = tlsTrustCertsFilePath;
return this;
}
@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
this.pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl(getBrokerServiceUrl()).build();
PulsarAdminBuilder builder = PulsarAdmin.builder();
if (getAuthPlugin() != null) {
builder.authentication(getAuthPlugin(), getAuthParams());
}
if (isTlsAllowInsecureConnection() != null) {
builder.allowTlsInsecureConnection(isTlsAllowInsecureConnection());
}
if (isTlsHostnameVerificationEnable() != null) {
builder.enableTlsHostnameVerification(isTlsHostnameVerificationEnable());
}
if (getTlsTrustCertsFilePath() != null) {
builder.tlsTrustCertsFilePath(getTlsTrustCertsFilePath());
}
this.pulsarAdmin = builder.serviceHttpUrl(getBrokerServiceUrl()).build();
}
return this.pulsarAdmin;
}
......
......@@ -76,6 +76,7 @@ import java.util.stream.Collectors;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
......@@ -116,6 +117,9 @@ public class PulsarMetadata implements ConnectorMetadata {
rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED, "Failed to get schemas from pulsar: Unauthorized");
}
throw new RuntimeException("Failed to get schemas from pulsar: "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
......@@ -174,6 +178,9 @@ public class PulsarMetadata implements ConnectorMetadata {
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
return builder.build();
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get tables/topics in %s: Unauthorized", schemaNameOrNull));
}
throw new RuntimeException("Failed to get tables/topics in " + schemaNameOrNull + ": "
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
......@@ -277,6 +284,9 @@ public class PulsarMetadata implements ConnectorMetadata {
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
throw new PrestoException(NOT_FOUND, "Schema " + namespace + " does not exist");
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get topics in schema %s: Unauthorized", namespace));
}
throw new RuntimeException("Failed to get topics in schema " + namespace
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
......@@ -297,8 +307,13 @@ public class PulsarMetadata implements ConnectorMetadata {
if (e.getStatusCode() == 404) {
// to indicate that we can't read from topic because there is no schema
return null;
} else if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema information for topic %s/%s: Unauthorized",
namespace, schemaTableName.getTableName()));
}
throw new RuntimeException("Failed to get schema information for topic "
throw new RuntimeException("Failed to get pulsar topic schema information for topic "
+ String.format("%s/%s", namespace, schemaTableName.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
......
......@@ -23,12 +23,12 @@ import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.SqlTimestampWithTimeZone;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;
import lombok.Data;
......@@ -57,6 +57,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
......@@ -105,7 +106,13 @@ public class PulsarSplitManager implements ConnectorSplitManager {
schemaInfo = this.pulsarAdmin.schemas().getSchemaInfo(
String.format("%s/%s", namespace, tableHandle.getTableName()));
} catch (PulsarAdminException e) {
throw new RuntimeException("Failed to get schema for topic "
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get pulsar topic schema for topic %s/%s: Unauthorized",
namespace, tableHandle.getTableName()));
}
throw new RuntimeException("Failed to get pulsar topic schema for topic "
+ String.format("%s/%s", namespace, tableHandle.getTableName())
+ ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
......@@ -143,6 +150,11 @@ public class PulsarSplitManager implements ConnectorSplitManager {
try {
numPartitions = (this.pulsarAdmin.topics().getPartitionedTopicMetadata(topicName.toString())).partitions;
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
throw new PrestoException(QUERY_REJECTED,
String.format("Failed to get metadata for partitioned topic %s: Unauthorized", topicName));
}
throw new RuntimeException("Failed to get metadata for partitioned topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册