Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .zuul.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@
pre-run: playbooks/pre.yml
run: playbooks/test-unit.yml

- job:
name: python-osism-integration-tests
pre-run: playbooks/pre.yml
run: playbooks/test-integration.yml

- project:
merge-mode: squash-merge
default-branch: main
Expand All @@ -148,6 +153,7 @@
- container-image-python-osism-frontend-build
- python-osism-test-setup
- python-osism-unit-tests
- python-osism-integration-tests
periodic-daily:
jobs:
- flake8
Expand All @@ -156,6 +162,7 @@
- python-black
- python-osism-test-setup
- python-osism-unit-tests
- python-osism-integration-tests
periodic-midnight:
jobs:
- container-image-python-osism-push
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,15 @@ Run a single test module:
```
pipenv run pytest tests/unit/test_smoke.py
```

## Running integration tests

The integration tests in `tests/integration/` exercise the Celery/Redis task
core (broker, queue routing, worker, result backend, Redis streams and locks)
end-to-end. They require a reachable Redis and start a Celery worker from the
same virtualenv; they are skipped automatically when Redis is not running.

```
docker run -d -p 6379:6379 redis:7-alpine
REDIS_HOST=localhost pipenv run pytest tests/integration
```
19 changes: 13 additions & 6 deletions osism/commands/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ def get_parser(self, prog_name):
return parser

def take_action(self, parsed_args):
from osism.tasks import Config

# Check if tasks are locked before proceeding
utils.check_task_lock_and_exit()

Expand Down Expand Up @@ -47,8 +49,16 @@ def take_action(self, parsed_args):
]
ps = [
subprocess.Popen(
f"celery -A {t} --broker=redis://redis beat -s /tmp/celerybeat-schedule-{t}.db",
shell=True,
[
"celery",
"-A",
t,
"--broker",
Config.broker_url,
"beat",
"-s",
f"/tmp/celerybeat-schedule-{t}.db",
]
)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
for t in ts
]
Expand All @@ -57,10 +67,7 @@ def take_action(self, parsed_args):
p.wait()

elif service == "flower":
p = subprocess.Popen(
"celery --broker=redis://redis flower",
shell=True,
)
p = subprocess.Popen(["celery", "--broker", Config.broker_url, "flower"])
p.wait()

elif service == "reconciler":
Expand Down
11 changes: 7 additions & 4 deletions osism/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

from loguru import logger

from osism import utils
from osism import settings, utils

# Regex pattern for extracting hosts from Ansible output
HOST_PATTERN = re.compile(r"^(ok|changed|failed|skipping|unreachable):\s+\[([^\]]+)\]")
Expand All @@ -23,11 +23,14 @@ class Config:
broker_connection_retry_on_startup = True
enable_utc = True
enable_ironic = os.environ.get("ENABLE_IRONIC", "True")
broker_url = "redis://redis"
result_backend = "redis://redis"
broker_url = os.environ.get(
"CELERY_BROKER_URL",
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}",
)
result_backend = os.environ.get("CELERY_RESULT_BACKEND", broker_url)
task_create_missing_queues = True
task_default_queue = "default"
task_track_started = (True,)
task_track_started = True
task_routes = {
"osism.tasks.ansible.*": {"queue": "osism-ansible"},
"osism.tasks.ceph.*": {"queue": "ceph-ansible"},
Expand Down
43 changes: 43 additions & 0 deletions playbooks/test-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
---
- name: Run integration tests
hosts: all

vars:
python_venv_dir: /tmp/venv

tasks:
- name: Start Redis container
ansible.builtin.shell:
executable: /bin/bash
cmd: |
set -e
set -o pipefail
set -x

docker run -d --name redis -p 6379:6379 redis:7-alpine
changed_when: true

- name: Install dependencies
ansible.builtin.shell:
executable: /bin/bash
chdir: "{{ zuul.project.src_dir }}"
cmd: |
set -e
set -o pipefail
set -x

{{ python_venv_dir }}/bin/pipenv install --dev --deploy
{{ python_venv_dir }}/bin/pipenv run pip install .

- name: Run pytest
ansible.builtin.shell:
executable: /bin/bash
chdir: "{{ zuul.project.src_dir }}"
cmd: |
set -e
set -o pipefail
set -x

{{ python_venv_dir }}/bin/pipenv run pytest tests/integration
environment:
REDIS_HOST: localhost
2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,5 @@ python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = -ra --strict-markers
markers =
integration: integration tests requiring a reachable Redis and a Celery worker (run with: pytest tests/integration)
Empty file added tests/integration/__init__.py
Empty file.
117 changes: 117 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
# SPDX-License-Identifier: Apache-2.0

"""Fixtures and skip logic shared by the Celery/Redis integration tests.

These tests exercise the real task-processing core (broker, queue routing,
worker, result backend, Redis streams and distributed locks) against a live
Redis. They are skipped automatically when Redis is not reachable -- for
example during a local ``pytest`` run without the service -- so the suite stays
green outside the dedicated CI job.
"""

import os
import subprocess
import time

import pytest

from osism import settings

# Only the ``ansible`` Celery app is used as worker: it has no import-time
# dependency on NetBox, OpenStack or ansible-core, unlike the other task
# modules. ``osism.tasks.ansible.*`` is routed to the ``osism-ansible`` queue.
WORKER_APP = "osism.tasks.ansible"
WORKER_QUEUE = "osism-ansible"
# Celery treats a ``-n`` value without ``@`` as the host part (yielding
# ``celery@ci-worker``), so the node name must be given explicitly.
WORKER_NAME = "ci-worker@%h"
WORKER_BOOT_TIMEOUT = 60


def _redis_reachable():
"""Return ``True`` when the configured Redis answers a ping."""
try:
from redis import Redis

client = Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=settings.REDIS_DB,
socket_connect_timeout=1,
)
try:
client.ping()
finally:
client.close()
return True
except Exception:
return False


