Add instrumentation for memory profiling in Python SDK#38853
Add instrumentation for memory profiling in Python SDK#38853tvalentyn wants to merge 23 commits into
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #38853 +/- ##
============================================
- Coverage 57.77% 57.71% -0.06%
Complexity 12969 12969
============================================
Files 2509 2510 +1
Lines 260525 260768 +243
Branches 10658 10658
============================================
- Hits 150516 150507 -9
- Misses 104318 104570 +252
Partials 5691 5691
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces profiling capabilities to the Apache Beam Python SDK worker harness, adding pipeline options to configure profiling agents (like memray and tcmalloc) and implementing background tasks in the Go bootloader to manage profiling, GCS uploads, and post-processing. Feedback on these changes highlights several critical improvements: replacing a potentially failing syscall.Kill with Go's standard process signaling, resolving an ARM64 compatibility issue by avoiding hardcoded absolute paths for tcmalloc, executing memray post-processing sequentially to prevent worker OOMs, implementing a cleanup mechanism for local profile files to avoid disk exhaustion, and fixing a goroutine/ticker leak by handling context cancellation.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
R: @jrmccluskey |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
jrmccluskey
left a comment
There was a problem hiding this comment.
A few random questions and go things, the core logic makes sense though
|
|
||
| if err := cmd.Wait(); err != nil { | ||
| var timer *time.Timer | ||
| var profilingTimedOut atomic.Bool |
There was a problem hiding this comment.
Typically you'd want to define an atomic value outside of the scope of the goroutine so it is shared amongst the goroutines. Defined within the goroutine function I would suspect each one would have its own copy, which makes the atomic unnecessary as best I can tell. I think it comes down to the behavior you want here: do you want each goroutine to individually stop profiling on a timeout, or would you want one timeout to cause all of the goroutines to stop profiling?
There was a problem hiding this comment.
I believe the ai review flagged this to me out of concern that the timer coroutine modifies the value concurrently.
|
thanks for a quick review! PTAL. |
…y value later anyway. Users can pass --profile_upload_interval_sec=0 to disable uploads.
This change adds instrumentation launch Python SDK Harness with custom profiling agents and adds memory Profiling capabilities with off-the-shelf profilers.
Design: https://s.apache.org/beam-python-memory-profiling
Requires these dependencies in the runtime environment (included in Beam default containers):
memray(https://pypi.org/project/memray/) pip package for profiling with Memraygoogle-perftoolsapt package for profiling with tcmallocTo enable memory profiling with Memray:
By default, binary profiles are all uploaded to the
<TEMP_LOCATION>/profilesand we are attempting to do basic profile postprocessing on the worker by creating memray flamegraphs.Additional optional params:
memray runoptions, pass them like so:--profiler_extra_arg="--native". For more information, see https://bloomberg.github.io/memray/run.html.--profiler_extra_arg="--aggregate" --profiler_stop_after_sec=600--profile_location=gs://your-bucket/profiles--profile_upload_interval_sec=60--profile_postprocess_interval_sec=60Other considerations:
--profile_postprocess_interval_sec=0. You will then need to post-process/analyze profiles later, possibly using the same or equivalent environment.--profiler_stop_after_crash.To enable memory profiling with TCMalloc:
python pipeline.py --runner=DataflowRunner --profiler_agent=tcmallocTo supply additional envirionment variables that configure tcmalloc, pass them individually with this option:
--profiler_extra_env_vars="HEAP_PROFILE_TIME_INTERVAL=600". For more information, see: https://gperftools.github.io/gperftools/heapprofile.htmlTo upload profiles using GCS FUSE instead of gcloud (Dataflow runner only):
python pipeline.py --runner=DataflowRunner --profiler_agent=memray --experiments="gcsfuse_buckets=your-bucket-name:rw" --profile_temp_location=/var/opt/google/gcs/your-bucket-name/profiles --profile_upload_interval_sec=0. Note: this still requires additional local disk space for storing profiles since Memray uses out-of-order writes, but allows to upload profiles without requiringgcloudcli.fixes: #20298