提交 ba6c1ad1 编写于 作者: M Magnus Edenhill

Make rd_kafka_pause|resume_partitions() synchronous (#2455)

This makes sure that a consumer_poll() call after pause() will not
return any messages.
上级 4d6c51d9
......@@ -631,11 +631,13 @@ rd_kafka_rebalance_op (rd_kafka_cgrp_t *rkcg,
rkcg->rkcg_c.rebalance_cnt++;
rd_kafka_wrunlock(rkcg->rkcg_rk);
/* Pause current partition set consumers until new assign() is called */
if (rkcg->rkcg_assignment)
rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 1,
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
rkcg->rkcg_assignment);
/* Pause current partition set consumers until new assign() is called */
if (rkcg->rkcg_assignment)
rd_kafka_toppars_pause_resume(rkcg->rkcg_rk,
rd_true/*pause*/,
RD_ASYNC,
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
rkcg->rkcg_assignment);
if (!(rkcg->rkcg_rk->rk_conf.enabled_events & RD_KAFKA_EVENT_REBALANCE)
|| !assignment
......@@ -2305,9 +2307,11 @@ rd_kafka_cgrp_unassign (rd_kafka_cgrp_t *rkcg) {
rd_kafka_toppar_unlock(rktp);
}
/* Resume partition consumption. */
rd_kafka_toppars_pause_resume(rkcg->rkcg_rk, 0/*resume*/,
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
/* Resume partition consumption. */
rd_kafka_toppars_pause_resume(rkcg->rkcg_rk,
rd_false/*resume*/,
RD_ASYNC,
RD_KAFKA_TOPPAR_F_LIB_PAUSE,
old_assignment);
rd_kafka_topic_partition_list_destroy(old_assignment);
......
......@@ -1964,8 +1964,12 @@ rd_ts_t rd_kafka_broker_consumer_toppar_serve (rd_kafka_broker_t *rkb,
/**
* Serve a toppar op
* 'rktp' may be NULL for certain ops (OP_RECV_BUF)
* @brief Serve a toppar op
*
* @param rktp may be NULL for certain ops (OP_RECV_BUF)
*
* Will send an empty reply op if the request rko has a replyq set,
* providing synchronous operation.
*
* @locality toppar handler thread
*/
......@@ -1998,7 +2002,7 @@ rd_kafka_toppar_op_serve (rd_kafka_t *rk,
#if ENABLE_DEVEL
rd_kafka_op_print(stdout, "PART_OUTDATED", rko);
#endif
rd_kafka_op_destroy(rko);
rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR__OUTDATED);
return RD_KAFKA_OP_RES_HANDLED;
}
}
......@@ -2114,7 +2118,7 @@ rd_kafka_toppar_op_serve (rd_kafka_t *rk,
break;
}
rd_kafka_op_destroy(rko);
rd_kafka_op_reply(rko, RD_KAFKA_RESP_ERR_NO_ERROR);
return RD_KAFKA_OP_RES_HANDLED;
}
......@@ -2256,16 +2260,17 @@ rd_kafka_resp_err_t rd_kafka_toppar_op_seek (rd_kafka_toppar_t *rktp,
/**
* Pause/resume partition (async operation).
* \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
* depending on if the app paused or librdkafka.
* \p pause is 1 for pausing or 0 for resuming.
* @brief Pause/resume partition (async operation).
*
* Locality: any
* @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
* depending on if the app paused or librdkafka.
* @param pause is 1 for pausing or 0 for resuming.
*
* @locality any
*/
static rd_kafka_resp_err_t
rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp,
int pause, int flag) {
rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp, int pause, int flag,
rd_kafka_replyq_t replyq) {
int32_t version;
rd_kafka_op_t *rko;
......@@ -2283,7 +2288,7 @@ rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp,
rko->rko_u.pause.pause = pause;
rko->rko_u.pause.flag = flag;
rd_kafka_toppar_op0(rktp, rko, RD_KAFKA_NO_REPLYQ);
rd_kafka_toppar_op0(rktp, rko, replyq);
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
......@@ -2293,20 +2298,29 @@ rd_kafka_toppar_op_pause_resume (rd_kafka_toppar_t *rktp,
/**
* Pause or resume a list of partitions.
* \p flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
* depending on if the app paused or librdkafka.
* \p pause is 1 for pausing or 0 for resuming.
* @brief Pause or resume a list of partitions.
*
* Locality: any
* @param flag is either RD_KAFKA_TOPPAR_F_APP_PAUSE or .._F_LIB_PAUSE
* depending on if the app paused or librdkafka.
* @param pause true for pausing, false for resuming.
* @param async RD_SYNC to wait for background thread to handle op,
* RD_ASYNC for asynchronous operation.
*
* @locality any
*
* @remark This is an asynchronous call, the actual pause/resume is performed
* by toppar_pause() in the toppar's handler thread.
*/
rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
rd_kafka_topic_partition_list_t *partitions) {
int i;
rd_kafka_toppars_pause_resume (rd_kafka_t *rk,
rd_bool_t pause, rd_async_t async, int flag,
rd_kafka_topic_partition_list_t *partitions) {
int i;
int waitcnt = 0;
rd_kafka_q_t *tmpq = NULL;
if (!async)
tmpq = rd_kafka_q_new(rk);
rd_kafka_dbg(rk, TOPIC, pause ? "PAUSE":"RESUME",
"%s %s %d partition(s)",
......@@ -2332,13 +2346,24 @@ rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
rktp = rd_kafka_toppar_s2i(s_rktp);
rd_kafka_toppar_op_pause_resume(rktp, pause, flag);
rd_kafka_toppar_op_pause_resume(rktp, pause, flag,
RD_KAFKA_REPLYQ(tmpq, 0));
if (!async)
waitcnt++;
rd_kafka_toppar_destroy(s_rktp);
rktpar->err = RD_KAFKA_RESP_ERR_NO_ERROR;
}
if (!async) {
while (waitcnt-- > 0)
rd_kafka_q_wait_result(tmpq, RD_POLL_INFINITE);
rd_kafka_q_destroy_owner(tmpq);
}
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
......
......@@ -505,8 +505,9 @@ void rd_kafka_toppar_leader_unavailable (rd_kafka_toppar_t *rktp,
rd_kafka_resp_err_t err);
rd_kafka_resp_err_t
rd_kafka_toppars_pause_resume (rd_kafka_t *rk, int pause, int flag,
rd_kafka_topic_partition_list_t *partitions);
rd_kafka_toppars_pause_resume (rd_kafka_t *rk,
rd_bool_t pause, rd_async_t async, int flag,
rd_kafka_topic_partition_list_t *partitions);
rd_kafka_topic_partition_t *rd_kafka_topic_partition_new (const char *topic,
......
......@@ -171,16 +171,22 @@ rd_kafka_subscription (rd_kafka_t *rk,
rd_kafka_resp_err_t
rd_kafka_pause_partitions (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions) {
return rd_kafka_toppars_pause_resume(rk, 1, RD_KAFKA_TOPPAR_F_APP_PAUSE,
partitions);
rd_kafka_topic_partition_list_t *partitions) {
return rd_kafka_toppars_pause_resume(rk,
rd_true/*pause*/,
RD_SYNC,
RD_KAFKA_TOPPAR_F_APP_PAUSE,
partitions);
}
rd_kafka_resp_err_t
rd_kafka_resume_partitions (rd_kafka_t *rk,
rd_kafka_topic_partition_list_t *partitions) {
return rd_kafka_toppars_pause_resume(rk, 0, RD_KAFKA_TOPPAR_F_APP_PAUSE,
partitions);
rd_kafka_topic_partition_list_t *partitions) {
return rd_kafka_toppars_pause_resume(rk,
rd_false/*resume*/,
RD_SYNC,
RD_KAFKA_TOPPAR_F_APP_PAUSE,
partitions);
}
......@@ -51,6 +51,15 @@ typedef uint8_t rd_bool_t;
#define rd_false 0
/**
* @enum Denotes an async or sync operation
*/
typedef enum {
RD_SYNC = 0, /**< Synchronous/blocking */
RD_ASYNC, /**< Asynchronous/non-blocking */
} rd_async_t;
/*
* Helpers
*/
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册