[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: config_provider.py
File is not writable. Editing disabled.
"""Config providers that work with files""" import json import logging import os import pwd from abc import abstractmethod from contextlib import suppress from textwrap import dedent from typing import Mapping, Optional, Protocol import yaml from defence360agent.utils import atomic_rewrite logger = logging.getLogger(__name__) # Don't read config if its file is larger than this. _MAX_CONFIG_SIZE = 1 << 20 # 1MiB class IConfigProvider(Protocol): @abstractmethod def read_config_file( self, force_read: bool = False, ignore_errors: bool = True ): raise NotImplementedError @abstractmethod def write_config_file(self, config: Mapping) -> None: raise NotImplementedError @abstractmethod def modified_since(self, timestamp: Optional[float]) -> bool: raise NotImplementedError class ConfigError(Exception): pass class JsonMessage: """Pretty-print given *obj* as JSON. To be used for logging. Example: logging.info("object: %s", JsonMessage(obj)) """ def __init__(self, obj): self._obj = obj def __str__(self): return json.dumps(self._obj, sort_keys=True) def diff_section(prev_section: Optional[dict], section: Optional[dict]): """Return difference between config sections.""" prev_section = prev_section or {} section = section or {} removed_settings = prev_section.keys() - section.keys() added_settings = section.keys() - prev_section.keys() return { "-": {v: prev_section[v] for v in removed_settings}, "+": {v: section[v] for v in added_settings}, # modified settings "?": { v: (prev_section[v], section[v]) for v in (prev_section.keys() & section.keys()) if prev_section[v] != section[v] }, } def diff_config(prev_conf: dict, conf: dict): """Compare *prev_conf* with the current *conf*.""" removed_sections = prev_conf.keys() - conf.keys() yield {section: prev_conf[section] for section in removed_sections} added_sections = conf.keys() - prev_conf.keys() yield {section: conf[section] for section in added_sections} # changed sections yield { section: diff_section(prev_conf[section], conf[section]) for section in (prev_conf.keys() & conf.keys()) if prev_conf[section] != conf[section] } def exclude_equals(*, main_conf: dict, base_conf: dict) -> dict: """ Return dict derived from *main_conf* excluding parts that are equal in *base_conf*. For example, >>> base_conf = { "SECTION1": {"OPTION1": "default", "OPTION2": "default"}, "SECTION2": {"OPTION1": "default"} } >>> main_conf = { "SECTION1": {"OPTION1": "value", "OPTION2": "default"}, "SECTION2": {"OPTION1": "default"} } >>> >>> exclude_equals(main_conf=main_conf, base_conf=base_conf) {'SECTION1': {'OPTION1': 'value'}} >>> """ _, added, changed = diff_config(base_conf, main_conf) result = {} for section, value in main_conf.items(): if section in added.keys(): result[section] = value if section in changed.keys(): result.setdefault(section, {}).update(changed[section]["+"]) result.setdefault(section, {}).update( {k: v[1] for k, v in changed[section]["?"].items()} ) return result class ConfigReader: """ ConfigFile file for settings page. Location config file is PATH """ def __init__(self, path, disclaimer="", permissions=None): self.path = path self.disclaimer = disclaimer self.permissions = permissions def __repr__(self): return "<{classname}({path})>".format( classname=self.__class__.__qualname__, path=self.path ) def __str__(self): return f"ConfigReader at {self.path}" def read_config_file( self, force_read: bool = False, ignore_errors: bool = True ) -> dict: """Read config file into memory. Raises ConfigError. """ try: if os.path.getsize(self.path) > _MAX_CONFIG_SIZE: raise ConfigError("Config file is too large") filename = self.path with open(filename, "r") as config_file: logger.info("Reading config file %s", filename) text = config_file.read() except UnicodeDecodeError as e: raise ConfigError("Unable to decode config file") from e except FileNotFoundError: return {} return self.load_config_body(text) def load_config_body(self, text: str) -> dict: try: config = yaml.safe_load(text) except yaml.YAMLError as e: raise ConfigError( f"Imunify360 config is not valid YAML document ({e})" ) from e if config is None: return {} if not isinstance(config, dict): raise ConfigError( "Imunify360 config is invalid or empty" ": path={!r}, text={!r}".format(self.path, text) ) return config def _pre_write(self): pass def _post_write(self): pass def write_config_file(self, config) -> str: self._pre_write() config_text = "" if self.disclaimer: config_text += dedent(self.disclaimer) config_text += "\n" config_text += yaml.dump(config, default_flow_style=False) atomic_rewrite( self.path, config_text, backup=False, permissions=self.permissions ) self._post_write() return config_text def modified_since(self, timestamp: Optional[float]) -> bool: return True class CachedConfigReader(ConfigReader): def __init__(self, path, disclaimer="", permissions=None): super().__init__(path, disclaimer) self.mtime: Optional[float] = None self.size: Optional[float] = None self._config = {} self.permissions = permissions def __str__(self): return ( "{classname} <'{path}', modified at {mtime}, {size} bytes>".format( classname=self.__class__.__qualname__, path=self.path, mtime=self.mtime, size=self.size, ) ) def read_config_file( self, force_read: bool = False, ignore_errors: bool = True ): """Update config if config file is modified""" if self.modified_since(self.mtime) or force_read: prev_config = self._config try: self._config = super().read_config_file() except ConfigError as error: if not ignore_errors: raise logger.exception( "Intentionally ignoring known config error: %s", error ) logger.warning( "%s is invalid, using previous settings: %s", self, JsonMessage(self._config), ) else: if self.mtime is not None: # don't log on startup diffs = list(diff_config(prev_config, self._config)) if any(diffs): # content has changed, log it logger.info( "%s modified: removed=%s, added=%s, changed=%s", self, *map(JsonMessage, diffs), ) try: stat = os.stat(self.path) self.mtime = stat.st_mtime self.size = stat.st_size except FileNotFoundError: self.mtime = 0.0 self.size = 0.0 return self._config def modified_since(self, timestamp: Optional[float]) -> bool: """Whether the config has updated since *timestamp*. (as defined by its last modification time and size) :param timestamp: None means that the file has never been read before """ # On startup consider timestamp to be None if timestamp is None: timestamp = 0.0 try: stat = os.stat(self.path) except FileNotFoundError: st_mtime, st_size = 0.0, 0.0 else: st_mtime, st_size = stat.st_mtime, stat.st_size return st_mtime > timestamp or st_size != self.size class WriteOnlyConfigReader(CachedConfigReader): def read_config_file(self, *_, **__): return self._config def write_config_file(self, config): config_text = super().write_config_file(config) self._config = self.load_config_body(config_text) return config_text class UserConfigReader(CachedConfigReader): def __init__(self, path, username): super().__init__(path) self.username = username def __str__(self): return f"Config of user {self.username}" def _pre_write(self): confdir = os.path.dirname(self.path) with suppress(FileExistsError): os.mkdir(os.path.dirname(self.path)) os.chown(confdir, 0, pwd.getpwnam(self.username).pw_gid) os.chmod(confdir, 0o750) def _post_write(self): os.chown(self.path, 0, pwd.getpwnam(self.username).pw_gid) os.chmod(self.path, 0o640)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.07 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A