diff --git a/osism/tasks/conductor/sonic/config_generator.py b/osism/tasks/conductor/sonic/config_generator.py index 76d72e43..f777bc38 100644 --- a/osism/tasks/conductor/sonic/config_generator.py +++ b/osism/tasks/conductor/sonic/config_generator.py @@ -118,6 +118,8 @@ "SNMP_SERVER_PARAMS", "SNMP_SERVER_TARGET", "SYSLOG_SERVER", + "ACL_TABLE", + "ACL_RULE", ) # Tables fully owned by the generator and rebuilt from scratch on every regen. @@ -374,6 +376,9 @@ def generate_sonic_config(device, hwsku, device_as_mapping=None, config_version= config["MGMT_INTERFACE"][f"eth0|{oob_ip}/{prefix_len}"] = {} metalbox_ip = _get_metalbox_ip_for_device(device) config["STATIC_ROUTE"]["mgmt|0.0.0.0/0"] = {"nexthop": metalbox_ip} + + # Restrict control-plane SSH to the OOB management network + _add_ssh_acl_configuration(config, device, oob_ip_result) else: oob_ip = None @@ -2202,6 +2207,40 @@ def _add_log_server_configuration(config, device): logger.debug(f"Added syslog_server {host}") +def _add_ssh_acl_configuration(config, device, oob_ip_result): + """Add a control-plane ACL restricting SSH to the management network. + + Emits an ACL_TABLE of type CTRLPLANE bound to the SSH service plus an + ACL_RULE that accepts SSH only from the device's OOB management subnet + (derived from the same data as MGMT_INTERFACE). Once a CTRLPLANE table + binds a service, caclmgrd installs an implicit default-drop for it, so + sources outside the permitted subnet — e.g. front-panel interfaces with + IPs in the default VRF — can no longer reach SSH. + """ + oob_ip, prefix_len = oob_ip_result + oob_network = ipaddress.IPv4Network(f"{oob_ip}/{prefix_len}", strict=False) + + config["ACL_TABLE"] = { + "SSH_ONLY": { + "policy_desc": "SSH_ONLY", + "type": "CTRLPLANE", + "services": ["SSH"], + } + } + config["ACL_RULE"] = { + "SSH_ONLY|RULE_1": { + "PRIORITY": "9999", + "PACKET_ACTION": "ACCEPT", + "SRC_IP": str(oob_network), + "IP_TYPE": "IP", + } + } + + logger.debug( + f"Added SSH control-plane ACL permitting {oob_network} for device {device.name}" + ) + + def _add_snmp_configuration(config, device, oob_ip): """Add Snmp configuration to device config. diff --git a/tests/unit/tasks/conductor/sonic/test_config_generator_orchestrator.py b/tests/unit/tasks/conductor/sonic/test_config_generator_orchestrator.py index be149e67..ed639895 100644 --- a/tests/unit/tasks/conductor/sonic/test_config_generator_orchestrator.py +++ b/tests/unit/tasks/conductor/sonic/test_config_generator_orchestrator.py @@ -94,6 +94,7 @@ def patch(name, **kw): _add_vlan_configuration=patch("_add_vlan_configuration"), _add_loopback_configuration=patch("_add_loopback_configuration"), _add_log_server_configuration=patch("_add_log_server_configuration"), + _add_ssh_acl_configuration=patch("_add_ssh_acl_configuration"), _add_snmp_configuration=patch("_add_snmp_configuration"), _add_vrf_configuration=patch("_add_vrf_configuration"), _add_portchannel_configuration=patch("_add_portchannel_configuration"), @@ -351,6 +352,9 @@ def test_generate_sonic_config_populates_mgmt_interface_and_static_route( # SNMP receives the OOB IP for SNMP_AGENT_ADDRESS_CONFIG wiring. _, _, snmp_oob = patch_orchestrator_helpers._add_snmp_configuration.call_args.args assert snmp_oob == "10.42.0.5" + # The SSH control-plane ACL is wired with the full OOB result. + ssh_acl_mock = patch_orchestrator_helpers._add_ssh_acl_configuration + ssh_acl_mock.assert_called_once_with(config, device, ("10.42.0.5", 24)) def test_generate_sonic_config_static_route_dropped_on_regen( @@ -393,6 +397,47 @@ def test_generate_sonic_config_no_oob_ip_leaves_mgmt_empty_and_passes_none( assert "mgmt|0.0.0.0/0" not in config["STATIC_ROUTE"] _, _, snmp_oob = patch_orchestrator_helpers._add_snmp_configuration.call_args.args assert snmp_oob is None + # Without an OOB IP there is no subnet to permit — no SSH ACL is emitted. + patch_orchestrator_helpers._add_ssh_acl_configuration.assert_not_called() + + +def test_generate_sonic_config_no_oob_ip_drops_stale_acl_tables( + mocker, patch_orchestrator_helpers, make_orchestrator_device +): + """Stale ACL_TABLE / ACL_RULE entries must not survive a regen without + an OOB IP. + + ACL_TABLE and ACL_RULE are on-demand owned tables: when the device has + no OOB IP, nothing re-emits them, so entries carried over from the base + config_db.json (e.g. the SSH ACL of an earlier regen, before the OOB IP + was removed from NetBox) must be gone — leaving them would keep an SSH + lockdown pointing at a subnet the device no longer manages. + """ + base = make_base_config() + base["ACL_TABLE"] = { + "SSH_ONLY": { + "policy_desc": "SSH_ONLY", + "type": "CTRLPLANE", + "services": ["SSH"], + } + } + base["ACL_RULE"] = { + "SSH_ONLY|RULE_1": { + "PRIORITY": "9999", + "PACKET_ACTION": "ACCEPT", + "SRC_IP": "10.42.0.0/24", + "IP_TYPE": "IP", + } + } + patch_base_config(mocker, base_config=base) + patch_orchestrator_helpers.get_device_oob_ip.return_value = None + device = make_orchestrator_device() + + config = generate_sonic_config(device, "HWSKU") + + assert "ACL_TABLE" not in config + assert "ACL_RULE" not in config + patch_orchestrator_helpers._add_ssh_acl_configuration.assert_not_called() # --------------------------------------------------------------------------- diff --git a/tests/unit/tasks/conductor/sonic/test_config_generator_ssh_acl.py b/tests/unit/tasks/conductor/sonic/test_config_generator_ssh_acl.py new file mode 100644 index 00000000..75599418 --- /dev/null +++ b/tests/unit/tasks/conductor/sonic/test_config_generator_ssh_acl.py @@ -0,0 +1,72 @@ +# SPDX-License-Identifier: Apache-2.0 + +"""Unit tests for ``_add_ssh_acl_configuration`` in ``config_generator``.""" + +from types import SimpleNamespace + +import pytest + +from osism.tasks.conductor.sonic.config_generator import _add_ssh_acl_configuration + +pytestmark = pytest.mark.usefixtures("reset_config_generator_caches") + + +def _device(name="leaf-1"): + return SimpleNamespace(name=name) + + +def test_add_ssh_acl_configuration_emits_ctrlplane_table_and_accept_rule(): + config = {} + + _add_ssh_acl_configuration(config, _device(), ("10.42.0.5", 24)) + + assert config["ACL_TABLE"] == { + "SSH_ONLY": { + "policy_desc": "SSH_ONLY", + "type": "CTRLPLANE", + "services": ["SSH"], + } + } + assert config["ACL_RULE"] == { + "SSH_ONLY|RULE_1": { + "PRIORITY": "9999", + "PACKET_ACTION": "ACCEPT", + "SRC_IP": "10.42.0.0/24", + "IP_TYPE": "IP", + } + } + + +@pytest.mark.parametrize( + "oob_ip,prefix_len,expected_src", + [ + ("10.42.0.5", 24, "10.42.0.0/24"), # host bits stripped + ("192.168.45.123", 26, "192.168.45.64/26"), # non-octet boundary + ("10.42.0.0", 24, "10.42.0.0/24"), # already the network address + ], +) +def test_add_ssh_acl_configuration_normalises_src_ip_to_network_address( + oob_ip, prefix_len, expected_src +): + """The OOB IP is a host address — the rule must carry its subnet.""" + config = {} + + _add_ssh_acl_configuration(config, _device(), (oob_ip, prefix_len)) + + assert config["ACL_RULE"]["SSH_ONLY|RULE_1"]["SRC_IP"] == expected_src + + +def test_add_ssh_acl_configuration_replaces_preexisting_tables(): + """ACL_TABLE / ACL_RULE are owned tables — entries carried over from the + base config_db.json are replaced wholesale on regen.""" + config = { + "ACL_TABLE": {"OPERATOR_TABLE": {"type": "L3"}}, + "ACL_RULE": {"OPERATOR_TABLE|RULE_1": {"PACKET_ACTION": "DROP"}}, + } + + _add_ssh_acl_configuration(config, _device(), ("10.42.0.5", 24)) + + assert "OPERATOR_TABLE" not in config["ACL_TABLE"] + assert "OPERATOR_TABLE|RULE_1" not in config["ACL_RULE"] + assert "SSH_ONLY" in config["ACL_TABLE"] + assert "SSH_ONLY|RULE_1" in config["ACL_RULE"]