diff options
author | Daniel Salzman <daniel.salzman@nic.cz> | 2024-09-26 17:21:41 +0200 |
---|---|---|
committer | Daniel Salzman <daniel.salzman@nic.cz> | 2024-09-27 12:44:40 +0200 |
commit | 40e188ab629937502e0d19daa7096fa4d34231df (patch) | |
tree | 05e40924440d6ba7ce3aea7fc0870ac0f0d35cdd /tests-extra/tests | |
parent | acl: add protocol option to the configuration (diff) | |
download | knot-40e188ab629937502e0d19daa7096fa4d34231df.tar.xz knot-40e188ab629937502e0d19daa7096fa4d34231df.zip |
tests-extra: add test for ACL configuration
Diffstat (limited to 'tests-extra/tests')
-rw-r--r-- | tests-extra/tests/config/acl/test.py | 208 |
1 files changed, 208 insertions, 0 deletions
diff --git a/tests-extra/tests/config/acl/test.py b/tests-extra/tests/config/acl/test.py new file mode 100644 index 000000000..3655b63b0 --- /dev/null +++ b/tests-extra/tests/config/acl/test.py @@ -0,0 +1,208 @@ +#!/usr/bin/env python3 + +'''Test for ACL configuration''' + +import os +import random + +from dnstest.libknot import libknot +from dnstest.test import Test +from dnstest.utils import * + +RMT = random.choice([False, True]) +detail_log("ACL remote %s" % (str(RMT))) + +RMT_ID = "rmt_test" +ACL_ID = "acl_test" + +def set_remote(addrs=list(), protos=list(), cert_key=None): + + try: + ctl.send_block(cmd="conf-unset", section="remote", identifier=RMT_ID) + resp = ctl.receive_block() + except: + pass + + ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID) + resp = ctl.receive_block() + + for addr in addrs: + ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="address", data=addr) + resp = ctl.receive_block() + for proto in protos: + if proto == "quic": + ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="quic", data="on") + resp = ctl.receive_block() + if proto == "tls": + ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="tls", data="on") + resp = ctl.receive_block() + if cert_key: + ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="cert-key", data=cert_key) + resp = ctl.receive_block() + +def set_autoacl(server, zone, item, addrs=list(), protos=list(), cert_key=None): + + ZONE = zone.name + + ctl.connect(os.path.join(server.dir, "knot.sock")) + ctl.send_block(cmd="conf-begin") + resp = ctl.receive_block() + + ctl.send_block(cmd="conf-set", section="server", item="automatic-acl", data="on") + resp = ctl.receive_block() + + if not addrs: + addrs = [server.addr] + set_remote(addrs, protos, cert_key) + ctl.send_block(cmd="conf-set", section="zone", identifier=ZONE, item=item, data=RMT_ID) + resp = ctl.receive_block() + + try: + ctl.send_block(cmd="conf-unset", section="zone", identifier=ZONE, item="acl") + resp = ctl.receive_block() + except: + pass + + ctl.send_block(cmd="conf-commit") + resp = ctl.receive_block() + + ctl.send(libknot.control.KnotCtlType.END) + ctl.close() + +def set_acl(server, actions, deny=False, addrs=list(), protos=list(), cert_key=None): + + ctl.connect(os.path.join(server.dir, "knot.sock")) + ctl.send_block(cmd="conf-begin") + resp = ctl.receive_block() + + try: + ctl.send_block(cmd="conf-unset", section="acl", identifier=ACL_ID) + resp = ctl.receive_block() + except: + pass + + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID) + resp = ctl.receive_block() + + for action in actions: + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="action", data=action) + resp = ctl.receive_block() + if deny: + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="deny", data="on") + resp = ctl.receive_block() + + if RMT: + if not addrs: + addrs = [server.addr] + set_remote(addrs, protos, cert_key) + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="remote", data=RMT_ID) + resp = ctl.receive_block() + else: + for addr in addrs: + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="address", data=addr) + resp = ctl.receive_block() + for proto in protos: + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="protocol", data=proto) + resp = ctl.receive_block() + if cert_key: + ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="cert-key", data=cert_key) + resp = ctl.receive_block() + + ctl.send_block(cmd="conf-commit") + resp = ctl.receive_block() + + ctl.send(libknot.control.KnotCtlType.END) + ctl.close() + +def send_upd(server, zone, rcode="NOERROR", proto=Proto.TCP): + + up = server.update(zone) + up.add("test", 3600, "A", "1.2.3.4") + up.send(rcode, proto=proto) + +def test_normal(server, zone): + + ZONE = zone.name + + # Purge the ACL rule. + set_acl(server, []) + + # Normal queries don't require authorization. + resp = server.dig(ZONE, "SOA") + resp.check(rcode="NOERROR") + + # Authorized operations are denied by default. + resp = server.dig(ZONE, "NOTIFY") + resp.check(rcode="NOTAUTH") + resp = server.dig(ZONE, "AXFR") + resp.check_xfr(rcode="NOTAUTH") + + # Authorize only one action. + set_acl(server, ["notify"]) + resp = server.dig(ZONE, "NOTIFY") + resp.check(rcode="NOERROR") + resp = server.dig(ZONE, "AXFR") + resp.check_xfr(rcode="NOTAUTH") + + # Authorize more actions. + set_acl(server, ["notify", "transfer"]) + resp = server.dig(ZONE, "NOTIFY") + resp.check(rcode="NOERROR") + resp = server.dig(ZONE, "AXFR") + resp.check_xfr(rcode="NOERROR") + send_upd(server, zone, "NOTAUTH") + + # Deny explicit action. + set_acl(server, ["notify"], deny=True) + resp = server.dig(ZONE, "NOTIFY") + resp.check(rcode="NOTAUTH") + + # Authorize only some protocols. + set_acl(server, ["update"], addrs=[t.addr], protos=["tls", "udp"]) + send_upd(server, zone, "NOTAUTH", Proto.TCP) + send_upd(server, zone, "NOERROR", Proto.TLS) + if not RMT: # Remote doesn't distinguish between UDP and TCP. + send_upd(server, zone, "NOERROR", Proto.UDP) + + # Authorize different address and protocol. + set_acl(server, ["update"], addrs=["192.0.2.1"], protos=["tls", "udp"]) + send_upd(server, zone, "NOTAUTH", Proto.TLS) + send_upd(server, zone, "NOTAUTH", Proto.TCP) + + + # Automatic ACL for XFR. + set_autoacl(server, zone, "notify", protos=["tcp"]) + resp = server.dig(ZONE, "AXFR") + resp.check_xfr(rcode="NOERROR") + + # Automatic ACL for XFR bad proto. + set_autoacl(server, zone, "notify", protos=["tls"]) + resp = server.dig(ZONE, "AXFR") + resp.check_xfr(rcode="NOTAUTH") + + # Automatic ACL for NOTIFY. + set_autoacl(server, zone, "master", protos=["tcp"]) + resp = server.dig(ZONE, "NOTIFY") + resp.check(rcode="NOERROR") + + # Automatic ACL for NOTIFY bad proto. + set_autoacl(server, zone, "master", protos=["quic"]) + resp = server.dig(ZONE, "NOTIFY") + resp.check(rcode="NOTAUTH") + +t = Test(quic=True, tls=True, tsig=False) + +master = t.server("knot") +zones = t.zone_rnd(1, records=5, dnssec=False) +zones[0].name = zones[0].name.lower() +t.link(zones, master) + +ctl = libknot.control.KnotCtl() + +t.start() + +master.zones_wait(zones) + +test_normal(master, zones[0]) + +t.end() |