diff --git a/src/tests/execution/process_pty_test.py b/src/tests/execution/process_pty_test.py index 80260ae5..f724a4d0 100644 --- a/src/tests/execution/process_pty_test.py +++ b/src/tests/execution/process_pty_test.py @@ -1,5 +1,4 @@ import os -import threading import unittest from pathlib import Path @@ -9,6 +8,22 @@ from utils import file_utils +def _run_and_read_all(process_wrapper): + """Start the process and return its full output, deterministically. + + The output is read on a separate thread (started by start()) that pushes + chunks into a ReplayObservable and closes it once reading is done. Reading + the output via read_until_closed() *while* that thread is still pushing races + the (unsynchronised) ReplayObservable.subscribe vs push and can drop the last + chunk — the source of the flaky test_PYTHONUNBUFFERED failure. Waiting for the + stream to close first means there is no concurrent pusher, and close() keeps + the replay buffer, so the late subscribe replays every chunk. + """ + process_wrapper.start() + process_wrapper.output_stream.wait_close() + return ''.join(read_until_closed(process_wrapper.output_stream)) + + class TestEnvironmentVariables(unittest.TestCase): def test_default_variables(self): @@ -38,11 +53,7 @@ def execute_and_get_passed_env(custom_variables): env_variables = test_utils.env_variables process_wrapper = PtyProcessWrapper( str(Path(__file__).parent.parent / 'scripts' / 'printenv.sh'), '.', env_variables.build_env_vars(custom_variables)) - process_wrapper.start() - thread = threading.Thread(target=process_wrapper.wait_finish, daemon=True) - thread.start() - thread.join(timeout=0.1) - output = ''.join(read_until_closed(process_wrapper.output_stream)) + output = _run_and_read_all(process_wrapper) lines = output.split('\n') env_dict = {line.split('=', 2)[0]: line.split('=', 2)[1] for line in lines if '=' in line} return env_dict @@ -54,13 +65,7 @@ def test_many_unicode_characters(self): test_utils.create_file('test.txt', text=long_unicode_text) process_wrapper = PtyProcessWrapper(['cat', 'test.txt'], test_utils.temp_folder, {}) - process_wrapper.start() - - thread = threading.Thread(target=process_wrapper.wait_finish, daemon=True) - thread.start() - thread.join(timeout=0.1) - - output = ''.join(read_until_closed(process_wrapper.output_stream)) + output = _run_and_read_all(process_wrapper) self.assertEqual(long_unicode_text, output) self.assertEqual(0, process_wrapper.get_return_code()) @@ -73,13 +78,7 @@ def test_mixed_encoding(self): byte_content=True) process_wrapper = PtyProcessWrapper(['cat', 'test.txt'], test_utils.temp_folder, {}) - process_wrapper.start() - - thread = threading.Thread(target=process_wrapper.wait_finish, daemon=True) - thread.start() - thread.join(timeout=0.1) - - output = ''.join(read_until_closed(process_wrapper.output_stream)) + output = _run_and_read_all(process_wrapper) self.assertEqual('gültig\n läuft verändert für Ändern \nPrüfung gültig läuft ࠀ 𒀀!', output) def setUp(self):