提交 b3397fb7 编写于 作者: S Sergii Zhevzhyk 提交者: Jia Zhai

Enforce checkstyle in the pulsar sql module (#4882)

The checksyle plugin was added to the pulsar sql module to enforce the defined style. All violations were fixed:

- Ordering of imports.
- Formatting of the code.
- Absent Javadoc comments.
- Other small issues.
(cherry picked from commit f6fee1c6)
上级 c781e405
......@@ -30,6 +30,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
......
......@@ -114,4 +114,26 @@
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<executions>
<execution>
<id>check-style</id>
<phase>verify</phase>
<configuration>
<configLocation>../buildtools/src/main/resources/pulsar/checkstyle.xml</configLocation>
<suppressionsLocation>../buildtools/src/main/resources/pulsar/suppressions.xml</suppressionsLocation>
<encoding>UTF-8</encoding>
</configuration>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
......@@ -19,11 +19,12 @@
package org.apache.pulsar.sql.presto;
import io.airlift.log.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
......@@ -31,9 +32,9 @@ import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
import java.io.IOException;
import java.util.List;
/**
* Schema handler for payload in the Avro format.
*/
public class AvroSchemaHandler implements SchemaHandler {
private final DatumReader<GenericRecord> datumReader;
......@@ -64,7 +65,7 @@ public class AvroSchemaHandler implements SchemaHandler {
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(heapBuffer.array(), heapBuffer.arrayOffset(),
heapBuffer.readableBytes(), decoderFromCache);
if (decoderFromCache==null) {
if (decoderFromCache == null) {
decoders.set(decoder);
}
return this.datumReader.read(null, decoder);
......@@ -87,7 +88,7 @@ public class AvroSchemaHandler implements SchemaHandler {
return null;
}
if (positionIndices.length > 0) {
for (int i = 1 ; i < positionIndices.length; i++) {
for (int i = 1; i < positionIndices.length; i++) {
curr = ((GenericRecord) curr).get(positionIndices[i]);
if (curr == null) {
return null;
......@@ -96,7 +97,7 @@ public class AvroSchemaHandler implements SchemaHandler {
}
return curr;
} catch (Exception ex) {
log.debug(ex,"%s", ex);
log.debug(ex, "%s", ex);
}
return null;
}
......
......@@ -20,17 +20,17 @@ package org.apache.pulsar.sql.presto;
import com.dslplatform.json.DslJson;
import com.facebook.presto.spi.type.Type;
import io.airlift.log.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
/**
* Schema handler for payload in the JSON format.
*/
public class JSONSchemaHandler implements SchemaHandler {
private static final Logger log = Logger.get(JSONSchemaHandler.class);
......@@ -84,7 +84,7 @@ public class JSONSchemaHandler implements SchemaHandler {
if (field == null) {
return null;
}
for (int i = 1; i < fieldNames.length ; i++) {
for (int i = 1; i < fieldNames.length; i++) {
field = ((Map) field).get(fieldNames[i]);
if (field == null) {
return null;
......@@ -101,7 +101,7 @@ public class JSONSchemaHandler implements SchemaHandler {
return field;
} catch (Exception ex) {
log.debug(ex,"%s", ex);
log.debug(ex, "%s", ex);
}
return null;
}
......
......@@ -18,27 +18,29 @@
*/
package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Arrays;
import static java.util.Objects.requireNonNull;
/**
* This class represents the basic information about a presto column.
*/
public class PulsarColumnHandle implements ColumnHandle {
private final String connectorId;
/**
* Column Name
* Column Name.
*/
private final String name;
/**
* Column type
* Column type.
*/
private final Type type;
......@@ -116,17 +118,33 @@ public class PulsarColumnHandle implements ColumnHandle {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PulsarColumnHandle that = (PulsarColumnHandle) o;
if (hidden != that.hidden) return false;
if (internal != that.internal) return false;
if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) return false;
if (name != null ? !name.equals(that.name) : that.name != null) return false;
if (type != null ? !type.equals(that.type) : that.type != null) return false;
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
if (hidden != that.hidden) {
return false;
}
if (internal != that.internal) {
return false;
}
if (connectorId != null ? !connectorId.equals(that.connectorId) : that.connectorId != null) {
return false;
}
if (name != null ? !name.equals(that.name) : that.name != null) {
return false;
}
if (type != null ? !type.equals(that.type) : that.type != null) {
return false;
}
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
return false;
}
return Arrays.deepEquals(positionIndices, that.positionIndices);
}
......@@ -144,14 +162,14 @@ public class PulsarColumnHandle implements ColumnHandle {
@Override
public String toString() {
return "PulsarColumnHandle{" +
"connectorId='" + connectorId + '\'' +
", name='" + name + '\'' +
", type=" + type +
", hidden=" + hidden +
", internal=" + internal +
", fieldNames=" + Arrays.toString(fieldNames) +
", positionIndices=" + Arrays.toString(positionIndices) +
'}';
return "PulsarColumnHandle{"
+ "connectorId='" + connectorId + '\''
+ ", name='" + name + '\''
+ ", type=" + type
+ ", hidden=" + hidden
+ ", internal=" + internal
+ ", fieldNames=" + Arrays.toString(fieldNames)
+ ", positionIndices=" + Arrays.toString(positionIndices)
+ '}';
}
}
......@@ -20,10 +20,11 @@ package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.type.Type;
import java.util.Arrays;
import java.util.List;
/**
* Description of the column metadata.
*/
public class PulsarColumnMetadata extends ColumnMetadata {
private boolean isInternal;
......@@ -60,25 +61,37 @@ public class PulsarColumnMetadata extends ColumnMetadata {
@Override
public String toString() {
return "PulsarColumnMetadata{" +
"isInternal=" + isInternal +
", nameWithCase='" + nameWithCase + '\'' +
", fieldNames=" + Arrays.toString(fieldNames) +
", positionIndices=" + Arrays.toString(positionIndices) +
'}';
return "PulsarColumnMetadata{"
+ "isInternal=" + isInternal
+ ", nameWithCase='" + nameWithCase + '\''
+ ", fieldNames=" + Arrays.toString(fieldNames)
+ ", positionIndices=" + Arrays.toString(positionIndices)
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
PulsarColumnMetadata that = (PulsarColumnMetadata) o;
if (isInternal != that.isInternal) return false;
if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) return false;
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) return false;
if (isInternal != that.isInternal) {
return false;
}
if (nameWithCase != null ? !nameWithCase.equals(that.nameWithCase) : that.nameWithCase != null) {
return false;
}
if (!Arrays.deepEquals(fieldNames, that.fieldNames)) {
return false;
}
return Arrays.deepEquals(positionIndices, that.positionIndices);
}
......
......@@ -18,6 +18,10 @@
*/
package org.apache.pulsar.sql.presto;
import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
......@@ -26,13 +30,11 @@ import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.transaction.IsolationLevel;
import io.airlift.bootstrap.LifeCycleManager;
import io.airlift.log.Logger;
import javax.inject.Inject;
import static com.facebook.presto.spi.transaction.IsolationLevel.READ_COMMITTED;
import static com.facebook.presto.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
/**
* This file contains implementation of the connector to the Presto engine.
*/
public class PulsarConnector implements Connector {
private static final Logger log = Logger.get(PulsarConnector.class);
......
......@@ -18,8 +18,13 @@
*/
package org.apache.pulsar.sql.presto;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import java.io.IOException;
import java.util.Map;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.LedgerOffloader;
......@@ -34,13 +39,9 @@ import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.PulsarVersion;
import java.io.IOException;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
/**
* Implementation of a cache for the Pulsar connector.
*/
public class PulsarConnectorCache {
private static final Logger log = Logger.get(PulsarConnectorCache.class);
......@@ -68,7 +69,7 @@ public class PulsarConnectorCache {
// start stats provider
ClientConfiguration clientConfiguration = new ClientConfiguration();
pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
pulsarConnectorConfig.getStatsProviderConfigs().forEach(clientConfiguration::setProperty);
this.statsProvider.start(clientConfiguration);
......@@ -84,7 +85,8 @@ public class PulsarConnectorCache {
return instance;
}
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig)
throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setClientTcpNoDelay(false)
......@@ -123,16 +125,17 @@ public class PulsarConnectorCache {
Map<String, String> offloaderProperties = conf.getOffloaderProperties();
offloaderProperties.put(OFFLOADERS_DIRECTOR, conf.getOffloadersDirectory());
offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_DRIVER, conf.getManagedLedgerOffloadDriver());
offloaderProperties.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
offloaderProperties
.put(MANAGED_LEDGER_OFFLOAD_MAX_THREADS, String.valueOf(conf.getManagedLedgerOffloadMaxThreads()));
try {
return offloaderFactory.create(
PulsarConnectorUtils.getProperties(offloaderProperties),
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
PulsarConnectorUtils.getProperties(offloaderProperties),
ImmutableMap.of(
LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(),
LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha()
),
getOffloaderScheduler(conf));
} catch (IOException ioe) {
log.error("Failed to create offloader: ", ioe);
throw new RuntimeException(ioe.getMessage(), ioe.getCause());
......
......@@ -20,20 +20,21 @@ package org.apache.pulsar.sql.presto;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airlift.configuration.Config;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import javax.validation.constraints.NotNull;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.common.naming.NamedEntity;
import org.apache.pulsar.common.protocol.Commands;
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
/**
* This object handles configuration of the Pulsar connector for the Presto engine.
*/
public class PulsarConnectorConfig implements AutoCloseable {
private String brokerServiceUrl = "http://localhost:8080";
......@@ -55,7 +56,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
private boolean namespaceDelimiterRewriteEnable = false;
private String rewriteNamespaceDelimiter = "/";
/**** --- Ledger Offloading --- ****/
// --- Ledger Offloading ---
private String managedLedgerOffloadDriver = null;
private int managedLedgerOffloadMaxThreads = 2;
private String offloadersDirectory = "./offloaders";
......@@ -188,14 +189,15 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
/**** --- Ledger Offloading --- ****/
// --- Ledger Offloading ---
public int getManagedLedgerOffloadMaxThreads() {
return this.managedLedgerOffloadMaxThreads;
}
@Config("pulsar.managed-ledger-offload-max-threads")
public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads) throws IOException {
public PulsarConnectorConfig setManagedLedgerOffloadMaxThreads(int managedLedgerOffloadMaxThreads)
throws IOException {
this.managedLedgerOffloadMaxThreads = managedLedgerOffloadMaxThreads;
return this;
}
......@@ -231,7 +233,7 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
/**** --- Authentication --- ****/
// --- Authentication ---
public String getAuthPlugin() {
return this.authPluginClassName;
......@@ -316,8 +318,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
@Override
public String toString() {
return "PulsarConnectorConfig{" +
"brokerServiceUrl='" + brokerServiceUrl + '\'' +
'}';
return "PulsarConnectorConfig{"
+ "brokerServiceUrl='" + brokerServiceUrl + '\''
+ '}';
}
}
......@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.sql.presto;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.connector.Connector;
import com.facebook.presto.spi.connector.ConnectorContext;
......@@ -26,12 +29,11 @@ import com.google.inject.Injector;
import io.airlift.bootstrap.Bootstrap;
import io.airlift.json.JsonModule;
import io.airlift.log.Logger;
import java.util.Map;
import static com.google.common.base.Throwables.throwIfUnchecked;
import static java.util.Objects.requireNonNull;
/**
* The factory class which helps to build the presto connector.
*/
public class PulsarConnectorFactory implements ConnectorFactory {
private static final Logger log = Logger.get(PulsarConnectorFactory.class);
......
......@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
/**
* Unique identifier of a connector.
*/
public class PulsarConnectorId {
private final String id;
......@@ -34,8 +37,12 @@ public class PulsarConnectorId {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PulsarConnectorId that = (PulsarConnectorId) o;
......
......@@ -18,21 +18,23 @@
*/
package org.apache.pulsar.sql.presto;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import java.util.concurrent.TimeUnit;
/**
* This class helps to track metrics related to the connector.
*/
public class PulsarConnectorMetricsTracker implements AutoCloseable{
private final StatsLogger statsLogger;
private static final String SCOPE = "split";
/** metric names **/
// metric names
// time spend waiting to get entry from entry queue because it is empty
private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time";
......@@ -70,7 +72,7 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
public static final String READ_ATTEMPTS = "read-attempts";
// number of read attempts per query
public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query";
public static final String READ_ATTEMTPS_PER_QUERY = "read-attempts-per-query";
// latency of reads per batch
public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch";
......@@ -97,209 +99,208 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
/** stats loggers **/
private final OpStatsLogger statsLogger_entryQueueDequeueWaitTime;
private final Counter statsLogger_bytesRead;
private final OpStatsLogger statsLogger_entryDeserializetime;
private final OpStatsLogger statsLogger_messageQueueEnqueueWaitTime;
private final Counter statsLogger_numMessagesDeserialized;
private final OpStatsLogger statsLogger_numMessagesDeserializedPerEntry;
private final OpStatsLogger statsLogger_readAttempts;
private final OpStatsLogger statsLogger_readLatencyPerBatch;
private final OpStatsLogger statsLogger_numEntriesPerBatch;
private final OpStatsLogger statsLogger_recordDeserializeTime;
private final Counter statsLogger_numRecordDeserialized;
private final OpStatsLogger statsLogger_totalExecutionTime;
/** internal tracking variables **/
private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
private long BYTES_READ_sum = 0L;
private long ENTRY_DESERIALIZE_TIME_startTime;
private long ENTRY_DESERIALIZE_TIME_sum = 0L;
private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L;
private long NUM_MESSAGES_DERSERIALIZED_sum = 0L;
private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
private long READ_ATTEMTPS_SUCCESS_sum = 0L;
private long READ_ATTEMTPS_FAIL_sum = 0L;
private long READ_LATENCY_SUCCESS_sum = 0L;
private long READ_LATENCY_FAIL_sum = 0L;
private long NUM_ENTRIES_PER_BATCH_sum = 0L;
private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
private long RECORD_DESERIALIZE_TIME_startTime;
private long RECORD_DESERIALIZE_TIME_sum = 0L;
// stats loggers
private final OpStatsLogger statsLoggerEntryQueueDequeueWaitTime;
private final Counter statsLoggerBytesRead;
private final OpStatsLogger statsLoggerEntryDeserializeTime;
private final OpStatsLogger statsLoggerMessageQueueEnqueueWaitTime;
private final Counter statsLoggerNumMessagesDeserialized;
private final OpStatsLogger statsLoggerNumMessagesDeserializedPerEntry;
private final OpStatsLogger statsLoggerReadAttempts;
private final OpStatsLogger statsLoggerReadLatencyPerBatch;
private final OpStatsLogger statsLoggerNumEntriesPerBatch;
private final OpStatsLogger statsLoggerRecordDeserializeTime;
private final Counter statsLoggerNumRecordDeserialized;
private final OpStatsLogger statsLoggerTotalExecutionTime;
// internal tracking variables
private long entryQueueDequeueWaitTimeStartTime;
private long entryQueueDequeueWaitTimeSum = 0L;
private long bytesReadSum = 0L;
private long entryDeserializeTimeStartTime;
private long entryDeserializeTimeSum = 0L;
private long messageQueueEnqueueWaitTimeStartTime;
private long messageQueueEnqueueWaitTimeSum = 0L;
private long numMessagesDerserializedSum = 0L;
private long numMessagedDerserializedPerBatch = 0L;
private long readAttemptsSuccessSum = 0L;
private long readAttemptsFailSum = 0L;
private long readLatencySuccessSum = 0L;
private long readLatencyFailSum = 0L;
private long numEntriesPerBatchSum = 0L;
private long messageQueueDequeueWaitTimeSum = 0L;
private long recordDeserializeTimeStartTime;
private long recordDeserializeTimeSum = 0L;
public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
this.statsLogger = statsProvider instanceof NullStatsProvider
? null : statsProvider.getStatsLogger(SCOPE);
if (this.statsLogger != null) {
statsLogger_entryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
statsLogger_bytesRead = statsLogger.getCounter(BYTES_READ);
statsLogger_entryDeserializetime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
statsLogger_messageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
statsLogger_numMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
statsLogger_numMessagesDeserializedPerEntry = statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
statsLogger_readAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS);
statsLogger_readLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
statsLogger_numEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
statsLogger_recordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
statsLogger_numRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
statsLogger_totalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
statsLoggerEntryQueueDequeueWaitTime = statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME);
statsLoggerBytesRead = statsLogger.getCounter(BYTES_READ);
statsLoggerEntryDeserializeTime = statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME);
statsLoggerMessageQueueEnqueueWaitTime = statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME);
statsLoggerNumMessagesDeserialized = statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED);
statsLoggerNumMessagesDeserializedPerEntry = statsLogger
.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY);
statsLoggerReadAttempts = statsLogger.getOpStatsLogger(READ_ATTEMPTS);
statsLoggerReadLatencyPerBatch = statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH);
statsLoggerNumEntriesPerBatch = statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH);
statsLoggerRecordDeserializeTime = statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME);
statsLoggerNumRecordDeserialized = statsLogger.getCounter(NUM_RECORD_DESERIALIZED);
statsLoggerTotalExecutionTime = statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME);
} else {
statsLogger_entryQueueDequeueWaitTime = null;
statsLogger_bytesRead = null;
statsLogger_entryDeserializetime = null;
statsLogger_messageQueueEnqueueWaitTime = null;
statsLogger_numMessagesDeserialized = null;
statsLogger_numMessagesDeserializedPerEntry = null;
statsLogger_readAttempts = null;
statsLogger_readLatencyPerBatch = null;
statsLogger_numEntriesPerBatch = null;
statsLogger_recordDeserializeTime = null;
statsLogger_numRecordDeserialized = null;
statsLogger_totalExecutionTime = null;
statsLoggerEntryQueueDequeueWaitTime = null;
statsLoggerBytesRead = null;
statsLoggerEntryDeserializeTime = null;
statsLoggerMessageQueueEnqueueWaitTime = null;
statsLoggerNumMessagesDeserialized = null;
statsLoggerNumMessagesDeserializedPerEntry = null;
statsLoggerReadAttempts = null;
statsLoggerReadLatencyPerBatch = null;
statsLoggerNumEntriesPerBatch = null;
statsLoggerRecordDeserializeTime = null;
statsLoggerNumRecordDeserialized = null;
statsLoggerTotalExecutionTime = null;
}
}
public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
if (statsLogger != null) {
ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime();
entryQueueDequeueWaitTimeStartTime = System.nanoTime();
}
}
public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
statsLogger_entryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
long time = System.nanoTime() - entryQueueDequeueWaitTimeStartTime;
entryQueueDequeueWaitTimeSum += time;
statsLoggerEntryQueueDequeueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void register_BYTES_READ(long bytes) {
if (statsLogger != null) {
BYTES_READ_sum += bytes;
statsLogger_bytesRead.add(bytes);
bytesReadSum += bytes;
statsLoggerBytesRead.add(bytes);
}
}
public void start_ENTRY_DESERIALIZE_TIME() {
if (statsLogger != null) {
ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime();
entryDeserializeTimeStartTime = System.nanoTime();
}
}
public void end_ENTRY_DESERIALIZE_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
ENTRY_DESERIALIZE_TIME_sum += time;
statsLogger_entryDeserializetime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
long time = System.nanoTime() - entryDeserializeTimeStartTime;
entryDeserializeTimeSum += time;
statsLoggerEntryDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
if (statsLogger != null) {
MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime();
messageQueueEnqueueWaitTimeStartTime = System.nanoTime();
}
}
public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
statsLogger_messageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
long time = System.nanoTime() - messageQueueEnqueueWaitTimeStartTime;
messageQueueEnqueueWaitTimeSum += time;
statsLoggerMessageQueueEnqueueWaitTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
if (statsLogger != null) {
NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
statsLogger_numMessagesDeserialized.add(1);
numMessagedDerserializedPerBatch++;
statsLoggerNumMessagesDeserialized.add(1);
}
}
public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
if (statsLogger != null) {
NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
statsLogger_numMessagesDeserializedPerEntry.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
numMessagesDerserializedSum += numMessagedDerserializedPerBatch;
statsLoggerNumMessagesDeserializedPerEntry.registerSuccessfulValue(numMessagedDerserializedPerBatch);
numMessagedDerserializedPerBatch = 0L;
}
}
public void incr_READ_ATTEMPTS_SUCCESS() {
if (statsLogger != null) {
READ_ATTEMTPS_SUCCESS_sum++;
statsLogger_readAttempts.registerSuccessfulValue(1L);
readAttemptsSuccessSum++;
statsLoggerReadAttempts.registerSuccessfulValue(1L);
}
}
public void incr_READ_ATTEMPTS_FAIL() {
if (statsLogger != null) {
READ_ATTEMTPS_FAIL_sum++;
statsLogger_readAttempts.registerFailedValue(1L);
readAttemptsFailSum++;
statsLoggerReadAttempts.registerFailedValue(1L);
}
}
public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
if (statsLogger != null) {
READ_LATENCY_SUCCESS_sum += latency;
statsLogger_readLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
readLatencySuccessSum += latency;
statsLoggerReadLatencyPerBatch.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
}
}
public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
if (statsLogger != null) {
READ_LATENCY_FAIL_sum += latency;
statsLogger_readLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
readLatencyFailSum += latency;
statsLoggerReadLatencyPerBatch.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
if (statsLogger != null) {
NUM_ENTRIES_PER_BATCH_sum += delta;
statsLogger_numEntriesPerBatch.registerSuccessfulValue(delta);
numEntriesPerBatchSum += delta;
statsLoggerNumEntriesPerBatch.registerSuccessfulValue(delta);
}
}
public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
if (statsLogger != null) {
statsLogger_numEntriesPerBatch.registerFailedValue(delta);
statsLoggerNumEntriesPerBatch.registerFailedValue(delta);
}
}
public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) {
if (statsLogger != null) {
MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency;
messageQueueDequeueWaitTimeSum += latency;
}
}
public void start_RECORD_DESERIALIZE_TIME() {
if (statsLogger != null) {
RECORD_DESERIALIZE_TIME_startTime = System.nanoTime();
recordDeserializeTimeStartTime = System.nanoTime();
}
}
public void end_RECORD_DESERIALIZE_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
RECORD_DESERIALIZE_TIME_sum += time;
statsLogger_recordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
long time = System.nanoTime() - recordDeserializeTimeStartTime;
recordDeserializeTimeSum += time;
statsLoggerRecordDeserializeTime.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_RECORD_DESERIALIZED() {
if (statsLogger != null) {
statsLogger_numRecordDeserialized.add(1);
statsLoggerNumRecordDeserialized.add(1);
}
}
public void register_TOTAL_EXECUTION_TIME(long latency) {
if (statsLogger != null) {
statsLogger_totalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
statsLoggerTotalExecutionTime.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
}
}
......@@ -308,47 +309,47 @@ public class PulsarConnectorMetricsTracker implements AutoCloseable{
if (statsLogger != null) {
// register total entry dequeue wait time for query
statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
.registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
.registerSuccessfulEvent(entryQueueDequeueWaitTimeSum, TimeUnit.NANOSECONDS);
//register bytes read per query
statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY)
.registerSuccessfulValue(BYTES_READ_sum);
.registerSuccessfulValue(bytesReadSum);
// register total time spent deserializing entries for query
statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY)
.registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
.registerSuccessfulEvent(entryDeserializeTimeSum, TimeUnit.NANOSECONDS);
// register time spent waiting for message queue enqueue because message queue is full per query
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY)
.registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
.registerSuccessfulEvent(messageQueueEnqueueWaitTimeSum, TimeUnit.NANOSECONDS);
// register number of messages deserialized per query
statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY)
.registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum);
.registerSuccessfulValue(numMessagesDerserializedSum);
// register number of read attempts per query
statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
.registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum);
.registerSuccessfulValue(readAttemptsSuccessSum);
statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
.registerFailedValue(READ_ATTEMTPS_FAIL_sum);
.registerFailedValue(readAttemptsFailSum);
// register total read latency for query
statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
.registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS);
.registerSuccessfulEvent(readLatencySuccessSum, TimeUnit.NANOSECONDS);
statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
.registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS);
.registerFailedEvent(readLatencyFailSum, TimeUnit.NANOSECONDS);
// register number of entries per query
statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY)
.registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum);
.registerSuccessfulValue(numEntriesPerBatchSum);
// register time spent waiting to read for message queue per query
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
.registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS);
.registerSuccessfulEvent(messageQueueDequeueWaitTimeSum, TimeUnit.MILLISECONDS);
// register time spent deserializing records per query
statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY)
.registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
.registerSuccessfulEvent(recordDeserializeTimeSum, TimeUnit.NANOSECONDS);
}
}
}
......@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.sql.presto;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.fasterxml.jackson.databind.DeserializationContext;
......@@ -25,14 +30,11 @@ import com.fasterxml.jackson.databind.deser.std.FromStringDeserializer;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import javax.inject.Inject;
import static com.facebook.presto.spi.type.TypeSignature.parseTypeSignature;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonBinder.jsonBinder;
import static java.util.Objects.requireNonNull;
/**
* This class defines binding of classes in the Presto connector.
*/
public class PulsarConnectorModule implements Module {
private final String connectorId;
......@@ -56,9 +58,11 @@ public class PulsarConnectorModule implements Module {
configBinder(binder).bindConfig(PulsarConnectorConfig.class);
jsonBinder(binder).addDeserializerBinding(Type.class).to(TypeDeserializer.class);
}
/**
* A wrapper to deserialize the Presto types.
*/
public static final class TypeDeserializer
extends FromStringDeserializer<Type> {
private static final long serialVersionUID = 1L;
......
......@@ -18,17 +18,18 @@
*/
package org.apache.pulsar.sql.presto;
import org.apache.avro.Schema;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
/**
* A helper class containing repeatable logic used in the other classes.
*/
public class PulsarConnectorUtils {
public static Schema parseSchema(String schemaJson) {
......
......@@ -18,6 +18,9 @@
*/
package org.apache.pulsar.sql.presto;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorHandleResolver;
import com.facebook.presto.spi.ConnectorSplit;
......@@ -25,9 +28,9 @@ import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
/**
* This class helps to resolve classes for the Presto connector.
*/
public class PulsarHandleResolver implements ConnectorHandleResolver {
@Override
public Class<? extends ConnectorTableHandle> getTableHandleClass() {
......@@ -57,8 +60,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver {
static PulsarColumnHandle convertColumnHandle(ColumnHandle columnHandle) {
requireNonNull(columnHandle, "columnHandle is null");
checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of " +
"PulsarColumnHandle");
checkArgument(columnHandle instanceof PulsarColumnHandle, "columnHandle is not an instance of "
+ "PulsarColumnHandle");
return (PulsarColumnHandle) columnHandle;
}
......@@ -70,8 +73,8 @@ public class PulsarHandleResolver implements ConnectorHandleResolver {
static PulsarTableLayoutHandle convertLayout(ConnectorTableLayoutHandle layout) {
requireNonNull(layout, "layout is null");
checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of " +
"PulsarTableLayoutHandle");
checkArgument(layout instanceof PulsarTableLayoutHandle, "layout is not an instance of "
+ "PulsarTableLayoutHandle");
return (PulsarTableLayoutHandle) layout;
}
......
......@@ -18,27 +18,31 @@
*/
package org.apache.pulsar.sql.presto;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.type.BigintType;
import com.facebook.presto.spi.type.TimestampType;
import com.facebook.presto.spi.type.TimestampWithTimeZoneType;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarcharType;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.pulsar.common.api.raw.RawMessage;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import org.apache.pulsar.common.api.raw.RawMessage;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
/**
* This abstract class represents internal columns.
*/
public abstract class PulsarInternalColumn {
/**
* Internal column representing the event time.
*/
public static class EventTimeColumn extends PulsarInternalColumn {
EventTimeColumn(String name, Type type, String comment) {
......@@ -51,6 +55,9 @@ public abstract class PulsarInternalColumn {
}
}
/**
* Internal column representing the publish time.
*/
public static class PublishTimeColumn extends PulsarInternalColumn {
PublishTimeColumn(String name, Type type, String comment) {
......@@ -63,6 +70,9 @@ public abstract class PulsarInternalColumn {
}
}
/**
* Internal column representing the message id.
*/
public static class MessageIdColumn extends PulsarInternalColumn {
MessageIdColumn(String name, Type type, String comment) {
......@@ -75,6 +85,9 @@ public abstract class PulsarInternalColumn {
}
}
/**
* Internal column representing the sequence id.
*/
public static class SequenceIdColumn extends PulsarInternalColumn {
SequenceIdColumn(String name, Type type, String comment) {
......@@ -87,6 +100,9 @@ public abstract class PulsarInternalColumn {
}
}
/**
* Internal column representing the producer name.
*/
public static class ProducerNameColumn extends PulsarInternalColumn {
ProducerNameColumn(String name, Type type, String comment) {
......@@ -99,6 +115,9 @@ public abstract class PulsarInternalColumn {
}
}
/**
* Internal column representing the key.
*/
public static class KeyColumn extends PulsarInternalColumn {
KeyColumn(String name, Type type, String comment) {
......@@ -111,9 +130,11 @@ public abstract class PulsarInternalColumn {
}
}
/**
* Internal column representing the message properties.
*/
public static class PropertiesColumn extends PulsarInternalColumn {
private static final ObjectMapper mapper = new ObjectMapper();
PropertiesColumn(String name, Type type, String comment) {
......@@ -145,8 +166,8 @@ public abstract class PulsarInternalColumn {
public static final PulsarInternalColumn PRODUCER_NAME = new ProducerNameColumn("__producer_name__", VarcharType
.VARCHAR, "The name of the producer that publish the message used to generate this row");
public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key " +
"for the topic");
public static final PulsarInternalColumn KEY = new KeyColumn("__key__", VarcharType.VARCHAR, "The partition key "
+ "for the topic");
public static final PulsarInternalColumn PROPERTIES = new PropertiesColumn("__properties__", VarcharType.VARCHAR,
"User defined properties");
......
......@@ -18,6 +18,18 @@
*/
package org.apache.pulsar.sql.presto;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static java.util.Objects.requireNonNull;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
......@@ -49,6 +61,16 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.log.Logger;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
......@@ -62,30 +84,9 @@ import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import javax.inject.Inject;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Stack;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.facebook.presto.spi.type.DateType.DATE;
import static com.facebook.presto.spi.type.TimeType.TIME;
import static com.facebook.presto.spi.type.TimestampType.TIMESTAMP;
import static java.util.Objects.requireNonNull;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertColumnHandle;
import static org.apache.pulsar.sql.presto.PulsarHandleResolver.convertTableHandle;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.rewriteNamespaceDelimiterIfNeeded;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
/**
* This connector helps to work with metadata.
*/
public class PulsarMetadata implements ConnectorMetadata {
private final String connectorId;
......@@ -114,7 +115,7 @@ public class PulsarMetadata implements ConnectorMetadata {
List<String> tenants = pulsarAdmin.tenants().getTenants();
for (String tenant : tenants) {
prestoSchemas.addAll(pulsarAdmin.namespaces().getNamespaces(tenant).stream().map(namespace ->
rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
rewriteNamespaceDelimiterIfNeeded(namespace, pulsarConnectorConfig)).collect(Collectors.toList()));
}
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 401) {
......@@ -141,7 +142,8 @@ public class PulsarMetadata implements ConnectorMetadata {
Optional<Set<ColumnHandle>> desiredColumns) {
PulsarTableHandle handle = convertTableHandle(table);
ConnectorTableLayout layout = new ConnectorTableLayout(new PulsarTableLayoutHandle(handle, constraint.getSummary()));
ConnectorTableLayout layout = new ConnectorTableLayout(
new PulsarTableLayoutHandle(handle, constraint.getSummary()));
return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary()));
}
......@@ -173,7 +175,8 @@ public class PulsarMetadata implements ConnectorMetadata {
} else {
List<String> pulsarTopicList = null;
try {
pulsarTopicList = this.pulsarAdmin.topics().getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
pulsarTopicList = this.pulsarAdmin.topics()
.getList(restoreNamespaceDelimiterIfNeeded(schemaNameOrNull, pulsarConnectorConfig));
} catch (PulsarAdminException e) {
if (e.getStatusCode() == 404) {
log.warn("Schema " + schemaNameOrNull + " does not exsit");
......@@ -465,9 +468,9 @@ public class PulsarMetadata implements ConnectorMetadata {
canBeNull = true;
}
} else {
List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames, positionIndices);
List<PulsarColumnMetadata> columns = getColumns(fieldName, type, fieldTypes, fieldNames,
positionIndices);
columnMetadataList.addAll(columns);
}
}
} else if (fieldSchema.getType() == Schema.Type.RECORD) {
......@@ -484,7 +487,8 @@ public class PulsarMetadata implements ConnectorMetadata {
if (fieldName == null) {
columns = getColumns(field.name(), field.schema(), fieldTypes, fieldNames, positionIndices);
} else {
columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(), fieldTypes, fieldNames, positionIndices);
columns = getColumns(String.format("%s.%s", fieldName, field.name()), field.schema(),
fieldTypes, fieldNames, positionIndices);
}
positionIndices.pop();
......
......@@ -22,10 +22,12 @@ import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.google.common.collect.ImmutableList;
/**
* Implementation of the Pulsar plugin for Pesto.
*/
public class PulsarPlugin implements Plugin {
@Override
public Iterable<ConnectorFactory> getConnectorFactories()
{
public Iterable<ConnectorFactory> getConnectorFactories() {
return ImmutableList.of(new PulsarConnectorFactory());
}
}
......@@ -23,8 +23,6 @@ import io.netty.buffer.ByteBufUtil;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Date;
import io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.schema.SchemaInfo;
......@@ -62,4 +60,4 @@ public class PulsarPrimitiveSchemaHandler implements SchemaHandler {
public Object extractField(int index, Object currentRecord) {
return currentRecord;
}
}
\ No newline at end of file
}
\ No newline at end of file
......@@ -37,17 +37,14 @@ import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.VarbinaryType;
import com.facebook.presto.spi.type.VarcharType;
import com.google.common.annotations.VisibleForTesting;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.avro.Schema;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
......@@ -61,11 +58,12 @@ import org.apache.pulsar.common.api.raw.MessageParser;
import org.apache.pulsar.common.api.raw.RawMessage;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaType;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.SpscArrayQueue;
/**
* Implementation of a cursor to read records.
*/
public class PulsarRecordCursor implements RecordCursor {
private List<PulsarColumnHandle> columnHandles;
......@@ -118,15 +116,16 @@ public class PulsarRecordCursor implements RecordCursor {
// Exposed for testing purposes
PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.splitSize = pulsarSplit.getSplitSize();
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig, pulsarConnectorMetricsTracker);
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, managedLedgerConfig,
pulsarConnectorMetricsTracker);
}
private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, ManagedLedgerConfig managedLedgerConfig,
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.columnHandles = columnHandles;
this.pulsarSplit = pulsarSplit;
this.pulsarConnectorConfig = pulsarConnectorConfig;
......@@ -149,7 +148,7 @@ public class PulsarRecordCursor implements RecordCursor {
try {
this.cursor = getCursor(TopicName.get("persistent", NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
pulsarSplit.getTableName()), pulsarSplit.getStartPosition(), managedLedgerFactory, managedLedgerConfig);
} catch (ManagedLedgerException | InterruptedException e) {
log.error(e, "Failed to get read only cursor");
close();
......@@ -300,11 +299,13 @@ public class PulsarRecordCursor implements RecordCursor {
ReadOnlyCursorImpl readOnlyCursorImpl = ((ReadOnlyCursorImpl) cursor);
// check if ledger is offloaded
if (!readOffloaded && readOnlyCursorImpl.getCurrentLedgerInfo().hasOffloadContext()) {
log.warn("Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
log.warn(
"Ledger %s is offloaded for topic %s. Ignoring it because offloader is not configured",
readOnlyCursorImpl.getCurrentLedgerInfo().getLedgerId(), pulsarSplit.getTableName());
long numEntries = readOnlyCursorImpl.getCurrentLedgerInfo().getEntries();
long entriesToSkip = (numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
long entriesToSkip =
(numEntries - ((PositionImpl) cursor.getReadPosition()).getEntryId()) + 1;
cursor.skipEntries(Math.toIntExact((entriesToSkip)));
entriesProcessed += entriesToSkip;
......@@ -339,13 +340,14 @@ public class PulsarRecordCursor implements RecordCursor {
outstandingReadsRequests.incrementAndGet();
//set read latency stats for success
metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx);
metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long) ctx);
//stats for number of entries read
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
}
public boolean hasFinished() {
return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >=1 && splitSize <= entriesProcessed;
return messageQueue.isEmpty() && isDone && outstandingReadsRequests.get() >= 1
&& splitSize <= entriesProcessed;
}
@Override
......@@ -354,7 +356,7 @@ public class PulsarRecordCursor implements RecordCursor {
outstandingReadsRequests.incrementAndGet();
//set read latency stats for failed
metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx);
metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long) ctx);
//stats for number of entries read failed
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize);
}
......@@ -377,7 +379,7 @@ public class PulsarRecordCursor implements RecordCursor {
currentMessage = null;
}
while(true) {
while (true) {
if (readEntries.hasFinished()) {
return false;
}
......
......@@ -18,15 +18,17 @@
*/
package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.RecordCursor;
import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.type.Type;
import com.google.common.collect.ImmutableList;
import java.util.List;
import static java.util.Objects.requireNonNull;
/**
* Implementation of a record set.
*/
public class PulsarRecordSet implements RecordSet {
private final List<PulsarColumnHandle> columnHandles;
......
......@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplit;
......@@ -25,12 +27,12 @@ import com.facebook.presto.spi.RecordSet;
import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;
import javax.inject.Inject;
import java.util.List;
import javax.inject.Inject;
import static java.util.Objects.requireNonNull;
/**
* Implementation of the provider for record sets.
*/
public class PulsarRecordSetProvider implements ConnectorRecordSetProvider {
private final PulsarConnectorConfig pulsarConnectorConfig;
......@@ -52,7 +54,6 @@ public class PulsarRecordSetProvider implements ConnectorRecordSetProvider {
handles.add((PulsarColumnHandle) handle);
}
return new PulsarRecordSet(pulsarSplit, handles.build(), this.pulsarConnectorConfig);
}
}
......@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
......@@ -25,15 +27,15 @@ import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.util.List;
import java.util.Map;
import static java.util.Objects.requireNonNull;
/**
* This class represents information for a split.
*/
public class PulsarSplit implements ConnectorSplit {
private final long splitId;
......@@ -176,19 +178,19 @@ public class PulsarSplit implements ConnectorSplit {
@Override
public String toString() {
return "PulsarSplit{" +
"splitId=" + splitId +
", connectorId='" + connectorId + '\'' +
", schemaName='" + schemaName + '\'' +
", tableName='" + tableName + '\'' +
", splitSize=" + splitSize +
", schema='" + schema + '\'' +
", schemaType=" + schemaType +
", startPositionEntryId=" + startPositionEntryId +
", endPositionEntryId=" + endPositionEntryId +
", startPositionLedgerId=" + startPositionLedgerId +
", endPositionLedgerId=" + endPositionLedgerId +
'}';
return "PulsarSplit{"
+ "splitId=" + splitId
+ ", connectorId='" + connectorId + '\''
+ ", schemaName='" + schemaName + '\''
+ ", tableName='" + tableName + '\''
+ ", splitSize=" + splitSize
+ ", schema='" + schema + '\''
+ ", schemaType=" + schemaType
+ ", startPositionEntryId=" + startPositionEntryId
+ ", endPositionEntryId=" + endPositionEntryId
+ ", startPositionLedgerId=" + startPositionLedgerId
+ ", endPositionLedgerId=" + endPositionLedgerId
+ '}';
}
public SchemaInfo getSchemaInfo() {
......
......@@ -18,6 +18,12 @@
*/
package org.apache.pulsar.sql.presto;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorSplitSource;
......@@ -30,14 +36,20 @@ import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import io.airlift.log.Logger;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import javax.inject.Inject;
import lombok.Data;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
......@@ -47,22 +59,10 @@ import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaInfo;
import com.google.common.base.Predicate;
import org.apache.bookkeeper.conf.ClientConfiguration;
import javax.inject.Inject;
import java.sql.Timestamp;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import static com.facebook.presto.spi.StandardErrorCode.QUERY_REJECTED;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static org.apache.bookkeeper.mledger.ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries;
import static org.apache.pulsar.sql.presto.PulsarConnectorUtils.restoreNamespaceDelimiterIfNeeded;
/**
* The class helping to manage splits.
*/
public class PulsarSplitManager implements ConnectorSplitManager {
private final String connectorId;
......@@ -146,7 +146,7 @@ public class PulsarSplitManager implements ConnectorSplitManager {
}
throw new RuntimeException("Failed to get metadata for partitioned topic "
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(),e);
+ topicName + ": " + ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
int actualNumSplits = Math.max(numPartitions, numSplits);
......@@ -342,15 +342,15 @@ public class PulsarSplitManager implements ConnectorSplitManager {
// Just use a close bound since presto can always filter out the extra entries even if
// the bound
// should be open or a mixture of open and closed
com.google.common.collect.Range<PositionImpl> posRange
= com.google.common.collect.Range.range(overallStartPos,
com.google.common.collect.Range<PositionImpl> posRange =
com.google.common.collect.Range.range(overallStartPos,
com.google.common.collect.BoundType.CLOSED,
overallEndPos, com.google.common.collect.BoundType.CLOSED);
long numOfEntries = readOnlyCursor.getNumberOfEntries(posRange) - 1;
PredicatePushdownInfo predicatePushdownInfo
= new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
PredicatePushdownInfo predicatePushdownInfo =
new PredicatePushdownInfo(overallStartPos, overallEndPos, numOfEntries);
log.debug("Predicate pushdown optimization calculated: %s", predicatePushdownInfo);
return predicatePushdownInfo;
}
......
......@@ -18,20 +18,22 @@
*/
package org.apache.pulsar.sql.presto;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.SchemaTableName;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
/**
* Description of basic metadata of a table.
*/
public class PulsarTableHandle implements ConnectorTableHandle {
/**
* connector id
* Connector id.
*/
private final String connectorId;
......
......@@ -18,16 +18,18 @@
*/
package org.apache.pulsar.sql.presto;
import static java.util.Objects.requireNonNull;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorTableLayoutHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
/**
* This class handles the table layout.
*/
public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
private final PulsarTableHandle table;
private final TupleDomain<ColumnHandle> tupleDomain;
......@@ -46,14 +48,12 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
}
@JsonProperty
public TupleDomain<ColumnHandle> getTupleDomain()
{
public TupleDomain<ColumnHandle> getTupleDomain() {
return tupleDomain;
}
@Override
public boolean equals(Object o)
{
public boolean equals(Object o) {
if (this == o) {
return true;
}
......@@ -61,19 +61,17 @@ public class PulsarTableLayoutHandle implements ConnectorTableLayoutHandle {
return false;
}
PulsarTableLayoutHandle that = (PulsarTableLayoutHandle) o;
return Objects.equals(table, that.table) &&
Objects.equals(tupleDomain, that.tupleDomain);
return Objects.equals(table, that.table)
&& Objects.equals(tupleDomain, that.tupleDomain);
}
@Override
public int hashCode()
{
public int hashCode() {
return Objects.hash(table, tupleDomain);
}
@Override
public String toString()
{
public String toString() {
return table.toString();
}
}
......@@ -18,14 +18,17 @@
*/
package org.apache.pulsar.sql.presto;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
/**
* Represents the basic information about a pulsar topic.
*/
public class PulsarTopicDescription {
private final String tableName;
private final String topicName;
......
......@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
/**
* A handle for transactions.
*/
public enum PulsarTransactionHandle implements ConnectorTransactionHandle {
INSTANCE
}
......@@ -20,6 +20,9 @@ package org.apache.pulsar.sql.presto;
import io.netty.buffer.ByteBuf;
/**
* This interface defines the methods to work with schemas.
*/
public interface SchemaHandler {
Object deserialize(ByteBuf payload);
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* Implementation of the connector to the Presto engine.
*/
package org.apache.pulsar.sql.presto;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册