Skip to content

Commit

Permalink
Fixup writer..
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Nov 7, 2021
1 parent ff6f86a commit c581c43
Showing 1 changed file with 15 additions and 5 deletions.
20 changes: 15 additions & 5 deletions src/rdkafka_msgset_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ static int rd_kafka_msgset_writer_init(rd_kafka_msgset_writer_t *msetw,
pid, epoch_base_msgid);
msetw->msetw_batch = &msetw->msetw_rkbuf->rkbuf_u.Produce.batch;

/* Enable streaming compression, if desired, supported and
/* Enable streaming compression, if desired, supported, and
* not suppressed.
* Due to more complex framing in older MsgVersions we only do
* this for modern MsgsVersions (>=2). Older MsgVersions will
Expand Down Expand Up @@ -823,8 +823,8 @@ static size_t rd_kafka_msgset_writer_write_msg(rd_kafka_msgset_writer_t *msetw,
actual_written = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf) - pre_len;
rd_dassert(outlen <=
rd_kafka_msg_wire_size(rkm, msetw->msetw_MsgVersion));
rd_dassert(outlen == actual_written ||
rd_kafka_buf_has_compressor(msetw->msetw_rkbuf));
rd_assert(outlen == actual_written ||
rd_kafka_buf_has_compressor(msetw->msetw_rkbuf));

return actual_written;
}
Expand All @@ -842,7 +842,7 @@ static int rd_kafka_msgset_writer_write_msgq(rd_kafka_msgset_writer_t *msetw,
rd_kafka_msgq_t *rkmq) {
rd_kafka_toppar_t *rktp = msetw->msetw_rktp;
rd_kafka_broker_t *rkb = msetw->msetw_rkb;
rd_kafka_t *rk = rkb->rkb_rk;
rd_kafka_t *rk = rkb->rkb_rk;
size_t len = rd_buf_len(&msetw->msetw_rkbuf->rkbuf_buf);
/* This is the hard batch size limit */
size_t max_msg_size = (size_t)rk->rk_conf.max_msg_size;
Expand Down Expand Up @@ -979,6 +979,15 @@ static int rd_kafka_msgset_writer_write_msgq(rd_kafka_msgset_writer_t *msetw,

} while ((rkm = TAILQ_FIRST(&rkmq->rkmq_msgs)));

if (!rkm)
rd_rkb_dbg(rkb, MSG | RD_KAFKA_DBG_QUEUE, "PRODUCE",
"%.*s [%" PRId32
"]: "
"No more messages in queue after "
"adding %d msgs and %zu bytes to MessageSet",
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition, msgcnt, len);

msetw->msetw_MaxTimestamp = MaxTimestamp;

/* Idempotent Producer:
Expand Down Expand Up @@ -1293,7 +1302,8 @@ static int rd_kafka_msgset_writer_compress(rd_kafka_msgset_writer_t *msetw,
.rkm_payload = ciov.iov_base,
.rkm_timestamp =
msetw->msetw_firstmsg.timestamp};
outlen = rd_kafka_msgset_writer_write_msg(

outlen = rd_kafka_msgset_writer_write_msg(
msetw, &rkm, 0, msetw->msetw_compression,
rd_free /*free for ciov.iov_base*/);
}
Expand Down

0 comments on commit c581c43

Please sign in to comment.