[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: validate.py
File is not writable. Editing disabled.
import datetime import logging import math import os import re from collections import namedtuple from functools import wraps from cerberus.validator import Validator from defence360agent.contracts.config import ( ANTIVIRUS_MODE, BackupRestore, Malware, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.subsys.backup_systems import BackupSystem, get_backend logger = logging.getLogger(__name__) SHA256_REGEXP = re.compile("^[A-Fa-f0-9]{64}$") class ValidationError(Exception): def __init__(self, errors, extra_data=None): if isinstance(errors, str): self.errors = [errors] else: self.errors = errors self.extra_data = extra_data or {} OrderByBase = namedtuple("OrderByBase", ["column_name", "desc"]) class OrderBy(OrderByBase): def __new__(cls, column_name, desc): return super().__new__(cls, column_name, desc) @classmethod def fromstring(cls, ob_string): """ :param ob_string: for example: 'user+', 'id-' :return: """ try: col_name, sign = re.compile("^(.+)([+|-])").split(ob_string)[1:-1] return cls(col_name, sign == "-") except ValueError as e: raise ValueError( "Incorrect order_by: ({}): {}".format(str(e), ob_string) ) class SchemaValidator(Validator): _DATE_FORMAT = "%Y-%m-%d" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.extra_data = {} def _normalize_coerce_order_by(self, value): if isinstance(value, OrderBy): return value return OrderBy.fromstring(value) def _normalize_coerce_sha256hash(self, value): return str(value).strip().lower() def _normalize_coerce_scan_db(self, value): if ANTIVIRUS_MODE: return False if value is None: return Malware.DATABASE_SCAN_ENABLED return value def _validate_type_order_by(self, value): if isinstance(value, OrderBy): return True return False def _validate_type_sha256hash(self, value: str): return SHA256_REGEXP.match(str(value).strip()) def _validate_is_absolute_path(self, is_absolute_path, field, value): """{'type': 'boolean', 'empty': False}""" if is_absolute_path: if not os.path.isabs(value): self._error(field, "Path {} should be absolute".format(value)) def _validate_isascii(self, isascii, field, value): """{'type': 'boolean'}""" if isascii: try: value.encode("ascii") except UnicodeEncodeError: self._error(field, "Must only contain ascii symbols") def _normalize_coerce_int(self, value): return int(value) def _normalize_default_setter_now(self, document) -> int: return math.ceil(datetime.datetime.now().timestamp()) # for argparser support def _validate_cli(self, *args, **kwargs): """{'type': 'dict', 'empty': False, 'schema': { 'users': {'type': 'list', 'allowed': ['non-root', 'root'], 'empty': False}, 'require_rpc': {'type': 'string', 'empty': True, 'default': 'running', 'allowed': ['running', 'stopped', 'any', 'direct']} }} """ # for argparser support def _validate_help(self, *args, **kwargs): """{'type': 'string', 'empty': False}""" # for argparser support def _validate_positional(self, *args, **kwargs): """{'type': 'boolean', 'empty': True, 'default': False}""" # metadata for response validation def _validate_return_type(self, *args, **kwargs): """{'type': 'string', 'empty': True}""" def _validate_envvar(self, *args, **kwargs): """ Parameter can be passed via the specified environment variable. The value specified via a CLI argument takes precedence. The rule's arguments are validated against this schema: {'type': 'string', 'empty': False} """ def _validate_envvar_only(self, *args, **kwargs): """ Parameter will only be accepted if provided via environment variable specified by `envvar`. It will be rejected if passed as a CLI argument. The rule's arguments are validated against this schema: {'type': 'boolean', 'default': False} """ def _normalize_coerce_path(self, value: str): if value: return os.path.abspath(value) return value def _normalize_coerce_backup_system(self, value): if isinstance(value, BackupSystem): return value return get_backend(value) def _validator_backup_is_enabled(self, field, value): if not (BackupRestore.ENABLED and BackupRestore.backup_system()): self._error(field, "Backup is not enabled!") def validate(validator, hashable, params): values = validator.normalized( {hashable: params}, always_return_document=True ) if not validator.validate({hashable: values[hashable]}): logger.warning( "Validation error with command {}, params {}, errors {}".format( hashable, params, validator.errors ) ) raise ValidationError(validator.errors, validator.extra_data) return validator.document[hashable] def validate_middleware(validator): def wrapped(f): @wraps(f) async def wrapper(request, *args, **kwargs): hashable = tuple(request["command"]) request["params"] = validate( validator, hashable, request["params"] ) result = await f(request, *args, **kwargs) return result return wrapper return wrapped def validate_av_plus_license(func): """ Decorator for CLI commands methods that ensures that the AV+ license is valid. :raises ValidationError: """ exception = ValidationError("ImunifyAV+ license required") @wraps(func) async def async_wrapper(*args, **kwargs): if LicenseCLN.is_valid_av_plus(): return await func(*args, **kwargs) raise exception if ANTIVIRUS_MODE: return async_wrapper return func
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.13 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A