[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: sync.py
File is not writable. Editing disabled.
"""Define ipsets/iptables rules for DEF-16003 synclists. i.e. for ip network blocks/unblocks received from /api/sync/v1/iplist correlation server endpoint. The expected behavior (requirements) is defined in src/handbook/message_processing/server_sync.py """ import dataclasses import ipaddress import itertools import re from abc import ABCMeta, abstractmethod from logging import getLogger from typing import ( AbstractSet, Dict, FrozenSet, Iterable, Iterator, List, Set, TypeVar, ) from im360.contracts.config import Webshield as WebshieldConfig from im360.internals.core import FirewallRules, rules from im360.internals.core.ipset import ( IPSetAtomicRestoreBase, IPSetCollectionResetMixin, libipset, ) from im360.internals.core.ipset.libipset import ( IPSetCmdBuilder, IPSetRestoreCmd, ) from im360.internals.core.rules import FirewallRule from im360.model.firewall import IPListID, IPListPurpose, IPListRecord, Purpose from im360.subsys import webshield from im360.utils import return_value_on_error from im360.utils.validate import IP, IPVersion from . import IP_SET_PREFIX, AbstractIPSet, IPSetCount, get_ipset_family logger = getLogger(__name__) __all__ = [ "IPSetSyncCaptcha", "IPSetSyncDrop", "IPSetSyncRemoteIPListSets", "IPSetSyncSplashscreen", "IPSetSyncWhite", ] Args = TypeVar("Args") T = TypeVar("T") class IPSetSync(IPSetAtomicRestoreBase, metaclass=ABCMeta): """Abstract class responsible for iptables rules for *sync ipsets. It mimics .ip.BaseIPSet just enough for .ip.IPSet.get_rules() to work. """ DB_NAME = None # not in iplist table @property @abstractmethod def purpose(self) -> Purpose: raise NotImplementedError # pragma: no cover def create_rules(self, ip_version: IPVersion) -> Iterable[dict]: """To be called by .ip.IPSet.get_rules()""" ipset_name = SingleIPSetSyncRemoteIPListSets( self.purpose ).gen_ipset_name_for_ip_version(ip_version) return map( dataclasses.asdict, self._rules(ipset_name, ip_version=ip_version) ) @abstractmethod def _rules( self, ipset_name: str, ip_version: IPVersion ) -> Iterator[FirewallRule]: raise NotImplementedError # pragma: no cover def is_enabled(self, ip_version: IPVersion = None) -> bool: """Whether it makes sense to call *create_rules*.""" return True def gen_ipset_name_for_ip_version(self, ip_version: IPVersion) -> str: """.ip.BaseIPSet method.""" return self.custom_ipset_name or SingleIPSetSyncRemoteIPListSets( self.purpose ).gen_ipset_name_for_ip_version(ip_version) async def get_db_count(self, ip_version: IPVersion) -> int: """.ip.BaseIPSet method.""" # see IPSetSyncRemoteIPListSets.get_ipsets_count() return 0 # stub def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: """.ip.BaseIPSet method.""" return [] # stub def gen_ipset_destroy_ops(self, ip_version: IPVersion) -> List[str]: return [] async def gen_ipset_restore_ops(self, ip_version: IPVersion) -> List[str]: return [] async def restore_from_persistent(self, ip_version: IPVersion) -> None: """.ip.BaseIPSet method.""" class IPSetSyncCaptcha(IPSetSync): """Responsible for iptables rules related to *captcha.sync ipsets.""" purpose = Purpose.CAPTCHA def is_enabled(self, ip_version: IPVersion = None) -> bool: return super().is_enabled() and _captcha_rules_enabled() def _rules( self, ipset_name: str, ip_version: IPVersion ) -> Iterator[FirewallRule]: assert self.is_enabled() ipset_name = SingleIPSetSyncRemoteIPListSets( self.purpose ).gen_ipset_name_for_ip_version(ip_version) return rules.webshield_rules( ipset_name, ip_version, rules.CaptchaRuleBuilder(include_webshield_ports_rules=True), ) class IPSetSyncDrop(IPSetSync): """Responsible for iptables rules related to *drop.sync ipsets.""" purpose = Purpose.DROP PRIORITY = FirewallRules.DROP_SYNC_PRIORITY def _rules( self, ipset_name: str, ip_version: IPVersion ) -> Iterator[FirewallRule]: assert self.is_enabled() return rules.drop_rules(ipset_name, ip_version, priority=self.PRIORITY) class IPSetSyncSplashscreen(IPSetSync): """Responsible for iptables rules related to *splashscreen.sync ipsets.""" purpose = Purpose.SPLASHSCREEN def is_enabled(self, ip_version: IPVersion = None) -> bool: return super().is_enabled() and _splashscreen_rules_enabled() def _rules( self, ipset_name: str, ip_version: IPVersion ) -> Iterator[FirewallRule]: assert self.is_enabled() return rules.webshield_rules( ipset_name, ip_version, rules.SplashscreenRuleBuilder(), ) class IPSetSyncWhite(IPSetSync): """Responsible for iptables rules related to *white.sync ipsets.""" purpose = Purpose.WHITE def _rules( self, ipset_name: str, ip_version: IPVersion ) -> Iterator[FirewallRule]: assert self.is_enabled() return rules.white_rules(ipset_name, ip_version) class SingleIPSetSyncRemoteIPListSets(IPSetAtomicRestoreBase): def __init__(self, purpose): super().__init__(purpose) self.purpose = purpose def gen_ipset_name_for_ip_version(self, ip_version: IPVersion) -> str: return ( self.custom_ipset_name or f"{IP_SET_PREFIX}.{ip_version}.{self.purpose}.sync" ) def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: size = max(32, 2 * len(IPSetIPListRemote.all_ipsets(ip_version))) name = self.gen_ipset_name_for_ip_version(ip_version) return [IPSetCmdBuilder.get_create_list_set_cmd(name, size=size)] def gen_ipset_destroy_ops(self, ip_version: IPVersion) -> List[str]: ipset_name = self.gen_ipset_name_for_ip_version(ip_version) return [IPSetCmdBuilder.get_destroy_cmd(ipset_name)] async def gen_ipset_restore_ops(self, ip_version: IPVersion) -> List[str]: ipset_name = self.gen_ipset_name_for_ip_version(ip_version) result: List[str] = [] for ipset in IPSetIPListRemote().get_ipset_instances_by_purpose( ip_version, [self.purpose] ): if not await ipset.exists(ip_version): result.extend(ipset.gen_ipset_create_ops(ip_version)) result.extend(await ipset.gen_ipset_restore_ops(ip_version)) result.append( IPSetCmdBuilder.get_add_cmd( ipset_name, ipset.gen_ipset_name_for_ip_version(ip_version) ) ) return result class IPSetSyncRemoteIPListSets(AbstractIPSet): """Facade to manage /api/sync remote iplist ipsets.""" def get_all_ipsets(self, ip_version: IPVersion) -> FrozenSet[str]: """Return required [system] "sync" ipset names. It does not check whether the ipsets are actually present on the system. """ return frozenset( self._all_ipsets(ip_version) ) | IPSetIPListRemote.all_ipsets(ip_version) def _all_ipsets(self, ip_version: IPVersion) -> List[str]: return [ ipset.gen_ipset_name_for_ip_version(ip_version) for ipset in self.get_all_ipset_instances(ip_version) ] def get_all_ipset_instances( self, ip_version: IPVersion ) -> List[SingleIPSetSyncRemoteIPListSets]: return [ SingleIPSetSyncRemoteIPListSets(purpose) for purpose in Purpose ] @staticmethod def _purpose_from_ipset_name(ipset_name: str) -> Purpose: return next( purpose for purpose in Purpose if f".{purpose}." in ipset_name ) def get_rules( self, ip_version: IPVersion, **kwargs: Args ) -> Iterable[dict]: """Yield remote iplist firewall rules.""" # Define iptables rules in # IPSetSync{White,Drop,Splashscreen,Captcha} instead of here, # to allow both old/new ipsets be active at the same time: old # graylist/graysplashlist ipsets may contain local/non-server, # therefore new "sync" ipset that contain server-only values # can't replace them return () # delegate actual rules to the above IPSetSync* classes async def restore(self, ip_version: IPVersion) -> None: """Restore system ipsets from db.""" # Define what actual [system] ipsets are created here instead # of in IPSetSync{White,Drop,Splashscreen,Captcha}, to get # greater flexibility in how the ipsets flushed/created/filled # e.g., to control the impact on memory consumption existing_ipsets = frozenset(await libipset.list_set()) # commands to flush sync [list:set] ipsets # if they exist regardless of config settings flush_sync_cmds: Iterator[IPSetRestoreCmd] = ( IPSetCmdBuilder.get_flush_cmd(ipset_name) for ipset_name in self._all_ipsets(ip_version) if ipset_name in existing_ipsets ) # commands to flush iplist_id ipsets (with ips) flush_id_cmds: Iterator[IPSetRestoreCmd] = ( IPSetCmdBuilder.get_flush_cmd(ipset_name) for ipset_name in IPSetIPListRemote.all_ipsets(ip_version) if ipset_name in existing_ipsets ) restore_cmds = ( # commands to create sync ipsets if necessary (self._gen_ipset_create_cmds(ip_version, exclude=existing_ipsets)), # commands to create iplist_id ipsets if necessary IPSetIPListRemote.create_cmds(ip_version, exclude=existing_ipsets), flush_sync_cmds, # commands to fill sync ipsets await self._fill_cmds(ip_version), flush_id_cmds, # commands to fill iplist_id ipsets if necessary IPSetIPListRemote.fill_cmds(ip_version), ) await libipset.restore(itertools.chain(*restore_cmds)) def gen_ipset_create_ops( self, ip_version: IPVersion ) -> List[IPSetRestoreCmd]: """`ipset`'s commands to create remote iplist ipsets.""" return self._gen_ipset_create_cmds(ip_version) def gen_ipset_destroy_ops( self, ip_version: IPVersion, existing: Set[str] ) -> Dict[str, IPSetRestoreCmd]: """ `ipset`'s commands to destroy both sync & id remote iplist ipsets. """ d: Dict[str, IPSetRestoreCmd] = {} # first destroy sync ipsets to decrement ref count for id ipsets d.update( (ipset_name, IPSetCmdBuilder.get_destroy_cmd(ipset_name)) for ipset_name in self._all_ipsets(ip_version) if ipset_name in existing ) # commands to destroy iplist_id ipsets (with ips) d.update( (ipset_name, IPSetCmdBuilder.get_destroy_cmd(ipset_name)) # can't rely on ipsets corresponding to db, # therefore remove all that looks like an iplist_id ipset for ipset_name in existing if IPSetIPListRemote.match_ipset_name(ipset_name, ip_version) ) return d def _gen_ipset_create_cmds( self, ip_version: IPVersion, *, exclude: AbstractSet = frozenset() ) -> List[IPSetRestoreCmd]: """Return mapping: ipset -> command to create it. Exclude ipsets mention in the *exclude* set. """ result = [] for ipset in self.get_all_ipset_instances(ip_version): if ipset.gen_ipset_name_for_ip_version(ip_version) not in exclude: result.extend(ipset.gen_ipset_create_ops(ip_version)) return result async def get_ipsets_count( self, ip_version: IPVersion ) -> List[IPSetCount]: """Expected vs. actual ipset member counts for all ipsets.""" # Define the method here instead of IPSetSync's subclasses, to # avoid "one class--one ipset" restriction (to be able to # include "iplist_id" ipsets easily if necessary) return [ IPSetCount( name=ipset_name, # expected number of members in the ipset # (db is the source of truth) db_count=(await self._get_db_count(ipset_name)), # actual ipset member count as reported by ipset command ipset_count=(await libipset.get_ipset_count(ipset_name)), ) for ipset_name in self._all_ipsets(ip_version) ] + (await IPSetIPListRemote.get_ipsets_count(ip_version)) async def _get_db_count(self, ipset_name: str) -> int: purpose: Purpose = self._purpose_from_ipset_name(ipset_name) return IPListPurpose.fetch_count(purpose) async def _fill_cmds( self, ip_version: IPVersion ) -> Iterable[IPSetRestoreCmd]: """Generate `ipset restore` commands to fill sync ipsets.""" result = [] for ipset in self.get_all_ipset_instances(ip_version): result.extend(await ipset.gen_ipset_restore_ops(ip_version)) return result async def add_id_iplist(self, purpose, iplist_id, ip_version: IPVersion): """Add existing .id ipset to .sync list:set ipset""" set_name = SingleIPSetSyncRemoteIPListSets( purpose ).gen_ipset_name_for_ip_version(ip_version) iplist_id_name = SingleIPSetIPListRemote( iplist_id ).gen_ipset_name_for_ip_version(ip_version) await libipset.restore( (IPSetCmdBuilder.get_add_cmd(set_name, iplist_id_name),) ) @staticmethod async def delete_id_iplist(purpose, iplist_id, ip_version: IPVersion): """Remove .id ipset from .sync list:set ipset (without deleting .id ipset itself) """ ipset_name = SingleIPSetSyncRemoteIPListSets( purpose ).gen_ipset_name_for_ip_version(ip_version) iplist_id_name = SingleIPSetIPListRemote( iplist_id ).gen_ipset_name_for_ip_version(ip_version) await libipset.delete_item(ipset_name, iplist_id_name) class SingleIPSetIPListRemote(IPSetAtomicRestoreBase): _NAME = "{prefix}.{ip_version}.{iplist_id}.id" def __init__(self, iplist_id: IPListID): super().__init__(iplist_id) self.iplist_id = iplist_id def gen_ipset_name_for_ip_version(self, ip_version: IPVersion) -> str: return self.custom_ipset_name or self._NAME.format( prefix=IP_SET_PREFIX, ip_version=ip_version, iplist_id=self.iplist_id, ) def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: name = self.gen_ipset_name_for_ip_version(ip_version) return [ IPSetCmdBuilder.get_create_cmd( name, get_ipset_family(ip_version), maxelem=2000_000 ) ] def gen_ipset_destroy_ops(self, ip_version: IPVersion) -> List[str]: return [ IPSetCmdBuilder.get_destroy_cmd( self.gen_ipset_name_for_ip_version(ip_version) ) ] async def gen_ipset_restore_ops(self, ip_version: IPVersion) -> List[str]: return [ IPSetCmdBuilder.get_add_cmd( self.gen_ipset_name_for_ip_version(ip_version), str(ip), ) for ip in IPListRecord.fetch_ips(ip_version, self.iplist_id) ] class IPSetIPListRemote(IPSetCollectionResetMixin): """Namespace for ipsets populated by ips from IPListRecord table. The table store ips for the remote iplists. """ @staticmethod def all_ipsets( ip_version: IPVersion, *, purposes: Iterable[Purpose] = Purpose ) -> FrozenSet[str]: """Yield all ipset names from db.""" return frozenset( IPSetIPListRemote._name_from_id(iplist_id, ip_version) for iplist_id in IPListPurpose.fetch_iplist_ids( ip_version, purposes ) ) def get_ipset_instances_by_purpose( self, ip_version: IPVersion, purpose: Iterable ): instances = [ SingleIPSetIPListRemote(iplist_id) for iplist_id in IPListPurpose.fetch_iplist_ids( ip_version, purpose ) ] return instances def get_all_ipset_instances( self, ip_version: IPVersion ) -> List[IPSetAtomicRestoreBase]: return self.get_ipset_instances_by_purpose(ip_version, Purpose) @staticmethod def _name_from_id(iplist_id: IPListID, ip_version: IPVersion) -> str: return SingleIPSetIPListRemote( iplist_id ).gen_ipset_name_for_ip_version(ip_version) @staticmethod def match_ipset_name(ipset_name: str, ip_version: IPVersion) -> bool: """Whether *ipset_name* looks like an iplist_id ipset name.""" id_ph = "IPListIDplaceholder" # should not contain any re chars name_ph = IPSetIPListRemote._name_from_id(id_ph, ip_version) # type: ignore # noqa: E501 return re.fullmatch(re.escape(name_ph).replace(id_ph, r"\d+"), ipset_name) # type: ignore # noqa: E501 @staticmethod def create_cmds( ip_version: IPVersion, *, exclude: AbstractSet[str] = frozenset() ) -> Iterator[IPSetRestoreCmd]: """Yield `ipset restore` commands to create ipsets for remote ips. Don't create ipsets with names from the *exclude* set. """ return ( IPSetCmdBuilder.get_create_cmd( ipset_name, get_ipset_family(ip_version), maxelem=2000_000 ) for ipset_name in IPSetIPListRemote.all_ipsets(ip_version) if ipset_name not in exclude ) @staticmethod def fill_cmds( ip_version: IPVersion, ) -> Iterator[IPSetRestoreCmd]: """Yield `ipset restore` commands to populate ipsets for remote ips.""" return ( IPSetCmdBuilder.get_add_cmd( IPSetIPListRemote._name_from_id(iplist_id, ip_version), str(ip), ) for iplist_id in IPListPurpose.fetch_iplist_ids( ip_version, Purpose ) for ip in IPListRecord.fetch_ips(ip_version, iplist_id) ) @staticmethod async def get_ipsets_count(ip_version: IPVersion) -> List[IPSetCount]: """Expected vs. actual ipset member counts for .id ipsets.""" # note: missing id ipset is equivalent to an empty id ipset here _get_ipset_count = return_value_on_error( libipset.IPSetNotFoundError, 0 )(libipset.get_ipset_count) return [ IPSetCount( name=( ipset_name := IPSetIPListRemote._name_from_id( iplist_id, ip_version ) ), # expected number of members in the ipset # (db is the source of truth) db_count=IPListRecord.fetch_ips_count(ip_version, iplist_id), # actual ipset member count as reported by ipset command ipset_count=(await _get_ipset_count(ipset_name)), ) for iplist_id in IPListPurpose.fetch_iplist_ids( ip_version, Purpose ) ] def _lines_to_restore(self, iplist_id, ips, *, cmd_creator): for ip in ips: version = ipaddress.ip_network(ip).version set_name = self._name_from_id(iplist_id, f"ipv{version}") yield cmd_creator(set_name, ip) async def add_ips(self, iplist_id, ips): await libipset.restore( self._lines_to_restore( iplist_id, ips, cmd_creator=IPSetCmdBuilder.get_add_cmd ) ) async def delete_ips(self, iplist_id, ips): await libipset.restore( self._lines_to_restore( iplist_id, ips, cmd_creator=IPSetCmdBuilder.get_delete_cmd ) ) @staticmethod async def create(iplist_id, version): """create_id_ipset (if it doesn't exist)""" command = IPSetCmdBuilder.get_create_cmd( IPSetIPListRemote._name_from_id( iplist_id, IP.V4 if version == 4 else IP.V6 ), get_ipset_family(IP.V4 if version == 4 else IP.V6), maxelem=2000_000, ) await libipset.restore((command,)) async def delete(self, iplist_id, ip_version: IPVersion): """delete_id_ipset""" set_name = self._name_from_id(iplist_id, ip_version) await libipset.delete_set(set_name) async def flush_ips(self, iplist_id, ip_version: IPVersion): set_name = self._name_from_id(iplist_id, ip_version) await libipset.flush_set(set_name) def _captcha_rules_enabled() -> bool: return WebshieldConfig.ENABLE and webshield.expects_traffic() def _splashscreen_rules_enabled() -> bool: return _captcha_rules_enabled() and WebshieldConfig.SPLASH_SCREEN
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.26 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A