[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: graylist.py
File is not writable. Editing disabled.
""" Manage Gray List """ import logging import time from humanize import naturaldelta from peewee import DoesNotExist from defence360agent.contracts.messages import MessageType, Reject from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.model import instance from defence360agent.model.simplification import run_in_executor from defence360agent.utils import timeit from im360.contracts.config import AutoWhiteList from im360.internals import geo from im360.ioc import services from im360.model.firewall import IgnoreList, IPList from im360.utils.net import IPNetwork, pack_ip_network, unpack_ip_network from im360.utils.validate import IP logger = logging.getLogger(__name__) class ManageGrayList(MessageSink, MessageSource): PROCESSING_ORDER = MessageSink.ProcessingOrder.GRAYLIST_DB_FIXUP async def create_sink(self, loop): self._loop = loop async def create_source(self, loop, sink): self._loop = loop self._sink = sink def _cleanup_ignore_list(self, net): """ Removes *net* and its subnets from ignore list :return: list of nets removed """ to_unblock = list(IgnoreList.subnets(net)) IgnoreList.remove(to_unblock) return to_unblock def _process_alert(self, ip: IPNetwork, expiration: int, deep: int): listname = IPList.GRAY # alert puts in GRAY list by definition assert expiration # alert should always have finite expiration if expiration <= time.time(): raise Reject("Already expired") unblocklist = [] # 1. Cleanup exact match from GRAY_SPLASHSCREEN try: existing = IPList.get(ip=ip, listname=IPList.GRAY_SPLASHSCREEN) except DoesNotExist: pass else: if existing.lives_less(expiration) \ or not existing.lives_longer(expiration): unblocklist.append( (existing.ip_network, IPList.GRAY_SPLASHSCREEN)) existing.delete_instance() # 2. Remove from ignore list unblocklist.extend((net, IPList.IGNORE) for net in self._cleanup_ignore_list(ip)) # 3. Store IP in DB existing, created = IPList.get_or_create( ip=ip, listname=listname, defaults=dict( expiration=expiration, deep=deep, manual=False, comment='Automatically blocked due to distributed attack', imported_from='Imunify360') ) if not created: existing.update_properties(expiration, deep, listname) else: with geo.reader() as geo_reader: country = geo_reader.get_id(ip) existing.country = country existing.save() return dict( blocklist={(ip, listname): dict(expiration=expiration)}, unblocklist=unblocklist ) @expect(MessageType.SensorAlert) async def add_to_db(self, message): """On alert, add attackers' ip/net to db's GRAY iplist or update expiry date for an existing GRAY iplist entry. Whitelist is checked in separate plugin No check existing supernets. GRAY subnets of the given net are not removed from the db! BLACK subnets are _not_ removed (whether they are added manually or not). GRAY_SPLASHSCREEN subnets /that expire earlier than ip's expiry date/ are removed from the db /only if the ip is added above/. Ipsets/webshield are updated elsewhere. """ def process(): with instance.db.atomic(): return self._process_alert( ip=message["attackers_ip"], expiration=message["properties"]["expiration"], deep=message["properties"]["deep"], ) await self._sink.process_message( MessageType.BlockUnblockList( await run_in_executor( self._loop, process, ) ) ) @expect(MessageType.ClientUnblock) async def delete_from_db(self, message): """ Spec [1]: for l in ["GRAY", "GRAY_SPLASHSCREEN"]: existing = search_exactly(net, l) if existing: remove(net, l) supernets = search(net, l) if supernets: for n in search_subnets(net, "IGNORE"): remove(net, "IGNORE") add(net, "IGNORE") return [1]: https://gerrit.cloudlinux.com/#/c/61260/18/src/handbook/message_processing/local_unblock.py """ # noqa ip = message['attackers_ip'] blocklist = {} unblocklist = [] affected_listnames = [IPList.GRAY, IPList.GRAY_SPLASHSCREEN] blocked_in_any_supernet = bool([ supernet for supernet in IPList.find_closest_ip_nets( ip, listname=affected_listnames, limit=1 ) if supernet.ip_network != ip ]) existing_ignored_subnets = list(IgnoreList.subnets(ip)) if blocked_in_any_supernet: await run_in_executor( self._loop, lambda: IgnoreList.get_or_create(ip=ip) ) blocklist[(message['attackers_ip'], IPList.IGNORE)] = { "expiration": IPList.NEVER } else: await run_in_executor( self._loop, lambda: self._delete_from_db( ip, ignore_list=existing_ignored_subnets, listname=affected_listnames ) ) unblocklist = [ (message["attackers_ip"], listname) for listname in affected_listnames ] for ip in existing_ignored_subnets: unblocklist.append( (ip, IPList.IGNORE) ) # add ip to whitelist due to captcha pass if message.get("plugin_id") == "captcha": if not await services.primary_whitelist_cache.contains(ip): unblock_ttl = AutoWhiteList.unblock_whitelist_ttl() expiration = int(time.time() + unblock_ttl) with instance.db.atomic(), geo.reader() as geo_reader: IPList.delete_expired(ip=ip, listname=IPList.WHITE) country = geo_reader.get_id(ip) record, created = IPList.get_or_create( ip=ip, listname=IPList.WHITE, defaults=dict( expiration=expiration, manual=False, full_access=False, captcha_passed=True, comment=( "Whitelisted for %s due to successful " "captcha pass" % naturaldelta(unblock_ttl) ), country=country, ) ) if created: logger.info('Added %s to the Whitelist for %s seconds', ip, unblock_ttl) blocklist[(message['attackers_ip'], IPList.WHITE)] = { "expiration": expiration, } await self._sink.process_message( MessageType.BlockUnblockList( { "blocklist": blocklist, "unblocklist": unblocklist, } ) ) @staticmethod def _delete_from_db(ip, ignore_list=None, *, listname=None): if listname is None: listname = [IPList.GRAY, IPList.GRAY_SPLASHSCREEN, IPList.BLACK] elif isinstance(listname, str): listname = [listname] if ignore_list is None: ignore_list = [] deleted = IPList.delete_from_list( ip=ip, listname=listname, manual=False ) if deleted: logger.debug("IP %s is unchecked from DB", ip) else: logger.debug("IP %s is not in DB", ip) if ignore_list: # unblock all subnet, we should flush all ignored ips IgnoreList.remove(ignore_list) @expect(MessageType.SynclistResponse) async def process_synclist(self, message): def process(): with instance.db.atomic(): return self._process_synclist(message) await self._sink.process_message( await run_in_executor( self._loop, process, ) ) def _process_blocklist(self, blocklist, manual_blacklist): with timeit( "remove expired and manually blacklisted ips from blocklist", logger ): _blocklist = { ip: prop for ip, prop in blocklist.items() if ( ip not in manual_blacklist and ( IPList.lives_longer_prop(prop, time.time()) ) ) } if not _blocklist: return {}, [] to_block, to_unblock = [], [] with timeit( "filter exact matched nets with greater ttl", logger ): for ip in IPList.find_ips_with_later_expiration(_blocklist): _blocklist.pop(ip) with geo.reader() as geo_reader: for ip, properties in _blocklist.items(): listname = IPList.get_listname_from(properties) expiration = properties.get("expiration", IPList.NEVER) # if already blocked with higher priority/ttl - skip to_unblock.extend( (net, IPList.IGNORE) for net in self._cleanup_ignore_list(ip) ) net, mask, version = pack_ip_network(ip) to_block.append( dict( ip=IP.ip_net_to_string(ip), network_address=net, netmask=mask, version=version, listname=listname, expiration=expiration, deep=properties.get("deep", 0), manual=False, country=geo_reader.get_id(ip), ) ) with timeit("add records to iplist", logger): IPList.block_many(to_block) return ( { ( unpack_ip_network( item["network_address"], item["netmask"], item["version"], ), item["listname"], ): {"expiration": item["expiration"]} for item in to_block }, to_unblock, ) def _process_unblocklist(self, unblocklist, manual_blacklist): to_block, to_unblock = {}, [] unblocklist = { ip: properties for ip, properties in unblocklist.items() if not any( ip.subnet_of(net) for net in manual_blacklist if ip.version == net.version ) } def ips_to_remove_from_db(): full_action_types_list_to_unblock = [ IPList.listname2action_type(list_) for list_ in [ IPList.GRAY_SPLASHSCREEN, IPList.GRAY, IPList.BLACK ] ] for ip, properties in unblocklist.items(): action_type = ( properties.get("action_type") if properties else None ) action_types_to_unblock = ( [action_type] if action_type is not None else full_action_types_list_to_unblock ) for action_type in action_types_to_unblock: # required only *action_type* field to unblock yield (ip, {"action_type": action_type}) to_unblock += IPList.remove_many(ips_to_remove_from_db()) return to_block, to_unblock def _process_synclist(self, message): manual_blacklist = set(IPList.fetch_ipnetwork_list( IPList.BLACK, manual=True)) blocklist, unblocklist = {}, [] for method, list_ in [ (self._process_blocklist, message["blocklist"]), (self._process_unblocklist, message["unblocklist"]) ]: with timeit("synclist %s" % method, logger): to_block, to_unblock = method(list_, manual_blacklist) blocklist.update(to_block) unblocklist.extend(to_unblock) return MessageType.BlockUnblockList({ "blocklist": blocklist, "unblocklist": unblocklist, })
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.2 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A