Add metric for Kafka offset commit failures#38889
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances observability for Kafka I/O operations in Apache Beam by adding instrumentation to track failed offset commits. By capturing these failures as metrics, users can better monitor the health and reliability of their Kafka pipelines. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a commit-failures counter metric in KafkaCommitOffset to track failed offset commits. The review feedback suggests declaring the Counter as static final (and renaming it to COMMIT_FAILURES) to avoid serialization overhead and adhere to standard naming conventions.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| private final Counter commitFailures = | ||
| Metrics.counter(CommitOffsetDoFn.class, "commit-failures"); |
There was a problem hiding this comment.
In Apache Beam, it is a best practice to declare Counter metrics as static final rather than instance variables. This avoids unnecessary object creation per DoFn instance and prevents serialization overhead, as metrics are registered globally per class namespace. Additionally, static final fields should follow the screaming snake case naming convention (e.g., COMMIT_FAILURES).
| private final Counter commitFailures = | |
| Metrics.counter(CommitOffsetDoFn.class, "commit-failures"); | |
| private static final Counter COMMIT_FAILURES = | |
| Metrics.counter(CommitOffsetDoFn.class, "commit-failures"); |
| element.getKey().getTopicPartition(), | ||
| new OffsetAndMetadata(element.getValue() + 1))); | ||
| } catch (Exception e) { | ||
| commitFailures.inc(); |
|
Assigning reviewers: R: @kennknowles for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Summary
Adds a Beam metric to track Kafka offset commit failures in
KafkaCommitOffset.Changes
commit-failurescounter metric inCommitOffsetDoFn.consumer.commitSync()throws an exception.Motivation
This provides visibility into Kafka offset commit failures through Beam metrics, making it easier to monitor and diagnose commit-related issues in production pipelines.
Testing