[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: disabled_rules.py
File is not writable. Editing disabled.
import asyncio import functools import time from peewee import DoesNotExist, IntegrityError from defence360agent.model.simplification import run_in_executor from defence360agent.rpc_tools import ValidationError, lookup from defence360agent.rpc_tools.utils import run_in_executor_decorator from defence360agent.utils import Scope from im360.contracts.config import Modsec from defence360agent.contracts.messages import MessageType from im360.model.incident import DisabledRule, DisabledRuleDomain from im360.subsys import ( waf_rules_configurator, modsec_app_version_detector, ) from defence360agent.subsys import web_server from im360.subsys.panels import hosting_panel from im360.subsys.panels.generic.mod_security import ( GenericPanelModSecException, ) class DisabledRulesEndpoints(lookup.RootEndpoints): SCOPE = Scope.IM360 def __init__(self, sink): super().__init__(sink) self.hp = hosting_panel.HostingPanel() @lookup.bind('rules', 'disable') async def disable_rule(self, plugin, id, name, domains=None): domains = domains or [] if domains and plugin != 'modsec': raise ValidationError("Domains only allowed for plugin=modsec") # validate domain panel_domains = set(await self.hp.get_user_domains()) if not set(domains).issubset(panel_domains): raise ValidationError( "Some of the provided domains do not exist: {}".format( set(domains) - panel_domains, ) ) # NOTE: we can't call _store_disabled_rule after # _sync_modsec_configs, because we need to form a union of # specified domains and domains for which the specified rule is # already disabled. We might refactor this method in DEF-10761. sync_domains = await self._store_disabled_rule( plugin, id, name, domains) if plugin == 'modsec': await self._sync_modsec_configs(set(sync_domains) & panel_domains) await self._sink.process_message(MessageType.RuleDisabled( plugin_id=plugin, rule=id, name=name, domains=(domains or None), timestamp=time.time())) async def _delete_disabled_rule(self, plugin, id): loop = asyncio.get_event_loop() await run_in_executor( loop, lambda: DisabledRule.delete().where( DisabledRule.plugin == plugin, DisabledRule.rule_id == id ).execute()) @lookup.bind('rules', 'enable') async def enable_rule(self, plugin, id): loop = asyncio.get_event_loop() try: dr = await run_in_executor( loop, lambda: DisabledRule.get(plugin=plugin, rule_id=id) ) except DoesNotExist: return if plugin == 'modsec': domains = [d[0] for d in await run_in_executor( loop, lambda: DisabledRuleDomain.select( DisabledRuleDomain.domain ).where( DisabledRuleDomain.disabled_rule_id_id == dr.id).tuples() )] await self._delete_disabled_rule(plugin, id) panel_domains = set(await self.hp.get_user_domains()) await self._sync_modsec_configs(set(domains) & panel_domains) else: await self._delete_disabled_rule(plugin, id) await self._sink.process_message( MessageType.RuleEnabled( plugin_id=plugin, rule=id, timestamp=time.time() ) ) @lookup.bind('rules', 'list-disabled') @run_in_executor_decorator def list_disabled_rules(self, limit, offset, order_by=None): return DisabledRule.fetch(limit, offset, order_by) @lookup.bind('rules', 'update-app-specific-rules') async def update_app_based_rules(self): if not Modsec.APP_SPECIFIC_RULESET: raise ValidationError("App specific ruleset setting is disabled.") try: await waf_rules_configurator.update_waf_rules_config() except (waf_rules_configurator.NotSupportedWebserverError, modsec_app_version_detector.DatabaseNotFoundError, NotImplementedError) as e: raise ValidationError(str(e)) @run_in_executor_decorator def _store_disabled_rule(self, plugin, id, name, domains): sync_domains = set(domains) try: inserted_id = DisabledRule.insert( plugin=plugin, rule_id=id, name=name).execute() except IntegrityError: dr = DisabledRule.get(plugin=plugin, rule_id=id) for d in DisabledRuleDomain.select().where( DisabledRuleDomain.disabled_rule_id_id == dr.id ): sync_domains.add(d.domain) DisabledRuleDomain.delete().where( DisabledRuleDomain.disabled_rule_id_id == dr.id).execute() for d in domains: DisabledRuleDomain.create_or_get( disabled_rule_id_id=dr.id, domain=d) else: for d in domains: DisabledRuleDomain.create( disabled_rule_id_id=inserted_id, domain=d) return list(sync_domains) async def _sync_modsec_configs(self, domains: set): loop = asyncio.get_event_loop() domain_list = list(domains) rules_list = await asyncio.gather( *( run_in_executor( loop, functools.partial( DisabledRule.get_domain_disabled, 'modsec', d ), ) for d in domain_list ) ) try: await self.hp.sync_disabled_rules_for_domains( dict(zip(domain_list, rules_list)) ) except GenericPanelModSecException as e: # don't send errors from generic panel to Sentry; # panel admin is responsible for configuring generic panel raise ValidationError(str(e)) from e rules = await run_in_executor( loop, lambda: DisabledRule.get_global_disabled('modsec') ) await self.hp.sync_global_disabled_rules(rules) await web_server.graceful_restart()
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.46 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A