[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: webshield.py
File is not writable. Editing disabled.
import asyncio import json import logging from collections import OrderedDict from pathlib import Path from typing import Dict, Set, Tuple from defence360agent.model.simplification import run_in_executor from defence360agent.utils import CheckRunError, atomic_rewrite, check_run from im360.contracts.config import WebServices as WebServicesConfig from im360.contracts.config import Webshield from im360.model.country import CountryList from im360.model.firewall import RemoteProxy from im360.subsys.panels.hosting_panel import HostingPanel logger = logging.getLogger(__name__) _SERVICE = 'imunify360-webshield' _HEADER = '# AUTOGENERATED, DO NOT EDIT\n' _CONFIG = '/etc/imunify360-webshield/agent-proxies.conf' _CONFIG_COUNTRY_BLACKLIST = \ '/etc/imunify360-webshield/blocked_country_codes.conf' _COMPOSE_LISTS_SCRIPT = '/usr/sbin/imunify360-webshield-compose-lists' #: if file exists then no traffic should be redirected to webshield _WS_NO_REDIRECTION_FLAG_PATH = Path('/var/imunify360/webshield_broken') _WS_CTL_EXECUTABLE = '/usr/share/imunify360-webshield/webshieldctl' class Error(Exception): """Base exception for the module.""" async def _run_webshieldctl(command, error_message): try: await check_run([_WS_CTL_EXECUTABLE, command]) except CheckRunError as e: raise Error(error_message) from e async def is_ssl_cache_configured() -> bool: """ Return True if ssl cache is configured in csf :return: bool """ cmd = ['im360-ssl-cache', '--json'] return bool(json.loads((await check_run(cmd)).decode())) async def service_disable(now=False) -> None: """Disable webshield service starting on boot. If *now* is True, stop it right now. """ await _run_webshieldctl( "deactivate" if now else "disable", "failed to disable webshield" ) async def service_enable(now=False) -> None: """Enable webshield service starting on boot. If *now* is True, start it right now. """ await _run_webshieldctl( "activate" if now else "enable", "failed to enable webshield" ) def expects_traffic(): """Whether webshield expects traffic.""" return not _WS_NO_REDIRECTION_FLAG_PATH.exists() async def is_running() -> bool: """Whether webshield is running.""" try: await check_run([_WS_CTL_EXECUTABLE, "is-active"]) except CheckRunError as e: if e.returncode == 1: return False # not running raise Error( "failed to find out whether webshield is running" ) from e else: return True # running async def splashscreen_set_state(status=False) -> None: """Call script for change status of splash screen. """ await _run_webshieldctl( "enable-splashscreen" if status else "disable-splashscreen", "failed to %s splash screen" % ('enable' if status else 'disable') ) async def cpanelprotection_set_state(status=False) -> None: """Call script for change status of cpanel protection.""" await _run_webshieldctl( "enable-cpanelprotection" if status else "disable-cpanelprotection", "failed to %s cpanel protection" % ('enable' if status else 'disable') ) async def service_reload(): await check_run(['service', _SERVICE, 'reload']) async def update_internal_whitelist(_, is_updated): """Updates whitelists for webshield and reloads service. Should be run after static whitelist updates.""" if is_updated: logger.info("Updating webshield internal whitelist using " "imunify360-webshield-compose-lists script") await compose_lists([]) async def update_custom_lists(): logger.info("Updating webshield custom black and white lists using " "imunify360-webshield-compose-lists script") await compose_lists(['--custom-lists-only']) async def compose_lists(script_args): await check_run([_COMPOSE_LISTS_SCRIPT] + script_args) if Webshield.ENABLE: await service_reload() async def _rewrite_webshield_config(gather_items, config_file): """Rewrite config file with autogenerated data from database. """ items = await run_in_executor( asyncio.get_event_loop(), gather_items) content = [_HEADER] for item in sorted(items): content.append('{} 1;\n'.format(item)) atomic_rewrite(config_file, ''.join(content), backup=False) if Webshield.ENABLE: await service_reload() async def update_country_blacklist_config(): """Fill webshield config file with blacklisted country (from CountryList) Should be called after add/delete record from CountryList. """ await _rewrite_webshield_config( lambda: CountryList.country_codes(CountryList.BLACK), _CONFIG_COUNTRY_BLACKLIST) async def update_remote_proxy_config(): """Fill webshield config file with remote proxies (from RemoteProxy)""" await _rewrite_webshield_config( lambda: set(item['network'] for item in RemoteProxy.list(None, None, True)), _CONFIG) def port_range() -> Tuple[int, int]: """Return a range of ports used by webshield to provide captcha and assist in remote proxy processing, as a tuple (first, last). (last value is not included in that range).""" return (52220, 52240) def destination_webshield_ports() -> Set[int]: """Return a set of Webshield ports which could be redirected to.""" redirection_map = port_redirect_map() from_ports = redirected_to_webshield_ports() unexpected_from_ports = from_ports - redirection_map.keys() if unexpected_from_ports: logger.warning( "Got unexpected ports to redirect from: %s" ", which are not present in the redirect map: %s", unexpected_from_ports, redirection_map, ) dest_ports = { redirection_map[port] for port in from_ports & redirection_map.keys() } return dest_ports def redirected_to_webshield_ports() -> Set[int]: """Return a set of TCP destination ports that can be redirected to Webshield.""" ports = {80, 443} ports |= set(WebServicesConfig.HTTP_PORTS) | set( WebServicesConfig.HTTPS_PORTS ) ports |= HostingPanel().http_ports() | HostingPanel().https_ports() return ports def port_redirect_map() -> Dict[int, int]: """Return a mapping of destination TCP ports to their redirect target port of Webshield.""" m = OrderedDict() # type: Dict[int, int] m[80] = 52224 m[443] = 52223 m[2082] = 52230 m[2086] = 52228 m[2083] = 52229 m[2087] = 52227 m[2095] = 52232 m[2096] = 52231 m[2222] = 52235 m[8443] = 52233 m[8880] = 52234 return m
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.34 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A