[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: payload.py
File is not writable. Editing disabled.
import asyncio import enum import io import json import mimetypes import os import warnings from abc import ABC, abstractmethod from itertools import chain from typing import ( IO, TYPE_CHECKING, Any, ByteString, Dict, Iterable, Optional, Text, TextIO, Tuple, Type, Union, ) from multidict import CIMultiDict from . import hdrs from .abc import AbstractStreamWriter from .helpers import ( PY_36, content_disposition_header, guess_filename, parse_mimetype, sentinel, ) from .streams import StreamReader from .typedefs import JSONEncoder, _CIMultiDict __all__ = ( "PAYLOAD_REGISTRY", "get_payload", "payload_type", "Payload", "BytesPayload", "StringPayload", "IOBasePayload", "BytesIOPayload", "BufferedReaderPayload", "TextIOPayload", "StringIOPayload", "JsonPayload", "AsyncIterablePayload", ) TOO_LARGE_BYTES_BODY = 2 ** 20 # 1 MB if TYPE_CHECKING: # pragma: no cover from typing import List class LookupError(Exception): pass class Order(str, enum.Enum): normal = "normal" try_first = "try_first" try_last = "try_last" def get_payload(data: Any, *args: Any, **kwargs: Any) -> "Payload": return PAYLOAD_REGISTRY.get(data, *args, **kwargs) def register_payload( factory: Type["Payload"], type: Any, *, order: Order = Order.normal ) -> None: PAYLOAD_REGISTRY.register(factory, type, order=order) class payload_type: def __init__(self, type: Any, *, order: Order = Order.normal) -> None: self.type = type self.order = order def __call__(self, factory: Type["Payload"]) -> Type["Payload"]: register_payload(factory, self.type, order=self.order) return factory class PayloadRegistry: """Payload registry. note: we need zope.interface for more efficient adapter search """ def __init__(self) -> None: self._first = [] # type: List[Tuple[Type[Payload], Any]] self._normal = [] # type: List[Tuple[Type[Payload], Any]] self._last = [] # type: List[Tuple[Type[Payload], Any]] def get( self, data: Any, *args: Any, _CHAIN: Any = chain, **kwargs: Any ) -> "Payload": if isinstance(data, Payload): return data for factory, type in _CHAIN(self._first, self._normal, self._last): if isinstance(data, type): return factory(data, *args, **kwargs) raise LookupError() def register( self, factory: Type["Payload"], type: Any, *, order: Order = Order.normal ) -> None: if order is Order.try_first: self._first.append((factory, type)) elif order is Order.normal: self._normal.append((factory, type)) elif order is Order.try_last: self._last.append((factory, type)) else: raise ValueError(f"Unsupported order {order!r}") class Payload(ABC): _default_content_type = "application/octet-stream" # type: str _size = None # type: Optional[int] def __init__( self, value: Any, headers: Optional[ Union[_CIMultiDict, Dict[str, str], Iterable[Tuple[str, str]]] ] = None, content_type: Optional[str] = sentinel, filename: Optional[str] = None, encoding: Optional[str] = None, **kwargs: Any, ) -> None: self._encoding = encoding self._filename = filename self._headers = CIMultiDict() # type: _CIMultiDict self._value = value if content_type is not sentinel and content_type is not None: self._headers[hdrs.CONTENT_TYPE] = content_type elif self._filename is not None: content_type = mimetypes.guess_type(self._filename)[0] if content_type is None: content_type = self._default_content_type self._headers[hdrs.CONTENT_TYPE] = content_type else: self._headers[hdrs.CONTENT_TYPE] = self._default_content_type self._headers.update(headers or {}) @property def size(self) -> Optional[int]: """Size of the payload.""" return self._size @property def filename(self) -> Optional[str]: """Filename of the payload.""" return self._filename @property def headers(self) -> _CIMultiDict: """Custom item headers""" return self._headers @property def _binary_headers(self) -> bytes: return ( "".join([k + ": " + v + "\r\n" for k, v in self.headers.items()]).encode( "utf-8" ) + b"\r\n" ) @property def encoding(self) -> Optional[str]: """Payload encoding""" return self._encoding @property def content_type(self) -> str: """Content type""" return self._headers[hdrs.CONTENT_TYPE] def set_content_disposition( self, disptype: str, quote_fields: bool = True, **params: Any ) -> None: """Sets ``Content-Disposition`` header.""" self._headers[hdrs.CONTENT_DISPOSITION] = content_disposition_header( disptype, quote_fields=quote_fields, **params ) @abstractmethod async def write(self, writer: AbstractStreamWriter) -> None: """Write payload. writer is an AbstractStreamWriter instance: """ class BytesPayload(Payload): def __init__(self, value: ByteString, *args: Any, **kwargs: Any) -> None: if not isinstance(value, (bytes, bytearray, memoryview)): raise TypeError( "value argument must be byte-ish, not {!r}".format(type(value)) ) if "content_type" not in kwargs: kwargs["content_type"] = "application/octet-stream" super().__init__(value, *args, **kwargs) if isinstance(value, memoryview): self._size = value.nbytes else: self._size = len(value) if self._size > TOO_LARGE_BYTES_BODY: if PY_36: kwargs = {"source": self} else: kwargs = {} warnings.warn( "Sending a large body directly with raw bytes might" " lock the event loop. You should probably pass an " "io.BytesIO object instead", ResourceWarning, **kwargs, ) async def write(self, writer: AbstractStreamWriter) -> None: await writer.write(self._value) class StringPayload(BytesPayload): def __init__( self, value: Text, *args: Any, encoding: Optional[str] = None, content_type: Optional[str] = None, **kwargs: Any, ) -> None: if encoding is None: if content_type is None: real_encoding = "utf-8" content_type = "text/plain; charset=utf-8" else: mimetype = parse_mimetype(content_type) real_encoding = mimetype.parameters.get("charset", "utf-8") else: if content_type is None: content_type = "text/plain; charset=%s" % encoding real_encoding = encoding super().__init__( value.encode(real_encoding), encoding=real_encoding, content_type=content_type, *args, **kwargs, ) class StringIOPayload(StringPayload): def __init__(self, value: IO[str], *args: Any, **kwargs: Any) -> None: super().__init__(value.read(), *args, **kwargs) class IOBasePayload(Payload): def __init__( self, value: IO[Any], disposition: str = "attachment", *args: Any, **kwargs: Any ) -> None: if "filename" not in kwargs: kwargs["filename"] = guess_filename(value) super().__init__(value, *args, **kwargs) if self._filename is not None and disposition is not None: if hdrs.CONTENT_DISPOSITION not in self.headers: self.set_content_disposition(disposition, filename=self._filename) async def write(self, writer: AbstractStreamWriter) -> None: loop = asyncio.get_event_loop() try: chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16) while chunk: await writer.write(chunk) chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16) finally: await loop.run_in_executor(None, self._value.close) class TextIOPayload(IOBasePayload): def __init__( self, value: TextIO, *args: Any, encoding: Optional[str] = None, content_type: Optional[str] = None, **kwargs: Any, ) -> None: if encoding is None: if content_type is None: encoding = "utf-8" content_type = "text/plain; charset=utf-8" else: mimetype = parse_mimetype(content_type) encoding = mimetype.parameters.get("charset", "utf-8") else: if content_type is None: content_type = "text/plain; charset=%s" % encoding super().__init__( value, content_type=content_type, encoding=encoding, *args, **kwargs, ) @property def size(self) -> Optional[int]: try: return os.fstat(self._value.fileno()).st_size - self._value.tell() except OSError: return None async def write(self, writer: AbstractStreamWriter) -> None: loop = asyncio.get_event_loop() try: chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16) while chunk: await writer.write(chunk.encode(self._encoding)) chunk = await loop.run_in_executor(None, self._value.read, 2 ** 16) finally: await loop.run_in_executor(None, self._value.close) class BytesIOPayload(IOBasePayload): @property def size(self) -> int: position = self._value.tell() end = self._value.seek(0, os.SEEK_END) self._value.seek(position) return end - position class BufferedReaderPayload(IOBasePayload): @property def size(self) -> Optional[int]: try: return os.fstat(self._value.fileno()).st_size - self._value.tell() except OSError: # data.fileno() is not supported, e.g. # io.BufferedReader(io.BytesIO(b'data')) return None class JsonPayload(BytesPayload): def __init__( self, value: Any, encoding: str = "utf-8", content_type: str = "application/json", dumps: JSONEncoder = json.dumps, *args: Any, **kwargs: Any, ) -> None: super().__init__( dumps(value).encode(encoding), content_type=content_type, encoding=encoding, *args, **kwargs, ) if TYPE_CHECKING: # pragma: no cover from typing import AsyncIterable, AsyncIterator _AsyncIterator = AsyncIterator[bytes] _AsyncIterable = AsyncIterable[bytes] else: from collections.abc import AsyncIterable, AsyncIterator _AsyncIterator = AsyncIterator _AsyncIterable = AsyncIterable class AsyncIterablePayload(Payload): _iter = None # type: Optional[_AsyncIterator] def __init__(self, value: _AsyncIterable, *args: Any, **kwargs: Any) -> None: if not isinstance(value, AsyncIterable): raise TypeError( "value argument must support " "collections.abc.AsyncIterablebe interface, " "got {!r}".format(type(value)) ) if "content_type" not in kwargs: kwargs["content_type"] = "application/octet-stream" super().__init__(value, *args, **kwargs) self._iter = value.__aiter__() async def write(self, writer: AbstractStreamWriter) -> None: if self._iter: try: # iter is not None check prevents rare cases # when the case iterable is used twice while True: chunk = await self._iter.__anext__() await writer.write(chunk) except StopAsyncIteration: self._iter = None class StreamReaderPayload(AsyncIterablePayload): def __init__(self, value: StreamReader, *args: Any, **kwargs: Any) -> None: super().__init__(value.iter_any(), *args, **kwargs) PAYLOAD_REGISTRY = PayloadRegistry() PAYLOAD_REGISTRY.register(BytesPayload, (bytes, bytearray, memoryview)) PAYLOAD_REGISTRY.register(StringPayload, str) PAYLOAD_REGISTRY.register(StringIOPayload, io.StringIO) PAYLOAD_REGISTRY.register(TextIOPayload, io.TextIOBase) PAYLOAD_REGISTRY.register(BytesIOPayload, io.BytesIO) PAYLOAD_REGISTRY.register(BufferedReaderPayload, (io.BufferedReader, io.BufferedRandom)) PAYLOAD_REGISTRY.register(IOBasePayload, io.IOBase) PAYLOAD_REGISTRY.register(StreamReaderPayload, StreamReader) # try_last for giving a chance to more specialized async interables like # multidict.BodyPartReaderPayload override the default PAYLOAD_REGISTRY.register(AsyncIterablePayload, AsyncIterable, order=Order.try_last)
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.04 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A