[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: proactive.py
File is not writable. Editing disabled.
import itertools import time from peewee import ( CompositeKey, DoesNotExist, ForeignKeyField, IntegerField, JOIN, PrimaryKeyField, TextField, CharField) from defence360agent.model import instance, Model from defence360agent.model.simplification import apply_order_by from im360.model.country import Country from im360.model.firewall import IPList, IPListPurpose class Proactive(Model): """Proactive defense php plugin events.""" class Meta: database = instance.db db_table = 'proactive' schema = 'proactive' id = PrimaryKeyField() #: When the event happened. timestamp = IntegerField(null=False) #: The IP that made the request - a string representation. ip = TextField(null=True) #: The IP that made the request - a numeric representation. ip_int = IntegerField(null=True) #: The IP that made the request - version (either `4` or `6`). ip_version = IntegerField(null=True) #: A country code for the IP, based on GeoDB data. ip_country_id = CharField(null=True) #: .. deprecated:: 4.6.0 no longer filled after DEF-10708. description = TextField(null=True) #: Action taken, may be `LOG`, `BLOCK`, `KILL`. action = TextField(null=False) #: The name of the server host under which the script was executed. host = TextField(null=True) #: The full PHP script path. path = TextField(null=False) #: The full URL of the request. url = TextField(null=True) #: The number of times the same event happened (if it's aggregated). count = IntegerField(null=False) #: User ID of the process. uid = IntegerField(null=False) #: Group ID of the process. gid = IntegerField(null=False) #: The ID of the matched rule (recognizer sub ID). rule_id = IntegerField(null=True) #: Human-readable name of the matched rule. rule_name = TextField(null=False) @classmethod def _iplist_join(cls): return (IPList.ip == cls.ip) & (~IPList.is_expired()) @classmethod def fetch(cls, uid, since=None, to=None, limit=None, offset=None, search=None, order_by=None): q = cls.select( cls.id, cls.timestamp, cls.ip, cls.action, cls.host, cls.path, cls.count, cls.rule_id, cls.rule_name, IPList.listname ).join(IPList, JOIN.LEFT_OUTER, on=cls._iplist_join()) if uid != 0: q = q.where(cls.uid == uid) if since is not None: q = q.where(cls.timestamp >= since) if to is not None: q = q.where(cls.timestamp <= to) if search is not None: q = q.where( cls.host.contains(search) | cls.path.contains(search) | cls.rule_name.contains(search) | cls.ip.contains(search) ) if order_by is not None: q = apply_order_by(order_by, cls, q) if limit is not None: q = q.limit(limit) if offset is not None: q = q.offset(offset) result = [] for item in q.dicts(): # add new field purpose # but save old listname as backward compatibility item["purpose"] = ( IPListPurpose.listname2purpose(item["listname"]) if item["listname"] else item["listname"] ) result.append(item) return result @classmethod def details(cls, id, uid): q = cls.select( cls.id, cls.timestamp, cls.ip, cls.description, cls.url, cls.action, cls.path, cls.count, cls.rule_id, cls.rule_name, IPList.listname, Country.code.alias('country') )\ .join(IPList, JOIN.LEFT_OUTER, on=cls._iplist_join())\ .switch()\ .join(Country, JOIN.LEFT_OUTER, on=(cls.ip_country_id == Country.id))\ .where(cls.id == id) if uid != 0: q = q.where(cls.uid == uid) q = q.dicts() if q.count() < 1: raise DoesNotExist event = next(iter(q)) q = ProactiveEnv.select(ProactiveEnv.name, ProactiveEnv.value)\ .where(ProactiveEnv.event_id == event['id']).tuples() event['env'] = dict(q) event["purpose"] = ( IPListPurpose.listname2purpose(event["listname"]) if event["listname"] else event["listname"] ) return event class ProactiveEnv(Model): """Proactive defence php plugin environment variables.""" #: A reference to the :class:`Proactive` table. event_id = IntegerField(null=False) #: The name of the environment variable. name = TextField(null=False) #: The value of the environment variable. value = TextField(null=True) class Meta: database = instance.db db_table = 'proactive_env' schema = 'proactive' primary_key = CompositeKey('event_id', 'name', 'value') class ProactiveIgnoredPath(Model): """Ignore list for proactive defence.""" #: Script path to be ignored. path = TextField(null=False, primary_key=True) #: Timestamp when the ignore record was added. timestamp = IntegerField(null=False, default=time.time) class Meta: database = instance.db db_table = 'proactive_ignored_path' @classmethod def _apply_order(cls, q, order_by=None): """ To be able to use itertools.groupby, we need result to be sorted by both path and timestamp, so in this method we add this fields to order_by if they was not passed by caller """ if order_by is None: order_by = [] order = [] fields = cls._meta.sorted_field_names order_by_fields = [f for f, _ in order_by] for field_name, desc in order_by: field = getattr(cls, field_name, None) if field is not None: order.append(field.desc() if desc else field) for field_name in fields: if field_name not in order_by_fields: order.append(getattr(cls, field_name)) order.append(ProactiveIgnoredRule.rule_name) return q.order_by(*order) @classmethod def fetch(cls, limit_homedir=None, since=None, to=None, search=None, order_by=None, limit=50, offset=0): q = cls.select( cls.path, cls.timestamp, ProactiveIgnoredRule.rule_id, ProactiveIgnoredRule.rule_name)\ .join(ProactiveIgnoredRule, JOIN.LEFT_OUTER) if limit_homedir is not None: # it's ok to use startswith() with path here, # because we are using normalized path when inserting q = q.where(cls.path.startswith(str(limit_homedir))) if search is not None: q = q.where(cls.path.contains(search)) if since is not None: q = q.where(cls.timestamp >= since) if to is not None: q = q.where(cls.timestamp <= to) q = cls._apply_order(q, order_by) result = [] max_count = 0 for p, g in itertools.groupby( q.dicts(), key=lambda r: (r['path'], r['timestamp'])): if (max_count >= offset) and (len(result) < limit): path, timestamp = p result.append({ 'path': path, 'timestamp': timestamp, 'rules': [{ 'id': row['rule_id'], 'name': row['rule_name']} for row in g if row['rule_id'] is not None ] }) max_count += 1 return max_count, result class ProactiveIgnoredRule(Model): """Specific rules ignored.""" #: A reference to the :class:`ProactiveIgnoredPath` table. path = ForeignKeyField(ProactiveIgnoredPath, null=False, on_delete='CASCADE', related_name='rules') #: The ID of the rule to be ignored. rule_id = IntegerField(null=False) #: A human-readable name of the rule to be ignored (just for information #: purposes - doesn't affect the logic). rule_name = TextField(null=False) class Meta: database = instance.db db_table = 'proactive_ignored_rule' indexes = ( (('path', 'rule_id'), True), )
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.34 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A