[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: pam.py
File is not writable. Editing disabled.
"""PAM module management plugin. Has two functions: * exports manually whitelisted IP addresses to PAM whitelist; * changes PAM module state (enabled/disabled) to match imunify360 config. """ import asyncio import contextlib import logging from defence360agent.contracts import config from defence360agent.contracts.config import SystemConfig from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import MessageSink, expect from defence360agent.utils import recurring_check from im360.model import custom_lists from im360.model.firewall import IPList from im360.subsys import ossec, pam logger = logging.getLogger(__name__) class PAM(MessageSink): _EXPORT_DELAY = 10 # seconds _CONFIG_PERIODIC_CHECK = 3600 # seconds _SSHD_ENABLED = config.FromConfig("PAM", "enable") _DOVECOT_PROTECTION_ENABLED = config.FromConfig( "PAM", "exim_dovecot_protection" ) _DOVECOT_NATIVE_ENABLED = config.FromConfig("PAM", "exim_dovecot_native") _FTP_ENABLED = config.FromConfig("PAM", "ftp_protection") def __init__(self): self._tasks = [] self._whitelist_update_required = asyncio.Event() self._status_check_required = asyncio.Event() self._loop = None async def create_sink(self, loop) -> None: self._loop = loop self._tasks.append(loop.create_task(self._exporter())) IPList.Signals.added.connect(self._on_signal, IPList.WHITE) IPList.Signals.deleted.connect(self._on_signal, IPList.WHITE) IPList.Signals.cleared.connect(self._on_signal, IPList.WHITE) IPList.Signals.updated.connect(self._on_signal, IPList.WHITE) self._tasks.append(loop.create_task(self._status_checker())) self._tasks.append(loop.create_task(self._initiate_status_check())) self._whitelist_update_required.set() def _on_signal(self, listname: str, **kwargs) -> None: self._whitelist_update_required.set() @recurring_check(_EXPORT_DELAY) async def _exporter(self) -> None: await self._whitelist_update_required.wait() self._whitelist_update_required.clear() status = await pam.get_status() if all( s == pam.PamServiceStatusValue.disabled for s in status.values() ): # pam protection is disabled for all services return # nothing to do q = ( IPList.select(IPList.ip) .where(IPList.listname == IPList.WHITE, IPList.manual) .tuples() ) networks = [ip for [ip] in q] networks.extend(await custom_lists.CustomWhitelist.load()) try: await pam.export_ip_whitelist(networks) except FileNotFoundError as exc: logger.warning("Failed to export IP whitelist for PAM: %s", exc) async def shutdown(self) -> None: IPList.Signals.added.disconnect(self._on_signal) IPList.Signals.deleted.disconnect(self._on_signal) IPList.Signals.cleared.disconnect(self._on_signal) IPList.Signals.updated.disconnect(self._on_signal) for task in self._tasks: if task is not None: task.cancel() with contextlib.suppress(asyncio.CancelledError): await task @expect(MessageType.UpdateCustomLists) async def on_custom_lists_update( self, message: MessageType.UpdateCustomLists ): self._whitelist_update_required.set() async def _ensure_status(self) -> None: status = await pam.get_status() dovecot_protection_enabled = await self._ensure_status_for_dovecot( desired_dovecot_status=pam.DovecotStatus.DISABLED if not self._DOVECOT_PROTECTION_ENABLED else pam.DovecotStatus.PAM if not self._DOVECOT_NATIVE_ENABLED else pam.DovecotStatus.NATIVE, pam_status=status, ) ftp_enabled = await self._ensure_status_for_service( self._FTP_ENABLED, status, pam.PamService.FTP ) sshd_enabled = await self._ensure_status_for_service( self._SSHD_ENABLED, status, pam.PamService.SSHD ) something_has_been_enabled = any( [ dovecot_protection_enabled, ftp_enabled, sshd_enabled, ] ) # ensure OSSEC status status = dict(await pam.get_status()) # . merge dovecot status for ossec status[ossec.DOVECOT] = ( pam.PamServiceStatusValue.disabled if status[pam.PamService.DOVECOT_NATIVE] == status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.disabled else pam.PamServiceStatusValue.enabled ) del status[pam.PamService.DOVECOT_NATIVE] del status[pam.PamService.DOVECOT_PAM] try: await ossec.configure_for_pam(status) except ossec.OssecRulesError as exc: logger.error("Failed to update OSSEC configuration: %s", exc) if something_has_been_enabled: self._whitelist_update_required.set() async def _ensure_status_for_dovecot( self, desired_dovecot_status: pam.DovecotStatus, pam_status: dict ) -> bool: """Ensure pam status corresponds to the desired dovecot status. Special handling for 3 states. Return whether pam/native modules were enabled. """ if desired_dovecot_status is pam.DovecotStatus.DISABLED: if not ( pam_status[pam.PamService.DOVECOT_NATIVE] == pam_status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.disabled ): # something is enabled # disable dovecot # note: either pam/native will do here; both should be disabled await pam.disable(pam.PamService.DOVECOT_NATIVE) logger.info("PAM module has been disabled for dovecot") elif desired_dovecot_status is pam.DovecotStatus.PAM: if ( pam_status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.enabled ): # already enabled if ( pam_status[pam.PamService.DOVECOT_NATIVE] == pam.PamServiceStatusValue.enabled ): # pragma: no cover # shouldn't happen, report to Sentry logger.error( "Unexpected PAM state: both pam/native are enabled." " Status: %s", pam_status, ) else: # enable dovecot pam pam_service = pam.PamService.DOVECOT_PAM await pam.enable(pam_service) logger.info("PAM module has been enabled for %s", pam_service) return True elif desired_dovecot_status is pam.DovecotStatus.NATIVE: if ( pam_status[pam.PamService.DOVECOT_NATIVE] == pam.PamServiceStatusValue.enabled ): # already enabled if ( pam_status[pam.PamService.DOVECOT_PAM] == pam.PamServiceStatusValue.enabled ): # pragma: no cover # shouldn't happen, report to Sentry logger.error( "Unexpected PAM state: both pam/native are enabled." " Status: %s", pam_status, ) else: # enable dovecot native pam_service = pam.PamService.DOVECOT_NATIVE await pam.enable(pam_service) logger.info("PAM module has been enabled for %s", pam_service) return True else: # pragma: no cover assert 0, "can't happen" return False # nothing has been enabled async def _ensure_status_for_service( self, should_be_enabled, status, pam_service ): expected_service_status = ( pam.PamServiceStatusValue.enabled if should_be_enabled else pam.PamServiceStatusValue.disabled ) if expected_service_status != status[pam_service]: if should_be_enabled: await pam.enable(pam_service) logger.info("PAM module has been enabled for %s", pam_service) return True await pam.disable(pam_service) logger.info("PAM module has been disabled for %s", pam_service) return False @recurring_check(0) async def _status_checker(self): await self._status_check_required.wait() self._status_check_required.clear() await self._ensure_status() @recurring_check(_CONFIG_PERIODIC_CHECK) async def _initiate_status_check(self): self._status_check_required.set() @expect(MessageType.ConfigUpdate) async def on_config_update(self, message: MessageType.ConfigUpdate): if isinstance(message["conf"], SystemConfig): self._status_check_required.set()
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.14 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A