Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 135 additions & 40 deletions rascal2/core/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import os
from dataclasses import dataclass
from logging import INFO
from multiprocessing import Process, Queue
from multiprocessing import Event, Process, Queue, cpu_count

import ratapi as rat
from PyQt6 import QtCore
from ratapi.utils.enums import Procedures

from rascal2.config import MatlabHelper, get_matlab_engine

Expand All @@ -18,75 +17,162 @@ class RATRunner(QtCore.QObject):
event_received = QtCore.pyqtSignal()
finished = QtCore.pyqtSignal()
stopped = QtCore.pyqtSignal()
go_event = Event()
processes_list_go_exit_events = []

def __init__(self, rat_inputs, procedure: Procedures, display_on: bool):
def __init__(self, parent=None, start_runners_early: bool = True, num_cores: int = cpu_count()):
super().__init__()
self.parent = parent
self.timer = QtCore.QTimer()
self.timer.setInterval(1)
self.timer.timeout.connect(self.check_queue)
self.matlab_helper = MatlabHelper()
self.num_cores = num_cores
self.start_runners_early = start_runners_early

# this queue handles both event data and results
self.queue = Queue()
matlab_helper = MatlabHelper()
self.process = Process(
target=run,
args=(
self.queue,
rat_inputs,
procedure,
display_on,
matlab_helper.ready_event,
matlab_helper.engine_output,
),
)
self.arg_queue = Queue()
self.go_event = Event()
self.exit_event = Event()
self.rat_inputs = None
self.procedure = None
self.display_on = None
self.processes_list = []
self.refresh_process_list()
self.process = None
self.updated_problem = None
self.results = None
self.error = None
self.events = []
self.engine_future = None

def set_runner_args(self, rat_inputs, procedure, display_on: bool):
self.arg_queue.put((rat_inputs, procedure, display_on))
self.rat_inputs = rat_inputs
self.display_on = display_on

def start(self):
"""Start the calculation."""
self.process.start()
self.process, (self.go_event, self.exit_event) = self.get_new_process()
if self.engine_future is None:
self.get_runner_matlab_engine()
self.go_event.set()
if not self.process.is_alive():
self.process.start()
self.timer.start()

def get_new_process(self):
if not self.processes_list:
self.refresh_process_list()
return self.processes_list.pop(0), self.processes_list_go_exit_events.pop(0)

def get_runner_matlab_engine(self):
problem_definition, cpp_controls = self.rat_inputs
if any([file["language"] == "matlab" for file in problem_definition.customFiles.files]):
engine_ready = (self.matlab_helper.ready_event,)
engine_output = self.matlab_helper.engine_output
matlab_queue = Queue()
get_runner_matlab_engine_process = Process(
target=run_matlab_init_engine,
args=(matlab_queue, engine_output, engine_ready, self.display_on),
)
get_runner_matlab_engine_process.start()
get_runner_matlab_engine_process.join()
self.engine_future = self.filter_queue(matlab_queue)

def interrupt(self):
"""Interrupt the running process."""
self.timer.stop()
self.process.kill()
self.stopped.emit()
self.go_event.clear()

def check_queue(self):
"""Check for new data in the queue."""
if not self.process.is_alive():
self.timer.stop()
self.queue.put(None)
for item in iter(self.queue.get, None):
self.filter_queue(self.queue)

def filter_queue(self, queue: Queue):
queue.put(None)
for item in iter(queue.get, None):
if isinstance(item, tuple):
self.updated_problem, self.results = item
self.go_event.clear()
self.finished.emit()
elif isinstance(item, Exception):
self.error = item
self.go_event.clear()
self.stopped.emit()
elif isinstance(item, list):
return item[0]
else: # else, assume item is an event
self.events.append(item)
self.event_received.emit()


def run(queue, rat_inputs: tuple, procedure: str, display: bool, engine_ready, engine_output):
def refresh_process_list(self):
self.processes_list_go_exit_events = [(Event(), Event()) for _ in range(self.num_cores)]
self.processes_list = [
Process(
target=run,
args=(
self.queue,
self.arg_queue,
self.processes_list_go_exit_events[ind][0],
self.processes_list_go_exit_events[ind][1],
),
)
for ind in range(self.num_cores)
]

def clear_queues(self):
self.queue.empty()
self.arg_queue.empty()
self.events.clear()
self.go_event.clear()
self.exit_event.clear()

def start_processes(self):
if self.start_runners_early:
for process in self.processes_list:
process.start()

def stop_processes(self):
self.exit_event.set()
self.go_event.set()
for go_event, exit_event in self.processes_list_go_exit_events:
exit_event.set()
go_event.set()
for process in self.processes_list:
if process.is_alive():
process.kill()
self.processes_list.clear()
self.clear_queues()
self.processes_list_go_exit_events.clear()
self.queue.close()
self.arg_queue.close()
if self.engine_future is not None:
self.engine_future.result().exit()
self.matlab_helper.close_event.set()


