[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: client.py
File is not writable. Editing disabled.
import asyncio import concurrent.futures import contextlib import logging import os import uuid from typing import Generator, List from defence360agent.api.server import APIError, send_message from defence360agent.contracts import license from defence360agent.contracts.config import Core from defence360agent.contracts.messages import Message, MessageType from defence360agent.contracts.plugins import MessageSink, expect from defence360agent.utils import RecurringCheckStop, recurring_check, Scope logger = logging.getLogger(__name__) class SendToServer(MessageSink): """Send messages to server. * process Reportable messages; * add them to a pending messages list; * send all pending messages to server when list is full (contains _PENDING_MESSAGES_LIMIT items or more); * send all pending messages on plugin shutdown.""" # TODO: consider reducing the limit, # since it can lead to messages loss on shutdown (DEF-20261) _PENDING_MESSAGES_LIMIT = int( os.environ.get("IMUNIFYAV_MESSAGES_COUNT_TO_SEND", 50) ) # 50 second because it should be less than DefaultTimeoutStopSec _SHUTDOWN_SEND_TIMEOUT = 50 SCOPE = Scope.AV async def create_sink(self, loop: asyncio.AbstractEventLoop): self._loop = loop self._pending = [] # type: List[Message] self._try_send = asyncio.Event() self._lock = asyncio.Lock() self._sender_task = loop.create_task(self._send()) async def shutdown(self) -> None: """ When shutdown begins it gives 50 seconds to send _pending messages to the server (after 60 seconds process will bi killed by systemd) If stop() isn't done in 50 second it terminates process of sending messages and logs error """ try: await asyncio.wait_for(self.stop(), self._SHUTDOWN_SEND_TIMEOUT) except asyncio.TimeoutError: # Used logger.error to notify sentry logger.error( "Timeout (%ds) sending messages to server on shutdown.", self._SHUTDOWN_SEND_TIMEOUT, ) if not self._sender_task.cancelled(): self._sender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._sender_task if self._pending: logger.warning( "Dropped %s messages not sent to server", len(self._pending) ) logger.warning("Dropped queue %r", self._pending) async def stop(self): """ Stop sending. 1. wait for the lock being available i.e., while _sender_task finishes the current round of sending message (if it takes too long, then the timeout in shutdown() is triggered 2. once the sending round complete (we got the lock), cancel the next iteration of the _sender_task (it exits) 3. send _pending messages (again, if it takes too long, the timeout in shutdown() is triggered and the coroutine is cancelled That method makes sure that the coroutine that was started in it has ended. It excludes a situation when: -> The result of a coroutine that started BEFORE shutdown() is started. -> And the process of sending messages from _pending is interrupted because of it """ # The _lock allows you to be sure that the _send_pending_messages # coroutine is not running and _pending is not being used async with self._lock: # Cancel _sender_task. The lock ensures that the coroutine # is not in its critical part self._sender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._sender_task # send messages that are in _pending at the time of agent shutdown await self._send_pending_messages() @contextlib.contextmanager def _get_api(self) -> Generator[send_message.SendMessageAPI, None, None]: base_url = os.environ.get("IMUNIFYAV_API_BASE") # we send messages sequentially, so max_workers=1 with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: api = send_message.SendMessageAPI( Core.VERSION, base_url, executor=executor ) api.set_product_name(license.LicenseCLN.get_product_name()) api.set_server_id(license.LicenseCLN.get_server_id()) api.set_license(license.LicenseCLN.get_token()) yield api @expect(MessageType.Reportable) async def send_to_server(self, message: Message) -> None: if "message_id" not in message: message["message_id"] = uuid.uuid4().hex self._pending.append(message) self._try_send.set() @recurring_check(0) async def _send(self): await self._try_send.wait() self._try_send.clear() if len(self._pending) >= self._PENDING_MESSAGES_LIMIT: # The _lock protects critical part of _send method async with self._lock: await self._send_pending_messages() async def _send_pending_messages(self) -> None: messages, self._pending = self._pending, [] unsuccessful = [] logger.info("Sending %s messages", len(messages)) with self._get_api() as api: if api.server_id is not None: for message in messages: try: await api.send_message(message) logger.info( "message sent %s", { "method": message.get("method"), "message_id": message.get("message_id"), }, ) except APIError as exc: logger.warning( "Failed to send message %s to server: %s", message, exc, ) unsuccessful.append(message) if unsuccessful: logger.info("Unsuccessful to send %s messages", len(unsuccessful)) self._pending = unsuccessful + self._pending
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.2 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A