summaryrefslogtreecommitdiffstats
path: root/python/knot_resolver/client/commands/cache.py
blob: 60417eec5df7272e531893526a8400a93b4672b7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import argparse
import sys
from enum import Enum
from typing import Any, Dict, List, Optional, Tuple, Type

from knot_resolver.client.command import Command, CommandArgs, CompWords, register_command
from knot_resolver.datamodel.cache_schema import CacheClearRPCSchema
from knot_resolver.utils.modeling.exceptions import AggregateDataValidationError, DataValidationError
from knot_resolver.utils.modeling.parsing import DataFormat, parse_json
from knot_resolver.utils.requests import request


class CacheOperations(Enum):
    CLEAR = 0


@register_command
class CacheCommand(Command):
    def __init__(self, namespace: argparse.Namespace) -> None:
        super().__init__(namespace)
        self.operation: Optional[CacheOperations] = namespace.operation if hasattr(namespace, "operation") else None
        self.output_format: DataFormat = (
            namespace.output_format if hasattr(namespace, "output_format") else DataFormat.YAML
        )

        # CLEAR operation
        self.clear_dict: Dict[str, Any] = {}
        if hasattr(namespace, "exact_name"):
            self.clear_dict["exact-name"] = namespace.exact_name
        if hasattr(namespace, "name"):
            self.clear_dict["name"] = namespace.name
        if hasattr(namespace, "rr_type"):
            self.clear_dict["rr-type"] = namespace.rr_type
        if hasattr(namespace, "chunk_size"):
            self.clear_dict["chunk-size"] = namespace.chunk_size

    @staticmethod
    def register_args_subparser(
        subparser: "argparse._SubParsersAction[argparse.ArgumentParser]",
    ) -> Tuple[argparse.ArgumentParser, "Type[Command]"]:
        cache_parser = subparser.add_parser("cache", help="Performs operations on the cache of the running resolver.")

        config_subparsers = cache_parser.add_subparsers(help="operation type")

        # 'clear' operation
        clear_subparser = config_subparsers.add_parser(
            "clear", help="Purge cache records that match specified criteria."
        )
        clear_subparser.set_defaults(operation=CacheOperations.CLEAR, exact_name=False)
        clear_subparser.add_argument(
            "--exact-name",
            help="If set, only records with the same name are purged.",
            action="store_true",
            dest="exact_name",
        )
        clear_subparser.add_argument(
            "--rr-type",
            help="Optional, the resource record type to purge. It is supported only with the '--exact-name' flag set.",
            action="store",
            type=str,
        )
        clear_subparser.add_argument(
            "--chunk-size",
            help="Optional, the number of records to remove in one round; the default is 100."
            " The purpose is not to block the resolver for long."
            " The resolver repeats the cache clearing after one millisecond until all matching data is cleared.",
            action="store",
            type=int,
            default=100,
        )
        clear_subparser.add_argument(
            "name",
            type=str,
            nargs="?",
            help="Optional, subtree name to purge; if omitted, the entire cache is purged (and all other parameters are ignored).",
            default=None,
        )

        output_format = clear_subparser.add_mutually_exclusive_group()
        output_format_default = DataFormat.YAML
        output_format.add_argument(
            "--json",
            help="Set JSON as the output format.",
            const=DataFormat.JSON,
            action="store_const",
            dest="output_format",
            default=output_format_default,
        )
        output_format.add_argument(
            "--yaml",
            help="Set YAML as the output format. YAML is the default.",
            const=DataFormat.YAML,
            action="store_const",
            dest="output_format",
            default=output_format_default,
        )

        return cache_parser, CacheCommand

    @staticmethod
    def completion(args: List[str], parser: argparse.ArgumentParser) -> CompWords:
        return {}

    def run(self, args: CommandArgs) -> None:
        if not self.operation:
            args.subparser.print_help()
            sys.exit()

        if self.operation == CacheOperations.CLEAR:
            try:
                validated = CacheClearRPCSchema(self.clear_dict)
            except (AggregateDataValidationError, DataValidationError) as e:
                print(e, file=sys.stderr)
                sys.exit(1)

            body: str = DataFormat.JSON.dict_dump(validated.get_unparsed_data())
            response = request(args.socket, "POST", "cache/clear", body)
            body_dict = parse_json(response.body)

        if response.status != 200:
            print(response, file=sys.stderr)
            sys.exit(1)
        print(self.output_format.dict_dump(body_dict, indent=4))