def run(queue: Queue, arg_queue: Queue, go_event, exit_event):
"""Run RAT and put the result into the queue.

Parameters
----------
queue : Queue
The interprocess queue for the RATRunner.
rat_inputs : tuple
The C++ inputs for rat.
procedure : str
The optimisation procedure.
display : bool
Whether to display events.
arg_queue :
A queue of arguments used to initialize the RAT process, passed from the Main Presenter

"""
go_event.wait()
if exit_event.is_set():
queue.put(LogData(INFO, "exit_event triggers"))
return
rat_inputs, procedure, display = arg_queue.get()
problem_definition, cpp_controls = rat_inputs

if display:
Expand All @@ -96,22 +182,10 @@ def run(queue, rat_inputs: tuple, procedure: str, display: bool, engine_ready, e
queue.put(LogData(INFO, "Starting RAT"))

try:
engine_future = None
if any([file["language"] == "matlab" for file in problem_definition.customFiles.files]):
if not engine_output:
queue.put(LogData(INFO, "Attempting to start Matlab..."))

result = get_matlab_engine(engine_ready, engine_output)
if isinstance(result, Exception):
raise result
else:
engine_future = result
engine_future.result().cd(os.getcwd())

problem_definition, output_results, bayes_results = rat.rat_core.RATMain(problem_definition, cpp_controls)
if display:
queue.put(LogData(INFO, "Creating RAT Results..."))
results = rat.outputs.make_results(procedure, output_results, bayes_results)
if engine_future is not None:
engine_future.result().exit()
except Exception as err:
queue.put(err)
return
Expand All @@ -123,6 +197,27 @@ def run(queue, rat_inputs: tuple, procedure: str, display: bool, engine_ready, e
queue.put((problem_definition, results))


def run_matlab_init_engine(queue, engine_output, engine_ready, display_on):
"""Get the engine future from the matlab engine and put in queue if successfully."""
try:
if not engine_output and display_on:
queue.put(LogData(INFO, "Attempting to start Matlab..."))

result = get_matlab_engine(engine_ready, engine_output)
if display_on:
queue.put(LogData(INFO, "Got Matlab engine"))
if isinstance(result, Exception):
raise result
else:
engine_future = result
engine_future.result().cd(os.getcwd())
queue.put([engine_future])

except Exception as err:
queue.put(err)
return


@dataclass
class LogData:
"""Dataclass for logging data."""
Expand Down
16 changes: 11 additions & 5 deletions rascal2/ui/presenter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import ratapi as rat
import ratapi.wrappers
from PyQt6.QtCore import QCoreApplication

from rascal2.config import LOGGER, SETTINGS, MatlabHelper
from rascal2.core import commands
Expand All @@ -15,6 +16,8 @@

from .model import MainWindowModel

START_PROCESSES = bool(os.getenv("START_PROCESSES", "True"))


class MainWindowPresenter:
"""Facilitates interaction between View and Model.
Expand All @@ -29,6 +32,11 @@ def __init__(self, view):
self.view = view
self.model = MainWindowModel()
self.worker = None
self.runner = RATRunner(self, start_runners_early=START_PROCESSES)
self.runner.finished.connect(self.handle_results)
self.runner.stopped.connect(self.handle_interrupt)
self.runner.event_received.connect(self.handle_event)
self.runner.start_processes()

def create_project(self, name: str, save_path: str):
"""Create a new RAT project and controls object then initialise UI.
Expand Down Expand Up @@ -229,11 +237,7 @@ def run(self):
self.model.controls.initialise_IPC()
rat_inputs = rat.inputs.make_input(self.model.project, self.model.controls)
display_on = self.model.controls.display != rat.utils.enums.Display.Off

self.runner = RATRunner(rat_inputs, self.model.controls.procedure, display_on)
self.runner.finished.connect(self.handle_results)
self.runner.stopped.connect(self.handle_interrupt)
self.runner.event_received.connect(self.handle_event)
self.runner.set_runner_args(rat_inputs, self.model.controls.procedure, display_on)
self.view.terminal_widget.write("Initializing RAT Process...")
self.runner.start()

Expand All @@ -249,6 +253,7 @@ def handle_results(self):
)
self.view.handle_results(self.runner.results)
self.model.controls.delete_IPC()
self.runner.clear_queues()

def handle_interrupt(self):
"""Handle a RAT run being interrupted."""
Expand All @@ -274,6 +279,7 @@ def handle_event(self):
self.view.plot_widget.plot_with_blit(event)
case LogData():
LOGGER.log(event.level, event.msg)
QCoreApplication.processEvents()

def edit_project(self, updated_project: dict, preview: bool = True) -> None:
"""Edit the Project with a dictionary of attributes.
Expand Down
2 changes: 2 additions & 0 deletions rascal2/ui/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ def closeEvent(self, event):
event.accept()
else:
event.ignore()
self.presenter.runner.stop_processes()
event.accept()

def show_project_dialog(self, dialog: StartupDialog):
"""Show a startup dialog of a given type.
Expand Down
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@ def global_setting():
return GLOBAL_SETTING


@pytest.fixture(autouse=True)
@patch("rascal2.core.runner.cpu_count")
def fix_cpu_count(cpu_count):
cpu_count.return_value = 1
yield


@pytest.fixture(scope="function", autouse=True)
def mock_start_processes_setting(monkeypatch):
monkeypatch.setenv("START_PROCESSES", "False")


@pytest.fixture(scope="session", autouse=True)
def mock_setting(request):
global GLOBAL_SETTING
Expand Down
Loading
Loading