From d56d503c329bad3c6bffdb510435812215bb6ca9 Mon Sep 17 00:00:00 2001 From: Q Misell Date: Mon, 28 Oct 2024 13:19:05 +0100 Subject: [PATCH] limit creation of autnum and inetnum --- irrd/storage/queries.py | 10 ++++++++++ irrd/updates/validators.py | 21 +++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/irrd/storage/queries.py b/irrd/storage/queries.py index f1837d2..4069a61 100644 --- a/irrd/storage/queries.py +++ b/irrd/storage/queries.py @@ -255,6 +255,16 @@ def ip_less_specific(self, ip: IP): ) return self._filter(fltr) + def ip_less_specific_range(self, ip_first: IP, ip_last: IP): + """Filter any less specifics or exact matches of a prefix.""" + fltr = sa.and_( + self.columns.ip_first <= str(ip_first), + self.columns.ip_last >= str(ip_last), + self.columns.ip_version == ip_first.version(), + self.columns.ip_version == ip_last.version(), + ) + return self._filter(fltr) + def ip_less_specific_one_level(self, ip: IP): """ Filter one level less specific of a prefix. diff --git a/irrd/updates/validators.py b/irrd/updates/validators.py index 34b0567..1f9ca46 100644 --- a/irrd/updates/validators.py +++ b/irrd/updates/validators.py @@ -388,6 +388,9 @@ def process_auth( result, related_mntner_list, rpsl_obj_new, related_object_class, related_pk ) result.mntners_notify = related_mntners_result.associated_mntners + else: + if isinstance(rpsl_obj_new, RPSLInetnum) or isinstance(rpsl_obj_new, RPSLInet6Num): + result.error_messages.add("New inet(6)num objects must be added by an administrator.") if isinstance(rpsl_obj_new, RPSLMntner): if not rpsl_obj_current: @@ -420,6 +423,10 @@ def process_auth( ): result.error_messages.add("Authorisation failed for the auth methods on this mntner object.") + if isinstance(rpsl_obj_new, RPSLAutNum) or isinstance(rpsl_obj_new, RPSLAsBlock): + result.error_messages.add("New AS objects must be added by an administrator.") + return result + mntner_result_for_change_log = current_mntners_result or related_mntners_result or new_mntners_result if mntner_result_for_change_log: result.auth_method = mntner_result_for_change_log.auth_method @@ -605,6 +612,8 @@ def _find_related_mntners( related_object = None if rpsl_obj_new.rpsl_object_class in ["route", "route6"]: related_object = self._find_related_object_route(rpsl_obj_new) + if rpsl_obj_new.rpsl_object_class in ("inetnum", "inet6num"): + related_object = self._find_related_object_inetnum(rpsl_obj_new) if issubclass(rpsl_obj_new.__class__, RPSLSet): related_object = self._find_related_object_set(rpsl_obj_new, result) @@ -651,6 +660,18 @@ def _find_related_object_route(self, rpsl_obj_new: RPSLObject): return None + @functools.lru_cache(maxsize=50) + def _find_related_object_inetnum(self, rpsl_obj_new: RPSLObject): + query = _init_related_object_query(rpsl_obj_new.rpsl_object_class, rpsl_obj_new).ip_less_specific_range( + ip_first=rpsl_obj_new.ip_first, ip_last=rpsl_obj_new.ip_last, + ) + inetnums = list(self.database_handler.execute_query(query)) + logging.info(f"{query} {inetnums}") + if inetnums: + return inetnums[0] + + return None + def _find_related_object_set(self, rpsl_obj_new: RPSLObject, result: ValidatorResult): """ Find the related aut-num object to rpsl_obj_new, which must be a set object,