提交 2f374530 编写于 作者: M Matt Howlett

addressing more review feedback

上级 bd165bc2
......@@ -106,7 +106,7 @@ fetch.max.bytes | C | 0 .. 2147483135 | 52428800
fetch.min.bytes | C | 1 .. 100000000 | 1 | low | Minimum number of bytes the broker responds with. If fetch.wait.max.ms expires the accumulated data will be sent to the client regardless of this setting. <br>*Type: integer*
fetch.error.backoff.ms | C | 0 .. 300000 | 500 | medium | How long to postpone the next fetch request for a topic+partition in case of a fetch error. <br>*Type: integer*
offset.store.method | C | none, file, broker | broker | low | **DEPRECATED** Offset commit store method: 'file' - DEPRECATED: local file store (offset.store.path, et.al), 'broker' - broker commit store (requires Apache Kafka 0.8.2 or later on the broker). <br>*Type: enum value*
isolation.level | C | read_uncommitted, read_committed | read_uncommitted | medium | Controls how to read messages written transactionally: `read_committed` - only return transactional messages which have been committed. `read_uncommitted` - return all messages, even messages for aborted and non-committed transactions. <br>*Type: enum value*
isolation.level | C | read_uncommitted, read_committed | read_uncommitted | high | Controls how to read messages written transactionally: `read_committed` - only return transactional messages which have been committed. `read_uncommitted` - return all messages, even transactional messages which have been aborted. <br>*Type: enum value*
consume_cb | C | | | low | Message consume callback (set with rd_kafka_conf_set_consume_cb()) <br>*Type: pointer*
rebalance_cb | C | | | low | Called after consumer group has been rebalanced (set with rd_kafka_conf_set_rebalance_cb()) <br>*Type: pointer*
offset_commit_cb | C | | | low | Offset commit result propagation callback. (set with rd_kafka_conf_set_offset_commit_cb()) <br>*Type: pointer*
......
......@@ -3559,7 +3559,6 @@ static void rd_kafka_toppar_fetch_backoff (rd_kafka_broker_t *rkb,
}
/**
* Parses and handles a Fetch reply.
* Returns 0 on success or an error code on failure.
......@@ -3628,12 +3627,27 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
&AbortedTxnCnt);
if (rkb->rkb_rk->rk_conf.isolation_level ==
RD_KAFKA_READ_COMMITTED)
end_offset = hdr.LastStableOffset;
RD_KAFKA_READ_UNCOMMITTED) {
if (AbortedTxnCnt > 0) {
if (unlikely(AbortedTxnCnt > 0)) {
rd_rkb_log(rkb, LOG_ERR, "FETCH",
"%.*s [%"PRId32"]: "
"%"PRId32" aborted transaction(s) "
"encountered in READ_UNCOMMITTED "
"fetch response - ignoring.",
RD_KAFKAP_STR_PR(&topic),
hdr.Partition,
AbortedTxnCnt);
rd_kafka_buf_skip(rkbuf,
AbortedTxnCnt * (8+8));
}
}
else if (AbortedTxnCnt > 0) {
int k;
end_offset = hdr.LastStableOffset;
if (unlikely(AbortedTxnCnt > 1000000))
rd_kafka_buf_parse_fail(
rkbuf,
......@@ -3652,21 +3666,6 @@ rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
rd_kafka_aborted_txns_add(aborted_txns, Pid, FirstOffset);
}
rd_kafka_aborted_txns_sort(aborted_txns);
if (unlikely(rkb->rkb_rk->rk_conf.isolation_level ==
RD_KAFKA_READ_UNCOMMITTED)) {
rd_rkb_log(rkb, LOG_ERR, "FETCH",
"%.*s [%"PRId32"]: "
"%"PRId32" aborted transaction(s) "
"encountered in READ_UNCOMMITTED "
"fetch response - ignoring.",
RD_KAFKAP_STR_PR(&topic),
hdr.Partition,
AbortedTxnCnt);
rd_kafka_aborted_txns_destroy(aborted_txns);
aborted_txns = NULL;
}
}
} else
hdr.LastStableOffset = -1;
......
......@@ -982,13 +982,13 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{ RD_KAFKA_OFFSET_METHOD_BROKER, "broker" }
}
},
{ _RK_GLOBAL|_RK_CONSUMER|_RK_MED, "isolation.level",
{ _RK_GLOBAL|_RK_CONSUMER|_RK_HIGH, "isolation.level",
_RK_C_S2I,
_RK(isolation_level),
"Controls how to read messages written transactionally: "
"`read_committed` - only return transactional messages which have "
"been committed. `read_uncommitted` - return all messages, even "
"messages for aborted and non-committed transactions.",
"transactional messages which have been aborted.",
.vdef = RD_KAFKA_READ_UNCOMMITTED,
.s2i = {
{ RD_KAFKA_READ_UNCOMMITTED, "read_uncommitted" },
......
......@@ -37,63 +37,33 @@
* @brief A collection of aborted transactions.
*/
typedef struct rd_kafka_aborted_txns_s {
rd_avl_t *avl;
/* Note: A list of nodes is maintained alongside
* the AVL tree to facilitate traversal.
*/
rd_list_t *list;
int32_t cnt;
rd_avl_t avl;
/* Note: A list of nodes is maintained alongside
* the AVL tree to facilitate traversal.
*/
rd_list_t list;
int32_t cnt;
} rd_kafka_aborted_txns_t;
/**
* @brief Allocate memory for, and initialize a new
* rd_kafka_aborted_txns_t struct.
*/
rd_kafka_aborted_txns_t *rd_kafka_aborted_txns_new (int32_t txn_cnt);
/**
* @brief Free all resources associated with a
* rd_kafka_aborted_txns_t struct.
*/
void
rd_kafka_aborted_txns_destroy (rd_kafka_aborted_txns_t *aborted_txns);
/**
* @brief Get the next aborted transaction start
* offset for the specified pid. Returns -1 if
* there is none.
*/
int64_t
rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid);
/**
* @brief Move to the next aborted transaction start
* offset for the specified pid.
*/
void
rd_kafka_aborted_txns_move_to_next(rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid);
rd_kafka_aborted_txns_get_offset (const rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid);
int64_t
rd_kafka_aborted_txns_pop_offset (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid);
/**
* @brief Sort each of the abort transaction start
* offset lists for each pid.
*/
void
rd_kafka_aborted_txns_sort(rd_kafka_aborted_txns_t *aborted_txns);
rd_kafka_aborted_txns_sort (rd_kafka_aborted_txns_t *aborted_txns);
/**
* @brief Add a transaction start offset corresponding
* to the specified pid to the aborted_txns collection.
*/
void
rd_kafka_aborted_txns_add(rd_kafka_aborted_txns_t *aborted_txns,
rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid,
int64_t first_offset);
......@@ -118,10 +88,6 @@ rd_kafka_msgset_parse (rd_kafka_buf_t *rkbuf,
rd_kafka_aborted_txns_t *aborted_txns,
const struct rd_kafka_toppar_ver *tver);
/**
* @brief Unit tests for all functions that operate on
* rd_kafka_aborted_txns_t
*/
int unittest_aborted_txns (void);
#endif /* _RDKAFKA_MSGSET_H_ */
......@@ -804,28 +804,26 @@ rd_kafka_msgset_reader_msg_v2 (rd_kafka_msgset_reader_t *msetr) {
if (unlikely(!msetr->msetr_aborted_txns))
goto unexpected_abort_txn;
aborted_txn_start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
/* This marks the end of this (aborted) transaction,
* advance to next aborted transaction in list */
aborted_txn_start_offset = rd_kafka_aborted_txns_pop_offset(
msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID);
if (unlikely(aborted_txn_start_offset == -1))
goto unexpected_abort_txn;
if (unlikely(aborted_txn_start_offset >= hdr.Offset)) {
if (unlikely(aborted_txn_start_offset >= hdr.Offset))
rd_rkb_log(msetr->msetr_rkb, LOG_ERR, "TXN",
"%s [%"PRId32"]: "
"Abort txn ctrl msg bad order "
"at offset %"PRId64". Expected "
" before or at %"PRId64,
"before or at %"PRId64". Messages "
"in aborted transactions may be "
" delivered to the application.",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
hdr.Offset, aborted_txn_start_offset);
break;
}
/* This marks the end of this (aborted) transaction,
* advance to next aborted transaction in list */
rd_kafka_aborted_txns_move_to_next(
msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID);
break;
unexpected_abort_txn:
......@@ -833,21 +831,22 @@ unexpected_abort_txn:
"%s [%"PRId32"]: "
"Received abort txn ctrl msg for "
"unknown txn PID %"PRId64" at "
"offset %"PRId64": ignoring",
"offset %"PRId64". Ignoring",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
msetr->msetr_v2_hdr->PID, hdr.Offset);
break;
default:
rd_kafka_buf_parse_fail(rkbuf,
rd_rkb_dbg(msetr->msetr_rkb, MSG, "TXN"
"%s [%"PRId32"]: "
"Unsupported ctrl message "
"type %"PRId16" at offset"
" %"PRId64,
" %"PRId64". Ignoring",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
ctrl_data.Type, hdr.Offset);
break;
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
......@@ -1013,6 +1012,9 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
rd_kafka_buf_read_i32(rkbuf, &hdr.BaseSequence);
rd_kafka_buf_read_i32(rkbuf, &hdr.RecordCount);
if (hdr.Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL)
msetr->msetr_ctrl_cnt++;
/* Payload size is hdr.Length - MessageSet headers */
payload_size = hdr.Length - (rd_slice_offset(&rkbuf->rkbuf_reader) -
len_start);
......@@ -1032,9 +1034,6 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
goto done;
}
if (hdr.Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL)
msetr->msetr_ctrl_cnt++;
msetr->msetr_v2_hdr = &hdr;
/* Handle compressed MessageSet */
......@@ -1074,12 +1073,19 @@ rd_kafka_msgset_reader_v2 (rd_kafka_msgset_reader_t *msetr) {
msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_TRANSACTIONAL &&
!(msetr->msetr_v2_hdr->Attributes & RD_KAFKA_MSGSET_V2_ATTR_CONTROL)) {
int64_t txn_start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
int64_t txn_start_offset = rd_kafka_aborted_txns_get_offset(
msetr->msetr_aborted_txns, msetr->msetr_v2_hdr->PID);
if (txn_start_offset >= 0 &&
if (txn_start_offset != -1 &&
msetr->msetr_v2_hdr->BaseOffset >= txn_start_offset) {
/* MessageSet is part of an aborted transaction */
rd_rkb_dbg(msetr->msetr_rkb, MSG, "MSG",
"%s [%"PRId32"]: "
"Skipping %"PRId32" message(s) "
"in aborted transaction.",
rktp->rktp_rkt->rkt_topic->str,
rktp->rktp_partition,
msetr->msetr_v2_hdr->RecordCount);
rd_slice_widen(&rkbuf->rkbuf_reader, &save_slice);
goto done;
}
......@@ -1409,88 +1415,134 @@ rd_kafka_msgset_parse (rd_kafka_buf_t *rkbuf,
*/
static int rd_kafka_offset_cmp (const void *_a, const void *_b) {
const int64_t *a = _a, *b = _b;
return *a - *b;
const int64_t diff = *a - *b;
return diff < 0 ? -1 : (diff > 0 ? 1 : 0);
}
/**
* @brief Pid comparator for rd_kafka_aborted_txn_start_offsets_t
*/
static int rd_kafka_aborted_txn_cmp_by_pid (const void *_a, const void *_b) {
const rd_kafka_aborted_txn_start_offsets_t *a = _a, *b = _b;
return a->pid - b->pid;
const int64_t diff = a->pid - b->pid;
return diff < 0 ? -1 : (diff > 0 ? 1 : 0);
}
/**
* @brief Free resources associated with an AVL tree node.
*/
void rd_kafka_aborted_txn_node_destroy (void *offsets) {
rd_list_destroy(&((rd_kafka_aborted_txn_start_offsets_t *)offsets)->offsets);
void rd_kafka_aborted_txn_node_destroy (void *_node_ptr) {
rd_kafka_aborted_txn_start_offsets_t *node_ptr = _node_ptr;
rd_list_destroy(&node_ptr->offsets);
}
/**
* @brief Allocate memory for, and initialize a new
* rd_kafka_aborted_txns_t struct.
*/
rd_kafka_aborted_txns_t *
rd_kafka_aborted_txns_new (int32_t txn_cnt) {
rd_kafka_aborted_txns_t *aborted_txns;
aborted_txns = rd_malloc(sizeof(aborted_txns));
aborted_txns->avl = rd_malloc(sizeof(*aborted_txns->avl));
rd_avl_init(aborted_txns->avl, rd_kafka_aborted_txn_cmp_by_pid, 0);
aborted_txns->list = rd_malloc(sizeof(*aborted_txns->list));
rd_list_init(aborted_txns->list, 0,
aborted_txns = rd_malloc(sizeof(*aborted_txns));
rd_avl_init(&aborted_txns->avl, rd_kafka_aborted_txn_cmp_by_pid, 0);
rd_list_init(&aborted_txns->list, txn_cnt,
rd_kafka_aborted_txn_node_destroy);
aborted_txns->cnt = txn_cnt;
return aborted_txns;
}
/**
* @brief Free all resources associated with a
* rd_kafka_aborted_txns_t struct.
*/
void
rd_kafka_aborted_txns_destroy (rd_kafka_aborted_txns_t *aborted_txns) {
rd_list_destroy(aborted_txns->list);
rd_avl_destroy(aborted_txns->avl);
rd_free(aborted_txns->avl);
rd_free(aborted_txns->list);
rd_list_destroy(&aborted_txns->list);
rd_avl_destroy(&aborted_txns->avl);
rd_free(aborted_txns);
}
rd_kafka_aborted_txn_start_offsets_t*
/**
* @brief Get the abort txn start offsets corresponding to
* the specified pid.
*/
rd_kafka_aborted_txn_start_offsets_t *
rd_kafka_aborted_txns_offsets_for_pid (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid) {
rd_kafka_aborted_txn_start_offsets_t node;
node.pid = pid;
return RD_AVL_FIND(aborted_txns->avl, &node);
return RD_AVL_FIND(&aborted_txns->avl, &node);
}
void
rd_kafka_aborted_txns_move_to_next (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid) {
/**
* @brief Get the next aborted transaction start
* offset for the specified pid.
*
* @param increment_idx it true, the offset index will be incremented.
*
* @returns the start offset or -1 if there is none.
*/
int64_t
rd_kafka_aborted_txns_next_offset (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid, rd_bool_t increment_idx) {
int64_t abort_start_offset;
rd_kafka_aborted_txn_start_offsets_t* node_ptr
= rd_kafka_aborted_txns_offsets_for_pid(aborted_txns, pid);
node_ptr->offsets_idx++;
}
int64_t
rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid) {
rd_kafka_aborted_txn_start_offsets_t node, *node_ptr;
node.pid = pid;
node_ptr = RD_AVL_FIND(aborted_txns->avl, &node);
if (node_ptr == NULL)
return -1;
if (unlikely(node_ptr->offsets_idx >= rd_list_cnt(&node_ptr->offsets)))
return -1;
int64_t abort_start_offset =
abort_start_offset =
*((int64_t *)rd_list_elem(&node_ptr->offsets, node_ptr->offsets_idx));
if (increment_idx)
node_ptr->offsets_idx++;
return abort_start_offset;
}
/**
* @brief Get the next aborted transaction start
* offset for the specified pid and progress the
* current index to the next one.
*
* @returns the start offset or -1 if there is none.
*/
int64_t
rd_kafka_aborted_txns_pop_offset (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid) {
return rd_kafka_aborted_txns_next_offset(aborted_txns, pid, true);
}
/**
* @brief Get the next aborted transaction start
* offset for the specified pid.
*
* @returns the start offset or -1 if there is none.
*/
int64_t
rd_kafka_aborted_txns_get_offset (const rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid) {
return rd_kafka_aborted_txns_next_offset(
(rd_kafka_aborted_txns_t *)aborted_txns, pid, false);
}
/**
* @brief Add a transaction start offset corresponding
* to the specified pid to the aborted_txns collection.
*/
void
rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
int64_t pid,
......@@ -1498,7 +1550,7 @@ rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
int64_t *v;
rd_kafka_aborted_txn_start_offsets_t *node_ptr
= rd_kafka_aborted_txns_offsets_for_pid(aborted_txns, pid);
if (!node_ptr) {
node_ptr = rd_malloc(sizeof(*node_ptr));
node_ptr->pid = pid;
......@@ -1508,8 +1560,8 @@ rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
rd_list_prealloc_elems(&node_ptr->offsets,
sizeof(int64_t),
aborted_txns->cnt, 0);
RD_AVL_INSERT(aborted_txns->avl, node_ptr, avl_node);
rd_list_add(aborted_txns->list, node_ptr);
RD_AVL_INSERT(&aborted_txns->avl, node_ptr, avl_node);
rd_list_add(&aborted_txns->list, node_ptr);
}
v = rd_list_add(&node_ptr->offsets, NULL);
......@@ -1517,12 +1569,16 @@ rd_kafka_aborted_txns_add (rd_kafka_aborted_txns_t *aborted_txns,
}
/**
* @brief Sort each of the abort transaction start
* offset lists for each pid.
*/
void
rd_kafka_aborted_txns_sort (rd_kafka_aborted_txns_t *aborted_txns) {
int k;
for (k = 0; k < rd_list_cnt(aborted_txns->list); k++) {
for (k = 0; k < rd_list_cnt(&aborted_txns->list); k++) {
rd_kafka_aborted_txn_start_offsets_t *el =
rd_list_elem(aborted_txns->list, k);
rd_list_elem(&aborted_txns->list, k);
rd_list_sort(&el->offsets, rd_kafka_offset_cmp);
}
}
......@@ -1532,8 +1588,7 @@ rd_kafka_aborted_txns_sort (rd_kafka_aborted_txns_t *aborted_txns) {
* @brief Unit tests for all functions that operate on
* rd_kafka_aborted_txns_t
*/
int unittest_aborted_txns (void)
{
int unittest_aborted_txns (void) {
rd_kafka_aborted_txns_t *aborted_txns = NULL;
int64_t start_offset;
......@@ -1547,83 +1602,88 @@ int unittest_aborted_txns (void)
rd_kafka_aborted_txns_add(aborted_txns, 1, 3);
rd_kafka_aborted_txns_sort(aborted_txns);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 1);
RD_UT_ASSERT(3 == start_offset,
"queried start offset was %"PRId64", "
"exected 3", start_offset);
"expected 3", start_offset);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 1);
RD_UT_ASSERT(3 == start_offset,
"queried start offset was %"PRId64", "
"exected 3", start_offset);
"expected 3", start_offset);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 1);
start_offset = rd_kafka_aborted_txns_pop_offset(
aborted_txns, 1);
RD_UT_ASSERT(3 == start_offset,
"queried start offset was %"PRId64", "
"expected 3", start_offset);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 1);
RD_UT_ASSERT(10 == start_offset,
"queried start offset was %"PRId64", "
"exected 10", start_offset);
"expected 10", start_offset);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 2);
RD_UT_ASSERT(7 == start_offset,
"queried start offset was %"PRId64", "
"exected 7", start_offset);
"expected 7", start_offset);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 1);
rd_kafka_aborted_txns_pop_offset(aborted_txns, 1);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 1);
RD_UT_ASSERT(42 == start_offset,
"queried start offset was %"PRId64", "
"exected 42", start_offset);
"expected 42", start_offset);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 1);
rd_kafka_aborted_txns_pop_offset(aborted_txns, 1);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 1);
RD_UT_ASSERT(44 == start_offset,
"queried start offset was %"PRId64", "
"exected 44", start_offset);
"expected 44", start_offset);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 2);
RD_UT_ASSERT(7 == start_offset,
"queried start offset was %"PRId64", "
"exected 7", start_offset);
"expected 7", start_offset);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 2);
rd_kafka_aborted_txns_pop_offset(aborted_txns, 2);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 2);
RD_UT_ASSERT(11 == start_offset,
"queried start offset was %"PRId64", "
"exected 11", start_offset);
"expected 11", start_offset);
/* error cases */
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 3);
RD_UT_ASSERT(-1 == start_offset,
"queried start offset was %"PRId64", "
"exected -1", start_offset);
"expected -1", start_offset);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 1);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 1);
rd_kafka_aborted_txns_move_to_next(aborted_txns, 2);
rd_kafka_aborted_txns_pop_offset(aborted_txns, 1);
rd_kafka_aborted_txns_pop_offset(aborted_txns, 1);
rd_kafka_aborted_txns_pop_offset(aborted_txns, 2);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 1);
RD_UT_ASSERT(-1 == start_offset,
"queried start offset was %"PRId64", "
"exected -1", start_offset);
start_offset = rd_kafka_aborted_txns_next_aborted_txn_offset_for_pid(
"expected -1", start_offset);
start_offset = rd_kafka_aborted_txns_get_offset(
aborted_txns, 2);
RD_UT_ASSERT(-1 == start_offset,
"queried start offset was %"PRId64", "
"exected -1", start_offset);
"expected -1", start_offset);
rd_kafka_aborted_txns_destroy(aborted_txns);
......
......@@ -269,8 +269,8 @@ struct rd_kafka_toppar_s { /* rd_kafka_toppar_t */
* by broker thread.
* Locks: toppar_lock */
int64_t rktp_ls_offset; /* Current last stable offset
* Locks: toppar_lock */
int64_t rktp_ls_offset; /**< Current last stable offset
* Locks: toppar_lock */
int64_t rktp_hi_offset; /* Current high watermark offset.
* Locks: toppar_lock */
int64_t rktp_lo_offset; /* Current broker low offset.
......
此差异已折叠。
......@@ -97,8 +97,8 @@ namespace Test {
}
/**
* @brief Create a topic using the Admin API
*/
* @brief Create a topic
*/
static RD_UNUSED void create_topic (RdKafka::Handle *use_handle, const char *topicname,
int partition_cnt, int replication_factor) {
rd_kafka_t *use_rk = NULL;
......@@ -108,8 +108,8 @@ namespace Test {
}
/**
* @brief Delete a topic using the Admin API
*/
* @brief Delete a topic
*/
static RD_UNUSED void delete_topic (RdKafka::Handle *use_handle, const char *topicname) {
rd_kafka_t *use_rk = NULL;
if (use_handle != NULL)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册