Skip to content

Fix IndexError in UnsignedVarInt32.encode_into for large records (>64KB)#3101

Closed
asulyma wants to merge 1 commit into
dpkp:masterfrom
asulyma:fix/uvarint-encode-buffer-overflow
Closed

Fix IndexError in UnsignedVarInt32.encode_into for large records (>64KB)#3101
asulyma wants to merge 1 commit into
dpkp:masterfrom
asulyma:fix/uvarint-encode-buffer-overflow

Conversation

@asulyma

@asulyma asulyma commented Jun 24, 2026

Copy link
Copy Markdown

Summary

UnsignedVarInt32.encode_into() writes to the EncodeBuffer without calling out.ensure() first. When a large record (e.g. 195KB of JSON) causes the EncodeBuffer to grow to exactly fit the serialized data, the subsequent tagged fields write at the end of the ProduceRequest encoding hits
IndexError: bytearray index out of range.

This silently crashes the Sender I/O thread iteration. The thread recovers, but the affected batch remains orphaned in _in_flight_batches until delivery_timeout_ms expires — causing guaranteed data loss for any record large enough to trigger the bug.

Versions affected

  • kafka-python 3.0.0 - 3.0.4

Steps to reproduce

from kafka.protocol.producer import ProduceRequest                                                                                                                                                                                                                                                               
from kafka.record.default_records import DefaultRecordBatchBuilder                                                                                                                                                                                                                                               
import time                                                                                                                                                                                                                                                                                                      
                                                                                                                                                                                                                                                                                                                 
builder = DefaultRecordBatchBuilder(                                                                                                                                                                                                                                                                             
    magic=2, compression_type=0, is_transactional=False,                                                                                                                                                                                                                                                         
    producer_id=-1, producer_epoch=-1, base_sequence=-1,                                                                                                                                                                                                                                                         
    batch_size=16384                                                                                                                                                                                                                                                                                             
)                                                                                                                                                                                                                                                                                                                
builder.append(0, int(time.time() * 1000), b'key', b'x' * 200_000, [])                                                                                                                                                                                                                                           
records_bytes = bytes(builder.build())                                                                                                                                                                                                                                                                           
                                                                                                                                                                                                                                                                                                                 
request = ProduceRequest[9](                                                                                                                                                                                                                                                                                     
    transactional_id=None, acks=1, timeout_ms=30000,                                                                                                                                                                                                                                                             
    topic_data=[('test-topic', [(0, records_bytes)])]                                                                                                                                                                                                                                                            
)                                                                                                                                                                                                                                                                                                                
request.encode()  # IndexError: bytearray index out of range                                                                                                                                                                                                                                                     

Traceback

File "kafka/protocol/api_message.py", line 229, in encode                                                                                                                                                                                                                                                        
    fast_encode(self, out)                                                                                                                                                                                                                                                                                       
File "<codegen:produce_request_v9>", line 186, in _encode                                                                                                                                                                                                                                                        
File "kafka/protocol/schemas/fields/codecs/tagged_fields.py", line 55, in encode_into                                                                                                                                                                                                                            
    UnsignedVarInt32.encode_into(out, len(tags))                                                                                                                                                                                                                                                                 
File "kafka/protocol/schemas/fields/codecs/types.py", line 350, in encode_into                                                                                                                                                                                                                                   
    buf[pos] = value                                                                                                                                                                                                                                                                                             
    ~~~^^^^^                                                                                                                                                                                                                                                                                                     
IndexError: bytearray index out of range                                                                                                                                                                                                                                                                         

Root cause

The EncodeBuffer starts at 64KB (__init__(self, size=65536)). When Bytes.encode_into() writes a large record, it calls out.ensure(n) which grows the buffer to max(len(buf) * 2, pos + n):

# encode_buffer.py
def ensure(self, needed):
    if self.pos + needed > len(self.buf):
        new_size = max(len(self.buf) * 2, self.pos + needed)

For a 200KB record where pos + 200000 > 128000 (double of 64KB), the buffer grows to exactly pos + n — leaving zero spare bytes.

After writing the record data, pos == len(buf). Then the codegen calls TaggedFields.encode_into() to write the trailing tagged fields:

<codegen:produce_request_v9> line 186:
    out.pos = pos
    tf.encode_into(item, out, version=9)   # enters TaggedFields.encode_into()

TaggedFields.encode_into() calls UnsignedVarInt32.encode_into(out, len(tags)) to write the tag count — without calling out.ensure() first:

# tagged_fields.py, line 55
def encode_into(self, item, out, version=None):
    ...
    UnsignedVarInt32.encode_into(out, len(tags))   # no ensure() before this call

And UnsignedVarInt32.encode_into() itself also does not check bounds:

# types.py, line 345-350
@classmethod
def encode_into(cls, out, value):
    buf = out.buf
    pos = out.pos
    ...
    buf[pos] = value   # IndexError when pos == len(buf)
    out.pos = pos + 1

By contrast, Bytes.encode_into() correctly calls out.ensure(n) before writing. UnsignedVarInt32.encode_into() does not.

Impact in production

The Sender thread catches the exception via except Exception in Sender.run() and logs "Uncaught error in kafka producer I/O thread". But the batch that was being encoded has already been:

  1. Drained from the RecordAccumulator (removed from the partition deque)
  2. Added to _in_flight_batches
  3. Never sent (encode failed before the request reached the network)

Since the batch is no longer in the accumulator, it cannot be re-drained. Since it was never sent, no response will arrive to complete it. The batch sits orphaned in _in_flight_batches until delivery_timeout_ms expires (default 120s), then fails with KafkaTimeoutError: Expiring 1 record(s).

This is deterministic — the same record will fail every time, making it impossible to deliver messages above ~64KB without compression.

Suggested fix

Add out.ensure(5) (max varint32 encoded size) at the top of UnsignedVarInt32.encode_into().

@dpkp

dpkp commented Jun 24, 2026

Copy link
Copy Markdown
Owner

Thanks for the detailed report! The fix here is actually wider than proposed by this PR, and we will need to include regression tests. So I've opened an issue #3102 to track and will push a separate PR with fixes (closing this PR). Planning to include fix in release 3.0.5 shortly.

@dpkp

dpkp commented Jun 24, 2026

Copy link
Copy Markdown
Owner

#3103 is the larger fix for the encode buffer sizing bug(s).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants