提交 bd91d485 编写于 作者: M Matt Howlett

remove rktp->rktp_offsets.hi_offset + whitespace

上级 39ded0b8
......@@ -3638,7 +3638,7 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
RD_KAFKAP_STR_PR(&topic),
hdr.Partition,
AbortedTxnCnt);
rd_kafka_buf_skip(rkbuf,
AbortedTxnCnt * (8+8));
}
......@@ -3771,12 +3771,6 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
hdr.LastStableOffset,
tver->version, fetch_version);
/* Update hi offset to be able to compute
* consumer lag. */
rktp->rktp_offsets.hi_offset = end_offset;
/* High offset for get_watermark_offsets() */
rd_kafka_toppar_lock(rktp);
rktp->rktp_hi_offset = hdr.HighwaterMarkOffset;
......
......@@ -349,7 +349,7 @@ struct rd_kafka_conf_s {
rd_kafka_offset_method_t offset_store_method;
rd_kafka_isolation_level_t isolation_level;
int enable_partition_eof;
/*
......
......@@ -31,9 +31,9 @@
/**
/**
* @struct rd_kafka_aborted_txns_t
*
*
* @brief A collection of aborted transactions.
*/
typedef struct rd_kafka_aborted_txns_s {
......
......@@ -98,8 +98,8 @@ struct msgset_v2_hdr {
/**
* @struct rd_kafka_aborted_txn_start_offsets_t
*
* @brief A sorted list of aborted transaction start offsets
*
* @brief A sorted list of aborted transaction start offsets
* (ascending) for a PID, and an offset into that list.
*/
typedef struct rd_kafka_aborted_txn_start_offsets_s {
......@@ -127,20 +127,20 @@ typedef struct rd_kafka_msgset_reader_s {
struct msgset_v2_hdr *msetr_v2_hdr; /**< MessageSet v2 header */
/*
/*
* Aborted Transaction Start Offsets. These are arranged in a map
* (ABORTED_TXN_OFFSETS), with PID as the key and value as follows:
* - OFFSETS: sorted list of aborted transaction start offsets (ascending)
* - IDX: an index into OFFSETS list, initialized to 0.
*
* The logic for processing fetched data is as follows (note: this is
* The logic for processing fetched data is as follows (note: this is
* different from the Java client):
*
*
* 1. If the message is a transaction control message and the status is ABORT
* then increment ABORTED_TXN_OFFSETS(PID).IDX. note: sanity check that
* OFFSETS[ABORTED_TXN_OFFSETS(PID).IDX] is less than the current offset
* before incrementing. If the status is COMMIT, do nothing.
*
*
* 2. If the message is a normal message, find the corresponding OFFSETS list
* in ABORTED_TXN_OFFSETS. If it doesn't exist, then keep the message. If
* the PID does exist, compare ABORTED_TXN_OFFSETS(PID).IDX with
......@@ -148,10 +148,10 @@ typedef struct rd_kafka_msgset_reader_s {
* compare the message offset with OFFSETS[ABORTED_TXN_OFFSETS(PID).IDX].
* If it's greater than or equal to this value, then the message should be
* ignored. If it's less than, then the message should be kept.
*
*
* Note: A MessageSet comprises messages from at most one transaction, so the
* logic in step 2 is done at the message set level.
*/
*/
rd_kafka_aborted_txns_t *msetr_aborted_txns;
const struct rd_kafka_toppar_ver *msetr_tver; /**< Toppar op version of
......@@ -449,7 +449,7 @@ rd_kafka_msgset_reader_decompress (rd_kafka_msgset_reader_t *msetr,
rkbufz,
msetr->msetr_rktp,
msetr->msetr_tver,
/* there is no aborted transaction
/* there is no aborted transaction
* support for MsgVersion < 2 */
NULL,
&msetr->msetr_rkq);
......@@ -788,7 +788,7 @@ rd_kafka_msgset_reader_msg_v2 (rd_kafka_msgset_reader_t *msetr) {
rd_kafka_buf_read_i16(rkbuf, &ctrl_data.Type);
/* Client is uninterested in value of commit marker */
rd_kafka_buf_skip(rkbuf, (int32_t)(message_end
rd_kafka_buf_skip(rkbuf, (int32_t)(message_end
- rd_slice_offset(&rkbuf->rkbuf_reader)));
switch (ctrl_data.Type) {
......@@ -1075,7 +1075,7 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
int64_t txn_start_offset = rd_kafka_aborted_txns_get_offset(
msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID);
if (txn_start_offset != -1 &&
msetr->msetr_v2_hdr->BaseOffset >= txn_start_offset) {
/* MessageSet is part of an aborted transaction */
......@@ -1439,7 +1439,7 @@ void rd_kafka_aborted_txn_node_destroy (void *_node_ptr) {
/**
* @brief Allocate memory for, and initialize a new
* rd_kafka_aborted_txns_t struct.
* rd_kafka_aborted_txns_t struct.
*/
rd_kafka_aborted_txns_t *
rd_kafka_aborted_txns_new (int32_t txn_cnt) {
......@@ -1478,12 +1478,12 @@ rd_kafka_aborted_txns_offsets_for_pid (rd_kafka_aborted_txns_t *aborted_txns,
}
/**
/**
* @brief Get the next aborted transaction start
* offset for the specified pid.
*
*
* @param increment_idx if true, the offset index will be incremented.
*
*
* @returns the start offset or -1 if there is none.
*/
static int64_t
......@@ -1492,14 +1492,14 @@ rd_kafka_aborted_txns_next_offset (rd_kafka_aborted_txns_t *aborted_txns,
int64_t abort_start_offset;
rd_kafka_aborted_txn_start_offsets_t* node_ptr
= rd_kafka_aborted_txns_offsets_for_pid(aborted_txns, pid);
if (node_ptr == NULL)
return -1;
if (unlikely(node_ptr->offsets_idx >= rd_list_cnt(&node_ptr->offsets)))
return -1;
abort_start_offset =
abort_start_offset =
*((int64_t *)rd_list_elem(&node_ptr->offsets, node_ptr->offsets_idx));
if (increment_idx)
......@@ -1509,11 +1509,11 @@ rd_kafka_aborted_txns_next_offset (rd_kafka_aborted_txns_t *aborted_txns,
}
/**
/**
* @brief Get the next aborted transaction start
* offset for the specified pid and progress the
* current index to the next one.
*
*
* @returns the start offset or -1 if there is none.
*/
int64_t
......@@ -1523,10 +1523,10 @@ rd_kafka_aborted_txns_pop_offset (rd_kafka_aborted_txns_t *aborted_txns,
}
/**
/**
* @brief Get the next aborted transaction start
* offset for the specified pid.
*
*
* @returns the start offset or -1 if there is none.
*/
int64_t
......@@ -1537,7 +1537,7 @@ rd_kafka_aborted_txns_get_offset (const rd_kafka_aborted_txns_t *aborted_txns,
}
/**
/**
* @brief Add a transaction start offset corresponding
* to the specified pid to the aborted_txns collection.
*/
......@@ -1556,7 +1556,7 @@ rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
rd_list_init(&node_ptr->offsets, 0, NULL);
/* Each PID list has no more than AbortedTxnCnt elements */
rd_list_prealloc_elems(&node_ptr->offsets,
sizeof(int64_t),
sizeof(int64_t),
aborted_txns->cnt, 0);
RD_AVL_INSERT(&aborted_txns->avl, node_ptr, avl_node);
rd_list_add(&aborted_txns->list, node_ptr);
......@@ -1567,7 +1567,7 @@ rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
}
/**
/**
* @brief Sort each of the abort transaction start
* offset lists for each pid.
*/
......
......@@ -41,7 +41,6 @@ extern const char *rd_kafka_fetch_states[];
struct offset_stats {
int64_t fetch_offset; /**< Next offset to fetch */
int64_t eof_offset; /**< Last offset we reported EOF for */
int64_t hi_offset; /**< Current broker hi offset */
};
/**
......@@ -50,7 +49,6 @@ struct offset_stats {
static RD_UNUSED void rd_kafka_offset_stats_reset (struct offset_stats *offs) {
offs->fetch_offset = 0;
offs->eof_offset = RD_KAFKA_OFFSET_INVALID;
offs->hi_offset = RD_KAFKA_OFFSET_INVALID;
}
......
......@@ -50,7 +50,7 @@
*
* - Uses the TransactionProducerCli Java application to produce messages
* that are part of abort and commit transactions in various combinations
* and tests that librdkafka consumes them as expected. Refer to
* and tests that librdkafka consumes them as expected. Refer to
* TransactionProducerCli.java for scenarios covered.
*/
......@@ -139,7 +139,7 @@ static void execute_java_produce_cli(std::string &bootstrapServers,
}
static std::vector<RdKafka::Message *> consume_messages(
RdKafka::KafkaConsumer *c,
RdKafka::KafkaConsumer *c,
std::string topic,
int partition) {
RdKafka::ErrorCode err;
......@@ -151,7 +151,7 @@ static std::vector<RdKafka::Message *> consume_messages(
Test::Fail("assign failed: " + RdKafka::err2str(err));
RdKafka::TopicPartition::destroy(parts);
Test::Say("Consuming from topic " + topic + "\n");
Test::Say("Consuming from topic " + topic + "\n");
std::vector<RdKafka::Message *> result = std::vector<RdKafka::Message *>();
while (true) {
......@@ -168,7 +168,7 @@ static std::vector<RdKafka::Message *> consume_messages(
result.push_back(msg);
continue;
default:
Test::Fail("Error consuming from topic " +
Test::Fail("Error consuming from topic " +
topic + ": " + msg->errstr());
delete msg;
break;
......@@ -260,7 +260,7 @@ static void do_test_consumer_txn_test (void) {
execute_java_produce_cli(bootstrap_servers, topic_name, "0");
msgs = consume_messages(c, topic_name, 0);
test_assert(msgs.size() == 5,
test_assert(msgs.size() == 5,
tostr() << "Consumed unexpected number of messages. "
"Expected 5, got: "
<< msgs.size());
......@@ -474,7 +474,7 @@ static void do_test_consumer_txn_test (void) {
test_assert(msgs[9]->key_len() >= 1 && 0x34 == msgs[9]->key()->c_str()[0],
"Unexpected key");
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
......@@ -761,7 +761,7 @@ static void do_test_consumer_txn_test (void) {
"Expected 13, got: "
<< msgs.size());
delete_messages(msgs);
Test::delete_topic(c, topic_name.c_str());
c->close();
......
......@@ -46,9 +46,9 @@ public class TransactionProducerCli {
}
static void makeTestMessages(
Producer<byte[], byte[]> producer,
Producer<byte[], byte[]> producer,
String topic, int partition,
int idStart, int count,
int idStart, int count,
TransactionType tt,
FlushType flush) throws InterruptedException {
byte[] payload = { 0x10, 0x20, 0x30, 0x40 };
......@@ -216,6 +216,7 @@ public class TransactionProducerCli {
case "6":
makeTestMessages(producer3, topic, 0, 0x10, 1, TransactionType.None, FlushType.Yes);
makeTestMessages(producer1, topic, 0, 0x20, 3, TransactionType.BeginOpen, FlushType.Yes);
System.exit(0);
break;
default:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册