[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: port_deny.py
File is not writable. Editing disabled.
import logging from abc import abstractmethod from enum import Enum from typing import Dict, FrozenSet, Iterable, List, Optional, Set from defence360agent.contracts.config import PORT_BLOCKING_MODE_DENY from im360.contracts.config import Firewall from im360.internals.core.firewall import FirewallRules from im360.internals.core.ipset import ( IP_SET_PREFIX, AbstractIPSet, IPSetAtomicRestoreBase, IPSetCount, get_ipset_family, libipset, ) from im360.internals.core.ipset.libipset import HASH_NET_PORT, IPSetCmdBuilder from im360.internals.strategy import Strategy from im360.model.port_ips_deny_mode import WhitelistPortIPsDenyMode from im360.utils.net import IP, TCP, UDP from im360.utils.validate import IPVersion logger = logging.getLogger(__name__) class TrafficDirection(Enum): INPUT = 'input' OUTPUT = 'output' WEBSHIELD_PORTS = ['52223', '52224', '52233', '52234'] # tcp in ACRONIS_PORTS = ['44445', '55556', '7770-7800'] # tcp out class PortBlockingDenyModeIPSetManager(IPSetAtomicRestoreBase): TEMPLATE = '{prefix}.{ip_version}.{dir}-ports-{proto}' def __init__(self, direction, proto, ip_tables_manager): super().__init__(direction, proto, ip_tables_manager) self.direction = direction self.proto = proto self.ip_tables_manager = ip_tables_manager def is_enabled(self, ip_version: Optional[IPVersion] = None): return is_enabled(ip_version) async def gen_ipset_restore_ops(self, ip_version: IPVersion) -> List[str]: name = self.gen_ipset_name_for_ip_version(ip_version) lines = [] for port in self.fetch(ip_version): lines.append( ' '.join(libipset.prepare_ipset_command('add', name, port)), ) return lines async def restore(self, ip_version: IPVersion) -> None: name = self.gen_ipset_name_for_ip_version(ip_version) await libipset.flush_set(name) await libipset.restore( await self.gen_ipset_restore_ops(ip_version), name=name ) async def restore_from_persistent(self, ip_version: IPVersion): await self.restore(ip_version) def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: return [ "create {name} bitmap:port range 0-65535 timeout 0 -exist".format( name=self.gen_ipset_name_for_ip_version(ip_version) ) ] def gen_ipset_destroy_ops(self, ip_version: IPVersion) -> List[str]: ipset_name = self.gen_ipset_name_for_ip_version(ip_version) return [libipset.IPSetCmdBuilder.get_destroy_cmd(ipset_name)] def count(self, ip_version: IPVersion): """Count individual ports, taking into account port ranges""" cnt = 0 for port_or_range in self.fetch(ip_version): if "-" in port_or_range: left, right = map(int, port_or_range.split("-")) cnt += right - left + 1 else: cnt += 1 return cnt def fetch(self, ip_version: IPVersion): return self.ip_tables_manager.get_config_option(ip_version, self.proto) def gen_ipset_name_for_ip_version(self, ip_version: IPVersion): return self.custom_ipset_name or self.TEMPLATE.format( prefix=IP_SET_PREFIX, dir=self.direction, proto=self.proto, ip_version=ip_version, ) class PortBlockingDenyModeIPSet(AbstractIPSet): def __init__(self, direction): self.direction = direction self.ip_sets = None @abstractmethod def get_config_option(self, ip_version: IPVersion, proto): raise NotImplementedError @abstractmethod def get_chain_name(self): raise NotImplementedError @abstractmethod def get_loopback_rule(self): raise NotImplementedError @abstractmethod def get_parent_chain_name(self): raise NotImplementedError @abstractmethod def get_block_action(self): raise NotImplementedError def get_all_ipset_instances( self, ip_version: IPVersion ) -> List[PortBlockingDenyModeIPSetManager]: return self.ip_sets def get_rules( self, ip_version: IPVersion, **kwargs ) -> Iterable[dict]: if not self._enabled(ip_version): return [] chain_name = self.get_chain_name() rules = [ self.get_loopback_rule(), ("-p", "icmp", "-j", "RETURN"), ( "-m", "conntrack", "--ctstate", "RELATED,ESTABLISHED", "-j", "RETURN", ), *[ ( "-p", ip_set.proto, "-m", "set", "--match-set", ip_set.gen_ipset_name_for_ip_version(ip_version), ( "src,dst" if isinstance( ip_set, PortNetworksBlockingDenyModeIPSet ) else "dst" ), "-j", "RETURN", ) for ip_set in self.ip_sets ], self.get_block_action(), ] return [ *[ dict( rule=rule, chain=chain_name, priority=priority, ) for priority, rule in enumerate(rules) ], dict( rule=('-j', chain_name), chain=self.get_parent_chain_name(), priority=FirewallRules.PORT_PROTO_PRIORITY, ), ] def _enabled(self, ip_version: IPVersion): return is_enabled(ip_version) async def restore(self, ip_version: IPVersion) -> None: if self._enabled(ip_version): for ip_set in self.ip_sets: await ip_set.restore(ip_version) def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: result = [] if self._enabled(ip_version): for ip_set in self.ip_sets: result.extend(ip_set.gen_ipset_create_ops(ip_version)) return result def get_all_ipsets(self, ip_version: IPVersion) -> FrozenSet[str]: result = [] if self._enabled(ip_version): for ip_set in self.ip_sets: result.append(ip_set.gen_ipset_name_for_ip_version(ip_version)) return frozenset(result) async def get_ipsets_count(self, ip_version: IPVersion) -> list: ipsets = [] if self._enabled(ip_version): for ip_set in self.ip_sets: set_name = ip_set.gen_ipset_name_for_ip_version(ip_version) expected_count = ip_set.count(ip_version) ipset_count = await libipset.get_ipset_count(set_name) ipsets.append( IPSetCount( name=set_name, db_count=expected_count, ipset_count=ipset_count ) ) return ipsets class InputPortBlockingDenyModeIPSet( PortBlockingDenyModeIPSet, ): def get_block_action(self): return FirewallRules.compose_action(FirewallRules.LOG_BLOCK_PORT_CHAIN) def __init__(self): super().__init__(TrafficDirection.INPUT.value) self.ip_sets = [ PortBlockingDenyModeIPSetManager( TrafficDirection.INPUT.value, TCP, self ), PortBlockingDenyModeIPSetManager( TrafficDirection.INPUT.value, UDP, self ), PortNetworksBlockingDenyModeIPSet(TCP), PortNetworksBlockingDenyModeIPSet(UDP), ] def get_config_option(self, ip_version: IPVersion, proto): if ip_version == IP.V6: return [] if proto == TCP: return Firewall.TCP_IN_IPV4 + WEBSHIELD_PORTS if proto == UDP: return Firewall.UDP_IN_IPV4 raise NotImplementedError() def get_chain_name(self): return FirewallRules.BP_INPUT_CHAIN def get_parent_chain_name(self): return FirewallRules.IMUNIFY_INPUT_CHAIN def get_loopback_rule(self): return ('-i', 'lo', '-j', 'RETURN') class OutputPortBlockingDenyModeIPSet( PortBlockingDenyModeIPSet, ): def get_block_action(self): return FirewallRules.compose_action( FirewallRules.REJECT ) def __init__(self): super().__init__(TrafficDirection.OUTPUT.value) self.ip_sets = [ PortBlockingDenyModeIPSetManager( TrafficDirection.OUTPUT.value, TCP, self ), PortBlockingDenyModeIPSetManager( TrafficDirection.OUTPUT.value, UDP, self ), ] def get_config_option(self, ip_version: IPVersion, proto): if ip_version == IP.V6: return [] if proto == TCP: return Firewall.TCP_OUT_IPV4 + ACRONIS_PORTS if proto == UDP: return Firewall.UDP_OUT_IPV4 raise NotImplementedError() def get_chain_name(self): return FirewallRules.BP_OUTPUT_CHAIN def get_parent_chain_name(self): return FirewallRules.IMUNIFY_OUTPUT_CHAIN def get_loopback_rule(self): return ('-o', 'lo', '-j', 'RETURN') def is_enabled(ip_version: Optional[IPVersion]): return ( Firewall.port_blocking_mode == PORT_BLOCKING_MODE_DENY and ip_version != IP.V6 and Strategy.current != Strategy.CSF_COOP_STRATEGY ) class PortNetworksBlockingDenyModeIPSet(IPSetAtomicRestoreBase): TEMPLATE = "{prefix}.{ip_version}.ports-ips-{proto}" def __init__(self, proto): super().__init__(proto) self.proto = proto def gen_ipset_name_for_ip_version(self, ip_version): return self.custom_ipset_name or self.TEMPLATE.format( prefix=IP_SET_PREFIX, proto=self.proto, ip_version=ip_version ) def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: name = self.gen_ipset_name_for_ip_version(ip_version) return [ IPSetCmdBuilder.get_create_cmd( name, family=get_ipset_family(ip_version), datatype=HASH_NET_PORT, ) ] def gen_ipset_destroy_ops(self, ip_version: IPVersion) -> List[str]: return [ IPSetCmdBuilder.get_destroy_cmd( self.gen_ipset_name_for_ip_version(ip_version) ) ] async def gen_ipset_restore_ops(self, ip_version: IPVersion) -> List[str]: name = self.gen_ipset_name_for_ip_version(ip_version) entries = WhitelistPortIPsDenyMode.load()[self.proto] lines = [] for port, net_list in entries.items(): for net in net_list: if IP.type_of(net) == ip_version: lines.append( IPSetCmdBuilder.get_add_cmd( name, f"{net},{self.proto}:{port}" ) ) return lines async def restore(self, ip_version: IPVersion) -> None: name = self.gen_ipset_name_for_ip_version(ip_version) await libipset.flush_set(name) await libipset.restore( await self.gen_ipset_restore_ops(ip_version), name=name ) def count(self, ip_version: IPVersion) -> int: return WhitelistPortIPsDenyMode.count(self.proto)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.26 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A