[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: validate.py
File is not writable. Editing disabled.
import asyncio import json import logging import os import time from collections import namedtuple from datetime import date, datetime, timedelta from functools import wraps from ipaddress import IPv4Network, IPv6Network, ip_network from pathlib import Path import jsonschema from defence360agent.rpc_tools.validate import ( SchemaValidator as SchemaValidatorBase, ) from defence360agent.rpc_tools.validate import validate from im360.contracts.config import Webshield from im360.internals.core import IPSetPort, libipset from im360.utils.validate import IP logger = logging.getLogger(__name__) TODAY = 'today' YESTERDAY = 'yesterday' PortProtoBase = namedtuple('PortProtoBase', ['port', 'proto']) PeriodBase = namedtuple('PeriodBase', ['since', 'to']) class PortProto(PortProtoBase): def __new__(cls, port, proto): if proto not in IPSetPort.PROTOS: raise ValueError('Protocol {} is not supported'.format(proto)) if not (IPSetPort.MIN_PORT < port < IPSetPort.MAX_PORT): raise ValueError('Port {} is incorrect'.format(port)) return super().__new__(cls, port, proto) @classmethod def fromstring(cls, pp_string): try: port, proto = pp_string.split(':') port = int(port) return cls(port, proto) except ValueError as e: raise ValueError("Incorrect port_proto ({}): {}".format( str(e), pp_string)) class Period(PeriodBase): def __new__(cls, since, to): try: datetime.fromtimestamp(since), datetime.fromtimestamp(to) except ValueError as e: raise ValueError("Incorrect value for period: {}".format(str(e))) return super().__new__(cls, since, to) @classmethod def fromstring(cls, period_string): now = datetime.now() seconds_since_midnight = (now - now.replace( hour=0, minute=0, second=0, microsecond=0 )).total_seconds() if period_string == TODAY: since, to = time.time() - seconds_since_midnight, time.time() elif period_string == YESTERDAY: from_date = time.time() - seconds_since_midnight \ - timedelta(days=1).total_seconds() to_date = time.time() - seconds_since_midnight since, to = from_date, to_date else: period_names = 'weeks', 'days', 'hours', 'minutes', 'seconds' try: val, sfx = int(period_string[:-1]), period_string[-1:] except (ValueError, IndexError) as e: raise ValueError("Invalid string from period: {} ({})".format( period_string, str(e) )) if not sfx.endswith(tuple( p_name[0] for p_name in period_names )): # argparse will handle this exception raise ValueError("Invalid suffix: {}".format(sfx)) sfx_expanded = next(xp for xp in period_names if sfx == xp[0]) real_args = {sfx_expanded: val} since, to = ((datetime.now() - timedelta(**real_args)).timestamp(), time.time()) return cls(since, to) class SchemaValidator(SchemaValidatorBase): MAX_IPSET_TIMEOUT = libipset.IPSET_TIMEOUT_MAX # ipset's maximum ttl def _normalize_coerce_port_proto(self, value): if isinstance(value, PortProto): return value elif isinstance(value, str): return PortProto.fromstring(value) raise ValueError("String or PortProto must be provided") def _normalize_coerce_ip(self, value): if isinstance(value, (IPv4Network, IPv6Network)): return value elif isinstance(value, str): return ip_network(value) def _normalize_coerce_ip_discard_host_bits(self, value): if isinstance(value, (IPv4Network, IPv6Network)): return value elif isinstance(value, str): return ip_network(value, strict=False) def _normalize_coerce_period(self, value): if isinstance(value, Period): return value elif isinstance(value, str): return Period.fromstring(value) raise ValueError("String or Period must be provided") def _normalize_coerce_tolower(self, value): # please, don't try to casefold() instead of lower() # see https://tools.ietf.org/html/rfc4343 return value.lower() def _validate_type_port_proto(self, value): if isinstance(value, PortProto): return True return False def _validate_type_period(self, value): if isinstance(value, Period): return True return False def _validate_type_ip(self, value): return isinstance(value, (IPv4Network, IPv6Network)) def _validator_enforce64min_subnet_mask_for_ipv6(self, field, value): if IP.is_valid_ipv6_network(value): # 64 - min subnet mask for ipv6 addr if IPv6Network(value).prefixlen > 64: self._error( field, 'Supported only ipv6 /64 networks: {}'.format(value)) def _validator_max_days(self, field, value): max_days = timedelta.max.days max_past = date.today() - date(1970, 1, 1) if value > max_days or timedelta(days=value) > max_past: self._error(field, 'Number of days ({}) exceeds maximum value of {}. ' 'Please, specify lesser amount of days'.format( value, max_past.days)) def _validator_timestamp(self, field, value): try: datetime.fromtimestamp(value) except ValueError as e: self._error(field, "Incorrect timestamp: {} ({})".format( value, str(e) )) def _validator_expiration(self, field, value): if not value: return expiration_time = value now = time.time() if expiration_time <= now: self._error( field, "Expiration contains expired timestamp {}!".format( time.strftime("%x %X %Z", time.gmtime(expiration_time)) ), ) max_expiration_time = now + self.MAX_IPSET_TIMEOUT if expiration_time > max_expiration_time: self._error( field, ( "Expiration time {} is too far into the future." " It is more than {} seconds from now" ).format(expiration_time, self.MAX_IPSET_TIMEOUT), ) def _validator_webshield_is_enabled(self, field, value): if not Webshield.ENABLE: self._error(field, 'This command is not supported when webshield ' 'is disabled') def validate_middleware(validator): base = Path(os.path.dirname(__file__)) / '../..' core_schemas = base / 'defence360agent/simple_rpc/schema_responses/another' imav_schemas = base / 'imav/simple_rpc/schema_responses/another' im360_schemas = base / 'im360/simple_rpc/schema_responses/another' def get_file_from_schema_responses_dirs(filename): core_schema_path = core_schemas.with_name(filename) if core_schema_path.exists(): return core_schema_path imav_schema_path = imav_schemas.with_name(filename) if imav_schema_path.exists(): return imav_schema_path return im360_schemas.with_name(filename) def get_response_schema(return_type): # asserts that return_type does not contain '/' schema_path = get_file_from_schema_responses_dirs(return_type + '.json') with schema_path.open('r') as f: schema = json.load(f) return schema async def validate_response(hashable, result): return_type = validator.schema.get(hashable) \ .get('return_type', None) if return_type is None: return schema = get_response_schema(return_type) # validation should be performed after # result gets such format (ui accepts it) # in some cases result never gets such format # like test_addmany_invalid_request target = {'result': 'success', 'messages': [], 'data': result} try: jsonschema.validate(target, schema) except jsonschema.ValidationError as error: logger.critical( 'Validating %r using schema %r failed with error "%s".', target, schema, error, exc_info=error, ) def wrapped(f): @wraps(f) async def wrapper(request, *args, **kwargs): hashable = tuple(request['command']) request['params'] = validate( validator, hashable, request['params']) result = await f(request, *args, **kwargs) # no cpu overhead during rpc request # since validation is asynchronous # (only next request may be delayed a little) # run_until_complete waits until this task will be finished (why?) # so test will be failed asyncio.ensure_future(validate_response(hashable, result)) return result return wrapper return wrapped
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.07 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A