[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: csf.py
File is not writable. Editing disabled.
import asyncio import functools import logging from contextlib import suppress from typing import Union import os from ipaddress import ip_network, IPv4Network, IPv6Network from defence360agent.utils.kwconfig import KWConfig from defence360agent.utils import ( check_run, CheckRunError, retry_on, run, FileLock, ) from im360.utils.validate import IP from im360.utils.net import listening_ports, TCP, UDP, IN, OUT CSF_CONFIG_ROOT = '/etc/csf' CSF_CONFIG = os.path.join(CSF_CONFIG_ROOT, 'csf.conf') CSF_IGNORE_FILE = os.path.join(CSF_CONFIG_ROOT, 'csf.ignore') CSF_DENY_FILE = os.path.join(CSF_CONFIG_ROOT, 'csf.deny') CSF_ALLOW_FILE = os.path.join(CSF_CONFIG_ROOT, 'csf.allow') CSF_LOCK_PATH = '/var/lib/csf/csf.lock' CSF_RESTART_THROTTLE_DELAY = 5 # in sec CSF_IMUNIFY_IPLISTS_MAPPING = { 'BLACK': [ CSF_DENY_FILE ], 'WHITE': [ CSF_ALLOW_FILE, CSF_IGNORE_FILE ] } logger = logging.getLogger(__name__) def csf_coop(do_lock: bool, lock_timeout: int): def decorator(func): """ Decorator to disable concurrent rule editing with CSF Method is executed with holding lock file used by CSF to prevent it's start or restart while imunify360 is editing iptables rules :return: """ @functools.wraps(func) async def wrapper(*args, **kwargs): if do_lock and os.path.isfile(CSF_LOCK_PATH): # edit rules only when csf is not doing the same async with FileLock(path=CSF_LOCK_PATH, timeout=lock_timeout): return await func(*args, **kwargs) else: return await func(*args, **kwargs) return wrapper return decorator class Config(KWConfig): SEARCH_PATTERN = r'^\s*{}\s*=\s*"(.*?)".*$' WRITE_PATTERN = '{} = "{}"' DEFAULT_FILENAME = CSF_CONFIG ALLOW_EMPTY_CONFIG = False def get_ports(proto, direction): """ Get set of open ports and ports ranges in csf.conf :param proto: :param direction: :return: """ name = _form_conn_name(proto, direction) data = Config(name).get() return _parse_ports(data) def add_ports(proto, direction, *ports, ranges=None): """ Add open ports or port ranges to csf.conf :param proto: :param direction: :param ports: :param ranges: :return: True if changes made, False otherwise :rtype: boolean """ name = _form_conn_name(proto, direction) p, r = get_ports(proto, direction) ps = {*ports} if ps.issubset(p) and (ranges is None or ranges.issubset(r)): return False p.update(ps) if ranges: r.update(ranges) out = _pack_ports(p, r) Config(name).set(out) return True def remove_ports(proto, direction, *ports, ranges=None): """ Remove open ports or port ranges from csf.conf :param proto: :param direction: :param ports: :param ranges: :return: """ name = _form_conn_name(proto, direction) p, r = get_ports(proto, direction) ports_to_remove = {*ports} p = p - ports_to_remove if ranges: r = r - ranges out = _pack_ports(p, r) Config(name).set(out) async def is_running(): csf_app = '/usr/sbin/csf' if not os.path.isfile(csf_app): return False try: rc, out, err = await run([csf_app, '--status']) except FileNotFoundError: return False if rc > 1: logger.warning('CSF unexpected retcode %d. stdout=%r, stderr=%r', rc, out, err) return not bool(rc) async def is_SMTP_block_enabled() -> bool: """ Return True if csf running and SMTP_BLOCK is enabled in csf :return: bool """ if await is_running(): return Config('SMTP_BLOCK').get() == '1' return False async def denyrm(ip: Union[IPv4Network, IPv6Network]): """ Unblock an IP and remove from /etc/csf/csf.deny """ cmd = ['csf', '--denyrm', IP.ip_net_to_string(ip)] await check_run(cmd) async def temprm(ip: Union[IPv4Network, IPv6Network]): """ Remove an IP from the temporary IP ban or allow list """ cmd = ['csf', '--temprm', IP.ip_net_to_string(ip)] await check_run(cmd) async def unblock(ip: Union[IPv4Network, IPv6Network]): """ Unblock ip blocked either temporary or permanently """ await denyrm(ip) await temprm(ip) async def lfd_restart(): cmd = ['csf', '--lfd', 'restart'] await check_run(cmd) async def async_log_on_error(e, i): logger.warning('Error during csf --restartall, %r retry %s', e, i) await asyncio.sleep(CSF_RESTART_THROTTLE_DELAY) @retry_on( CheckRunError, max_tries=3, on_error=async_log_on_error) async def restart_all(): with suppress(FileNotFoundError): os.unlink('/etc/csf/csf.error') await check_run(['csf', '--restartall']) def _readlines(path): """Yield non-blank, non-comment lines. Ignore non-utf-8 content. Leading/trailing whitespace is removed. """ with open(path, encoding='utf-8', errors='ignore') as file: for line in file: line = line.strip() if line and not line.startswith('#'): yield line def ips_from_file(path): """ Load ips and networks from csf allow/deny file :param path: path to csf allow/deny file :return: """ ips = [] try: for line in _readlines(path): parts = line.split(maxsplit=1) if len(parts) == 2 and parts[0] == 'Include': ips.extend(ips_from_file(parts[1].strip())) elif len(parts) >= 1: try: ip_network(parts[0]) if IP.is_valid_ipv6_addr(parts[0]): parts[0] = IP.convert_to_ipv6_network(parts[0]) except ValueError: logger.debug("Cannot parse line {!r} from file {}" .format(line.strip(), path)) else: comment = None if len(parts) >= 2 and '#' in parts[1]: comment = parts[1][parts[1].find('#') + 1:].strip() ips.append( (parts[0], comment) ) except OSError: logger.warning("Can not open file {}".format(path)) return ips def ignore_ports_from_file(path): """ Load open ports and ip from csf allow/ignore file :param path: path to csf allow/ignore file :return: """ ips = [] try: for line in _readlines(path): parts = line.split(maxsplit=1) if len(parts) == 2 and parts[0] == 'Include': ips.extend(ignore_ports_from_file(parts[1].strip())) continue try: proto, direction, port, ip = line.split('|') port_direction, port = port.split('=') port = int(port) except ValueError: continue ip_direction, ip = ip.split('=') # direction, 'in' = INPUT, out = OUTPUT iptables rule # port_direction, 'd' = port destination, s = source port # ip_direction, 'd' = ip destination, s = source ip if direction == 'in' and \ port_direction == 'd' and \ ip_direction == 's': ip = ip.split(maxsplit=1) try: ip_network(ip[0]) if IP.is_valid_ipv6_addr(ip[0]): ip[0] = IP.convert_to_ipv6_network(ip[0]) except ValueError: logger.debug("Cannot parse line {!r} from file {}" .format(line.strip(), path)) else: comment = None if len(ip) >= 2 and '#' in ip[1]: comment = ip[1][ip[1].find('#') + 1:].strip() ips.append( (port, proto, ip[0], comment) ) except OSError: logger.warning("Can not open file {}".format(path)) return ips def ips_from_list(listname): ips = [] for path in CSF_IMUNIFY_IPLISTS_MAPPING[listname]: ips.extend(ips_from_file(path)) return ips def _parse_ports(line): """ Parses opened ports and ranges from line from csf.conf E.g. 22,80,443,2048:3072 -> ({22, 80, 442}, (2048, 3072)) :param line: :return: """ ports = set() ranges = set() if not line: return ports, ranges values = line.split(',') for value in values: # Skip empty values (may occur due to # doubled or trailing commas) if not value: continue items = value.split(':') # Looking for port range, e.g. 3000:3010 items = [*map(int, items)] # Converting to integers if len(items) == 1: # Single port ports.add(items[0]) elif len(items) == 2: # Port range ranges.add(tuple(items)) else: raise ValueError('Cannot parse following piece: %s', value) return ports, ranges def _form_conn_name(proto, direction): """ Forms proper name of csf.conf parameter for connection E.g. TCP_IN, UDP_OUT :param proto: :param direction: :return: """ assert proto in (TCP, UDP) assert direction in (IN, OUT) return '{}_{}'.format(proto, direction).upper() def _pack_ports(ports, ranges=None): """ Presents ports and port ranges in format, accepted in csf.conf :param ports: :param ranges: :return: """ ps = sorted(ports) ports_s = ','.join(map(str, ps)) if ranges: rs = sorted(ranges) ranges_s = ','.join([':'.join(map(str, rng)) for rng in rs]) return ','.join((ports_s, ranges_s)) else: return ports_s def _merge_ports_and_ranges(ports, ranges): """ Merges ports and port ranges in single set :param ports: set of ports :param ranges: set of tuples (start_port, end_port) :return: set of ports included ports from ranges """ for r in ranges: start, end = r ports_from_range = set(range(start, end + 1)) ports.update(ports_from_range) return ports def incoming_ports(proto): """ Read opened incoming ports from csf config :param proto: tcp/udp :return: """ ports, ranges = get_ports(proto, IN) return _merge_ports_and_ranges(ports, ranges) def closed_ports(proto): """ Difference between listening_ports and incoming_ports :param proto: tcp/udp :return: """ assert proto in (TCP, UDP) return listening_ports(proto) - incoming_ports(proto)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.07 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A