[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: socket_utils.py
File is not writable. Editing disabled.
#!/opt/alt/python37/bin/python3 -bb # coding=utf-8 # # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2021 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT # from __future__ import absolute_import import json import socket import struct import time from typing import Optional # uint32_t, big endian _format = '>I' # Socket read timeout, seconds _WPOS_SOCKET_READ_TIMEOUT_SEC = 10 def get_uid_from_socket(sock_object: socket.socket) -> int: """ Retrieve credentials from SO_PEERCRED option :param sock_object: Socket object :return: uid of user, which connects to this socket. """ _format_string = '3I' creds = sock_object.getsockopt(socket.SOL_SOCKET, socket.SO_PEERCRED, struct.calcsize(_format_string)) # creds contains _pid, _uid, _gid - 3 uint32_t numbers _, _uid, _ = struct.unpack(_format_string, creds) return _uid def pack_data_for_socket(data_dict: dict) -> bytes: """ Prefix message with a 4-byte length :param data_dict: Data dict for send :return: byte array for send to socket """ msg_bytes = json.dumps(data_dict).encode('utf-8') # Output data format: # 4 bytes unsigned int, big-endian - data_len # data_len - data_bytes return struct.pack(_format, len(msg_bytes)) + msg_bytes def _read_bytes_from_socket_with_timeout(sock_object: socket.socket, num_bytes: int, timeout_sec: int) -> Optional[bytes]: """ Read amount data from socket :param sock_object: Socket object to read data from :param num_bytes: Bytes number to read :param timeout_sec: Read timeout, None - timeout expired, data not received """ msg = bytes() for i in range(timeout_sec * 10): msg += sock_object.recv(num_bytes) if len(msg) == num_bytes: return msg time.sleep(0.1) return None def read_unpack_response_from_socket_daemon(sock_object: socket.socket) -> Optional[dict]: """ Read length-prefixed amount of data from socket :param sock_object: Socket object to read data :return: Data received from socket dictionary. None - socket data format error """ # Socket Input data format: # 4 bytes unsigned int, big-endian - data_len # data_len - data_bytes # NOTE: Set non-blocking mode and set timeout to avoid socket.recv hanging if invalid data was sent to socket sock_object.setblocking(False) sock_object.settimeout(_WPOS_SOCKET_READ_TIMEOUT_SEC) # Get data length (4 bytes) raw_msglen = _read_bytes_from_socket_with_timeout(sock_object, 4, _WPOS_SOCKET_READ_TIMEOUT_SEC) if raw_msglen is None: return None msglen = struct.unpack(_format, raw_msglen)[0] msg = _read_bytes_from_socket_with_timeout(sock_object, msglen, _WPOS_SOCKET_READ_TIMEOUT_SEC) if msg is None: return None return json.loads(msg.decode('utf-8')) def read_unpack_response_from_socket_client(sock_object: socket.socket) -> Optional[dict]: """ Read length-prefixed amount of data from socket :param sock_object: Socket object to read data :return: Data received from socket dictionary. None - socket data format error """ # Socket Input data format: # 4 bytes unsigned int, big-endian - data_len # data_len - data_bytes raw_msglen = sock_object.recv(4) msglen = struct.unpack(_format, raw_msglen)[0] msg = bytes() while len(msg) != msglen: msg += sock_object.recv(4096) return json.loads(msg.decode('utf-8')) def send_dict_to_socket_connection_and_close(connection: socket.socket, data_to_send: dict): """ Sends dictionary to socket connection and close it :param connection: Socket connection to send data :param data_to_send: Data dict to send """ bytes_to_send = pack_data_for_socket(data_to_send) connection.sendall(bytes_to_send) connection.close()
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.13 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A