[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: incident.py
File is not writable. Editing disabled.
import time from typing import Dict, List from defence360agent.model import Model, instance from defence360agent.model.simplification import apply_order_by from im360.contracts.config import ( ControlPanelProtector, ModsecSensor, OssecSensor, ) from im360.model.country import Country from im360.model.firewall import IPList, IPListPurpose from peewee import ( JOIN, Case, CharField, CompositeKey, FloatField, ForeignKeyField, IntegerField, IntegrityError, PrimaryKeyField, TextField, prefetch, ) from playhouse.shortcuts import model_to_dict ossec_to_modsec_severity = { 1: 7, # debug level 2: 6, 3: 5, 4: 4, # default for UI filtering 5: 4, 6: 3, 7: 3, 8: 3, 9: 3, 10: 3, 11: 3, 12: 2, 13: 2, 14: 1, 15: 0 # emergency level } class _SafeCharField(CharField): def adapt(self, value): return super().adapt(value.encode('utf-8', errors='ignore')) class Incident(Model): """Security-related events that happened on the server.""" # supplying each field with null=True to be consistent # with previously used create table sql: # CREATE TABLE incident ( # id INTEGER PRIMARY KEY, # plugin TEXT, # rule TEXT, # timestamp REAL, # retries INTEGER, # severity NUMERIC, # name TEXT, # description TEXT, # abuser TEXT # ); id = IntegerField(primary_key=True, null=True) #: The name of the sensor used to detect an incident, e.g. modsec, cl_dos. plugin = CharField(null=True) #: The ID of the rule. rule = CharField(null=True) #: Timestamp when the incident happened, or at least was detected. timestamp = FloatField(null=True) #: How many times it happened - incidents are aggregated over #: a short period preserving most of the fields, except for the exact #: :attr:`timestamp` and :attr:`description`. retries = IntegerField(null=True) #: How significant the threat is. #: All plugins/sensors are brought to the scale roughly matching the `OSSEC #: classification <https://www.ossec.net/docs/manual/rules-decoders/ #: rule-levels.html#rules-classification>`_ severity = IntegerField(null=True) #: A human-readable name of the triggered rule. name = CharField(null=True) #: A detailed description of the event. description = _SafeCharField(null=True) #: The IP that has caused the incident, if applicable. abuser = CharField(null=True) #: A reference to country code and name for the IP, based on GeoDB data. country = ForeignKeyField(Country, null=True) #: A domain name related to the incident, if available. domain = TextField(null=True, default=None) class Meta: database = instance.db db_table = 'incident' indexes = ( (('timestamp',), False), ) class OrderBy: @staticmethod def severity(): max_ossec_severity = max(ossec_to_modsec_severity.keys()) ossec_cases = tuple((ossec, modsec + (max_ossec_severity + 1 - ossec) / (max_ossec_severity + 1)) # sort ossec's incidents correctly when # modsec's severity equivalents equal for ossec, modsec in ossec_to_modsec_severity.items()) return Case(Incident.plugin, ( (OssecSensor.PLUGIN_ID, Case( Incident.severity, ossec_cases, 0), ), (ModsecSensor.PLUGIN_ID, Incident.severity), ), 100), # incidents without severity to the end @classmethod def _accept_severity(cls, severity): return ( ( ( (cls.plugin == OssecSensor.PLUGIN_ID) | (cls.plugin == ControlPanelProtector.PLUGIN_ID) ) & (cls.severity >= severity) ) | ( (cls.plugin == ModsecSensor.PLUGIN_ID) & (cls.severity <= ossec_to_modsec_severity[severity]) ) | cls.severity.is_null() ) @classmethod def get_sorted_incident_list( cls, since=None, to=None, by_abuser_ip=None, by_list=None, limit=None, offset=None, severity=None, by_country_code=None, search=None, order_by=None, ): """ :param by_country_code: country code in form 'US => United States" :param integer since: unixtime when records is began :param integer to: unixtime when records is ended :param str by_abuser_ip: full or part of IP, used for filtering results by abuser's IP :param str by_list: List of names of the appropriate ip list. Could be 'gray', 'white', 'black'. :param int limit: limits the output with specified number of incidents. The number greater than zero :param int offset: offset for pagination :param int severity: min log level (severity) to return. :param str search: filter results by ip, name, description :param list order_by: sorting orders """ if to is None: to = time.time() if by_list is not None: query_IPList = IPList.select(IPList).where( (IPList.listname << {lst.upper() for lst in by_list}) & (~IPList.is_expired()) ) else: query_IPList = IPList.select(IPList).where( (~IPList.is_expired())) query = Incident.select( Incident, query_IPList.c.listname, query_IPList.c.expiration, Country ).join( query_IPList, JOIN.LEFT_OUTER, on=(Incident.abuser == query_IPList.c.ip), attr="ip" ).join( Country, JOIN.LEFT_OUTER, on=(Incident.country == Country.id) ).where( (Incident.timestamp >= since) & cls._accept_severity(severity) & (Incident.timestamp <= to) ).order_by(Incident.timestamp.desc()) if by_list is not None: query = query.where(query_IPList.c.listname.is_null(False)) if search is not None: query = query.where( Incident.name.contains(search) | Incident.description.contains(search) | Incident.domain.contains(search) | Incident.abuser.contains(search) ) if by_abuser_ip is not None: query = query.where(Incident.abuser.contains(by_abuser_ip)) if by_country_code is not None: query = query.where(Country.code == by_country_code) if offset is not None: query = query.offset(offset) if limit is not None: query = query.limit(limit) if order_by is not None: query = apply_order_by(order_by, cls, query) return list(cls.mk_incident_iterator(query)) @classmethod def mk_incident_iterator(cls, query): for row in query: listname = row.ip.listname.lower() \ if getattr(row, "ip", None) else None purpose = IPListPurpose.listname2purpose( listname.upper()).value if listname else None incident_dict = { 'id': row.id, 'plugin': row.plugin, 'rule': row.rule, 'timestamp': row.timestamp, 'times': row.retries, 'severity': row.severity, 'name': row.name, 'description': row.description, 'abuser': row.abuser, 'listname': listname, 'purpose': purpose, 'country': model_to_dict(row.country) if row.country else {}, 'domain': row.domain } yield incident_dict @staticmethod def save_incident_list(data): # number of rows to insert in one query num_rows = 50 with instance.db.atomic(): for idx in range(0, len(data), num_rows): Incident.insert_many(data[idx:idx + num_rows]).execute() @classmethod def _add_common_filters(cls, query, kwargs): if 'domain' in kwargs: query = query.where(cls.domain == kwargs['domain']) if 'ip' in kwargs: query = query.where(cls.abuser == kwargs['ip']) if 'attack_type' in kwargs: query = query.where(cls.name == kwargs['attack_type']) if 'description' in kwargs: query = query.where( cls.description.contains(kwargs['description'])) return query class DisabledRule(Model): """Provides a way to ignore certain rules.""" class Meta: database = instance.db db_table = 'disabled_rules' indexes = ( (('plugin', 'rule_id'), True), ) id = PrimaryKeyField() #: The name of the sensor used to detect an incident, e.g. modsec, cl_dos. plugin = CharField(null=False) #: The ID of the rule. rule_id = CharField(null=False) #: A human-readable name of the rule. #: Only used for UX, doesn't affect detection logic. name = TextField(null=False) @classmethod def as_list(cls) -> List[Dict]: return [ { cls.plugin.name: rule.plugin, cls.rule_id.name: rule.rule_id, cls.name.name: rule.name, } for rule in cls.select() ] @classmethod def is_rule_ignored(cls, plugin, rule_id, domain=None): try: dr = cls.get(plugin=plugin, rule_id=rule_id) if dr.domains: return domain in (d.domain for d in dr.domains) else: return True except cls.DoesNotExist: pass return False @classmethod def get_global_disabled(cls, plugin): query = cls.select(cls.rule_id) \ .join(DisabledRuleDomain, JOIN.LEFT_OUTER) \ .where((cls.plugin == plugin) & (DisabledRuleDomain.domain >> None)).dicts() return [row['rule_id'] for row in query] @classmethod def get_domain_disabled(cls, plugin, domain): query = cls \ .select(cls.rule_id).join(DisabledRuleDomain) \ .where(cls.plugin == plugin, DisabledRuleDomain.domain == domain) \ .dicts() return [row['rule_id'] for row in query] @classmethod def fetch(cls, limit, offset=0, order_by=None): rules_query = cls.select() \ .order_by(cls.plugin, cls.rule_id) \ .limit(limit) \ .offset(offset) if order_by is not None: rules_query = apply_order_by(order_by, cls, rules_query) domains_query = DisabledRuleDomain.select() rules_with_domains_query = prefetch(rules_query, domains_query) result = [] max_count = rules_query.count(clear_limit=True) for rule in rules_with_domains_query: item = { 'plugin': rule.plugin, 'id': rule.rule_id, 'name': rule.name, 'domains': None } if rule.domains: item['domains'] = [ d.domain for d in rule.domains] result.append(item) return max_count, result @classmethod def store(self, plugin, id, name, domains): try: inserted_id = DisabledRule.insert( plugin=plugin, rule_id=id, name=name).execute() except IntegrityError: dr = DisabledRule.get(plugin=plugin, rule_id=id) if domains: for d in domains: DisabledRuleDomain.create_or_get( disabled_rule_id_id=dr.id, domain=d) else: DisabledRuleDomain.delete().where( DisabledRuleDomain.disabled_rule_id_id == dr.id).execute() else: for d in domains: DisabledRuleDomain.create( disabled_rule_id_id=inserted_id, domain=d) class DisabledRuleDomain(Model): """Allows to disable rules for specific domains. If there are no records in this table related to :class:`DisabledRule`, then the rule is ignored for all domains. Otherwise, the rule is ignored only for domains listed. """ disabled_rule_id_id = ForeignKeyField( DisabledRule, backref='domains', on_delete='CASCADE') #: The domain name, for which the rule must be disabled. domain = CharField(null=False) class Meta: database = instance.db db_table = 'disabled_rules_domains' primary_key = CompositeKey('disabled_rule_id_id', 'domain')
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.06 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A