未验证 提交 474645f7 编写于 作者: B Boyang Jerry Peng 提交者: GitHub

adding metrics to presto pulsar connector (#2631)

* adding metrics to presto pulsar connector

* rename batch size

* adding comments

* refactoring metrics

* modifying bytes read metric

* fixing tests

* deleting tmp file

* adding jars to LICENSE
上级 606b4de3
......@@ -24,7 +24,7 @@ pulsar.broker-service-url=http://localhost:8080
# URI of Zookeeper cluster
pulsar.zookeeper-uri=localhost:2181
# minimum number of entries to read at a single time
pulsar.entry-read-batch-size=100
pulsar.max-entry-read-batch-size=100
# default number of splits to use per query
pulsar.target-num-splits=2
# max message queue size
......
......@@ -387,6 +387,7 @@ The Apache Software License, Version 2.0
- org.apache.distributedlog-distributedlog-core-4.7.2-tests.jar
- org.apache.distributedlog-distributedlog-core-4.7.2.jar
- org.apache.distributedlog-distributedlog-protocol-4.7.2.jar
- org.apache.bookkeeper.stats-codahale-metrics-provider-4.7.2.jar
* LZ4 -- net.jpountz.lz4-lz4-1.3.0.jar
* AsyncHttpClient
- org.asynchttpclient-async-http-client-2.1.0-alpha26.jar
......@@ -459,6 +460,10 @@ The Apache Software License, Version 2.0
- io.kubernetes-client-java-proto-2.0.0.jar
* Joda Time
- joda-time-joda-time-2.9.3.jar
* Dropwizard
- io.dropwizard.metrics-metrics-core-3.1.0.jar
- io.dropwizard.metrics-metrics-graphite-3.1.0.jar
- io.dropwizard.metrics-metrics-jvm-3.1.0.jar
BSD 3-clause "New" or "Revised" License
......
......@@ -38,6 +38,18 @@
<artifactId>bookkeeper-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>prometheus-metrics-provider</artifactId>
<version>${bookkeeper.version}</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper.stats</groupId>
<artifactId>codahale-metrics-provider</artifactId>
<version>${bookkeeper.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
......
......@@ -382,6 +382,15 @@ The Apache Software License, Version 2.0
- validation-api-1.1.0.Final.jar
* Objectsize
- objectsize-0.0.12.jar
* Dropwizard Metrics
- metrics-core-3.1.0.jar
- metrics-graphite-3.1.0.jar
- metrics-jvm-3.1.0.jar
* Prometheus
- simpleclient-0.0.23.jar
- simpleclient_common-0.0.23.jar
- simpleclient_hotspot-0.0.23.jar
- simpleclient_servlet-0.0.23.jar
Protocol Buffers License
* Protocol Buffers
......
......@@ -22,6 +22,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
public class PulsarConnectorCache {
......@@ -29,8 +30,19 @@ public class PulsarConnectorCache {
private final ManagedLedgerFactory managedLedgerFactory;
private final StatsProvider statsProvider;
private PulsarConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
this.managedLedgerFactory = initManagedLedgerFactory(pulsarConnectorConfig);
this.statsProvider = PulsarConnectorUtils.createInstance(pulsarConnectorConfig.getStatsProvider(),
StatsProvider.class, getClass().getClassLoader());
// start stats provider
ClientConfiguration clientConfiguration = new ClientConfiguration();
pulsarConnectorConfig.getStatsProviderConfigs().forEach((key, value) -> clientConfiguration.setProperty(key, value));
this.statsProvider.start(clientConfiguration);
}
public static PulsarConnectorCache getConnectorCache(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
......@@ -55,9 +67,14 @@ public class PulsarConnectorCache {
return managedLedgerFactory;
}
public StatsProvider getStatsProvider() {
return statsProvider;
}
public static void shutdown() throws ManagedLedgerException, InterruptedException {
if (instance != null) {
instance.managedLedgerFactory.shutdown();
instance.statsProvider.stop();
instance = null;
}
}
......
......@@ -18,11 +18,17 @@
*/
package org.apache.pulsar.sql.presto;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import io.airlift.configuration.Config;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
import javax.validation.constraints.NotNull;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
public class PulsarConnectorConfig implements AutoCloseable {
......@@ -32,6 +38,8 @@ public class PulsarConnectorConfig implements AutoCloseable {
private int targetNumSplits = 2;
private int maxSplitMessageQueueSize = 10000;
private int maxSplitEntryQueueSize = 1000;
private String statsProvider = NullStatsProvider.class.getName();
private Map<String, String> statsProviderConfigs = new HashMap<>();
private PulsarAdmin pulsarAdmin;
@NotNull
......@@ -57,12 +65,12 @@ public class PulsarConnectorConfig implements AutoCloseable {
}
@NotNull
public int getEntryReadBatchSize() {
public int getMaxEntryReadBatchSize() {
return this.entryReadBatchSize;
}
@Config("pulsar.entry-read-batch-size")
public PulsarConnectorConfig setEntryReadBatchSize(int batchSize) {
@Config("pulsar.max-entry-read-batch-size")
public PulsarConnectorConfig setMaxEntryReadBatchSize(int batchSize) {
this.entryReadBatchSize = batchSize;
return this;
}
......@@ -100,6 +108,29 @@ public class PulsarConnectorConfig implements AutoCloseable {
return this;
}
@NotNull
public String getStatsProvider() {
return statsProvider;
}
@Config("pulsar.stats-provider")
public PulsarConnectorConfig setStatsProvider(String statsProvider) {
this.statsProvider = statsProvider;
return this;
}
@NotNull
public Map<String, String> getStatsProviderConfigs() {
return statsProviderConfigs;
}
@Config("pulsar.stats-provider-configs")
public PulsarConnectorConfig setStatsProviderConfigs(String statsProviderConfigs) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
this.statsProviderConfigs = new Gson().fromJson(statsProviderConfigs, type);
return this;
}
@NotNull
public PulsarAdmin getPulsarAdmin() throws PulsarClientException {
if (this.pulsarAdmin == null) {
......
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.sql.presto;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import java.util.concurrent.TimeUnit;
public class PulsarConnectorMetricsTracker implements AutoCloseable{
private StatsLogger statsLogger;
private static final String SCOPE = "split";
/** metric names **/
// time spend waiting to get entry from entry queue because it is empty
private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME = "entry-queue-dequeue-wait-time";
// total time spend waiting to get entry from entry queue per query
private static final String ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "entry-queue-dequeue-wait-time-per-query";
// number of bytes read from bookkeeper
private static final String BYTES_READ = "bytes-read";
// total number of bytes read per query
private static final String BYTES_READ_PER_QUERY = "bytes-read-per-query";
// time spent derserializing entries
private static final String ENTRY_DESERIALIZE_TIME = "entry-deserialize-time";
// time spent derserializing entries per query
private static final String ENTRY_DESERIALIZE_TIME_PER_QUERY = "entry-deserialize-time_per_query";
// time spent waiting for message queue enqueue because message queue is full
private static final String MESSAGE_QUEUE_ENQUEUE_WAIT_TIME = "message-queue-enqueue-wait-time";
// time spent waiting for message queue enqueue because message queue is full per query
private static final String MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY = "message-queue-enqueue-wait-time-per-query";
private static final String NUM_MESSAGES_DERSERIALIZED = "num-messages-deserialized";
// number of messages deserialized
public static final String NUM_MESSAGES_DERSERIALIZED_PER_ENTRY = "num-messages-deserialized-per-entry";
// number of messages deserialized per query
public static final String NUM_MESSAGES_DERSERIALIZED_PER_QUERY = "num-messages-deserialized-per-query";
// number of read attempts. Will fail if queues are full
public static final String READ_ATTEMTPS = "read-attempts";
// number of read attempts per query
public static final String READ_ATTEMTPS_PER_QUERY= "read-attempts-per-query";
// latency of reads per batch
public static final String READ_LATENCY_PER_BATCH = "read-latency-per-batch";
// total read latency per query
public static final String READ_LATENCY_PER_QUERY = "read-latency-per-query";
// number of entries per batch
public static final String NUM_ENTRIES_PER_BATCH = "num-entries-per-batch";
// number of entries per query
public static final String NUM_ENTRIES_PER_QUERY = "num-entries-per-query";
// time spent waiting to dequeue from message queue because its empty
public static final String MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY = "message-queue-dequeue-wait-time-per-query";
// time spent deserializing message to record e.g. avro, json, etc
public static final String RECORD_DESERIALIZE_TIME = "record-deserialize-time";
// time spent deserializing message to record per query
private static final String RECORD_DESERIALIZE_TIME_PER_QUERY = "record-deserialize-time-per-query";
private static final String NUM_RECORD_DESERIALIZED = "num-record-deserialized";
private static final String TOTAL_EXECUTION_TIME = "total-execution-time";
/** internal tracking variables **/
private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
private long ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
private long BYTES_READ_sum = 0L;
private long ENTRY_DESERIALIZE_TIME_startTime;
private long ENTRY_DESERIALIZE_TIME_sum = 0L;
private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
private long MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum = 0L;
private long NUM_MESSAGES_DERSERIALIZED_sum = 0L;
private long NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
private long READ_ATTEMTPS_SUCCESS_sum = 0L;
private long READ_ATTEMTPS_FAIL_sum = 0L;
private long READ_LATENCY_SUCCESS_sum = 0L;
private long READ_LATENCY_FAIL_sum = 0L;
private long NUM_ENTRIES_PER_BATCH_sum = 0L;
private long MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum = 0L;
private long RECORD_DESERIALIZE_TIME_startTime;
private long RECORD_DESERIALIZE_TIME_sum = 0L;
public PulsarConnectorMetricsTracker(StatsProvider statsProvider) {
this.statsLogger = statsProvider instanceof NullStatsProvider
? null : statsProvider.getStatsLogger(SCOPE);
}
public void start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
if (statsLogger != null) {
ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime = System.nanoTime();
}
}
public void end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - ENTRY_QUEUE_DEQUEUE_WAIT_TIME_startTime;
ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum += time;
statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME)
.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void register_BYTES_READ(long bytes) {
if (statsLogger != null) {
BYTES_READ_sum += bytes;
statsLogger.getCounter(BYTES_READ).add(bytes);
}
}
public void start_ENTRY_DESERIALIZE_TIME() {
if (statsLogger != null) {
ENTRY_DESERIALIZE_TIME_startTime = System.nanoTime();
}
}
public void end_ENTRY_DESERIALIZE_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - ENTRY_DESERIALIZE_TIME_startTime;
ENTRY_DESERIALIZE_TIME_sum += time;
statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME)
.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
if (statsLogger != null) {
MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime = System.nanoTime();
}
}
public void end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_startTime;
MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum += time;
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME)
.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
if (statsLogger != null) {
NUM_MESSAGED_DERSERIALIZED_PER_BATCH++;
statsLogger.getCounter(NUM_MESSAGES_DERSERIALIZED).add(1);
}
}
public void end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY() {
if (statsLogger != null) {
NUM_MESSAGES_DERSERIALIZED_sum += NUM_MESSAGED_DERSERIALIZED_PER_BATCH;
statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_ENTRY)
.registerSuccessfulValue(NUM_MESSAGED_DERSERIALIZED_PER_BATCH);
NUM_MESSAGED_DERSERIALIZED_PER_BATCH = 0L;
}
}
public void incr_READ_ATTEMPTS_SUCCESS() {
if (statsLogger != null) {
READ_ATTEMTPS_SUCCESS_sum++;
statsLogger.getOpStatsLogger(READ_ATTEMTPS)
.registerSuccessfulValue(1L);
}
}
public void incr_READ_ATTEMPTS_FAIL() {
if (statsLogger != null) {
READ_ATTEMTPS_FAIL_sum++;
statsLogger.getOpStatsLogger(READ_ATTEMTPS)
.registerFailedValue(1L);
}
}
public void register_READ_LATENCY_PER_BATCH_SUCCESS(long latency) {
if (statsLogger != null) {
READ_LATENCY_SUCCESS_sum += latency;
statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
}
}
public void register_READ_LATENCY_PER_BATCH_FAIL(long latency) {
if (statsLogger != null) {
READ_LATENCY_FAIL_sum += latency;
statsLogger.getOpStatsLogger(READ_LATENCY_PER_BATCH)
.registerFailedEvent(latency, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_ENTRIES_PER_BATCH_SUCCESS(long delta) {
if (statsLogger != null) {
NUM_ENTRIES_PER_BATCH_sum += delta;
statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
.registerSuccessfulValue(delta);
}
}
public void incr_NUM_ENTRIES_PER_BATCH_FAIL(long delta) {
if (statsLogger != null) {
statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_BATCH)
.registerFailedValue(delta);
}
}
public void register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(long latency) {
if (statsLogger != null) {
MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum += latency;
}
}
public void start_RECORD_DESERIALIZE_TIME() {
if (statsLogger != null) {
RECORD_DESERIALIZE_TIME_startTime = System.nanoTime();
}
}
public void end_RECORD_DESERIALIZE_TIME() {
if (statsLogger != null) {
long time = System.nanoTime() - RECORD_DESERIALIZE_TIME_startTime;
RECORD_DESERIALIZE_TIME_sum += time;
statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME)
.registerSuccessfulEvent(time, TimeUnit.NANOSECONDS);
}
}
public void incr_NUM_RECORD_DESERIALIZED() {
if (statsLogger != null) {
statsLogger.getCounter(NUM_RECORD_DESERIALIZED).add(1);
}
}
public void register_TOTAL_EXECUTION_TIME(long latency) {
if (statsLogger != null) {
statsLogger.getOpStatsLogger(TOTAL_EXECUTION_TIME)
.registerSuccessfulEvent(latency, TimeUnit.NANOSECONDS);
}
}
@Override
public void close() {
if (statsLogger != null) {
// register total entry dequeue wait time for query
statsLogger.getOpStatsLogger(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
.registerSuccessfulEvent(ENTRY_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
//register bytes read per query
statsLogger.getOpStatsLogger(BYTES_READ_PER_QUERY)
.registerSuccessfulValue(BYTES_READ_sum);
// register total time spent deserializing entries for query
statsLogger.getOpStatsLogger(ENTRY_DESERIALIZE_TIME_PER_QUERY)
.registerSuccessfulEvent(ENTRY_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
// register time spent waiting for message queue enqueue because message queue is full per query
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_PER_QUERY)
.registerSuccessfulEvent(MESSAGE_QUEUE_ENQUEUE_WAIT_TIME_sum, TimeUnit.NANOSECONDS);
// register number of messages deserialized per query
statsLogger.getOpStatsLogger(NUM_MESSAGES_DERSERIALIZED_PER_QUERY)
.registerSuccessfulValue(NUM_MESSAGES_DERSERIALIZED_sum);
// register number of read attempts per query
statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
.registerSuccessfulValue(READ_ATTEMTPS_SUCCESS_sum);
statsLogger.getOpStatsLogger(READ_ATTEMTPS_PER_QUERY)
.registerFailedValue(READ_ATTEMTPS_FAIL_sum);
// register total read latency for query
statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
.registerSuccessfulEvent(READ_LATENCY_SUCCESS_sum, TimeUnit.NANOSECONDS);
statsLogger.getOpStatsLogger(READ_LATENCY_PER_QUERY)
.registerFailedEvent(READ_LATENCY_FAIL_sum, TimeUnit.NANOSECONDS);
// register number of entries per query
statsLogger.getOpStatsLogger(NUM_ENTRIES_PER_QUERY)
.registerSuccessfulValue(NUM_ENTRIES_PER_BATCH_sum);
// register time spent waiting to read for message queue per query
statsLogger.getOpStatsLogger(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_PER_QUERY)
.registerSuccessfulEvent(MESSAGE_QUEUE_DEQUEUE_WAIT_TIME_sum, TimeUnit.MILLISECONDS);
// register time spent deserializing records per query
statsLogger.getOpStatsLogger(RECORD_DESERIALIZE_TIME_PER_QUERY)
.registerSuccessfulEvent(RECORD_DESERIALIZE_TIME_sum, TimeUnit.NANOSECONDS);
}
}
}
......@@ -24,6 +24,9 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicName;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
public class PulsarConnectorUtils {
public static Schema parseSchema(String schemaJson) {
......@@ -39,4 +42,40 @@ public class PulsarConnectorUtils {
+ ExceptionUtils.getRootCause(e).getLocalizedMessage(), e);
}
}
/**
* Create an instance of <code>userClassName</code> using provided <code>classLoader</code>.
* This instance should implement the provided interface <code>xface</code>.
*
* @param userClassName user class name
* @param xface the interface that the reflected instance should implement
* @param classLoader class loader to load the class.
* @return the instance
*/
public static <T> T createInstance(String userClassName,
Class<T> xface,
ClassLoader classLoader) {
Class<?> theCls;
try {
theCls = Class.forName(userClassName, true, classLoader);
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException("User class must be in class path", cnfe);
}
if (!xface.isAssignableFrom(theCls)) {
throw new RuntimeException(userClassName + " not " + xface.getName());
}
Class<T> tCls = (Class<T>) theCls.asSubclass(xface);
try {
Constructor<T> meth = tCls.getDeclaredConstructor();
return meth.newInstance();
} catch (InstantiationException ie) {
throw new RuntimeException("User class must be concrete", ie);
} catch (NoSuchMethodException e) {
throw new RuntimeException("User class must have a no-arg constructor", e);
} catch (IllegalAccessException e) {
throw new RuntimeException("User class must a public constructor", e);
} catch (InvocationTargetException e) {
throw new RuntimeException("User class constructor throws exception", e);
}
}
}
......@@ -35,14 +35,12 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageParser;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import java.io.IOException;
import java.util.List;
......@@ -77,24 +75,22 @@ public class PulsarRecordCursor implements RecordCursor {
private Message currentMessage;
private Map<String, PulsarInternalColumn> internalColumnMap = PulsarInternalColumn.getInternalFieldsMap();
private SchemaHandler schemaHandler;
private int batchSize;
private int maxBatchSize;
private AtomicLong completedBytes = new AtomicLong(0L);
private ReadEntries readEntries;
private DeserializeEntries deserializeEntries;
private TopicName topicName;
private PulsarConnectorMetricsTracker metricsTracker;
private static final Logger log = Logger.get(PulsarRecordCursor.class);
// Stats total execution time of split
private long startTime;
private static ManagedLedgerFactory initManagedLedgerFactory(PulsarConnectorConfig pulsarConnectorConfig) throws Exception {
ClientConfiguration bkClientConfiguration = new ClientConfiguration()
.setZkServers(pulsarConnectorConfig.getZookeeperUri())
.setAllowShadedLedgerManagerFactoryClass(true)
.setShadedLedgerManagerFactoryClassPrefix("org.apache.pulsar.shade.");
return new ManagedLedgerFactoryImpl(bkClientConfiguration);
}
private static final Logger log = Logger.get(PulsarRecordCursor.class);
public PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit,
PulsarConnectorConfig pulsarConnectorConfig) {
// Set start time for split
this.startTime = System.nanoTime();
PulsarConnectorCache pulsarConnectorCache;
try {
pulsarConnectorCache = PulsarConnectorCache.getConnectorCache(pulsarConnectorConfig);
......@@ -104,26 +100,29 @@ public class PulsarRecordCursor implements RecordCursor {
throw new RuntimeException(e);
}
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig,
pulsarConnectorCache.getManagedLedgerFactory());
pulsarConnectorCache.getManagedLedgerFactory(),
new PulsarConnectorMetricsTracker(pulsarConnectorCache.getStatsProvider()));
}
// Exposed for testing purposes
PulsarRecordCursor(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory) {
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory);
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory, PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
initialize(columnHandles, pulsarSplit, pulsarConnectorConfig, managedLedgerFactory, pulsarConnectorMetricsTracker);
}
private void initialize(List<PulsarColumnHandle> columnHandles, PulsarSplit pulsarSplit, PulsarConnectorConfig
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory) {
pulsarConnectorConfig, ManagedLedgerFactory managedLedgerFactory,
PulsarConnectorMetricsTracker pulsarConnectorMetricsTracker) {
this.columnHandles = columnHandles;
this.pulsarSplit = pulsarSplit;
this.pulsarConnectorConfig = pulsarConnectorConfig;
this.batchSize = pulsarConnectorConfig.getEntryReadBatchSize();
this.maxBatchSize = pulsarConnectorConfig.getMaxEntryReadBatchSize();
this.messageQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitMessageQueueSize());
this.entryQueue = new ArrayBlockingQueue<>(pulsarConnectorConfig.getMaxSplitEntryQueueSize());
this.topicName = TopicName.get("persistent",
NamespaceName.get(pulsarSplit.getSchemaName()),
pulsarSplit.getTableName());
this.metricsTracker = pulsarConnectorMetricsTracker;
Schema schema = PulsarConnectorUtils.parseSchema(pulsarSplit.getSchema());
......@@ -191,7 +190,7 @@ public class PulsarRecordCursor implements RecordCursor {
private final Thread thread;
public DeserializeEntries() {
this.thread = new Thread(this);
this.thread = new Thread(this, "derserialize-thread-split-" + pulsarSplit.getSplitId());
}
public void interrupt() {
......@@ -209,19 +208,42 @@ public class PulsarRecordCursor implements RecordCursor {
while (isRunning.get()) {
Entry entry;
try {
// start time for entry queue read
metricsTracker.start_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
// read from entry queue and block if empty
entry = entryQueue.take();
// record entry queue wait time stats
metricsTracker.end_ENTRY_QUEUE_DEQUEUE_WAIT_TIME();
} catch (InterruptedException e) {
break;
}
try {
completedBytes.addAndGet(entry.getDataBuffer().readableBytes());
long bytes = entry.getDataBuffer().readableBytes();
completedBytes.addAndGet(bytes);
// register stats for bytes read
metricsTracker.register_BYTES_READ(bytes);
// set start time for time deserializing entries for stats
metricsTracker.start_ENTRY_DESERIALIZE_TIME();
// filter entries that is not part of my split
if (((PositionImpl) entry.getPosition()).compareTo(pulsarSplit.getEndPosition()) < 0) {
try {
MessageParser.parseMessage(topicName, entry.getLedgerId(), entry.getEntryId(),
entry.getDataBuffer(), (messageId, message, byteBuf) -> {
try {
// start time for message queue read
metricsTracker.start_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
// enqueue deserialize message from this entry
messageQueue.put(message);
// stats for how long a read from message queue took
metricsTracker.end_MESSAGE_QUEUE_ENQUEUE_WAIT_TIME();
// stats for number of messages read
metricsTracker.incr_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
} catch (InterruptedException e) {
//no-op
}
......@@ -230,6 +252,11 @@ public class PulsarRecordCursor implements RecordCursor {
log.error(e, "Failed to parse message from pulsar topic %s", topicName.toString());
throw new RuntimeException(e);
}
// stats for time spend deserializing entries
metricsTracker.end_ENTRY_DESERIALIZE_TIME();
// stats for num messages per entry
metricsTracker.end_NUM_MESSAGES_DESERIALIZED_PER_ENTRY();
}
} finally {
entry.release();
......@@ -256,9 +283,19 @@ public class PulsarRecordCursor implements RecordCursor {
.compareTo(pulsarSplit.getEndPosition()) >= 0) {
isDone.set(true);
} else if (entryQueue.remainingCapacity() > batchSize) {
outstandingReadsRequests.decrementAndGet();
cursor.asyncReadEntries(batchSize, this, System.currentTimeMillis());
} else {
int batchSize = Math.min(maxBatchSize, entryQueue.remainingCapacity());
if (batchSize > 0) {
outstandingReadsRequests.decrementAndGet();
cursor.asyncReadEntries(batchSize, this, System.nanoTime());
// stats for successful read request
metricsTracker.incr_READ_ATTEMPTS_SUCCESS();
} else {
// stats for failed read request because entry queue is full
metricsTracker.incr_READ_ATTEMPTS_FAIL();
}
}
}
}
......@@ -267,6 +304,11 @@ public class PulsarRecordCursor implements RecordCursor {
public void readEntriesComplete(List<Entry> entries, Object ctx) {
entryQueue.addAll(entries);
outstandingReadsRequests.incrementAndGet();
//set read latency stats for success
metricsTracker.register_READ_LATENCY_PER_BATCH_SUCCESS(System.nanoTime() - (long)ctx);
//stats for number of entries read
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_SUCCESS(entries.size());
}
public boolean hashFinished() {
......@@ -278,6 +320,11 @@ public class PulsarRecordCursor implements RecordCursor {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
log.debug(exception, "Failed to read entries from topic %s", topicName.toString());
outstandingReadsRequests.incrementAndGet();
//set read latency stats for failed
metricsTracker.register_READ_LATENCY_PER_BATCH_FAIL(System.nanoTime() - (long)ctx);
//stats for number of entries read failed
metricsTracker.incr_NUM_ENTRIES_PER_BATCH_FAIL((long) maxBatchSize);
}
}
......@@ -286,12 +333,12 @@ public class PulsarRecordCursor implements RecordCursor {
public boolean advanceNextPosition() {
if (readEntries == null) {
readEntries = new ReadEntries();
readEntries.run();
// start deserialize thread
deserializeEntries = new DeserializeEntries();
deserializeEntries.start();
readEntries = new ReadEntries();
readEntries.run();
}
while(true) {
......@@ -309,13 +356,23 @@ public class PulsarRecordCursor implements RecordCursor {
} else {
try {
Thread.sleep(5);
// stats for time spent wait to read from message queue because its empty
metricsTracker.register_MESSAGE_QUEUE_DEQUEUE_WAIT_TIME(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
//start time for deseralizing record
metricsTracker.start_RECORD_DESERIALIZE_TIME();
currentRecord = this.schemaHandler.deserialize(this.currentMessage.getData());
metricsTracker.incr_NUM_RECORD_DESERIALIZED();
// stats for time spend deserializing
metricsTracker.end_RECORD_DESERIALIZE_TIME();
return true;
}
......@@ -423,6 +480,12 @@ public class PulsarRecordCursor implements RecordCursor {
log.error(e);
}
}
// set stat for total execution time of split
if (this.metricsTracker != null) {
this.metricsTracker.register_TOTAL_EXECUTION_TIME(System.nanoTime() - startTime);
this.metricsTracker.close();
}
}
private void checkFieldType(int field, Class<?> expected) {
......
......@@ -53,6 +53,7 @@ import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.javax.ws.rs.ClientErrorException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsProvider;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
......@@ -69,7 +70,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
......@@ -664,7 +664,7 @@ public abstract class TestPulsarConnector {
@BeforeMethod
public void setup() throws Exception {
this.pulsarConnectorConfig = spy(new PulsarConnectorConfig());
this.pulsarConnectorConfig.setEntryReadBatchSize(1);
this.pulsarConnectorConfig.setMaxEntryReadBatchSize(1);
this.pulsarConnectorConfig.setMaxSplitEntryQueueSize(10);
this.pulsarConnectorConfig.setMaxSplitMessageQueueSize(100);
......@@ -924,7 +924,8 @@ public abstract class TestPulsarConnector {
for (Map.Entry<TopicName, PulsarSplit> split : splits.entrySet()) {
PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(), pulsarConnectorConfig, managedLedgerFactory));
PulsarRecordCursor pulsarRecordCursor = spy(new PulsarRecordCursor(fooColumnHandles, split.getValue(),
pulsarConnectorConfig, managedLedgerFactory, new PulsarConnectorMetricsTracker(new NullStatsProvider())));
this.pulsarRecordCursors.put(split.getKey(), pulsarRecordCursor);
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册