Fix IndexError in UnsignedVarInt32.encode_into for large records (>64KB)#3101
Closed
asulyma wants to merge 1 commit into
Closed
Fix IndexError in UnsignedVarInt32.encode_into for large records (>64KB)#3101asulyma wants to merge 1 commit into
asulyma wants to merge 1 commit into
Conversation
Owner
|
Thanks for the detailed report! The fix here is actually wider than proposed by this PR, and we will need to include regression tests. So I've opened an issue #3102 to track and will push a separate PR with fixes (closing this PR). Planning to include fix in release 3.0.5 shortly. |
Owner
|
#3103 is the larger fix for the encode buffer sizing bug(s). |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
UnsignedVarInt32.encode_into()writes to theEncodeBufferwithout callingout.ensure()first. When a large record (e.g. 195KB of JSON) causes theEncodeBufferto grow to exactly fit the serialized data, the subsequent tagged fields write at the end of theProduceRequestencoding hitsIndexError: bytearray index out of range.This silently crashes the Sender I/O thread iteration. The thread recovers, but the affected batch remains orphaned in
_in_flight_batchesuntildelivery_timeout_msexpires — causing guaranteed data loss for any record large enough to trigger the bug.Versions affected
Steps to reproduce
Traceback
Root cause
The
EncodeBufferstarts at 64KB (__init__(self, size=65536)). WhenBytes.encode_into()writes a large record, it callsout.ensure(n)which grows the buffer tomax(len(buf) * 2, pos + n):For a 200KB record where
pos + 200000 > 128000(double of 64KB), the buffer grows to exactlypos + n— leaving zero spare bytes.After writing the record data,
pos == len(buf). Then the codegen callsTaggedFields.encode_into()to write the trailing tagged fields:TaggedFields.encode_into()callsUnsignedVarInt32.encode_into(out, len(tags))to write the tag count — without callingout.ensure()first:And
UnsignedVarInt32.encode_into()itself also does not check bounds:By contrast,
Bytes.encode_into()correctly callsout.ensure(n)before writing.UnsignedVarInt32.encode_into()does not.Impact in production
The Sender thread catches the exception via
except ExceptioninSender.run()and logs"Uncaught error in kafka producer I/O thread". But the batch that was being encoded has already been:RecordAccumulator(removed from the partition deque)_in_flight_batchesSince the batch is no longer in the accumulator, it cannot be re-drained. Since it was never sent, no response will arrive to complete it. The batch sits orphaned in
_in_flight_batchesuntildelivery_timeout_msexpires (default 120s), then fails withKafkaTimeoutError: Expiring 1 record(s).This is deterministic — the same record will fail every time, making it impossible to deliver messages above ~64KB without compression.
Suggested fix
Add
out.ensure(5)(max varint32 encoded size) at the top ofUnsignedVarInt32.encode_into().