[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: panel.py
File is not writable. Editing disabled.
import asyncio import functools import json import logging import os from collections import defaultdict from typing import Dict, List, Sequence, Set import cerberus import yaml from defence360agent.api.integration_conf import ( ClIntegrationConfig, IntegrationConfig, ) from defence360agent.api.jwt_issuer import JWTIssuer from defence360agent.application.determine_hosting_panel import ( is_generic_panel_installed, ) from defence360agent.rpc_tools.lookup import UserType from defence360agent.utils import ( CheckRunError, check_run, get_non_system_users, ) from .. import base logger = logging.getLogger(__name__) _SCHEMA_PATH_TMPL = ( os.path.dirname(__file__) + "/users_script_schemas/schema-{}.yaml" ) ADMIN_LIST_FILE_PATH = "/etc/sysconfig/imunify360/auth.admin" METADATA = "metadata" @functools.lru_cache(maxsize=2) def _get_validator(script: str) -> cerberus.Validator: """Returns a validator for given script.""" with open(_SCHEMA_PATH_TMPL.format(script)) as schema_file: schema = yaml.safe_load(schema_file) if script is not METADATA: schema[METADATA] = {"required": True} return cerberus.Validator(schema) def get_users_default_impl(): return [dict(name=pw.pw_name) for pw in get_non_system_users()] def _get_conf_path(cl, script): d = cl.to_dict() if "integration_scripts" in d and script in d["integration_scripts"]: return d["integration_scripts"][script] return None async def get_integration_data(script: str): path = _get_conf_path(IntegrationConfig(), script) if not path: path = _get_conf_path(ClIntegrationConfig(), script) if not path: raise IntegrationScriptError( "%s not found neither in " "/etc/sysconfig/imunify360/integration.conf " "nor in /opt/cpvendor/etc/integration.ini." % script ) return await _get_integration_data(script, path) async def _get_integration_data(script: str, path: str): try: stdout = await check_run(path, shell=True) except CheckRunError as e: raise IntegrationScriptError( "Integrations script {script} " "failed with exit code {e.returncode} \n" "{e.stderr}".format(script=script, e=e) ) try: data = json.loads(stdout.decode()) except (UnicodeDecodeError, json.JSONDecodeError) as e: raise IntegrationScriptError( "Cannot decode output of %s as JSON" % path ) from e if not isinstance(data, dict): raise IntegrationScriptError("%s should return dict" % path) metadata_validator = _get_validator(METADATA) if not metadata_validator.validate(data): raise IntegrationScriptError( "Validation error in metadata of %s script: %s" % (script, metadata_validator.errors) ) if data[METADATA]["result"] != "ok": metadata_error = data[METADATA]["result"] if "message" in data[METADATA]: metadata_error += ": %s" % data[METADATA]["message"] raise IntegrationScriptError(metadata_error) validator = _get_validator(script) if not validator.validate(data): raise IntegrationScriptError( "Validation error in %s script: %s" % (script, validator.errors) ) return data["data"] async def _get_client_data(): try: users = await get_integration_data("users") domains = await get_integration_data("domains") users = {u["username"]: [] for u in users} for k, v in domains.items(): user_domains = users.setdefault(v["owner"], []) user_domains.append(k) return [{"name": k, "domains": v} for k, v in users.items()] except IntegrationScriptError: logger.warning( "Applying default implementation of users and domains lists" ) return get_users_default_impl() async def get_domain_data(): try: return await get_integration_data("domains") except IntegrationScriptError: logger.warning("Could not parse domains lists") return {} async def get_admin_list() -> List[str]: script_name = "admins" admins_set = {"root"} admins_from_integration_scripts = await asyncio.gather( _get_integration_data( script_name, _get_conf_path( IntegrationConfig(), script_name, ), ), _get_integration_data( script_name, _get_conf_path( ClIntegrationConfig(), script_name, ), ), return_exceptions=True, ) custom_admins = { admin["name"] for admins in admins_from_integration_scripts if isinstance(admins, list) # skip exceptions for admin in admins } if not custom_admins: logger.warning( "Error occurred during extracting admins " "from integration configs: %s", admins_from_integration_scripts, ) admins_set |= custom_admins try: with open(ADMIN_LIST_FILE_PATH) as admin_list_file: admins_set.update(admin_list_file.read().splitlines()) except OSError: logger.warning( "Failed to retrieve admins list from %s", ADMIN_LIST_FILE_PATH ) return list(admins_set) class IntegrationScriptError(base.PanelException): def __init__(self, *args, **kwargs): super().__init__(*args) logger.warning(self) class GenericPanel(base.AbstractPanel): """ Panel, UI to which is provided by imunify{-antivirus,360-firewall}-generic.{rpm,deb} """ NAME = "generic panel" exception = IntegrationScriptError @classmethod def is_installed(cls): return is_generic_panel_installed() # pragma: no cover async def enable_imunify360_plugin(self, name=None): pass async def disable_imunify360_plugin(self, plugin_name=None): pass @classmethod async def version(cls): try: info = await get_integration_data("panel_info") return "{name} {version}".format(**info) except IntegrationScriptError: return "0" async def get_user_domains(self): users = await _get_client_data() result = [] for user in users: result.extend(user.get("domains", tuple())) return result async def get_users(self) -> List[str]: users = await _get_client_data() return [user["name"] for user in users] async def get_domain_to_owner(self) -> Dict[str, List[str]]: users = await _get_client_data() result = defaultdict(list) for user in users: for domain in user.get("domains", []): result[domain].append(user["name"]) return result async def get_domains_per_user(self) -> Dict[str, List[str]]: users = await _get_client_data() return {user["name"]: user.get("domains", []) for user in users} def authenticate(self, protocol, data: dict): if protocol._uid != 0 and data["command"] != ["login", "pam"]: token = data["params"].pop("jwt", None) parsed_token = JWTIssuer.parse_token(token) return parsed_token["user_type"], ( parsed_token["user_name"] if parsed_token["user_type"] == UserType.NON_ROOT else None ) else: return protocol.user, None def basedirs(self) -> Set[str]: conf = IntegrationConfig().to_dict() if "malware" in conf and "basedir" in conf["malware"]: return set(conf["malware"]["basedir"].split()) return set() async def list_docroots(self) -> Dict[str, str]: domains = await get_domain_data() return {v["document_root"]: domain for domain, v in domains.items()}
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 690.23 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A