[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: dos_detector.py
File is not writable. Editing disabled.
import collections import datetime import ipaddress import logging from typing import Iterable, Tuple import psutil from defence360agent.contracts import plugins from defence360agent.utils import recurring_check from im360.contracts.config import DOS as DOS_config from defence360agent.contracts.messages import MessageType from im360.contracts.plugins import IDSAwareMessageSink from im360.internals import strategy from im360.utils import net logger = logging.getLogger(__name__) class DOSSensor(plugins.Sensor, IDSAwareMessageSink): STRATEGY = strategy.Strategy.PRIMARY_IDS_STRATEGY async def create_sensor(self, loop, sink): self._loop = loop self._sink = sink async def create_sink(self, loop): self._loop = loop self._task = None async def activate(self): coro = recurring_check(DOS_config.INTERVAL)( self._check_connections) self._task = self._loop.create_task(coro()) await super().activate() async def deactivate(self): await self._cond_task_cancel() await super().deactivate() async def _cond_task_cancel(self): if self._task is not None: self._task.cancel() await self._task self._task = None @staticmethod def list_dos_ips() -> Iterable[Tuple[str, int]]: """ Provides list of IPs that have more than allowed simultaneous connections to the server. :return: iterator over tuples (IP, connections) """ def non_local_established_connections(): local_ips = set(net.local_ip_addresses()) for conn in psutil.net_connections(): if conn.status == psutil.CONN_ESTABLISHED: ip_addr = ipaddress.ip_address(conn.raddr.ip) if getattr(ip_addr, "ipv4_mapped", None): # IPv4-mapped IPv6 addresses ip_addr = ip_addr.ipv4_mapped if ip_addr not in local_ips: yield str(ip_addr), conn.laddr.port connections = collections.Counter( non_local_established_connections() ) for (remote_ip, port), count in connections.items(): per_port = DOS_config.PER_PORT.get( str(port), DOS_config.DEFAULT_LIMIT ) if count > per_port: logger.debug("DOS was discovered from ip %s on port %d" " with %d connections", remote_ip, port, count) yield remote_ip, port, count async def shutdown(self): await self._cond_task_cancel() async def _check_connections(self): if not DOS_config.ENABLED: return logger.debug('Checking for a DoS connections') for ip, port, num_connections in self.list_dos_ips(): message = self.generate_message(ip, port, num_connections) for msg_to_sink in ( MessageType.SensorAlert, MessageType.SensorIncident ): self._loop.create_task( self._sink.process_message( msg_to_sink(message) ) ) @staticmethod def generate_message(ip, port, num_connections): today = datetime.datetime.now() return { 'plugin_id': 'cl_dos', 'rule': None, 'timestamp': today.timestamp(), 'attackers_ip': ip, 'connections': num_connections, 'name': 'DOS detection', 'port': port, 'message': '{} Denial of Service attack was discovered ' 'from {}, on port {}. Open connections: {}'.format( today.strftime('%b %-d %H:%M:%S'), ip, port, num_connections ) }
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.34 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A