[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: helpers.py
File is not writable. Editing disabled.
import datetime import ftplib import functools import inspect import os import re import shutil import socket import warnings from itertools import chain from dateutil import parser def from_env(**param_to_env): def decorator(f): spec = inspect.getfullargspec(f) assert all( param in chain(spec.args, spec.kwonlyargs) for param in param_to_env ) f.from_env = param_to_env return f return decorator class DateTime(datetime.datetime): def __new__(cls, s): return parser.parse(s, ignoretz=True) fromtimestamp = datetime.datetime.fromtimestamp now = datetime.datetime.now utcfromtimestamp = datetime.datetime.utcfromtimestamp def reconnect_ftp(func): @functools.wraps(func) def wrapper(ftp, *args, **kwargs): try: return func(ftp, *args, **kwargs) except ftplib.error_temp: ftp.connect() return func(ftp, *args, **kwargs) return wrapper class FtpError(Exception): pass def _handle_ftp_error(func): @functools.wraps(func) def wrapper(ftp, *args, **kwargs): try: return func(ftp, *args, **kwargs) except (ftplib.Error, socket.error) as e: raise FtpError(*e.args) return wrapper @functools.lru_cache(maxsize=None) class Ftp: ftp = None def __init__(self, host, login, password, use_ftps, passive_mode=True, port=21, timeout=10, **_): self.host = host self.login = login self.password = password self.passive_mode = passive_mode self.use_ftps = use_ftps self.port = port self.timeout = timeout @property def _address(self): return self.login + '@' + self.host + ':' + str(self.port) def __str__(self): protocol = 'ftps' if self.use_ftps else 'ftp' return protocol + '://' + self._address @_handle_ftp_error def connect(self): if self.use_ftps: ftp = ftplib.FTP_TLS() else: ftp = ftplib.FTP() ftp.connect(self.host, self.port, self.timeout) ftp.login(self.login, self.password) if self.use_ftps: ftp.prot_p() ftp.set_pasv(self.passive_mode) self.ftp = ftp @_handle_ftp_error @reconnect_ftp def listdir(self, *args): return self.ftp.nlst(*args) @_handle_ftp_error @reconnect_ftp def mlistdir(self, *args, **kwargs): return self.ftp.mlsd(*args, **kwargs) @_handle_ftp_error @reconnect_ftp def retrieve(self, path, destination): """ :raises FtpError: :raises IsNotDirError: :raises DirNotEmptyError: """ if not os.path.isabs(destination): destination = os.path.abspath(destination) destination = os.path.join(destination, self._address) target_name = os.path.join(destination, path.lstrip(os.path.sep)) target_dir = os.path.dirname(target_name) mkdir(target_dir) free_space = shutil.disk_usage(target_dir).free try: target_size = self.size(path) except FtpError: # Skip this step if SIZE command is not supported on the server pass else: if target_size > free_space: raise FtpError('Disk is full', free_space, target_size) with open(target_name, 'wb') as w: self.ftp.retrbinary('RETR ' + path, w.write) return target_name @_handle_ftp_error @reconnect_ftp def size(self, dir_path): """ Get the size of the file on the server """ try: self.ftp.sendcmd('TYPE I') return self.ftp.size(dir_path) finally: # Fallback to the default ASCII mode self.ftp.sendcmd('TYPE A') class DirNotEmptyError(FileExistsError): def __init__(self, message=None): message = message or 'destination directory exists and it is not empty' super().__init__(message) class IsNotDirError(FileExistsError): def __init__(self, message=None): message = message or 'destination exists but it is not a directory' super().__init__(message) class ActionError(Exception): def __init__(self, message=None, code=1): self.message = message self.code = code def mkdir(path, check_empty=False): if os.path.isdir(path): if check_empty and os.listdir(path): raise DirNotEmptyError() elif os.path.exists(path): raise IsNotDirError() else: os.makedirs(path) def read(fileobj, size=2 ** 20): for chunk in iter(lambda: fileobj.read(size), b''): yield chunk optional_arg_match = re.compile(r'--(?P<key>\w+)(=(?P<value>\S+))?') def parse_extra_args(arg_list): # FIXME: move this logic into a special ArgumentParser argument and # get rid of `extra_args` magic args = [] kwargs = {} for arg in arg_list: match = optional_arg_match.match(arg) if match: kw = match.groupdict() value = kw['value'] if value is None or value.lower() in ('1', 'true', 'yes', 'y'): value = True elif value.lower() in ('0', 'false', 'no', 'n'): value = False kwargs[kw['key']] = value else: args.append(arg) return args, kwargs def _fill_out_positional_args(parsed_args, env_vars, positional_parameters): if len(parsed_args) < len(positional_parameters): provided_args_it = iter(parsed_args) result = [] for param in positional_parameters: if param in env_vars: result.append(env_vars[param]) else: try: result.append(next(provided_args_it)) except StopIteration: continue result.extend(provided_args_it) return result return parsed_args def _fill_out_keyword_args(parsed_kwargs, env_vars, keyword_parameters): return { **{ arg: env_vars[arg] for arg in keyword_parameters if arg in env_vars }, **parsed_kwargs, } def fill_args_from_env(spec, parsed_args, parsed_kwargs, env_vars): defaults_len = len(spec.defaults) if spec.defaults else 0 required = spec.args[: -defaults_len] optional = spec.args[-defaults_len:] return ( _fill_out_positional_args(parsed_args, env_vars, required), _fill_out_keyword_args(parsed_kwargs, env_vars, optional), ) def validate_params(spec, args, kwargs): """ Return tuple of lists with missing and unknown arguments of a func :param spec: func spec :param args: list of args to validate :param kwargs: list of args to validate :return: tuple of missing and unknown lists of arguments """ defaults_len = -len(spec.defaults) if spec.defaults else None required = spec.args[:defaults_len] required_len = len(required) args_len = len(args) varargs = spec.varargs if not varargs and required_len != args_len or required_len > args_len: missing = required else: missing = [] optional = spec.args[defaults_len:] unknown = list(set(kwargs) - set(optional)) return missing, unknown def _formatwarning(message, *_, **__): return 'Warning: %s%s' % (message, os.linesep) warnings.formatwarning = _formatwarning def warning(message, once=False): if once: warnings.warn(message) else: with warnings.catch_warnings(): warnings.simplefilter('always') warnings.warn(message)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.05 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A