summaryrefslogtreecommitdiffstats
path: root/tests-extra/tests
diff options
context:
space:
mode:
authorDaniel Salzman <daniel.salzman@nic.cz>2024-09-26 17:21:41 +0200
committerDaniel Salzman <daniel.salzman@nic.cz>2024-09-27 12:44:40 +0200
commit40e188ab629937502e0d19daa7096fa4d34231df (patch)
tree05e40924440d6ba7ce3aea7fc0870ac0f0d35cdd /tests-extra/tests
parentacl: add protocol option to the configuration (diff)
downloadknot-40e188ab629937502e0d19daa7096fa4d34231df.tar.xz
knot-40e188ab629937502e0d19daa7096fa4d34231df.zip
tests-extra: add test for ACL configuration
Diffstat (limited to 'tests-extra/tests')
-rw-r--r--tests-extra/tests/config/acl/test.py208
1 files changed, 208 insertions, 0 deletions
diff --git a/tests-extra/tests/config/acl/test.py b/tests-extra/tests/config/acl/test.py
new file mode 100644
index 000000000..3655b63b0
--- /dev/null
+++ b/tests-extra/tests/config/acl/test.py
@@ -0,0 +1,208 @@
+#!/usr/bin/env python3
+
+'''Test for ACL configuration'''
+
+import os
+import random
+
+from dnstest.libknot import libknot
+from dnstest.test import Test
+from dnstest.utils import *
+
+RMT = random.choice([False, True])
+detail_log("ACL remote %s" % (str(RMT)))
+
+RMT_ID = "rmt_test"
+ACL_ID = "acl_test"
+
+def set_remote(addrs=list(), protos=list(), cert_key=None):
+
+ try:
+ ctl.send_block(cmd="conf-unset", section="remote", identifier=RMT_ID)
+ resp = ctl.receive_block()
+ except:
+ pass
+
+ ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID)
+ resp = ctl.receive_block()
+
+ for addr in addrs:
+ ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="address", data=addr)
+ resp = ctl.receive_block()
+ for proto in protos:
+ if proto == "quic":
+ ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="quic", data="on")
+ resp = ctl.receive_block()
+ if proto == "tls":
+ ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="tls", data="on")
+ resp = ctl.receive_block()
+ if cert_key:
+ ctl.send_block(cmd="conf-set", section="remote", identifier=RMT_ID, item="cert-key", data=cert_key)
+ resp = ctl.receive_block()
+
+def set_autoacl(server, zone, item, addrs=list(), protos=list(), cert_key=None):
+
+ ZONE = zone.name
+
+ ctl.connect(os.path.join(server.dir, "knot.sock"))
+ ctl.send_block(cmd="conf-begin")
+ resp = ctl.receive_block()
+
+ ctl.send_block(cmd="conf-set", section="server", item="automatic-acl", data="on")
+ resp = ctl.receive_block()
+
+ if not addrs:
+ addrs = [server.addr]
+ set_remote(addrs, protos, cert_key)
+ ctl.send_block(cmd="conf-set", section="zone", identifier=ZONE, item=item, data=RMT_ID)
+ resp = ctl.receive_block()
+
+ try:
+ ctl.send_block(cmd="conf-unset", section="zone", identifier=ZONE, item="acl")
+ resp = ctl.receive_block()
+ except:
+ pass
+
+ ctl.send_block(cmd="conf-commit")
+ resp = ctl.receive_block()
+
+ ctl.send(libknot.control.KnotCtlType.END)
+ ctl.close()
+
+def set_acl(server, actions, deny=False, addrs=list(), protos=list(), cert_key=None):
+
+ ctl.connect(os.path.join(server.dir, "knot.sock"))
+ ctl.send_block(cmd="conf-begin")
+ resp = ctl.receive_block()
+
+ try:
+ ctl.send_block(cmd="conf-unset", section="acl", identifier=ACL_ID)
+ resp = ctl.receive_block()
+ except:
+ pass
+
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID)
+ resp = ctl.receive_block()
+
+ for action in actions:
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="action", data=action)
+ resp = ctl.receive_block()
+ if deny:
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="deny", data="on")
+ resp = ctl.receive_block()
+
+ if RMT:
+ if not addrs:
+ addrs = [server.addr]
+ set_remote(addrs, protos, cert_key)
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="remote", data=RMT_ID)
+ resp = ctl.receive_block()
+ else:
+ for addr in addrs:
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="address", data=addr)
+ resp = ctl.receive_block()
+ for proto in protos:
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="protocol", data=proto)
+ resp = ctl.receive_block()
+ if cert_key:
+ ctl.send_block(cmd="conf-set", section="acl", identifier=ACL_ID, item="cert-key", data=cert_key)
+ resp = ctl.receive_block()
+
+ ctl.send_block(cmd="conf-commit")
+ resp = ctl.receive_block()
+
+ ctl.send(libknot.control.KnotCtlType.END)
+ ctl.close()
+
+def send_upd(server, zone, rcode="NOERROR", proto=Proto.TCP):
+
+ up = server.update(zone)
+ up.add("test", 3600, "A", "1.2.3.4")
+ up.send(rcode, proto=proto)
+
+def test_normal(server, zone):
+
+ ZONE = zone.name
+
+ # Purge the ACL rule.
+ set_acl(server, [])
+
+ # Normal queries don't require authorization.
+ resp = server.dig(ZONE, "SOA")
+ resp.check(rcode="NOERROR")
+
+ # Authorized operations are denied by default.
+ resp = server.dig(ZONE, "NOTIFY")
+ resp.check(rcode="NOTAUTH")
+ resp = server.dig(ZONE, "AXFR")
+ resp.check_xfr(rcode="NOTAUTH")
+
+ # Authorize only one action.
+ set_acl(server, ["notify"])
+ resp = server.dig(ZONE, "NOTIFY")
+ resp.check(rcode="NOERROR")
+ resp = server.dig(ZONE, "AXFR")
+ resp.check_xfr(rcode="NOTAUTH")
+
+ # Authorize more actions.
+ set_acl(server, ["notify", "transfer"])
+ resp = server.dig(ZONE, "NOTIFY")
+ resp.check(rcode="NOERROR")
+ resp = server.dig(ZONE, "AXFR")
+ resp.check_xfr(rcode="NOERROR")
+ send_upd(server, zone, "NOTAUTH")
+
+ # Deny explicit action.
+ set_acl(server, ["notify"], deny=True)
+ resp = server.dig(ZONE, "NOTIFY")
+ resp.check(rcode="NOTAUTH")
+
+ # Authorize only some protocols.
+ set_acl(server, ["update"], addrs=[t.addr], protos=["tls", "udp"])
+ send_upd(server, zone, "NOTAUTH", Proto.TCP)
+ send_upd(server, zone, "NOERROR", Proto.TLS)
+ if not RMT: # Remote doesn't distinguish between UDP and TCP.
+ send_upd(server, zone, "NOERROR", Proto.UDP)
+
+ # Authorize different address and protocol.
+ set_acl(server, ["update"], addrs=["192.0.2.1"], protos=["tls", "udp"])
+ send_upd(server, zone, "NOTAUTH", Proto.TLS)
+ send_upd(server, zone, "NOTAUTH", Proto.TCP)
+
+
+ # Automatic ACL for XFR.
+ set_autoacl(server, zone, "notify", protos=["tcp"])
+ resp = server.dig(ZONE, "AXFR")
+ resp.check_xfr(rcode="NOERROR")
+
+ # Automatic ACL for XFR bad proto.
+ set_autoacl(server, zone, "notify", protos=["tls"])
+ resp = server.dig(ZONE, "AXFR")
+ resp.check_xfr(rcode="NOTAUTH")
+
+ # Automatic ACL for NOTIFY.
+ set_autoacl(server, zone, "master", protos=["tcp"])
+ resp = server.dig(ZONE, "NOTIFY")
+ resp.check(rcode="NOERROR")
+
+ # Automatic ACL for NOTIFY bad proto.
+ set_autoacl(server, zone, "master", protos=["quic"])
+ resp = server.dig(ZONE, "NOTIFY")
+ resp.check(rcode="NOTAUTH")
+
+t = Test(quic=True, tls=True, tsig=False)
+
+master = t.server("knot")
+zones = t.zone_rnd(1, records=5, dnssec=False)
+zones[0].name = zones[0].name.lower()
+t.link(zones, master)
+
+ctl = libknot.control.KnotCtl()
+
+t.start()
+
+master.zones_wait(zones)
+
+test_normal(master, zones[0])
+
+t.end()