[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: redirect.py
File is not writable. Editing disabled.
import logging from typing import Iterable, List from defence360agent.contracts.config import ( PORT_BLOCKING_MODE_ALLOW, PORT_BLOCKING_MODE_DENY, ) from im360.contracts.config import Firewall from im360.internals.core import ( FirewallRules, IPSetCmdBuilder, is_nat_available, ) from im360.internals.core.ipset import ( IP_SET_PREFIX, AbstractIPSet, IPSetAtomicRestoreBase, libipset, ) from im360.internals.core.ipset.port_deny import InputPortBlockingDenyModeIPSet from im360.subsys import webshield from im360.utils.net import ALL, TCP from im360.utils.validate import IPVersion logger = logging.getLogger(__name__) class SingleIPSetNoRedirectPort(IPSetAtomicRestoreBase): _NAME = "{prefix}.{ip_version}.no-redirect-port" def gen_ipset_create_ops(self, ip_version: IPVersion) -> List[str]: return [ "create {name} bitmap:port range 0-65535 timeout 0 -exist".format( name=self.gen_ipset_name_for_ip_version(ip_version) ) ] def gen_ipset_destroy_ops(self, ip_version: IPVersion) -> List[str]: ipset_name = self.gen_ipset_name_for_ip_version(ip_version) return [IPSetCmdBuilder.get_destroy_cmd(ipset_name)] async def gen_ipset_restore_ops(self, ip_version: IPVersion) -> List[str]: lines = [] name = self.gen_ipset_name_for_ip_version(ip_version) if Firewall.port_blocking_mode == PORT_BLOCKING_MODE_DENY: white_listed_ports = [] for ( port_or_port_range ) in InputPortBlockingDenyModeIPSet().get_config_option( # redirection to webshield rules work only for TCP, # so get conf value for TCP ip_version, TCP, ): if port_or_port_range.isdigit(): white_listed_ports.append(int(port_or_port_range)) else: white_listed_ports += [ p for p in range( *list(map(int, port_or_port_range.split("-"))) ) ] for port in sorted(webshield.redirected_to_webshield_ports()): if port not in white_listed_ports: lines.append( " ".join( libipset.prepare_ipset_command("add", name, port) ) ) elif Firewall.port_blocking_mode == PORT_BLOCKING_MODE_ALLOW: black_listed_ports = [] from im360.internals.core import IPSetPort for item in IPSetPort()._fetch(): # noqa port, generic_proto = item if generic_proto in (TCP, ALL): # webshield work by tcp proto black_listed_ports.append(port) for port in sorted(webshield.redirected_to_webshield_ports()): if port in black_listed_ports: lines.append( " ".join( libipset.prepare_ipset_command("add", name, port) ) ) else: raise NotImplementedError() return lines def gen_ipset_name_for_ip_version(self, ip_version: IPVersion) -> str: return self.custom_ipset_name or self._NAME.format( prefix=IP_SET_PREFIX, ip_version=ip_version ) class IPSetNoRedirectPort(AbstractIPSet): def get_all_ipsets(self, ip_version: IPVersion): return frozenset( [ ipset.gen_ipset_name_for_ip_version(ip_version) for ipset in self.get_all_ipset_instances(ip_version) ] ) def get_all_ipset_instances( self, ip_version: IPVersion ) -> List[IPSetAtomicRestoreBase]: return [SingleIPSetNoRedirectPort()] def get_rules(self, ip_version: IPVersion, **kwargs) -> Iterable[dict]: # there is no any reason why we should do some redirection # in case port is blocked. if port is block it means # it's blocked without any exception, which can happend, # for example, because of some redirection on webshield # or anywhere # see IPSetPort.block also return [ dict( rule=FirewallRules.stop_redirection( SingleIPSetNoRedirectPort().gen_ipset_name_for_ip_version( ip_version ) ), table=FirewallRules.NAT if is_nat_available(ip_version) else FirewallRules.MANGLE, chain=FirewallRules.IMUNIFY_INPUT_CHAIN, position=1, priority=FirewallRules.REMOTE_PROXY_PRIORITY, ) ] async def restore(self, ip_version: IPVersion) -> None: ipset = SingleIPSetNoRedirectPort() name = ipset.gen_ipset_name_for_ip_version(ip_version) await libipset.flush_set(name) lines = await ipset.gen_ipset_restore_ops(ip_version) await libipset.restore(lines, name=name) @classmethod def gen_ipset_create_ops(cls, ip_version: IPVersion) -> List[str]: return SingleIPSetNoRedirectPort().gen_ipset_create_ops(ip_version) async def add_item(self, port, ip_version: IPVersion): ipset = SingleIPSetNoRedirectPort() await libipset.add_item( ipset.gen_ipset_name_for_ip_version(ip_version), port, timeout=0 ) async def delete_item(self, port, ip_version: IPVersion): ipset = SingleIPSetNoRedirectPort() await libipset.delete_item( ipset.gen_ipset_name_for_ip_version(ip_version), port ) async def get_ipsets_count(self, ip_version: IPVersion) -> list: return [] class IPSetWebshieldPort(AbstractIPSet): """ Used to insert chain to check access to the webshield ports. Only redirected and local connections are allowed. Rules to the chain should be added by the corresponding WebshieldEnabledIPSet. """ def get_all_ipsets(self, ip_version: IPVersion): return frozenset() def get_all_ipset_instances( self, ip_version: IPVersion ) -> List[IPSetAtomicRestoreBase]: return [] def get_rules(self, ip_version: IPVersion, **kwargs): # insert rule at the top of IMUNIFY_INPUT_CHAIN return [ dict( rule=FirewallRules.block_dst_port_list( webshield.destination_webshield_ports(), policy=FirewallRules.WEBSHIELD_PORTS_INPUT_CHAIN, ), chain=FirewallRules.IMUNIFY_INPUT_CHAIN, table=FirewallRules.FILTER, priority=FirewallRules.HIGHEST_PRIORITY, ) ] async def restore(self, ip_version: IPVersion) -> None: pass def gen_ipset_create_ops(self, ip_version: IPVersion): return [] async def get_ipsets_count(self, ip_version: IPVersion): return []
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 690.26 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A