Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions kernelci/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,23 @@ def get_configs(self, event, channel="node"):
if sched_event_channel == channel:
sched_event = entry.event.copy()
sched_event.pop("channel")
if sched_event.items() <= event.items():
yield entry
if not sched_event.items() <= event.items():
continue
# Edge-triggered scheduling (kernelci-core#2912): when the API
# reports a node update together with its previous state/result,
# only act on the transition INTO the matched condition. This
# avoids re-creating identical child jobs every time the parent
# node is updated while staying in the same matching state (e.g.
# an artifact or timeout update on an already-`available` node).
# Falls back to level-triggered behaviour when no previous_*
# info is present (node creation, retry events or older API).
if event.get("op") == "updated" and "previous_state" in event:
previous_event = dict(event)
previous_event["state"] = event.get("previous_state")
previous_event["result"] = event.get("previous_result")
if sched_event.items() <= previous_event.items():
continue
yield entry

def get_schedule(self, event, channel="node"):
"""Get the (job, runtime, platform) configs for each job to run"""
Expand Down
173 changes: 173 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# SPDX-License-Identifier: LGPL-2.1-or-later
#
# Copyright (C) 2026 Collabora Limited

"""Unit tests for kernelci.scheduler edge-triggered matching (#2912)"""

import types
import unittest

from kernelci.scheduler import Scheduler


def _entry(event):
"""Build a minimal scheduler entry exposing an ``event`` mapping."""
return types.SimpleNamespace(event=event)


def _scheduler(entries):
"""Build a Scheduler with a preset list of entries, bypassing __init__."""
sched = Scheduler.__new__(Scheduler)
sched._scheduler = entries
return sched


class TestGetConfigsEdgeTrigger(unittest.TestCase):
"""get_configs() should fire on the transition into a matched state,
not on every update that keeps the node in that state (#2912)."""

KBUILD_AVAILABLE = {
"channel": "node",
"kind": "kbuild",
"state": "available",
"name": "kbuild-gcc-14-arm",
}

def _matches(self, entries, event, channel="node"):
sched = _scheduler(entries)
return list(sched.get_configs(event, channel))

def test_created_event_matches(self):
"""A creation event in the matched state fires (rising edge)."""
entry = _entry(self.KBUILD_AVAILABLE)
event = {
"op": "created",
"kind": "kbuild",
"state": "available",
"name": "kbuild-gcc-14-arm",
}
self.assertEqual(self._matches([entry], event), [entry])

def test_update_into_state_matches(self):
"""An update transitioning INTO the matched state fires."""
entry = _entry(self.KBUILD_AVAILABLE)
event = {
"op": "updated",
"kind": "kbuild",
"state": "available",
"name": "kbuild-gcc-14-arm",
"previous_state": "running",
"previous_result": None,
}
self.assertEqual(self._matches([entry], event), [entry])

def test_update_keeping_state_does_not_match(self):
"""An update that keeps the already-matched state does NOT fire.

This is the duplicate-job scenario from #2912: an artifact/timeout
update on a node that is already `available`.
"""
entry = _entry(self.KBUILD_AVAILABLE)
event = {
"op": "updated",
"kind": "kbuild",
"state": "available",
"name": "kbuild-gcc-14-arm",
"previous_state": "available",
"previous_result": None,
}
self.assertEqual(self._matches([entry], event), [])

def test_update_without_previous_state_is_level_triggered(self):
"""Older API events (no previous_*) fall back to level-triggered."""
entry = _entry(self.KBUILD_AVAILABLE)
event = {
"op": "updated",
"kind": "kbuild",
"state": "available",
"name": "kbuild-gcc-14-arm",
}
self.assertEqual(self._matches([entry], event), [entry])

def test_result_rising_edge(self):
"""An entry matching state+result fires on the transition to done."""
entry = _entry(
{
"channel": "node",
"kind": "kbuild",
"state": "done",
"result": "pass",
}
)
rising = {
"op": "updated",
"kind": "kbuild",
"state": "done",
"result": "pass",
"previous_state": "available",
"previous_result": None,
}
self.assertEqual(self._matches([entry], rising), [entry])

def test_result_no_edge_when_already_done(self):
"""No fire when the node was already done/pass and is updated again."""
entry = _entry(
{
"channel": "node",
"kind": "kbuild",
"state": "done",
"result": "pass",
}
)
no_edge = {
"op": "updated",
"kind": "kbuild",
"state": "done",
"result": "pass",
"previous_state": "done",
"previous_result": "pass",
}
self.assertEqual(self._matches([entry], no_edge), [])

def test_non_matching_event(self):
"""An event that doesn't match the entry never fires."""
entry = _entry(self.KBUILD_AVAILABLE)
event = {
"op": "updated",
"kind": "kbuild",
"state": "running",
"name": "kbuild-gcc-14-arm",
"previous_state": "running",
}
self.assertEqual(self._matches([entry], event), [])

def test_channel_mismatch_skipped(self):
"""Entries for another channel are skipped."""
entry = _entry(
{"channel": "retry", "kind": "kbuild", "state": "available"}
)
event = {"op": "created", "kind": "kbuild", "state": "available"}
self.assertEqual(self._matches([entry], event, channel="node"), [])

def test_retry_event_without_op_is_level_triggered(self):
"""Retry events carry no ``op`` key and must always fire."""
entry = _entry(self.KBUILD_AVAILABLE)
# Synthesised retry event: parent node data with state forced
# to available, no "op"/"previous_state".
event = {
"kind": "kbuild",
"state": "available",
"name": "kbuild-gcc-14-arm",
"retry_counter": 1,
}
self.assertEqual(self._matches([entry], event), [entry])

def test_non_dict_event_returns_nothing(self):
"""A non-dict event is handled gracefully."""
self.assertEqual(
self._matches([_entry(self.KBUILD_AVAILABLE)], None), []
)


if __name__ == "__main__":
unittest.main()
Loading