From b3397fb7b58a81d82ef7b8753087b7506804353e Mon Sep 17 00:00:00 2001 From: Sergii Zhevzhyk Date: Mon, 5 Aug 2019 08:46:37 +0200 Subject: [PATCH] Enforce checkstyle in the pulsar sql module (#4882) The checksyle plugin was added to the pulsar sql module to enforce the defined style. All violations were fixed: - Ordering of imports. - Formatting of the code. - Absent Javadoc comments. - Other small issues. (cherry picked from commit f6fee1c6b7976c439215876b6d892591d198a26b) --- .../mledger/impl/ManagedLedgerTest.java | 1 + pulsar-sql/pom.xml | 22 ++ .../pulsar/sql/presto/AvroSchemaHandler.java | 15 +- .../pulsar/sql/presto/JSONSchemaHandler.java | 14 +- .../pulsar/sql/presto/PulsarColumnHandle.java | 62 +++-- .../sql/presto/PulsarColumnMetadata.java | 41 ++-- .../pulsar/sql/presto/PulsarConnector.java | 12 +- .../sql/presto/PulsarConnectorCache.java | 35 +-- .../sql/presto/PulsarConnectorConfig.java | 32 +-- .../sql/presto/PulsarConnectorFactory.java | 10 +- .../pulsar/sql/presto/PulsarConnectorId.java | 11 +- .../presto/PulsarConnectorMetricsTracker.java | 229 +++++++++--------- .../sql/presto/PulsarConnectorModule.java | 18 +- .../sql/presto/PulsarConnectorUtils.java | 13 +- .../sql/presto/PulsarHandleResolver.java | 17 +- .../sql/presto/PulsarInternalColumn.java | 41 +++- .../pulsar/sql/presto/PulsarMetadata.java | 64 ++--- .../pulsar/sql/presto/PulsarPlugin.java | 6 +- .../presto/PulsarPrimitiveSchemaHandler.java | 4 +- .../pulsar/sql/presto/PulsarRecordCursor.java | 38 +-- .../pulsar/sql/presto/PulsarRecordSet.java | 8 +- .../sql/presto/PulsarRecordSetProvider.java | 11 +- .../apache/pulsar/sql/presto/PulsarSplit.java | 38 +-- .../pulsar/sql/presto/PulsarSplitManager.java | 42 ++-- .../pulsar/sql/presto/PulsarTableHandle.java | 12 +- .../sql/presto/PulsarTableLayoutHandle.java | 24 +- .../sql/presto/PulsarTopicDescription.java | 9 +- .../sql/presto/PulsarTransactionHandle.java | 3 + .../pulsar/sql/presto/SchemaHandler.java | 3 + .../pulsar/sql/presto/package-info.java | 22 ++ 30 files changed, 500 insertions(+), 357 deletions(-) create mode 100644 pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 4b108e303ba..558c13efcda 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -30,6 +30,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; diff --git a/pulsar-sql/pom.xml b/pulsar-sql/pom.xml index 4a8a03d815f..ec1629b17c6 100644 --- a/pulsar-sql/pom.xml +++ b/pulsar-sql/pom.xml @@ -114,4 +114,26 @@ + + + + org.apache.maven.plugins + maven-checkstyle-plugin + + + check-style + verify + + ../buildtools/src/main/resources/pulsar/checkstyle.xml + ../buildtools/src/main/resources/pulsar/suppressions.xml + UTF-8 + + + check + + + + + + diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java index 5d55682acf1..f6807a2b19a 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/AvroSchemaHandler.java @@ -19,11 +19,12 @@ package org.apache.pulsar.sql.presto; import io.airlift.log.Logger; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.util.ReferenceCountUtil; import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; +import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; @@ -31,9 +32,9 @@ import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import java.io.IOException; -import java.util.List; - +/** + * Schema handler for payload in the Avro format. + */ public class AvroSchemaHandler implements SchemaHandler { private final DatumReader datumReader; @@ -64,7 +65,7 @@ public class AvroSchemaHandler implements SchemaHandler { BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(), heapBuffer.readableBytes(), decoderFromCache); - if (decoderFromCache==null) { + if (decoderFromCache == null) { decoders.set(decoder); } return this.datumReader.read(null, decoder); @@ -87,7 +88,7 @@ public class AvroSchemaHandler implements SchemaHandler { return null; } if (positionIndices.length > 0) { - for (int i = 1 ; i < positionIndices.length; i++) { + for (int i = 1; i < positionIndices.length; i++) { curr = ((GenericRecord) curr).get(positionIndices[i]); if (curr == null) { return null; @@ -96,7 +97,7 @@ public class AvroSchemaHandler implements SchemaHandler { } return curr; } catch (Exception ex) { - log.debug(ex,"%s", ex); + log.debug(ex, "%s", ex); } return null; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java index 5a12d3011f4..8649e41fc3c 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/JSONSchemaHandler.java @@ -20,17 +20,17 @@ package org.apache.pulsar.sql.presto; import com.dslplatform.json.DslJson; import com.facebook.presto.spi.type.Type; - import io.airlift.log.Logger; - +import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; import java.io.IOException; import java.math.BigDecimal; import java.util.List; import java.util.Map; -import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.FastThreadLocal; - +/** + * Schema handler for payload in the JSON format. + */ public class JSONSchemaHandler implements SchemaHandler { private static final Logger log = Logger.get(JSONSchemaHandler.class); @@ -84,7 +84,7 @@ public class JSONSchemaHandler implements SchemaHandler { if (field == null) { return null; } - for (int i = 1; i < fieldNames.length ; i++) { + for (int i = 1; i < fieldNames.length; i++) { field = ((Map) field).get(fieldNames[i]); if (field == null) { return null; @@ -101,7 +101,7 @@ public class JSONSchemaHandler implements SchemaHandler { return field; } catch (Exception ex) { - log.debug(ex,"%s", ex); + log.debug(ex, "%s", ex); } return null; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java index f98e864f1a6..2a1bd43e07e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnHandle.java @@ -18,27 +18,29 @@ */ package org.apache.pulsar.sql.presto; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.type.Type; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import java.util.Arrays; -import static java.util.Objects.requireNonNull; - +/** + * This class represents the basic information about a presto column. + */ public class PulsarColumnHandle implements ColumnHandle { private final String connectorId; /** - * Column Name + * Column Name. */ private final String name; /** - * Column type + * Column type. */ private final Type type; @@ -116,17 +118,33 @@ public class PulsarColumnHandle implements ColumnHandle { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } PulsarColumnHandle that = (PulsarColumnHandle) o; - if (hidden != that.hidden) return false; - if (internal != that.internal) return false; - if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false; - if (name != null ? !name.equals(that.name) : that.name != null) return false; - if (type != null ? !type.equals(that.type) : that.type != null) return false; - if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false; + if (hidden != that.hidden) { + return false; + } + if (internal != that.internal) { + return false; + } + if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) { + return false; + } + if (name != null ? !name.equals(that.name) : that.name != null) { + return false; + } + if (type != null ? !type.equals(that.type) : that.type != null) { + return false; + } + if (!Arrays.deepEquals(fieldNames, that.fieldNames)) { + return false; + } return Arrays.deepEquals(positionIndices, that.positionIndices); } @@ -144,14 +162,14 @@ public class PulsarColumnHandle implements ColumnHandle { @Override public String toString() { - return "PulsarColumnHandle{" + - "connectorId='" + connectorId + '\'' + - ", name='" + name + '\'' + - ", type=" + type + - ", hidden=" + hidden + - ", internal=" + internal + - ", fieldNames=" + Arrays.toString(fieldNames) + - ", positionIndices=" + Arrays.toString(positionIndices) + - '}'; + return "PulsarColumnHandle{" + + "connectorId='" + connectorId + '\'' + + ", name='" + name + '\'' + + ", type=" + type + + ", hidden=" + hidden + + ", internal=" + internal + + ", fieldNames=" + Arrays.toString(fieldNames) + + ", positionIndices=" + Arrays.toString(positionIndices) + + '}'; } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java index 9a484ba1a2f..5b033fac86f 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarColumnMetadata.java @@ -20,10 +20,11 @@ package org.apache.pulsar.sql.presto; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.type.Type; - import java.util.Arrays; -import java.util.List; +/** + * Description of the column metadata. + */ public class PulsarColumnMetadata extends ColumnMetadata { private boolean isInternal; @@ -60,25 +61,37 @@ public class PulsarColumnMetadata extends ColumnMetadata { @Override public String toString() { - return "PulsarColumnMetadata{" + - "isInternal=" + isInternal + - ", nameWithCase='" + nameWithCase + '\'' + - ", fieldNames=" + Arrays.toString(fieldNames) + - ", positionIndices=" + Arrays.toString(positionIndices) + - '}'; + return "PulsarColumnMetadata{" + + "isInternal=" + isInternal + + ", nameWithCase='" + nameWithCase + '\'' + + ", fieldNames=" + Arrays.toString(fieldNames) + + ", positionIndices=" + Arrays.toString(positionIndices) + + '}'; } @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - if (!super.equals(o)) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } PulsarColumnMetadata that = (PulsarColumnMetadata) o; - if (isInternal != that.isInternal) return false; - if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false; - if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false; + if (isInternal != that.isInternal) { + return false; + } + if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) { + return false; + } + if (!Arrays.deepEquals(fieldNames, that.fieldNames)) { + return false; + } return Arrays.deepEquals(positionIndices, that.positionIndices); } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java index 1d89b519c2e..f73c5b0a218 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnector.java @@ -18,6 +18,10 @@ */ package org.apache.pulsar.sql.presto; +import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED; +import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.connector.Connector; import com.facebook.presto.spi.connector.ConnectorMetadata; import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; @@ -26,13 +30,11 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.facebook.presto.spi.transaction.IsolationLevel; import io.airlift.bootstrap.LifeCycleManager; import io.airlift.log.Logger; - import javax.inject.Inject; -import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED; -import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports; -import static java.util.Objects.requireNonNull; - +/** + * This file contains implementation of the connector to the Presto engine. + */ public class PulsarConnector implements Connector { private static final Logger log = Logger.get(PulsarConnector.class); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java index 54789842b57..a42c9fe9296 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorCache.java @@ -18,8 +18,13 @@ */ package org.apache.pulsar.sql.presto; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import java.io.IOException; +import java.util.Map; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.LedgerOffloader; @@ -34,13 +39,9 @@ import org.apache.bookkeeper.stats.StatsProvider; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.PulsarVersion; -import java.io.IOException; -import java.util.Map; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.annotations.VisibleForTesting; - +/** + * Implementation of a cache for the Pulsar connector. + */ public class PulsarConnectorCache { private static final Logger log = Logger.get(PulsarConnectorCache.class); @@ -68,7 +69,7 @@ public class PulsarConnectorCache { // start stats provider ClientConfiguration clientConfiguration = new ClientConfiguration(); - pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value)); + pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty); this.statsProvider.start(clientConfiguration); @@ -84,7 +85,8 @@ public class PulsarConnectorCache { return instance; } - private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception { + private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) + throws Exception { ClientConfiguration bkClientConfiguration = new ClientConfiguration() .setZkServers(pulsarConnectorConfig.getZookeeperUri()) .setClientTcpNoDelay(false) @@ -123,16 +125,17 @@ public class PulsarConnectorCache { Map offloaderProperties = conf.getOffloaderProperties(); offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory()); offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver()); - offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads())); + offloaderProperties + .put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads())); try { return offloaderFactory.create( - PulsarConnectorUtils.getProperties(offloaderProperties), - ImmutableMap.of( - LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), - LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() - ), - getOffloaderScheduler(conf)); + PulsarConnectorUtils.getProperties(offloaderProperties), + ImmutableMap.of( + LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), + LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() + ), + getOffloaderScheduler(conf)); } catch (IOException ioe) { log.error("Failed to create offloader: ", ioe); throw new RuntimeException(ioe.getMessage(), ioe.getCause()); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java index edfa97d7827..b53c589b4ea 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorConfig.java @@ -20,20 +20,21 @@ package org.apache.pulsar.sql.presto; import com.fasterxml.jackson.databind.ObjectMapper; import io.airlift.configuration.Config; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Matcher; +import javax.validation.constraints.NotNull; +import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminBuilder; -import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.bookkeeper.stats.NullStatsProvider; import org.apache.pulsar.common.naming.NamedEntity; import org.apache.pulsar.common.protocol.Commands; -import javax.validation.constraints.NotNull; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Matcher; - +/** + * This object handles configuration of the Pulsar connector for the Presto engine. + */ public class PulsarConnectorConfig implements AutoCloseable { private String brokerServiceUrl = "http://localhost:8080"; @@ -55,7 +56,7 @@ public class PulsarConnectorConfig implements AutoCloseable { private boolean namespaceDelimiterRewriteEnable = false; private String rewriteNamespaceDelimiter = "/"; - /**** --- Ledger Offloading --- ****/ + // --- Ledger Offloading --- private String managedLedgerOffloadDriver = null; private int managedLedgerOffloadMaxThreads = 2; private String offloadersDirectory = "./offloaders"; @@ -188,14 +189,15 @@ public class PulsarConnectorConfig implements AutoCloseable { return this; } - /**** --- Ledger Offloading --- ****/ + // --- Ledger Offloading --- public int getManagedLedgerOffloadMaxThreads() { return this.managedLedgerOffloadMaxThreads; } @Config("pulsar.managed-ledger-offload-max-threads") - public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads) throws IOException { + public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads) + throws IOException { this.managedLedgerOffloadMaxThreads = managedLedgerOffloadMaxThreads; return this; } @@ -231,7 +233,7 @@ public class PulsarConnectorConfig implements AutoCloseable { return this; } - /**** --- Authentication --- ****/ + // --- Authentication --- public String getAuthPlugin() { return this.authPluginClassName; @@ -316,8 +318,8 @@ public class PulsarConnectorConfig implements AutoCloseable { @Override public String toString() { - return "PulsarConnectorConfig{" + - "brokerServiceUrl='" + brokerServiceUrl + '\'' + - '}'; + return "PulsarConnectorConfig{" + + "brokerServiceUrl='" + brokerServiceUrl + '\'' + + '}'; } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java index 0719e8e1a57..48d10814eed 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorFactory.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.sql.presto; +import static com.google.common.base.Throwables.throwIfUnchecked; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.connector.Connector; import com.facebook.presto.spi.connector.ConnectorContext; @@ -26,12 +29,11 @@ import com.google.inject.Injector; import io.airlift.bootstrap.Bootstrap; import io.airlift.json.JsonModule; import io.airlift.log.Logger; - import java.util.Map; -import static com.google.common.base.Throwables.throwIfUnchecked; -import static java.util.Objects.requireNonNull; - +/** + * The factory class which helps to build the presto connector. + */ public class PulsarConnectorFactory implements ConnectorFactory { private static final Logger log = Logger.get(PulsarConnectorFactory.class); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java index 0ba4e1be0bb..633bccbdf38 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorId.java @@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto; import static java.util.Objects.requireNonNull; +/** + * Unique identifier of a connector. + */ public class PulsarConnectorId { private final String id; @@ -34,8 +37,12 @@ public class PulsarConnectorId { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } PulsarConnectorId that = (PulsarConnectorId) o; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java index 34969d8035b..e3919d3f18e 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorMetricsTracker.java @@ -18,21 +18,23 @@ */ package org.apache.pulsar.sql.presto; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsProvider; import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; -import java.util.concurrent.TimeUnit; - +/** + * This class helps to track metrics related to the connector. + */ public class PulsarConnectorMetricsTracker implements AutoCloseable{ private final StatsLogger statsLogger; private static final String SCOPE = "split"; - /** metric names **/ + // metric names // time spend waiting to get entry from entry queue because it is empty private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time"; @@ -70,7 +72,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ public static final String READ_ATTEMPTS = "read-attempts"; // number of read attempts per query - public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query"; + public static final String READ_ATTEMTPS_PER_QUERY = "read-attempts-per-query"; // latency of reads per batch public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch"; @@ -97,209 +99,208 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ private static final String TOTAL_EXECUTION_TIME = "total-execution-time"; - /** stats loggers **/ - - private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime; - private final Counter statsLogger_bytesRead; - private final OpStatsLogger statsLogger_entryDeserializetime; - private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime; - private final Counter statsLogger_numMessagesDeserialized; - private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry; - private final OpStatsLogger statsLogger_readAttempts; - private final OpStatsLogger statsLogger_readLatencyPerBatch; - private final OpStatsLogger statsLogger_numEntriesPerBatch; - private final OpStatsLogger statsLogger_recordDeserializeTime; - private final Counter statsLogger_numRecordDeserialized; - private final OpStatsLogger statsLogger_totalExecutionTime; - - /** internal tracking variables **/ - private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime; - private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L; - private long BYTES_READ_sum = 0L; - private long ENTRY_DESERIALIZE_TIME_startTime; - private long ENTRY_DESERIALIZE_TIME_sum = 0L; - private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime; - private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L; - private long NUM_MESSAGES_DERSERIALIZED_sum = 0L; - private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L; - private long READ_ATTEMTPS_SUCCESS_sum = 0L; - private long READ_ATTEMTPS_FAIL_sum = 0L; - private long READ_LATENCY_SUCCESS_sum = 0L; - private long READ_LATENCY_FAIL_sum = 0L; - private long NUM_ENTRIES_PER_BATCH_sum = 0L; - private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L; - private long RECORD_DESERIALIZE_TIME_startTime; - private long RECORD_DESERIALIZE_TIME_sum = 0L; + // stats loggers + + private final OpStatsLogger statsLoggerEntryQueueDequeueWaitTime; + private final Counter statsLoggerBytesRead; + private final OpStatsLogger statsLoggerEntryDeserializeTime; + private final OpStatsLogger statsLoggerMessageQueueEnqueueWaitTime; + private final Counter statsLoggerNumMessagesDeserialized; + private final OpStatsLogger statsLoggerNumMessagesDeserializedPerEntry; + private final OpStatsLogger statsLoggerReadAttempts; + private final OpStatsLogger statsLoggerReadLatencyPerBatch; + private final OpStatsLogger statsLoggerNumEntriesPerBatch; + private final OpStatsLogger statsLoggerRecordDeserializeTime; + private final Counter statsLoggerNumRecordDeserialized; + private final OpStatsLogger statsLoggerTotalExecutionTime; + + // internal tracking variables + private long entryQueueDequeueWaitTimeStartTime; + private long entryQueueDequeueWaitTimeSum = 0L; + private long bytesReadSum = 0L; + private long entryDeserializeTimeStartTime; + private long entryDeserializeTimeSum = 0L; + private long messageQueueEnqueueWaitTimeStartTime; + private long messageQueueEnqueueWaitTimeSum = 0L; + private long numMessagesDerserializedSum = 0L; + private long numMessagedDerserializedPerBatch = 0L; + private long readAttemptsSuccessSum = 0L; + private long readAttemptsFailSum = 0L; + private long readLatencySuccessSum = 0L; + private long readLatencyFailSum = 0L; + private long numEntriesPerBatchSum = 0L; + private long messageQueueDequeueWaitTimeSum = 0L; + private long recordDeserializeTimeStartTime; + private long recordDeserializeTimeSum = 0L; public PulsarConnectorMetricsTracker(StatsProvider statsProvider) { this.statsLogger = statsProvider instanceof NullStatsProvider ? null : statsProvider.getStatsLogger(SCOPE); if (this.statsLogger != null) { - statsLogger_entryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME); - statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ); - statsLogger_entryDeserializetime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME); - statsLogger_messageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME); - statsLogger_numMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED); - statsLogger_numMessagesDeserializedPerEntry = statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY); - statsLogger_readAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS); - statsLogger_readLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH); - statsLogger_numEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH); - statsLogger_recordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME); - statsLogger_numRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED); - statsLogger_totalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME); + statsLoggerEntryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME); + statsLoggerBytesRead = statsLogger.getCounter(BYTES_READ); + statsLoggerEntryDeserializeTime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME); + statsLoggerMessageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME); + statsLoggerNumMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED); + statsLoggerNumMessagesDeserializedPerEntry = statsLogger + .getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY); + statsLoggerReadAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS); + statsLoggerReadLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH); + statsLoggerNumEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH); + statsLoggerRecordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME); + statsLoggerNumRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED); + statsLoggerTotalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME); } else { - statsLogger_entryQueueDequeueWaitTime = null; - statsLogger_bytesRead = null; - statsLogger_entryDeserializetime = null; - statsLogger_messageQueueEnqueueWaitTime = null; - statsLogger_numMessagesDeserialized = null; - statsLogger_numMessagesDeserializedPerEntry = null; - statsLogger_readAttempts = null; - statsLogger_readLatencyPerBatch = null; - statsLogger_numEntriesPerBatch = null; - statsLogger_recordDeserializeTime = null; - statsLogger_numRecordDeserialized = null; - statsLogger_totalExecutionTime = null; + statsLoggerEntryQueueDequeueWaitTime = null; + statsLoggerBytesRead = null; + statsLoggerEntryDeserializeTime = null; + statsLoggerMessageQueueEnqueueWaitTime = null; + statsLoggerNumMessagesDeserialized = null; + statsLoggerNumMessagesDeserializedPerEntry = null; + statsLoggerReadAttempts = null; + statsLoggerReadLatencyPerBatch = null; + statsLoggerNumEntriesPerBatch = null; + statsLoggerRecordDeserializeTime = null; + statsLoggerNumRecordDeserialized = null; + statsLoggerTotalExecutionTime = null; } } public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() { if (statsLogger != null) { - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime(); + entryQueueDequeueWaitTimeStartTime = System.nanoTime(); } } public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() { if (statsLogger != null) { - long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime; - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time; - statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + long time = System.nanoTime() - entryQueueDequeueWaitTimeStartTime; + entryQueueDequeueWaitTimeSum += time; + statsLoggerEntryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void register_BYTES_READ(long bytes) { if (statsLogger != null) { - BYTES_READ_sum += bytes; - statsLogger_bytesRead.add(bytes); + bytesReadSum += bytes; + statsLoggerBytesRead.add(bytes); } } public void start_ENTRY_DESERIALIZE_TIME() { if (statsLogger != null) { - ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime(); + entryDeserializeTimeStartTime = System.nanoTime(); } } public void end_ENTRY_DESERIALIZE_TIME() { if (statsLogger != null) { - long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime; - ENTRY_DESERIALIZE_TIME_sum += time; - statsLogger_entryDeserializetime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + long time = System.nanoTime() - entryDeserializeTimeStartTime; + entryDeserializeTimeSum += time; + statsLoggerEntryDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() { if (statsLogger != null) { - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime(); + messageQueueEnqueueWaitTimeStartTime = System.nanoTime(); } } public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() { if (statsLogger != null) { - long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime; - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time; - statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + long time = System.nanoTime() - messageQueueEnqueueWaitTimeStartTime; + messageQueueEnqueueWaitTimeSum += time; + statsLoggerMessageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() { if (statsLogger != null) { - NUM_MESSAGED_DERSERIALIZED_PER_BATCH++; - statsLogger_numMessagesDeserialized.add(1); + numMessagedDerserializedPerBatch++; + statsLoggerNumMessagesDeserialized.add(1); } } public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() { if (statsLogger != null) { - NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH; - - statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH); - - NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L; + numMessagesDerserializedSum += numMessagedDerserializedPerBatch; + statsLoggerNumMessagesDeserializedPerEntry.registerSuccessfulValue(numMessagedDerserializedPerBatch); + numMessagedDerserializedPerBatch = 0L; } } public void incr_READ_ATTEMPTS_SUCCESS() { if (statsLogger != null) { - READ_ATTEMTPS_SUCCESS_sum++; - statsLogger_readAttempts.registerSuccessfulValue(1L); + readAttemptsSuccessSum++; + statsLoggerReadAttempts.registerSuccessfulValue(1L); } } public void incr_READ_ATTEMPTS_FAIL() { if (statsLogger != null) { - READ_ATTEMTPS_FAIL_sum++; - statsLogger_readAttempts.registerFailedValue(1L); + readAttemptsFailSum++; + statsLoggerReadAttempts.registerFailedValue(1L); } } public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) { if (statsLogger != null) { - READ_LATENCY_SUCCESS_sum += latency; - statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + readLatencySuccessSum += latency; + statsLoggerReadLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); } } public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) { if (statsLogger != null) { - READ_LATENCY_FAIL_sum += latency; - statsLogger_readLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS); + readLatencyFailSum += latency; + statsLoggerReadLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS); } } public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) { if (statsLogger != null) { - NUM_ENTRIES_PER_BATCH_sum += delta; - statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta); + numEntriesPerBatchSum += delta; + statsLoggerNumEntriesPerBatch.registerSuccessfulValue(delta); } } public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) { if (statsLogger != null) { - statsLogger_numEntriesPerBatch.registerFailedValue(delta); + statsLoggerNumEntriesPerBatch.registerFailedValue(delta); } } public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) { if (statsLogger != null) { - MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency; + messageQueueDequeueWaitTimeSum += latency; } } public void start_RECORD_DESERIALIZE_TIME() { if (statsLogger != null) { - RECORD_DESERIALIZE_TIME_startTime = System.nanoTime(); + recordDeserializeTimeStartTime = System.nanoTime(); } } public void end_RECORD_DESERIALIZE_TIME() { if (statsLogger != null) { - long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime; - RECORD_DESERIALIZE_TIME_sum += time; - statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); + long time = System.nanoTime() - recordDeserializeTimeStartTime; + recordDeserializeTimeSum += time; + statsLoggerRecordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS); } } public void incr_NUM_RECORD_DESERIALIZED() { if (statsLogger != null) { - statsLogger_numRecordDeserialized.add(1); + statsLoggerNumRecordDeserialized.add(1); } } public void register_TOTAL_EXECUTION_TIME(long latency) { if (statsLogger != null) { - statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); + statsLoggerTotalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS); } } @@ -308,47 +309,47 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{ if (statsLogger != null) { // register total entry dequeue wait time for query statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY) - .registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS); + .registerSuccessfulEvent(entryQueueDequeueWaitTimeSum, TimeUnit.NANOSECONDS); //register bytes read per query statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY) - .registerSuccessfulValue(BYTES_READ_sum); + .registerSuccessfulValue(bytesReadSum); // register total time spent deserializing entries for query statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY) - .registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS); + .registerSuccessfulEvent(entryDeserializeTimeSum, TimeUnit.NANOSECONDS); // register time spent waiting for message queue enqueue because message queue is full per query statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY) - .registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS); + .registerSuccessfulEvent(messageQueueEnqueueWaitTimeSum, TimeUnit.NANOSECONDS); // register number of messages deserialized per query statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY) - .registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum); + .registerSuccessfulValue(numMessagesDerserializedSum); // register number of read attempts per query statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY) - .registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum); + .registerSuccessfulValue(readAttemptsSuccessSum); statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY) - .registerFailedValue(READ_ATTEMTPS_FAIL_sum); + .registerFailedValue(readAttemptsFailSum); // register total read latency for query statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY) - .registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS); + .registerSuccessfulEvent(readLatencySuccessSum, TimeUnit.NANOSECONDS); statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY) - .registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS); + .registerFailedEvent(readLatencyFailSum, TimeUnit.NANOSECONDS); // register number of entries per query statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY) - .registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum); + .registerSuccessfulValue(numEntriesPerBatchSum); // register time spent waiting to read for message queue per query statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY) - .registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS); + .registerSuccessfulEvent(messageQueueDequeueWaitTimeSum, TimeUnit.MILLISECONDS); // register time spent deserializing records per query statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY) - .registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS); + .registerSuccessfulEvent(recordDeserializeTimeSum, TimeUnit.NANOSECONDS); } } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java index 10dcd575c15..3d42ee08051 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorModule.java @@ -18,6 +18,11 @@ */ package org.apache.pulsar.sql.presto; +import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; +import static io.airlift.configuration.ConfigBinder.configBinder; +import static io.airlift.json.JsonBinder.jsonBinder; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.TypeManager; import com.fasterxml.jackson.databind.DeserializationContext; @@ -25,14 +30,11 @@ import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer; import com.google.inject.Binder; import com.google.inject.Module; import com.google.inject.Scopes; - import javax.inject.Inject; -import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature; -import static io.airlift.configuration.ConfigBinder.configBinder; -import static io.airlift.json.JsonBinder.jsonBinder; -import static java.util.Objects.requireNonNull; - +/** + * This class defines binding of classes in the Presto connector. + */ public class PulsarConnectorModule implements Module { private final String connectorId; @@ -56,9 +58,11 @@ public class PulsarConnectorModule implements Module { configBinder(binder).bindConfig(PulsarConnectorConfig.class); jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class); - } + /** + * A wrapper to deserialize the Presto types. + */ public static final class TypeDeserializer extends FromStringDeserializer { private static final long serialVersionUID = 1L; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java index b14cb87dc83..395e470526a 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarConnectorUtils.java @@ -18,17 +18,18 @@ */ package org.apache.pulsar.sql.presto; -import org.apache.avro.Schema; -import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.naming.TopicName; - import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.Map; import java.util.Properties; +import org.apache.avro.Schema; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.naming.TopicName; +/** + * A helper class containing repeatable logic used in the other classes. + */ public class PulsarConnectorUtils { public static Schema parseSchema(String schemaJson) { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java index 29c22ad1e74..45a0bed2f80 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarHandleResolver.java @@ -18,6 +18,9 @@ */ package org.apache.pulsar.sql.presto; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorHandleResolver; import com.facebook.presto.spi.ConnectorSplit; @@ -25,9 +28,9 @@ import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; - +/** + * This class helps to resolve classes for the Presto connector. + */ public class PulsarHandleResolver implements ConnectorHandleResolver { @Override public Class getTableHandleClass() { @@ -57,8 +60,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver { static PulsarColumnHandle convertColumnHandle(ColumnHandle columnHandle) { requireNonNull(columnHandle, "columnHandle is null"); - checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of " + - "PulsarColumnHandle"); + checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of " + + "PulsarColumnHandle"); return (PulsarColumnHandle) columnHandle; } @@ -70,8 +73,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver { static PulsarTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout) { requireNonNull(layout, "layout is null"); - checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of " + - "PulsarTableLayoutHandle"); + checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of " + + "PulsarTableLayoutHandle"); return (PulsarTableLayoutHandle) layout; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java index c18a0e2425d..f511f8a2e38 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarInternalColumn.java @@ -18,27 +18,31 @@ */ package org.apache.pulsar.sql.presto; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.type.BigintType; import com.facebook.presto.spi.type.TimestampType; -import com.facebook.presto.spi.type.TimestampWithTimeZoneType; import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.VarcharType; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; -import org.apache.pulsar.common.api.raw.RawMessage; - import java.util.Map; import java.util.Set; import java.util.function.Consumer; +import org.apache.pulsar.common.api.raw.RawMessage; -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Strings.isNullOrEmpty; -import static java.util.Objects.requireNonNull; - +/** + * This abstract class represents internal columns. + */ public abstract class PulsarInternalColumn { + /** + * Internal column representing the event time. + */ public static class EventTimeColumn extends PulsarInternalColumn { EventTimeColumn(String name, Type type, String comment) { @@ -51,6 +55,9 @@ public abstract class PulsarInternalColumn { } } + /** + * Internal column representing the publish time. + */ public static class PublishTimeColumn extends PulsarInternalColumn { PublishTimeColumn(String name, Type type, String comment) { @@ -63,6 +70,9 @@ public abstract class PulsarInternalColumn { } } + /** + * Internal column representing the message id. + */ public static class MessageIdColumn extends PulsarInternalColumn { MessageIdColumn(String name, Type type, String comment) { @@ -75,6 +85,9 @@ public abstract class PulsarInternalColumn { } } + /** + * Internal column representing the sequence id. + */ public static class SequenceIdColumn extends PulsarInternalColumn { SequenceIdColumn(String name, Type type, String comment) { @@ -87,6 +100,9 @@ public abstract class PulsarInternalColumn { } } + /** + * Internal column representing the producer name. + */ public static class ProducerNameColumn extends PulsarInternalColumn { ProducerNameColumn(String name, Type type, String comment) { @@ -99,6 +115,9 @@ public abstract class PulsarInternalColumn { } } + /** + * Internal column representing the key. + */ public static class KeyColumn extends PulsarInternalColumn { KeyColumn(String name, Type type, String comment) { @@ -111,9 +130,11 @@ public abstract class PulsarInternalColumn { } } + /** + * Internal column representing the message properties. + */ public static class PropertiesColumn extends PulsarInternalColumn { - private static final ObjectMapper mapper = new ObjectMapper(); PropertiesColumn(String name, Type type, String comment) { @@ -145,8 +166,8 @@ public abstract class PulsarInternalColumn { public static final PulsarInternalColumn PRODUCER_NAME = new ProducerNameColumn("__producer_name__", VarcharType .VARCHAR, "The name of the producer that publish the message used to generate this row"); - public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key " + - "for the topic"); + public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key " + + "for the topic"); public static final PulsarInternalColumn PROPERTIES = new PropertiesColumn("__properties__", VarcharType.VARCHAR, "User defined properties"); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java index a43198271eb..360baaf5a1b 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarMetadata.java @@ -18,6 +18,18 @@ */ package org.apache.pulsar.sql.presto; +import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; +import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; +import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; +import static com.facebook.presto.spi.type.DateType.DATE; +import static com.facebook.presto.spi.type.TimeType.TIME; +import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; +import static java.util.Objects.requireNonNull; +import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; +import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded; +import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle; +import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ColumnMetadata; import com.facebook.presto.spi.ConnectorSession; @@ -49,6 +61,16 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.log.Logger; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.Stack; +import java.util.stream.Collectors; +import javax.inject.Inject; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -62,30 +84,9 @@ import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import javax.inject.Inject; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.Stack; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; -import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; -import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; -import static com.facebook.presto.spi.type.DateType.DATE; -import static com.facebook.presto.spi.type.TimeType.TIME; -import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP; -import static java.util.Objects.requireNonNull; -import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle; -import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle; -import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded; -import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; - +/** + * This connector helps to work with metadata. + */ public class PulsarMetadata implements ConnectorMetadata { private final String connectorId; @@ -114,7 +115,7 @@ public class PulsarMetadata implements ConnectorMetadata { List tenants = pulsarAdmin.tenants().getTenants(); for (String tenant : tenants) { prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace -> - rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList())); + rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList())); } } catch (PulsarAdminException e) { if (e.getStatusCode() == 401) { @@ -141,7 +142,8 @@ public class PulsarMetadata implements ConnectorMetadata { Optional> desiredColumns) { PulsarTableHandle handle = convertTableHandle(table); - ConnectorTableLayout layout = new ConnectorTableLayout(new PulsarTableLayoutHandle(handle, constraint.getSummary())); + ConnectorTableLayout layout = new ConnectorTableLayout( + new PulsarTableLayoutHandle(handle, constraint.getSummary())); return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); } @@ -173,7 +175,8 @@ public class PulsarMetadata implements ConnectorMetadata { } else { List pulsarTopicList = null; try { - pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig)); + pulsarTopicList = this.pulsarAdmin.topics() + .getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig)); } catch (PulsarAdminException e) { if (e.getStatusCode() == 404) { log.warn("Schema " + schemaNameOrNull + " does not exsit"); @@ -465,9 +468,9 @@ public class PulsarMetadata implements ConnectorMetadata { canBeNull = true; } } else { - List columns = getColumns(fieldName, type, fieldTypes, fieldNames, positionIndices); + List columns = getColumns(fieldName, type, fieldTypes, fieldNames, + positionIndices); columnMetadataList.addAll(columns); - } } } else if (fieldSchema.getType() == Schema.Type.RECORD) { @@ -484,7 +487,8 @@ public class PulsarMetadata implements ConnectorMetadata { if (fieldName == null) { columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices); } else { - columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), fieldTypes, fieldNames, positionIndices); + columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), + fieldTypes, fieldNames, positionIndices); } positionIndices.pop(); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java index c9a94ae12f9..850b3f5d2f7 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPlugin.java @@ -22,10 +22,12 @@ import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.connector.ConnectorFactory; import com.google.common.collect.ImmutableList; +/** + * Implementation of the Pulsar plugin for Pesto. + */ public class PulsarPlugin implements Plugin { @Override - public Iterable getConnectorFactories() - { + public Iterable getConnectorFactories() { return ImmutableList.of(new PulsarConnectorFactory()); } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java index 6e3324bb6ff..28980a9f13b 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarPrimitiveSchemaHandler.java @@ -23,8 +23,6 @@ import io.netty.buffer.ByteBufUtil; import java.sql.Time; import java.sql.Timestamp; import java.util.Date; - -import io.netty.util.concurrent.FastThreadLocal; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AutoConsumeSchema; import org.apache.pulsar.common.schema.SchemaInfo; @@ -62,4 +60,4 @@ public class PulsarPrimitiveSchemaHandler implements SchemaHandler { public Object extractField(int index, Object currentRecord) { return currentRecord; } -} \ No newline at end of file +} \ No newline at end of file diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java index b2e8af5edef..b6ca975dae7 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordCursor.java @@ -37,17 +37,14 @@ import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.VarbinaryType; import com.facebook.presto.spi.type.VarcharType; import com.google.common.annotations.VisibleForTesting; - import io.airlift.log.Logger; import io.airlift.slice.Slice; import io.airlift.slice.Slices; - import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; -import org.apache.avro.Schema; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; @@ -61,11 +58,12 @@ import org.apache.pulsar.common.api.raw.MessageParser; import org.apache.pulsar.common.api.raw.RawMessage; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.schema.SchemaType; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; - +/** + * Implementation of a cursor to read records. + */ public class PulsarRecordCursor implements RecordCursor { private List columnHandles; @@ -118,15 +116,16 @@ public class PulsarRecordCursor implements RecordCursor { // Exposed for testing purposes PulsarRecordCursor(List columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig - pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, - PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) { + pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, + PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) { this.splitSize = pulsarSplit.getSplitSize(); - initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker); + initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, + pulsarConnectorMetricsTracker); } private void initialize(List columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig - pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, - PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) { + pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig, + PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) { this.columnHandles = columnHandles; this.pulsarSplit = pulsarSplit; this.pulsarConnectorConfig = pulsarConnectorConfig; @@ -149,7 +148,7 @@ public class PulsarRecordCursor implements RecordCursor { try { this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()), - pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig); + pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig); } catch (ManagedLedgerException | InterruptedException e) { log.error(e, "Failed to get read only cursor"); close(); @@ -300,11 +299,13 @@ public class PulsarRecordCursor implements RecordCursor { ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor); // check if ledger is offloaded if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) { - log.warn("Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured", - readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName()); + log.warn( + "Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured", + readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName()); long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries(); - long entriesToSkip = (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1; + long entriesToSkip = + (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1; cursor.skipEntries(Math.toIntExact((entriesToSkip))); entriesProcessed += entriesToSkip; @@ -339,13 +340,14 @@ public class PulsarRecordCursor implements RecordCursor { outstandingReadsRequests.incrementAndGet(); //set read latency stats for success - metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx); + metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long) ctx); //stats for number of entries read metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size()); } public boolean hasFinished() { - return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed; + return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1 + && splitSize <= entriesProcessed; } @Override @@ -354,7 +356,7 @@ public class PulsarRecordCursor implements RecordCursor { outstandingReadsRequests.incrementAndGet(); //set read latency stats for failed - metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx); + metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long) ctx); //stats for number of entries read failed metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize); } @@ -377,7 +379,7 @@ public class PulsarRecordCursor implements RecordCursor { currentMessage = null; } - while(true) { + while (true) { if (readEntries.hasFinished()) { return false; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java index 17e703645b8..be94a1b8eaa 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSet.java @@ -18,15 +18,17 @@ */ package org.apache.pulsar.sql.presto; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.RecordCursor; import com.facebook.presto.spi.RecordSet; import com.facebook.presto.spi.type.Type; import com.google.common.collect.ImmutableList; - import java.util.List; -import static java.util.Objects.requireNonNull; - +/** + * Implementation of a record set. + */ public class PulsarRecordSet implements RecordSet { private final List columnHandles; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java index 378013bfb7b..c49bae9c274 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarRecordSetProvider.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.sql.presto; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplit; @@ -25,12 +27,12 @@ import com.facebook.presto.spi.RecordSet; import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; import com.google.common.collect.ImmutableList; - -import javax.inject.Inject; import java.util.List; +import javax.inject.Inject; -import static java.util.Objects.requireNonNull; - +/** + * Implementation of the provider for record sets. + */ public class PulsarRecordSetProvider implements ConnectorRecordSetProvider { private final PulsarConnectorConfig pulsarConnectorConfig; @@ -52,7 +54,6 @@ public class PulsarRecordSetProvider implements ConnectorRecordSetProvider { handles.add((PulsarColumnHandle) handle); } - return new PulsarRecordSet(pulsarSplit, handles.build(), this.pulsarConnectorConfig); } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java index 2fdccc4e2db..eeebbd174dd 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplit.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.sql.presto; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSplit; import com.facebook.presto.spi.HostAddress; @@ -25,15 +27,15 @@ import com.facebook.presto.spi.predicate.TupleDomain; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Map; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; -import java.util.List; -import java.util.Map; - -import static java.util.Objects.requireNonNull; - +/** + * This class represents information for a split. + */ public class PulsarSplit implements ConnectorSplit { private final long splitId; @@ -176,19 +178,19 @@ public class PulsarSplit implements ConnectorSplit { @Override public String toString() { - return "PulsarSplit{" + - "splitId=" + splitId + - ", connectorId='" + connectorId + '\'' + - ", schemaName='" + schemaName + '\'' + - ", tableName='" + tableName + '\'' + - ", splitSize=" + splitSize + - ", schema='" + schema + '\'' + - ", schemaType=" + schemaType + - ", startPositionEntryId=" + startPositionEntryId + - ", endPositionEntryId=" + endPositionEntryId + - ", startPositionLedgerId=" + startPositionLedgerId + - ", endPositionLedgerId=" + endPositionLedgerId + - '}'; + return "PulsarSplit{" + + "splitId=" + splitId + + ", connectorId='" + connectorId + '\'' + + ", schemaName='" + schemaName + '\'' + + ", tableName='" + tableName + '\'' + + ", splitSize=" + splitSize + + ", schema='" + schema + '\'' + + ", schemaType=" + schemaType + + ", startPositionEntryId=" + startPositionEntryId + + ", endPositionEntryId=" + endPositionEntryId + + ", startPositionLedgerId=" + startPositionLedgerId + + ", endPositionLedgerId=" + endPositionLedgerId + + '}'; } public SchemaInfo getSchemaInfo() { diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java index 77b8f11831e..6da5df41eb8 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarSplitManager.java @@ -18,6 +18,12 @@ */ package org.apache.pulsar.sql.presto; +import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; +import static com.google.common.base.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; +import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries; +import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorSession; import com.facebook.presto.spi.ConnectorSplitSource; @@ -30,14 +36,20 @@ import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.Range; import com.facebook.presto.spi.predicate.TupleDomain; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Predicate; import io.airlift.log.Logger; +import java.sql.Timestamp; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import javax.inject.Inject; import lombok.Data; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.ReadOnlyCursor; -import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -47,22 +59,10 @@ import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.schema.SchemaInfo; -import com.google.common.base.Predicate; -import org.apache.bookkeeper.conf.ClientConfiguration; - -import javax.inject.Inject; -import java.sql.Timestamp; -import java.util.Collection; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; - -import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED; -import static com.google.common.base.Preconditions.checkArgument; -import static java.util.Objects.requireNonNull; -import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries; -import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded; +/** + * The class helping to manage splits. + */ public class PulsarSplitManager implements ConnectorSplitManager { private final String connectorId; @@ -146,7 +146,7 @@ public class PulsarSplitManager implements ConnectorSplitManager { } throw new RuntimeException("Failed to get metadata for partitioned topic " - + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e); + + topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e); } int actualNumSplits = Math.max(numPartitions, numSplits); @@ -342,15 +342,15 @@ public class PulsarSplitManager implements ConnectorSplitManager { // Just use a close bound since presto can always filter out the extra entries even if // the bound // should be open or a mixture of open and closed - com.google.common.collect.Range posRange - = com.google.common.collect.Range.range(overallStartPos, + com.google.common.collect.Range posRange = + com.google.common.collect.Range.range(overallStartPos, com.google.common.collect.BoundType.CLOSED, overallEndPos, com.google.common.collect.BoundType.CLOSED); long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1; - PredicatePushdownInfo predicatePushdownInfo - = new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries); + PredicatePushdownInfo predicatePushdownInfo = + new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries); log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo); return predicatePushdownInfo; } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java index 6e8ba9010ef..2ffcfde0ebd 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableHandle.java @@ -18,20 +18,22 @@ */ package org.apache.pulsar.sql.presto; +import static com.google.common.base.MoreObjects.toStringHelper; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ConnectorTableHandle; import com.facebook.presto.spi.SchemaTableName; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import java.util.Objects; -import static com.google.common.base.MoreObjects.toStringHelper; -import static java.util.Objects.requireNonNull; - +/** + * Description of basic metadata of a table. + */ public class PulsarTableHandle implements ConnectorTableHandle { /** - * connector id + * Connector id. */ private final String connectorId; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java index a7c240b940d..257c2a2d5e1 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTableLayoutHandle.java @@ -18,16 +18,18 @@ */ package org.apache.pulsar.sql.presto; +import static java.util.Objects.requireNonNull; + import com.facebook.presto.spi.ColumnHandle; import com.facebook.presto.spi.ConnectorTableLayoutHandle; import com.facebook.presto.spi.predicate.TupleDomain; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import java.util.Objects; -import static java.util.Objects.requireNonNull; - +/** + * This class handles the table layout. + */ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle { private final PulsarTableHandle table; private final TupleDomain tupleDomain; @@ -46,14 +48,12 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle { } @JsonProperty - public TupleDomain getTupleDomain() - { + public TupleDomain getTupleDomain() { return tupleDomain; } @Override - public boolean equals(Object o) - { + public boolean equals(Object o) { if (this == o) { return true; } @@ -61,19 +61,17 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle { return false; } PulsarTableLayoutHandle that = (PulsarTableLayoutHandle) o; - return Objects.equals(table, that.table) && - Objects.equals(tupleDomain, that.tupleDomain); + return Objects.equals(table, that.table) + && Objects.equals(tupleDomain, that.tupleDomain); } @Override - public int hashCode() - { + public int hashCode() { return Objects.hash(table, tupleDomain); } @Override - public String toString() - { + public String toString() { return table.toString(); } } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java index ae163ef0832..1658fee61fb 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTopicDescription.java @@ -18,14 +18,17 @@ */ package org.apache.pulsar.sql.presto; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; - import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.Objects.requireNonNull; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +/** + * Represents the basic information about a pulsar topic. + */ public class PulsarTopicDescription { private final String tableName; private final String topicName; diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java index 52282b2b2f1..2d1e104f551 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/PulsarTransactionHandle.java @@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto; import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +/** + * A handle for transactions. + */ public enum PulsarTransactionHandle implements ConnectorTransactionHandle { INSTANCE } diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java index 1e02d2ba302..37fce043295 100644 --- a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/SchemaHandler.java @@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto; import io.netty.buffer.ByteBuf; +/** + * This interface defines the methods to work with schemas. + */ public interface SchemaHandler { Object deserialize(ByteBuf payload); diff --git a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java new file mode 100644 index 00000000000..7c6649e9217 --- /dev/null +++ b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +/** + * Implementation of the connector to the Presto engine. + */ +package org.apache.pulsar.sql.presto; \ No newline at end of file -- GitLab