[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: lists.py
File is not writable. Editing disabled.
import datetime import ipaddress import warnings from functools import wraps from peewee import DoesNotExist, fn from defence360agent.contracts.config import PORT_BLOCKING_MODE_ALLOW from defence360agent.rpc_tools import lookup from defence360agent.rpc_tools.utils import (generate_warnings, run_in_executor_decorator) from defence360agent.rpc_tools.validate import ValidationError from defence360agent.utils import Scope from im360.api.ips import IgnoredByPortAPI, IPApi, PortAPI, \ GroupIPSyncSender, IPApiWithIdempotentAdd from im360.contracts.config import Firewall, Webshield from defence360agent.contracts.messages import MessageType from im360.model.country import CountryList from im360.model.firewall import BlockedPort, IPList, IPListRecord, \ IPListPurpose, Purpose from im360.utils.net import pack_ip_network def blocked_ports_allow_mode_only(func): @wraps(func) async def wrapper(*args, **kwargs): if Firewall.port_blocking_mode != PORT_BLOCKING_MODE_ALLOW: raise PermissionError('Only for FIREWALL.port_blocking_mode=ALLOW') return await func(*args, **kwargs) return wrapper def _create_graylist_filter(*, except_splash_screen=False, **kwargs): kwargs['listnames'] = [IPList.GRAY] if Webshield.SPLASH_SCREEN and not except_splash_screen: kwargs['listnames'].append(IPList.GRAY_SPLASHSCREEN) kwargs['group_by'] = IPList.ip kwargs['having'] = (IPList.expiration == fn.MAX(IPList.expiration)) return kwargs def migrate_warning(func): @wraps(func) async def async_wrapper(*args, **kwargs): warnings.warn('Deprecated cli call, use `ip-list` command instead.', DeprecationWarning) return await func(*args, **kwargs) return async_wrapper class ListsEndpoints(lookup.RootEndpoints): SCOPE = Scope.IM360 def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # for internal use only self._hidden_fields = {IPList.captcha_passed} @migrate_warning @lookup.bind('whitelist', 'ip', 'list') @run_in_executor_decorator def whitelist_ip_fetch(self, limit=None, offset=None, order_by=None, **kwargs): counts = self._counts(**kwargs) return counts['white'], counts, \ IPList.fetch(listnames=[IPList.WHITE], limit=limit, offset=offset, order_by=order_by, exclude_fields=self._hidden_fields, **kwargs) @migrate_warning @lookup.bind('blacklist', 'ip', 'list') @run_in_executor_decorator def blacklist_ip_fetch(self, limit=None, offset=None, **kwargs): return IPList.fetch_count([IPList.BLACK], **kwargs), \ IPList.fetch(listnames=[IPList.BLACK], limit=limit, offset=offset, exclude_fields=self._hidden_fields, **kwargs) @migrate_warning @lookup.bind('graylist', 'ip', 'list') @run_in_executor_decorator def graylist_fetch(self, limit=None, offset=None, order_by=None, **kwargs): except_splash_screen = kwargs.pop("no_splash_screen", False) counts = self._counts( except_splash_screen=except_splash_screen, **kwargs ) kwargs = _create_graylist_filter( except_splash_screen=except_splash_screen, **kwargs ) return counts['gray'], counts, IPList.fetch( limit=limit, offset=offset, order_by=order_by, exclude_fields=self._hidden_fields, **kwargs ) @lookup.bind('blacklist') @run_in_executor_decorator def blacklist_fetch(self, limit, offset, manual=None, order_by=None, **kwargs): country_items = CountryList.fetch( order_by=order_by, by_list=CountryList.BLACK, **kwargs, ) ip_items = IPList.fetch( listnames=[IPList.BLACK], order_by=order_by, manual=manual, exclude_fields=self._hidden_fields, **kwargs ) items = (country_items + ip_items)[offset: offset + limit] counts = self._counts(manual=manual, **kwargs) return counts['black'], counts, items @migrate_warning @lookup.bind('whitelist', 'ip', 'add') async def whitelist_add(self, **kwargs): return await self._ip_add(listname=IPList.WHITE, **kwargs) @migrate_warning @lookup.bind('blacklist', 'ip', 'add') async def blacklist_add(self, **kwargs): return await self._ip_add(listname=IPList.BLACK, **kwargs) @migrate_warning @lookup.bind('graylist', 'ip', 'add') async def graylist_add(self, **kwargs): return await self._ip_add(listname=IPList.GRAY, **kwargs) async def _ip_add( self, listname, items, comment=None, expiration=0, client_addr=None, full_access=None, scope=None, ): assert listname in [IPList.BLACK, IPList.WHITE, IPList.GRAY] if comment is None: now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') auto_generated_comment = 'Manually added on %s' % now if client_addr: auto_generated_comment += " from IP %s" % client_addr comment = auto_generated_comment affected, not_affected = await IPApi.block( listname=listname, items=items, comment=comment, full_access=full_access, expiration=expiration, scope=scope, # "whitelist ip add" doesn't remove expired manually blacklisted # subnets keep_manual_expired_subnets=(listname == IPList.WHITE), manual=True, ) await (await GroupIPSyncSender().collect(affected)).send('add') return generate_warnings( affected, not_affected, dest_listname=listname, all_list=items, success_warning="{}/{} ip(s) were successfully added", failure_warning="Noop: unable to add {} to {}", in_another_list_warning="IP {} is already in {} list" ) async def _ip_add_idempotent( self, listname, items, comment=None, expiration=0, full_access=None, scope=None, ): assert listname in [IPList.BLACK, IPList.WHITE] if comment is None: now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') auto_generated_comment = 'Manually added on %s' % now comment = auto_generated_comment group_ip_sender = GroupIPSyncSender() if scope == IPList.SCOPE_LOCAL: await group_ip_sender.collect(items) affected, not_affected = await IPApiWithIdempotentAdd.block( listname=listname, items=items, comment=comment, full_access=full_access, expiration=expiration, scope=scope, # "whitelist ip add" doesn't remove expired manually blacklisted # subnets keep_manual_expired_subnets=(listname == IPList.WHITE), manual=True, ) if scope == IPList.SCOPE_LOCAL: await group_ip_sender.filter(affected).send('del') await (await GroupIPSyncSender().collect(affected)).send('add') return generate_warnings( affected, not_affected, dest_listname=listname, all_list=items, success_warning="{}/{} ip(s) were successfully added", failure_warning="Noop: unable to add {} to {}", in_another_list_warning="IP {} is already in {} list" ) @migrate_warning @lookup.bind('blacklist', 'ip', 'move') async def blacklist_move(self, items): return await self._ip_move(items, IPList.BLACK) @migrate_warning @lookup.bind('whitelist', 'ip', 'move') async def whitelist_move(self, items, full_access): return await self._ip_move(items, IPList.WHITE, full_access) async def _ip_move(self, items, listname, full_access=None): ips = await self._ips_get(ips=items) not_found_ips_raw = list(set(items) - set(ip['ip'] for ip in ips)) not_found_ips = list(map(lambda ip: {'rec': ip}, not_found_ips_raw)) affected, not_affected = await IPApi.move( items=ips, listname=listname, full_access=full_access ) not_affected.extend(not_found_ips) await (await GroupIPSyncSender().collect( [data['ip'] for data in affected])).send('add') return generate_warnings( affected, not_affected, dest_listname=listname, all_list=items, success_warning="{}/{} ip(s) were successfully moved", failure_warning="Noop: unable to move {} to list: {}" ) @migrate_warning @lookup.bind('blacklist', 'ip', 'edit') async def blacklist_edit(self, **kwargs): kwargs['listname'] = IPList.BLACK return await self._ip_edit(**kwargs) @migrate_warning @lookup.bind('whitelist', 'ip', 'edit') async def whitelist_edit(self, **kwargs): kwargs['listname'] = IPList.WHITE return await self._ip_edit(**kwargs) async def _ip_edit( self, items, listname, comment=None, expiration=None, full_access=None, scope=None, ): group_ip_sender = GroupIPSyncSender() if scope == IPList.SCOPE_LOCAL: await group_ip_sender.collect(items) affected, not_affected = await IPApi.edit( items=items, listname=listname, comment=comment, full_access=full_access, expiration=expiration, scope=scope, ) if scope == IPList.SCOPE_LOCAL: await group_ip_sender.filter(affected).send('del') else: await (await group_ip_sender.collect(affected)).send('add') return generate_warnings( affected, not_affected, dest_listname=listname, all_list=items, success_warning="{}/{} ip(s) were successfully edited", failure_warning="Noop: unable to edit {} from {}", # we don't care whether ip exists in other lists during edits in_another_list_warning=None, ) @migrate_warning @lookup.bind('blacklist', 'ip', 'delete') async def blacklist_delete(self, items): return await self._ip_delete(items, IPList.BLACK) @migrate_warning @lookup.bind('whitelist', 'ip', 'delete') async def whitelist_delete(self, items): return await self._ip_delete(items, IPList.WHITE) async def _ip_delete(self, items, listname): group_ip_sender = await GroupIPSyncSender().collect(items) affected, not_affected = await IPApi.unblock( items, listname=listname) await group_ip_sender.filter(affected).send('del') return generate_warnings( affected, not_affected, dest_listname=listname, all_list=items, success_warning="{}/{} ip(s) were successfully deleted", failure_warning="Noop: unable to delete {} from {}", in_another_list_warning="IP {} is already in {} list" ) @migrate_warning @lookup.bind('graylist', 'ip', 'delete') async def graylist_delete(self, items): """Unblocking IPs and trying to process IPs that in subnets by sending them on processing to UnblockFromGraylist This code implements src/handbook/message_processing/client_delete.py """ items = set(items) gray_affected, gray_not_affected = await self._delete_ip_from( items, IPList.GRAY ) splash_affected, splash_not_affected = await self._delete_ip_from( items, IPList.GRAY_SPLASHSCREEN ) affected = gray_affected affected += [ addr for addr in splash_affected if addr not in affected ] not_affected = [] not_affected_lists = (gray_not_affected, splash_not_affected) for not_affected_list in not_affected_lists: not_affected += [ net for net in not_affected_list if (net['rec'] not in affected) and (net not in not_affected) ] return generate_warnings( affected, not_affected, dest_listname=IPList.GRAY, all_list=items, success_warning="{}/{} ip(s) were successfully deleted", failure_warning="Noop: unable to delete {} from {}", in_another_list_warning="IP {} is already in {} list" ) async def _delete_ip_from(self, items, listname): """Unblock from *listname*""" to_ignore, to_unblock = [], [] not_affected = [] for ip in items: net = ipaddress.ip_network(ip) supernets = IPList.find_closest_ip_nets( net, listname=[listname], limit=1) # all(a.subnet_of(b) for a, b in zip([net]+supernets, supernets)) if supernets: if supernets[0].ip_network != net: # net is a part of supernet(s) to_ignore.append(net) else: # the first supernet == net to_unblock.append(net) else: # net is not a subnet not_affected.append({ 'rec': net, 'listname': IPList.get_field('listname', ip=ip) }) for ip in to_ignore: # no way to unblock IP, because it a part of blocked supernets so, # sending ClientUnblock in order to put it in ignore list await self._sink.process_message( MessageType.ClientUnblock( attackers_ip=ip, plugin_id='imunify360' ) ) affected, not_unblocked = await IPApi.unblock( to_unblock, listname=listname ) affected += to_ignore not_affected += not_unblocked return affected, not_affected @run_in_executor_decorator def _ips_get(self, ips): result = [] for ip in ips: net, mask, version = pack_ip_network(ip) q = IPList.select(IPList.listname).where( IPList.network_address == net, IPList.netmask == mask, IPList.version == version ).tuples() if q.count(): result.append({ 'ip': ip, 'listnames': [listname for (listname,) in q] }) return result @lookup.bind('blocked-port', 'list') @blocked_ports_allow_mode_only @run_in_executor_decorator def get_port_proto(self, limit=None, offset=None, **kwargs): counts = self._counts(**kwargs) return counts['blocked-ports'], counts, \ BlockedPort.fetch(limit=limit, offset=offset, **kwargs) @lookup.bind('blocked-port', 'add') @blocked_ports_allow_mode_only async def blocked_port_add(self, items, **kwargs): ips = kwargs.pop('ips', []) affected, not_affected = await PortAPI.block(items, **kwargs) for ip in ips: for port, proto in affected: await IgnoredByPortAPI.block( [ip], port=port, proto=proto ) return generate_warnings( affected, not_affected, dest_listname=None, all_list=items, success_warning="{}/{} port(s) were successfully added", failure_warning="Noop: unable to add {}" ) @lookup.bind('blocked-port', 'delete') @blocked_ports_allow_mode_only async def blocked_port_delete(self, items): affected, not_affected = await PortAPI.unblock(items) return generate_warnings( affected, not_affected, dest_listname=None, all_list=items, success_warning="{}/{} port(s) were successfully deleted", failure_warning="Noop: unable to delete {}" ) @blocked_ports_allow_mode_only @lookup.bind('blocked-port', 'edit') async def blocked_port_edit(self, items, comment): affected, not_affected = await PortAPI.edit(items, comment) return generate_warnings( affected, not_affected, dest_listname=None, all_list=items, success_warning="{}/{} port(s) were successfully edited", failure_warning="Noop: unable to edit {}" ) @lookup.bind('blocked-port-ip', 'add') @blocked_ports_allow_mode_only async def ignored_by_port_add_ip(self, port_proto, ips, comment=None): port, proto = port_proto if not await self.check_port_proto(port, proto): raise ValidationError("Port and proto does not exist {}:{}" .format(port, proto)) affected, not_affected = await IgnoredByPortAPI.block( items=ips, port=port, proto=proto, comment=comment) return generate_warnings( affected, not_affected, dest_listname=None, all_list=ips, success_warning="{}/{} ip(s) were successfully added", failure_warning="Noop: unable to add {}" ) @blocked_ports_allow_mode_only @lookup.bind('blocked-port-ip', 'edit') async def ignored_by_port_edit_ip(self, port_proto, ips, comment=None): port, proto = port_proto if not await self.check_port_proto(port, proto): raise ValidationError("Port and proto does not exist {}:{}" .format(port, proto)) affected, not_affected = await IgnoredByPortAPI.edit( items=ips, port=port, proto=proto, comment=comment) return generate_warnings( affected, not_affected, dest_listname=None, all_list=ips, success_warning="{}/{} ip(s) were successfully edited", failure_warning="Noop: unable to edit {}" ) @lookup.bind('blocked-port-ip', 'delete') @blocked_ports_allow_mode_only async def ignored_by_port_delete_ip(self, port_proto, ips): port, proto = port_proto if not await self.check_port_proto(port, proto): raise ValidationError("Port and proto does not exist {}:{}" .format(port, proto)) affected, not_affected = await IgnoredByPortAPI.unblock( items=ips, port=port, proto=proto) return generate_warnings( affected, not_affected, dest_listname=None, all_list=ips, success_warning="{}/{} ip(s) were successfully deleted", failure_warning="Noop: unable to delete {}" ) @run_in_executor_decorator def check_port_proto(self, port, proto): try: BlockedPort.get(port=port, proto=proto) except DoesNotExist: return False return True def _counts(self, manual=None, except_splash_screen=False, **kwargs): return { 'white': IPList.fetch_count(listnames=[IPList.WHITE], **kwargs), 'black': (IPList.fetch_count(listnames=[IPList.BLACK], manual=manual, **kwargs) + CountryList.fetch_count(by_list=CountryList.BLACK, **kwargs)), 'gray': IPList.fetch_count( **_create_graylist_filter( except_splash_screen=except_splash_screen, **kwargs ) ), 'blocked-ports': BlockedPort.fetch_count(**kwargs) } @lookup.bind("ip-list", "synced") @run_in_executor_decorator def ip_list_synced( self, purpose=None, by_ip=None, limit=None, offset=None ): return IPListRecord.fetch_count(purpose, by_ip), \ self._counts_synced(by_ip=by_ip), \ IPListRecord.fetch(purpose=purpose, by_ip=by_ip, limit=limit, offset=offset) @staticmethod def _counts_synced(**kwargs): return { Purpose.WHITE.value: IPListRecord.fetch_count( purpose=Purpose.WHITE.value, by_ip=kwargs.get("by_ip") ), Purpose.DROP.value: IPListRecord.fetch_count( purpose=Purpose.DROP.value, by_ip=kwargs.get("by_ip") ), Purpose.CAPTCHA.value: IPListRecord.fetch_count( purpose=Purpose.CAPTCHA.value, by_ip=kwargs.get("by_ip") ), Purpose.SPLASHSCREEN.value: IPListRecord.fetch_count( purpose=Purpose.SPLASHSCREEN.value, by_ip=kwargs.get("by_ip"), ) } async def get_counts_local(self, list_names, except_splash_screen=False, **kwargs): max_count = IPList.fetch_count(listnames=list_names, **kwargs) count_synced = self._counts_synced(**kwargs) blacklisted_country_count = CountryList.fetch_count( by_list=CountryList.BLACK, by_country_code=kwargs.get("by_country_code"), by_comment=kwargs.get("by_comment"), by_ip=kwargs.get("by_ip"), ) counts = { "server": { "white": IPList.fetch_count( listnames=[IPList.WHITE], **kwargs ), "drop": ( IPList.fetch_count(listnames=[IPList.BLACK], **kwargs) + blacklisted_country_count ), "captcha": IPList.fetch_count( **_create_graylist_filter( except_splash_screen=except_splash_screen, **kwargs, ) ), "splashscreen": IPList.fetch_count( listnames=[IPList.GRAY_SPLASHSCREEN], **kwargs ), }, "cloud": count_synced } if IPList.BLACK in list_names: max_count += blacklisted_country_count return max_count, counts @staticmethod async def get_blacklisted_local_countries(limit=None, offset=None, order_by=None, **kwargs): # country can be only black listed # same as it was before in rpc `blacklist` call country_items = CountryList.fetch( by_list=IPList.BLACK, by_country_code=kwargs.get( 'by_country_code'), by_comment=kwargs.get( 'by_comment'), by_ip=kwargs.get('by_ip'), order_by=order_by, limit=limit, offset=offset ) # map listname to purpose in response for item in country_items: item["purpose"] = IPListPurpose.listname2purpose( item.pop("listname") ).value return country_items async def get_ip_local(self, list_names, limit=None, offset=None, except_splash_screen=False, order_by=None, **kwargs): # get ip records if ( IPList.GRAY in list_names and Webshield.SPLASH_SCREEN and not except_splash_screen ): if IPList.GRAY_SPLASHSCREEN not in list_names: list_names.append(IPList.GRAY_SPLASHSCREEN) kwargs = _create_graylist_filter( except_splash_screen=except_splash_screen, **kwargs) kwargs.pop("listnames", None) ip_items = IPList.fetch( listnames=list_names, exclude_fields=self._hidden_fields, # country record will be added to result # and offset should be calculated on full set limit=limit, offset=offset, order_by=order_by, **kwargs, ) # map listname to purpose in response for item in ip_items: item["purpose"] = IPListPurpose.listname2purpose( item.pop("listname") ).value return ip_items @lookup.bind("ip-list", "local", "list") async def ip_list_local_list( self, purpose=None, limit=None, offset=None, order_by=None, **kwargs ): """replacements for old whitelist/graylist/blacklist ip list, With changes: non search by ip, now will find supernets and subnets """ if not purpose: list_names = [ IPList.WHITE, IPList.BLACK, IPList.GRAY, IPList.GRAY_SPLASHSCREEN, ] else: list_names = [Purpose.listname(p) for p in purpose] except_splash_screen = kwargs.pop("no_splash_screen", False) by_type = kwargs.pop('by_type', None) if by_type == "country": max_count, counts = await self.get_counts_local( # UI interested in blacklisted country only list_names=[IPList.BLACK], except_splash_screen=except_splash_screen, **kwargs) country_items = await self.get_blacklisted_local_countries( limit, offset, order_by=order_by, **kwargs) return max_count, counts, country_items if by_type == "ip": max_count, counts = await self.get_counts_local( list_names=list_names, except_splash_screen=except_splash_screen, **kwargs) ip_items = await self.get_ip_local( list_names, limit=limit, offset=offset, order_by=order_by, except_splash_screen=except_splash_screen, **kwargs) return max_count, counts, ip_items # if there is no filter by record type `by_type`, # add country records first country_items = await self.get_blacklisted_local_countries( limit=None, offset=None, order_by=order_by, **kwargs) # in [0 .. limit] ip_limit = max(0, min(limit, offset + limit - len(country_items))) # in [0 .. offset] ip_offset = max(0, offset - len(country_items)) ip_items = await self.get_ip_local( list_names, limit=ip_limit, offset=ip_offset, order_by=order_by, except_splash_screen=except_splash_screen, **kwargs) result_items = (country_items[offset: offset + limit] + ip_items) max_count, counts = await self.get_counts_local( list_names, except_splash_screen=except_splash_screen, **kwargs) return max_count, counts, result_items async def _ip_delete_local(self, items, listname): group_ip_sender = await GroupIPSyncSender().collect(items) affected, not_affected = await self._delete_ip_from(items, listname) if listname == IPList.GRAY: splash_affected, splash_not_affected = await self._delete_ip_from( items, IPList.GRAY_SPLASHSCREEN ) affected += [ addr for addr in splash_affected if addr not in affected ] na = [] not_affected_lists = (not_affected, splash_not_affected) for not_affected_list in not_affected_lists: na += [ net for net in not_affected_list if (net["rec"] not in affected) and (net not in na) ] not_affected = na await group_ip_sender.filter(affected).send('del') return generate_warnings( affected, not_affected, dest_listname=listname, all_list=items, success_warning="{}/{} ip(s) were successfully deleted", failure_warning="Noop: unable to delete {} from {}", in_another_list_warning="IP {} is already in {} list", ) @lookup.bind("ip-list", "local", "add") async def ip_list_local_add(self, purpose, **kwargs): """ replacements for old whitelist/graylist/blacklist ip list/add/delete/edit, With changes: new add will include functionality of old add/edit/move""" list_name = Purpose.listname(purpose) return await self._ip_add_idempotent(listname=list_name, **kwargs) @lookup.bind("ip-list", "local", "delete") async def ip_list_local_delete(self, purpose, items): """ Used for removing record from IPList table, same as old rpc calls: `[white/black/gray]list ip delete`, but now also splachscreen is allowed to delete """ list_name = Purpose.listname(purpose) return await self._ip_delete_local(items, list_name)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.07 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A