1b8dd5
From 4011007b445e8f8da9b0cc45eccd793b94f6b5ce Mon Sep 17 00:00:00 2001
1b8dd5
From: Sergio Correia <scorreia@redhat.com>
1b8dd5
Date: Thu, 29 Jul 2021 19:25:43 -0300
1b8dd5
Subject: [PATCH] Add ausysrulevalidate
1b8dd5
1b8dd5
---
1b8dd5
 contrib/ausysrulevalidate | 198 ++++++++++++++++++++++++++++++++++++++
1b8dd5
 1 file changed, 198 insertions(+)
1b8dd5
 create mode 100755 contrib/ausysrulevalidate
1b8dd5
1b8dd5
diff --git a/contrib/ausysrulevalidate b/contrib/ausysrulevalidate
1b8dd5
new file mode 100755
1b8dd5
index 0000000..a251b2c
1b8dd5
--- /dev/null
1b8dd5
+++ b/contrib/ausysrulevalidate
1b8dd5
@@ -0,0 +1,198 @@
1b8dd5
+#!/usr/bin/env python3
1b8dd5
+# -*- coding: utf-8 -*-
1b8dd5
+
1b8dd5
+# ausysrulevalidate - A program that lets you validate the syscalls
1b8dd5
+# in audit rules.
1b8dd5
+# Copyright (c) 2021 Red Hat Inc., Durham, North Carolina.
1b8dd5
+# All Rights Reserved.
1b8dd5
+#
1b8dd5
+# This software may be freely redistributed and/or modified under the
1b8dd5
+# terms of the GNU General Public License as published by the Free
1b8dd5
+# Software Foundation; either version 2, or (at your option) any
1b8dd5
+# later version.
1b8dd5
+#
1b8dd5
+# This program is distributed in the hope that it will be useful,
1b8dd5
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
1b8dd5
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
1b8dd5
+# GNU General Public License for more details.
1b8dd5
+#
1b8dd5
+# You should have received a copy of the GNU General Public License
1b8dd5
+# along with this program; see the file COPYING. If not, write to the
1b8dd5
+# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor
1b8dd5
+# Boston, MA 02110-1335, USA.
1b8dd5
+#
1b8dd5
+# Authors:
1b8dd5
+#   Sergio Correia <scorreia@redhat.com>
1b8dd5
+
1b8dd5
+""" This program lets you validate syscalls in audit rules. """
1b8dd5
+
1b8dd5
+import argparse
1b8dd5
+import os.path
1b8dd5
+import sys
1b8dd5
+
1b8dd5
+import audit
1b8dd5
+
1b8dd5
+
1b8dd5
+class AuSyscallRuleValidate:
1b8dd5
+    """AuSyscallRuleValidate validates syscalls in audit rules."""
1b8dd5
+
1b8dd5
+    def __init__(self):
1b8dd5
+        self.syscalls_table = {}
1b8dd5
+        self.invalid_syscalls = {}
1b8dd5
+        self.machines = {
1b8dd5
+            "b32": audit.audit_determine_machine("b32"),
1b8dd5
+            "b64": audit.audit_determine_machine("b64"),
1b8dd5
+        }
1b8dd5
+
1b8dd5
+        if self.machines["b32"] == -1 or self.machines["b64"] == -1:
1b8dd5
+            sys.stderr.write("ERROR: Unable to determine machine type\n")
1b8dd5
+            sys.exit(1)
1b8dd5
+
1b8dd5
+    def validate_syscall(self, arch, syscall):
1b8dd5
+        """Validates a single syscall."""
1b8dd5
+
1b8dd5
+        if syscall == "all":
1b8dd5
+            return True
1b8dd5
+
1b8dd5
+        lookup = "{0}:{1}".format(arch, syscall)
1b8dd5
+        if lookup in self.syscalls_table:
1b8dd5
+            return self.syscalls_table[lookup]
1b8dd5
+
1b8dd5
+        ret = audit.audit_name_to_syscall(syscall, self.machines[arch])
1b8dd5
+        self.syscalls_table[lookup] = ret != -1
1b8dd5
+        if not self.syscalls_table[lookup]:
1b8dd5
+            self.invalid_syscalls[lookup] = lookup
1b8dd5
+
1b8dd5
+        return self.syscalls_table[lookup]
1b8dd5
+
1b8dd5
+    def process_syscalls(self, arch, syscalls):
1b8dd5
+        """Processes a group of syscalls, validating them individually."""
1b8dd5
+
1b8dd5
+        scalls = syscalls.split(",")
1b8dd5
+        processed = []
1b8dd5
+        for syscall in scalls:
1b8dd5
+            if self.validate_syscall(arch, syscall):
1b8dd5
+                processed.append(syscall)
1b8dd5
+        return ",".join(processed)
1b8dd5
+
1b8dd5
+    def parse_line(self, line):
1b8dd5
+        """Processes a single line from the audit rules file, and returns the
1b8dd5
+        same line adjusted, if required, by removing invalid syscalls, or even
1b8dd5
+        removing the rule altogether, if no valid syscall remain after
1b8dd5
+        validation."""
1b8dd5
+
1b8dd5
+        if line.lstrip().startswith("#") or "-S" not in line:
1b8dd5
+            return line
1b8dd5
+
1b8dd5
+        # We do have a rule specifying syscalls, so let's validate them.
1b8dd5
+        tokens = line.split()
1b8dd5
+        processed = []
1b8dd5
+        is_syscall = False
1b8dd5
+        arch = None
1b8dd5
+
1b8dd5
+        for val in tokens:
1b8dd5
+            if not is_syscall:
1b8dd5
+                processed.append(val)
1b8dd5
+
1b8dd5
+            if val.startswith("arch="):
1b8dd5
+                archs = val.split("=")
1b8dd5
+                if len(archs) == 2:
1b8dd5
+                    arch = val.split("=")[1]
1b8dd5
+                    if arch not in self.machines:
1b8dd5
+                        sys.stderr.write("ERROR: unexpected arch '{0}'\n".format(arch))
1b8dd5
+                        continue
1b8dd5
+
1b8dd5
+            if val == "-S":
1b8dd5
+                is_syscall = True
1b8dd5
+                continue
1b8dd5
+
1b8dd5
+            if is_syscall:
1b8dd5
+                is_syscall = False
1b8dd5
+                scalls = self.process_syscalls(arch, val)
1b8dd5
+
1b8dd5
+                if len(scalls) == 0:
1b8dd5
+                    processed = processed[:-1]
1b8dd5
+                    continue
1b8dd5
+                processed.append(scalls)
1b8dd5
+
1b8dd5
+        if "-S" not in processed:
1b8dd5
+            # Removing rule altogether, as we have no valid syscalls remaining.
1b8dd5
+            return None
1b8dd5
+        return " ".join(processed)
1b8dd5
+
1b8dd5
+    def process_rules(self, rules_file):
1b8dd5
+        """Reads a file with audit rules and returns the rules after
1b8dd5
+        validation of syscalls/architecture. Invalid syscalls will be removed
1b8dd5
+        and, if there are no valid remaining syscalls, the rule itself is
1b8dd5
+        removed."""
1b8dd5
+
1b8dd5
+        if not os.path.isfile(rules_file):
1b8dd5
+            sys.stderr.write("ERROR: rules file '{0}' not found\n".format(rules_file))
1b8dd5
+            sys.exit(1)
1b8dd5
+
1b8dd5
+        with open(rules_file) as rules:
1b8dd5
+            content = rules.readlines()
1b8dd5
+
1b8dd5
+        processed = []
1b8dd5
+        changed = False
1b8dd5
+        for line in content:
1b8dd5
+            validated = self.parse_line(line)
1b8dd5
+            if validated is None:
1b8dd5
+                changed = True
1b8dd5
+                continue
1b8dd5
+
1b8dd5
+            if validated.rstrip("\r\n") != line.rstrip("\r\n"):
1b8dd5
+                changed = True
1b8dd5
+            processed.append(validated.rstrip("\r\n"))
1b8dd5
+
1b8dd5
+        invalid_syscalls = []
1b8dd5
+        for invalid in self.invalid_syscalls:
1b8dd5
+            invalid_syscalls.append(invalid)
1b8dd5
+
1b8dd5
+        return (processed, changed, invalid_syscalls)
1b8dd5
+
1b8dd5
+    def update_rules(self, rules_file):
1b8dd5
+        """Reads a file with audit rules and updates it after validation of
1b8dd5
+        syscalls/architecture. Invalid syscalls will be removed and, if
1b8dd5
+        there are no valid remaining syscalls, the rule itself is removed."""
1b8dd5
+
1b8dd5
+        new_rules, changed, invalid_syscalls = self.process_rules(rules_file)
1b8dd5
+        if changed:
1b8dd5
+            with open(rules_file, "w") as rules:
1b8dd5
+                for line in new_rules:
1b8dd5
+                    rules.write("{0}\n".format(line))
1b8dd5
+
1b8dd5
+        return (new_rules, changed, invalid_syscalls)
1b8dd5
+
1b8dd5
+
1b8dd5
+if __name__ == "__main__":
1b8dd5
+    parser = argparse.ArgumentParser(description="ausysrulevalidate")
1b8dd5
+    parser.add_argument(
1b8dd5
+        "-u", "--update", help="Update rules file if required", action="store_true"
1b8dd5
+    )
1b8dd5
+    parser.add_argument(
1b8dd5
+        "-v", "--verbose", help="Show the resulting rules file", action="store_true"
1b8dd5
+    )
1b8dd5
+    required_named = parser.add_argument_group("required named arguments")
1b8dd5
+    required_named.add_argument(
1b8dd5
+        "-r", "--rules-file", help="Rules file name", required=True
1b8dd5
+    )
1b8dd5
+    args = parser.parse_args()
1b8dd5
+
1b8dd5
+    validator = AuSyscallRuleValidate()
1b8dd5
+
1b8dd5
+    action = validator.process_rules
1b8dd5
+    if args.update:
1b8dd5
+        action = validator.update_rules
1b8dd5
+
1b8dd5
+    data, changed, invalid = action(args.rules_file)
1b8dd5
+    if changed:
1b8dd5
+        verb = "require"
1b8dd5
+        if args.update:
1b8dd5
+            verb += "d"
1b8dd5
+        sys.stderr.write("Rules in '{0}' {1} changes\n".format(args.rules_file, verb))
1b8dd5
+        if len(invalid) > 0:
1b8dd5
+            sys.stderr.write("Invalid syscalls: {0}\n".format(", ".join(invalid)))
1b8dd5
+
1b8dd5
+    if args.verbose:
1b8dd5
+        print(*data, sep="\n")
1b8dd5
-- 
1b8dd5
2.31.1
1b8dd5