Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions osism/tasks/conductor/sonic/config_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@
"SNMP_SERVER_PARAMS",
"SNMP_SERVER_TARGET",
"SYSLOG_SERVER",
"ACL_TABLE",
"ACL_RULE",
)

# Tables fully owned by the generator and rebuilt from scratch on every regen.
Expand Down Expand Up @@ -374,6 +376,9 @@ def generate_sonic_config(device, hwsku, device_as_mapping=None, config_version=
config["MGMT_INTERFACE"][f"eth0|{oob_ip}/{prefix_len}"] = {}
metalbox_ip = _get_metalbox_ip_for_device(device)
config["STATIC_ROUTE"]["mgmt|0.0.0.0/0"] = {"nexthop": metalbox_ip}

# Restrict control-plane SSH to the OOB management network
_add_ssh_acl_configuration(config, device, oob_ip_result)
else:
oob_ip = None

Expand Down Expand Up @@ -2202,6 +2207,40 @@ def _add_log_server_configuration(config, device):
logger.debug(f"Added syslog_server {host}")


def _add_ssh_acl_configuration(config, device, oob_ip_result):
"""Add a control-plane ACL restricting SSH to the management network.

Emits an ACL_TABLE of type CTRLPLANE bound to the SSH service plus an
ACL_RULE that accepts SSH only from the device's OOB management subnet
(derived from the same data as MGMT_INTERFACE). Once a CTRLPLANE table
binds a service, caclmgrd installs an implicit default-drop for it, so
sources outside the permitted subnet — e.g. front-panel interfaces with
IPs in the default VRF — can no longer reach SSH.
"""
oob_ip, prefix_len = oob_ip_result
oob_network = ipaddress.IPv4Network(f"{oob_ip}/{prefix_len}", strict=False)

config["ACL_TABLE"] = {
"SSH_ONLY": {
"policy_desc": "SSH_ONLY",
"type": "CTRLPLANE",
"services": ["SSH"],
}
}
config["ACL_RULE"] = {
"SSH_ONLY|RULE_1": {
"PRIORITY": "9999",
"PACKET_ACTION": "ACCEPT",
"SRC_IP": str(oob_network),
"IP_TYPE": "IP",
}
}

logger.debug(
f"Added SSH control-plane ACL permitting {oob_network} for device {device.name}"
)


def _add_snmp_configuration(config, device, oob_ip):
"""Add Snmp configuration to device config.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def patch(name, **kw):
_add_vlan_configuration=patch("_add_vlan_configuration"),
_add_loopback_configuration=patch("_add_loopback_configuration"),
_add_log_server_configuration=patch("_add_log_server_configuration"),
_add_ssh_acl_configuration=patch("_add_ssh_acl_configuration"),
_add_snmp_configuration=patch("_add_snmp_configuration"),
_add_vrf_configuration=patch("_add_vrf_configuration"),
_add_portchannel_configuration=patch("_add_portchannel_configuration"),
Expand Down Expand Up @@ -351,6 +352,9 @@ def test_generate_sonic_config_populates_mgmt_interface_and_static_route(
# SNMP receives the OOB IP for SNMP_AGENT_ADDRESS_CONFIG wiring.
_, _, snmp_oob = patch_orchestrator_helpers._add_snmp_configuration.call_args.args
assert snmp_oob == "10.42.0.5"
# The SSH control-plane ACL is wired with the full OOB result.
ssh_acl_mock = patch_orchestrator_helpers._add_ssh_acl_configuration
ssh_acl_mock.assert_called_once_with(config, device, ("10.42.0.5", 24))


def test_generate_sonic_config_static_route_dropped_on_regen(
Expand Down Expand Up @@ -393,6 +397,47 @@ def test_generate_sonic_config_no_oob_ip_leaves_mgmt_empty_and_passes_none(
assert "mgmt|0.0.0.0/0" not in config["STATIC_ROUTE"]
_, _, snmp_oob = patch_orchestrator_helpers._add_snmp_configuration.call_args.args
assert snmp_oob is None
# Without an OOB IP there is no subnet to permit — no SSH ACL is emitted.
patch_orchestrator_helpers._add_ssh_acl_configuration.assert_not_called()


def test_generate_sonic_config_no_oob_ip_drops_stale_acl_tables(
mocker, patch_orchestrator_helpers, make_orchestrator_device
):
"""Stale ACL_TABLE / ACL_RULE entries must not survive a regen without
an OOB IP.

ACL_TABLE and ACL_RULE are on-demand owned tables: when the device has
no OOB IP, nothing re-emits them, so entries carried over from the base
config_db.json (e.g. the SSH ACL of an earlier regen, before the OOB IP
was removed from NetBox) must be gone — leaving them would keep an SSH
lockdown pointing at a subnet the device no longer manages.
"""
base = make_base_config()
base["ACL_TABLE"] = {
"SSH_ONLY": {
"policy_desc": "SSH_ONLY",
"type": "CTRLPLANE",
"services": ["SSH"],
}
}
base["ACL_RULE"] = {
"SSH_ONLY|RULE_1": {
"PRIORITY": "9999",
"PACKET_ACTION": "ACCEPT",
"SRC_IP": "10.42.0.0/24",
"IP_TYPE": "IP",
}
}
patch_base_config(mocker, base_config=base)
patch_orchestrator_helpers.get_device_oob_ip.return_value = None
device = make_orchestrator_device()

config = generate_sonic_config(device, "HWSKU")

assert "ACL_TABLE" not in config
assert "ACL_RULE" not in config
patch_orchestrator_helpers._add_ssh_acl_configuration.assert_not_called()


# ---------------------------------------------------------------------------
Expand Down
72 changes: 72 additions & 0 deletions tests/unit/tasks/conductor/sonic/test_config_generator_ssh_acl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# SPDX-License-Identifier: Apache-2.0

"""Unit tests for ``_add_ssh_acl_configuration`` in ``config_generator``."""

from types import SimpleNamespace

import pytest

from osism.tasks.conductor.sonic.config_generator import _add_ssh_acl_configuration

pytestmark = pytest.mark.usefixtures("reset_config_generator_caches")


def _device(name="leaf-1"):
return SimpleNamespace(name=name)


def test_add_ssh_acl_configuration_emits_ctrlplane_table_and_accept_rule():
config = {}

_add_ssh_acl_configuration(config, _device(), ("10.42.0.5", 24))

assert config["ACL_TABLE"] == {
"SSH_ONLY": {
"policy_desc": "SSH_ONLY",
"type": "CTRLPLANE",
"services": ["SSH"],
}
}
assert config["ACL_RULE"] == {
"SSH_ONLY|RULE_1": {
"PRIORITY": "9999",
"PACKET_ACTION": "ACCEPT",
"SRC_IP": "10.42.0.0/24",
"IP_TYPE": "IP",
}
}


@pytest.mark.parametrize(
"oob_ip,prefix_len,expected_src",
[
("10.42.0.5", 24, "10.42.0.0/24"), # host bits stripped
("192.168.45.123", 26, "192.168.45.64/26"), # non-octet boundary
("10.42.0.0", 24, "10.42.0.0/24"), # already the network address
],
)
def test_add_ssh_acl_configuration_normalises_src_ip_to_network_address(
oob_ip, prefix_len, expected_src
):
"""The OOB IP is a host address — the rule must carry its subnet."""
config = {}

_add_ssh_acl_configuration(config, _device(), (oob_ip, prefix_len))

assert config["ACL_RULE"]["SSH_ONLY|RULE_1"]["SRC_IP"] == expected_src


def test_add_ssh_acl_configuration_replaces_preexisting_tables():
"""ACL_TABLE / ACL_RULE are owned tables — entries carried over from the
base config_db.json are replaced wholesale on regen."""
config = {
"ACL_TABLE": {"OPERATOR_TABLE": {"type": "L3"}},
"ACL_RULE": {"OPERATOR_TABLE|RULE_1": {"PACKET_ACTION": "DROP"}},
}

_add_ssh_acl_configuration(config, _device(), ("10.42.0.5", 24))

assert "OPERATOR_TABLE" not in config["ACL_TABLE"]
assert "OPERATOR_TABLE|RULE_1" not in config["ACL_RULE"]
assert "SSH_ONLY" in config["ACL_TABLE"]
assert "SSH_ONLY|RULE_1" in config["ACL_RULE"]