def pytest_collection_modifyitems(config, items):
"""Skip integration-marked tests when Redis is not reachable."""
if _redis_reachable():
return
skip = pytest.mark.skip(
reason=f"Redis is not reachable on {settings.REDIS_HOST}:{settings.REDIS_PORT}"
)
for item in items:
if "integration" in item.keywords:
item.add_marker(skip)


@pytest.fixture(scope="session")
def celery_app():
"""The ``ansible`` Celery app, configured from the live broker URL."""
from osism.tasks import ansible

return ansible.app


@pytest.fixture(scope="session")
def celery_worker(celery_app):
"""Start a Celery worker for the ``osism-ansible`` queue for the session.

The worker runs from the same virtualenv as the tests.
``GATHER_FACTS_SCHEDULE=0`` prevents registration of the periodic
``gather_facts`` task, which would try to run ``/run.sh`` in containers that
do not exist in CI.
"""
proc = subprocess.Popen(
[
"celery",
"-A",
WORKER_APP,
"worker",
"-n",
WORKER_NAME,
"-Q",
WORKER_QUEUE,
"-c",
"1",
],
env={**os.environ, "GATHER_FACTS_SCHEDULE": "0"},
)

try:
deadline = time.time() + WORKER_BOOT_TIMEOUT
while time.time() < deadline:
if proc.poll() is not None:
raise RuntimeError(
f"Celery worker exited early with code {proc.returncode}"
)
if celery_app.control.inspect().ping():
break
time.sleep(1)
else:
raise RuntimeError(
f"Celery worker did not become ready within {WORKER_BOOT_TIMEOUT}s"
)
yield proc
finally:
proc.terminate()
try:
proc.wait(timeout=30)
except subprocess.TimeoutExpired:
proc.kill()
proc.wait()
45 changes: 45 additions & 0 deletions tests/integration/test_celery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# SPDX-License-Identifier: Apache-2.0

"""Celery round-trip and worker-visibility integration tests.

These validate the broker, queue routing (``osism-ansible``), worker and result
backend end-to-end against a live Redis and a worker started from the same
virtualenv.
"""

import pytest

pytestmark = pytest.mark.integration


def test_noop_round_trip(celery_worker):
"""Dispatching ``noop`` returns ``True`` through the result backend.

A single round-trip exercises the broker, the ``osism-ansible`` queue
routing, the worker and the result backend.
"""
from osism.tasks import ansible

result = ansible.noop.delay()

assert result.get(timeout=60) is True


def test_worker_is_visible(celery_worker):
"""``app.control.inspect().ping()`` sees the running worker.

This is the mechanism behind ``osism get status workers``: a fresh Celery
client configured from ``Config`` inspects the worker over the broker. The
``celery_worker`` fixture starts it as ``ci-worker@<hostname>``.
"""
from celery import Celery

from osism.tasks import Config

app = Celery("status")
app.config_from_object(Config)

replies = app.control.inspect().ping()

assert replies
assert any(name.startswith("ci-worker@") for name in replies)
41 changes: 41 additions & 0 deletions tests/integration/test_locking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# SPDX-License-Identifier: Apache-2.0

"""Distributed-locking integration tests against a live Redis.

Covers the Redlock helper used by ``run_ansible_in_environment`` for per-play
locking and the task-lock flag used by ``osism lock`` / ``osism unlock``.
"""

import uuid

import pytest

from osism import utils

pytestmark = pytest.mark.integration


def test_redlock_acquire_and_release():
"""A Redlock can be acquired and released against the live Redis."""
lock = utils.create_redlock(key=f"itest-lock-{uuid.uuid4()}")

assert lock.acquire(timeout=10)
lock.release()


def test_task_lock_set_check_remove():
"""``set_task_lock`` / ``is_task_locked`` / ``remove_task_lock`` round-trip."""
# Start from a known-unlocked state so the test is independent of prior runs.
utils.remove_task_lock()
assert utils.is_task_locked() is None

assert utils.set_task_lock(user="tester", reason="integration test") is True

lock_info = utils.is_task_locked()
assert lock_info is not None
assert lock_info["locked"] is True
assert lock_info["user"] == "tester"
assert lock_info["reason"] == "integration test"

assert utils.remove_task_lock() is True
assert utils.is_task_locked() is None
30 changes: 30 additions & 0 deletions tests/integration/test_redis_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# SPDX-License-Identifier: Apache-2.0

"""Redis-streams task-output integration tests.

``push_task_output`` / ``finish_task_output`` / ``fetch_task_output`` are the
mechanism ``osism apply`` uses to stream task logs; they round-trip through a
Redis stream keyed by the task id and are testable with Redis alone.
"""

import uuid

import pytest

from osism import utils

pytestmark = pytest.mark.integration


def test_task_output_round_trip(capsys):
"""Output pushed to a task stream is read back with its return code."""
task_id = f"itest-{uuid.uuid4()}"

utils.push_task_output(task_id, "first line\n")
utils.push_task_output(task_id, "second line\n")
utils.finish_task_output(task_id, rc=3)

rc = utils.fetch_task_output(task_id, timeout=10)

assert rc == 3
assert capsys.readouterr().out == "first line\nsecond line\n"