[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: svcctl.py
File is not writable. Editing disabled.
import asyncio import logging import os from pathlib import Path import subprocess as su from typing import Iterable, Union from defence360agent.contracts.config import Core from defence360agent.utils import check_run, run, OsReleaseInfo logger = logging.getLogger(__name__) DOS_PROTECTOR_SERVICE_NAME = "imunify360-dos-protection" UAL_SERVICE_NAME = "imunify360-unified-access-logger" def _apply_cmd(func): async def wrapper(*args, **kwargs): cmd = func(*args, **kwargs) logger.debug("check_call(%r)", cmd) await check_run(cmd) return wrapper async def _reset_failed_state( services: Iterable[Union["_CentOs6", "_SystemctlBased"]] ): for s in services: await s.reset_failed() await s.restart() for _ in range(10): if await s.is_active(): break logger.warning( "Service %s is still not active, sleep for %s seconds", s, 1 ) await asyncio.sleep(1) class _CentOs6: SVC_CTL_BIN = "/sbin/service" _CHKCONFIG = "/sbin/chkconfig" def __init__(self, service_name): self._service_name = service_name @_apply_cmd def start(self): return [self.SVC_CTL_BIN, self._service_name, "start"] @_apply_cmd def stop(self): return [self.SVC_CTL_BIN, self._service_name, "stop"] @_apply_cmd def restart(self): return [self.SVC_CTL_BIN, self._service_name, "restart"] async def reset_failed(self): """Not implemented for Centos6""" pass @_apply_cmd def enable(self): return [self._CHKCONFIG, "--add", self._service_name] async def is_enabled(self): cmd = [self._CHKCONFIG, "--list", self._service_name] proc = await asyncio.create_subprocess_exec( *cmd, stdout=su.PIPE, stderr=su.DEVNULL ) out, _ = await proc.communicate() rc = await proc.wait() return rc == 0 and b":on" in out def is_enabled_sync(self): cmd = [self._CHKCONFIG, "--list", self._service_name] cp = su.run(cmd, stdout=su.PIPE, stderr=su.DEVNULL) return cp.returncode == 0 and b":on" in cp.stdout @_apply_cmd def disable(self): return [self._CHKCONFIG, "--del", self._service_name] mask = disable unmask = enable async def is_active(self): cmd = [self.SVC_CTL_BIN, self._service_name, "status"] exit_code, _, _ = await run(cmd) return exit_code == 0 async def activate_socket_service(self): if await self.is_enabled() and not await self.is_active(): await _reset_failed_state((self,)) class _SystemctlBased: SVC_CTL_BIN = "systemctl" def __init__(self, service_name): self._service_name = service_name @_apply_cmd def start(self): return [self.SVC_CTL_BIN, "start", self._service_name] @_apply_cmd def stop(self): return [self.SVC_CTL_BIN, "stop", self._service_name] @_apply_cmd def restart(self): return [self.SVC_CTL_BIN, "restart", self._service_name] @_apply_cmd def _enable_now(self, *, now: bool): return [ self.SVC_CTL_BIN, "enable", *(["--now"] if now else []), self._service_name, ] async def enable(self, *, now: bool): await self._enable_now(now=now) # WARN: Ubuntu 16.04 demonstrates very special behavior of the # `systemcl enable --now` command - if the unit is stopped it # wouldn't be started. We need to handle that case. # TODO: Remove this case on dropping support for Ubuntu 16.04. osinfo = {} try: OsReleaseInfo.dict_from_file(osinfo) except (FileNotFoundError, PermissionError): return if osinfo.get("ID", "").lower() != "ubuntu": return if osinfo.get("VERSION_ID", "") == "16.04": await self.restart() async def is_enabled(self): cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name] proc = await asyncio.create_subprocess_exec( *cmd, stdout=su.DEVNULL, stderr=su.DEVNULL ) await proc.communicate() rc = await proc.wait() return rc == 0 def is_enabled_sync(self): cmd = [self.SVC_CTL_BIN, "is-enabled", self._service_name] rc = su.call(cmd, stdout=su.DEVNULL, stderr=su.DEVNULL) return rc == 0 @_apply_cmd def disable(self, *, now: bool): return [ self.SVC_CTL_BIN, "disable", *(["--now"] if now else []), self._service_name, ] @_apply_cmd def reload(self): return [self.SVC_CTL_BIN, "reload", self._service_name] @_apply_cmd def mask(self): """ It was created for imunify360-webshield which required masking as far This is no more relevant but let it stay is started by 'Wants=' in imunify360.service """ return [self.SVC_CTL_BIN, "mask", self._service_name] @_apply_cmd def unmask(self): """ It was created for imunify360-webshield which required masking as far This is no more relevant but let it stay is started by 'Wants=' in imunify360.service """ return [self.SVC_CTL_BIN, "unmask", self._service_name] async def is_active(self): cmd = [self.SVC_CTL_BIN, "is-active", self._service_name] exit_code, _, _ = await run(cmd) return exit_code == 0 @_apply_cmd def reset_failed(self): return [self.SVC_CTL_BIN, "reset-failed", self._service_name] async def activate_socket_service(self): agent_service_socket = adaptor(f"{self._service_name}.socket") if ( await agent_service_socket.is_enabled() and not await agent_service_socket.is_active() ): await _reset_failed_state((self, agent_service_socket)) class _CentOs7(_SystemctlBased): SVC_CTL_BIN = "/usr/bin/systemctl" class _DebianUbuntu(_SystemctlBased): SVC_CTL_BIN = "/bin/systemctl" def adaptor(service_name, *, include_centos6=True): for a in ( _DebianUbuntu, _CentOs7, *((_CentOs6,) if include_centos6 else tuple()), ): if os.path.exists(a.SVC_CTL_BIN): return a(service_name) else: raise RuntimeError("Cannot instantiate appropriate adaptor.") def imunify360_service(): return adaptor(Core.SVC_NAME) def imunify360_dos_protector_service(): try: return adaptor(DOS_PROTECTOR_SERVICE_NAME, include_centos6=False) except RuntimeError: logger.info("DOS Protector service is not available on this system") return None def imunify360_ual_service(): return adaptor(UAL_SERVICE_NAME)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.07 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A