[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: libipset.py
File is not writable. Editing disabled.
"""Python API for calling ipset command-line utility.""" import os import tempfile from functools import lru_cache from logging import getLogger from typing import Iterable, Optional from defence360agent.utils import ( CheckRunError, await_for, check_run, readlines_from_cmd_output, retry_on, ) from defence360agent.utils.common import find_executable from defence360agent.utils.common import DAY, rate_limit #: One command (a string) that can be fed to `ipset restore` utility IPSetRestoreCmd = str # TypeAlias is 3.10+ logger = getLogger(__name__) throttled_log_error = rate_limit(period=DAY, on_drop=logger.warning)( logger.error ) HASH_IP = 'hash:ip' HASH_NET = 'hash:net' HASH_NET_PORT = 'hash:net,port' BITMAP_PORT = 'bitmap:port' # Datatypes IPSET_NET, IPSET_PORT, IPSET_NET_PORT = 'net', 'port', 'net,port' IPSET_RESTORE_TIMEOUT = 15 #: http://ipset.netfilter.org/ipset.man.html IPSET_TIMEOUT_MAX = 2147483 # seconds _COMMANDS_LOG_SIZE_CUTOFF = 80 * 3 # keep ~3 full lines at most @lru_cache(1) def get_ipset_exe(): ipset_exe = find_executable("ipset") if ipset_exe is None: raise IPSetEXENotFoundError( f"Cannot find executable with the name 'ipset'. {ipset_exe=}" ) return ipset_exe def _gen_ipset_cmd(call_args): return [get_ipset_exe()] + call_args class IPSetError(RuntimeError): """Base class for libipset errors.""" class IgnoredIPSetKernelError(IPSetError): pass class IPSetCannotBeDestroyedError(IPSetError): SIGNATURE = b"Set cannot be destroyed: it is in use by a kernel component" class IPSetNotFoundError(IPSetError): SIGNATURE = b"The set with the given name does not exist" class IPSetKernelPermittedError(IgnoredIPSetKernelError): SIGNATURE = b"Kernel error received: Operation not permitted" class IPSetKernelSessionError(IgnoredIPSetKernelError): SIGNATURE = b"Cannot open session to kernel" class IPSetEXENotFoundError(IPSetError): SIGNATURE = b"Cannot find executable with the name 'ipset'" class IPSetCmdBuilder: """Generate commands that can be passed to `ipset restore` utility.""" @staticmethod def get_add_cmd(ipset_name: str, entry: str) -> IPSetRestoreCmd: return f"add {ipset_name} {entry} -exist" @staticmethod def get_delete_cmd(ipset_name: str, entry: str) -> IPSetRestoreCmd: return f"del {ipset_name} {entry} -exist" @staticmethod def get_create_cmd( ipset_name: str, family, datatype=HASH_NET, timeout=0, maxelem=65536, ) -> IPSetRestoreCmd: return ( f"create {ipset_name} {datatype} family {family} timeout" f" {timeout} maxelem {maxelem} -exist" ) @staticmethod def get_create_list_set_cmd( ipset_name: str, size: int = 8 ) -> IPSetRestoreCmd: return f"create {ipset_name} list:set size {size} -exist" @staticmethod def get_destroy_cmd(ipset_name: str) -> IPSetRestoreCmd: return f"destroy {ipset_name}" @staticmethod def get_flush_cmd(ipset_name: str) -> IPSetRestoreCmd: return f"flush {ipset_name}" def raise_ipset_error_if_matched(exc, msg): for exc_cls in [ IPSetCannotBeDestroyedError, IPSetNotFoundError, IPSetKernelSessionError, IPSetKernelPermittedError, ]: if exc_cls.SIGNATURE in exc.stderr: raise exc_cls(msg) from exc raise exc async def _run_ipset(name, command, *args, **kwargs): """ :param name: ipset name, None if we can't get set name :param command: ipset will run as 'IPSET_EXEC <command>' """ command = _gen_ipset_cmd(command) try: return await check_run(command, *args, **kwargs) except CheckRunError as e: raise_ipset_error_if_matched( e, f"Error '{e.stderr}' occurs when executing '{command}' " f"command for '{name}'" ) def prepare_ipset_command(cmd, name, item, timeout=0): if cmd == 'add': if timeout > IPSET_TIMEOUT_MAX: throttled_log_error( "Wrong timeout: %s %s %s %s; clipped to %s", cmd, name, item, timeout, IPSET_TIMEOUT_MAX, ) timeout = IPSET_TIMEOUT_MAX return [cmd, name, str(item), 'timeout', str(timeout), "-exist"] elif cmd == 'del': return [cmd, name, str(item), '-exist'] else: raise NotImplementedError( "Method with action {} " "not implemented".format(cmd) ) async def add_item(name, item, timeout): """ Adds entry into existing set of ipset :param str name: name of set from ipset :param str item: IP v4 address :param int timeout: relative timeout in seconds :return: """ command = prepare_ipset_command('add', name, item, timeout) await _run_ipset(name, command) async def delete_item(name, item): """ Removes entry from existing set of ipset :param str name: name of set from ipset :param str item: IP v4 address :return: """ command = prepare_ipset_command('del', name, item) await _run_ipset(name, command) # TODO: Refactor to avoid code duplication async def create_hash_set(name, datatype=IPSET_NET, **options): """ Creates hashset into ipset. :param name: name of the set :param datatype: type of stored data (ip, net, port, (net, port)) :param options: options to command :return: """ if not isinstance(name, str): raise TypeError( "{name} is {type_} but str expected".format(name=name, type_=type(name))) datatypes = [IPSET_NET, IPSET_NET_PORT] if datatype not in datatypes: raise ValueError( "Datatype argument value should be in {datatypes}, " "but {datatype} received".format(datatypes=datatypes, datatype=datatype)) set_type = 'hash:' + datatype options = {k: str(v) for k, v in options.items()} command = ['create', name, set_type] command.extend(['family', options.get('family', 'inet')]) command.extend(['maxelem', options.get('maxelem', '65536')]) command.extend(['timeout', options.get('timeout', '0')]) command.append('-exist') await _run_ipset(name, command) async def create_bitmap_set(name, datatype=IPSET_PORT, **options): """ Creates bitmapset into ipset. :param name: name of the set :param datatype: type of stored data (ip, net, port) :param options: options to command :return: """ if not isinstance(name, str): raise TypeError( "{name} is {type_} but str expected".format(name=name, type_=type(name))) datatypes = [IPSET_PORT] if datatype not in datatypes: raise ValueError( "Datatype argument value should be in {datatypes}, " "but {datatype} received".format(datatypes=datatypes, datatype=datatype)) set_type = 'bitmap:' + datatype options = {k: str(v) for k, v in options.items()} command = ['create', name, set_type] command.extend(['range', options.get('range', '0-65535')]) command.extend(['timeout', options.get('timeout', '0')]) command.append('-exist') await _run_ipset(name, command) @retry_on( IPSetCannotBeDestroyedError, max_tries=3, on_error=await_for(seconds=3) ) async def delete_set(name): """ Removes set into ipset Removes rule into firewall-cmd wich links with new set of ipset :param str name: name of set :return: """ assert isinstance(name, str) existing = await list_set() if name in existing: await _run_ipset(name, ['flush', name]) await _run_ipset(name, ['destroy', name]) async def flush_set(name): """ Removes ips from set :param str name: name of set :return: """ assert isinstance(name, str) existing = await list_set() if name in existing: await _run_ipset(name, ['flush', name]) async def list_set(): """ Returns names of ipset sets :return: """ out = await _run_ipset(None, ['list', '-n', '-t']) out = out.decode().strip() # type: str if out: return out.splitlines(keepends=False) else: return [] async def restore( lines: Iterable[IPSetRestoreCmd], name: Optional[str] = None ) -> bytes: """ Run `ipset restore` command for bulk operations :param lines: lines of input commands for `ipset restore` utility :param name: optional ipset name to be used in error reporting :return: the output of ipset command as bytes """ with tempfile.TemporaryFile() as f: # note: can't use to_thread here because *lines* may invoke db ops # DEF-15621 may fix it f.writelines(line.encode() + b"\n" for line in lines) f.flush() f.seek(0) try: return await _run_ipset(name, ["restore"], stdin=f) except Exception as e: # preserve specific IPSetError type Error = e.__class__ if isinstance(e, IPSetError) else IPSetError # add ipset commands to the error message file_size = os.fstat(f.fileno()).st_size f.seek(0) if file_size < _COMMANDS_LOG_SIZE_CUTOFF: commands = f.read() else: # cut commands = f.read(_COMMANDS_LOG_SIZE_CUTOFF // 2) commands += b"..." f.seek(-_COMMANDS_LOG_SIZE_CUTOFF // 2, os.SEEK_END) commands += f.read() raise Error( "ipset restore failed. " f"Name: {name!r} " f"Reason: {e} " f"Commands: {commands!r}" ) from e async def swap(set_name1, set_name2): await _run_ipset(None, ["swap", set_name1, set_name2]) async def get_ipset_count(setname: str) -> int: """Return the number of ips in the *setname* ipset.""" # using '-terse' option may not show 'Number of entries' in output # (looks related to ipset version) # so we can rely on full output command = _gen_ipset_cmd(["list", setname]) is_member = False count = 0 try: async for line in readlines_from_cmd_output(command): if is_member: count += 1 if line.strip() else 0 else: is_member = line.startswith(b"Members") except CheckRunError as e: raise_ipset_error_if_matched( e, f"Error '{e.stderr}' occurs when executing '{command}'" ) return count
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 690.26 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A