This repository has been archived on 2024-10-27. You can view files and clone it, but cannot push or open issues or pull requests.
local-llm-server/llm_server/custom_redis.py

228 lines
6.7 KiB
Python
Raw Normal View History

import pickle
import sys
2023-09-23 23:14:22 -06:00
import traceback
2023-09-29 00:09:44 -06:00
from typing import Callable, List, Mapping, Optional, Union
import redis as redis_pkg
import simplejson as json
2023-08-21 21:28:52 -06:00
from flask_caching import Cache
2023-08-23 01:14:19 -06:00
from redis import Redis
2023-09-29 00:09:44 -06:00
from redis.typing import AnyKeyT, EncodableT, ExpiryT, FieldT, KeyT, PatternT, ZScoreBoundT
2023-08-21 21:28:52 -06:00
2023-09-30 19:41:50 -06:00
flask_cache = Cache(config={'CACHE_TYPE': 'RedisCache', 'CACHE_REDIS_URL': 'redis://localhost:6379/15', 'CACHE_KEY_PREFIX': 'local_llm_flask'})
ONE_MONTH_SECONDS = 2678000
2023-09-30 19:41:50 -06:00
class RedisCustom(Redis):
"""
A wrapper class to set prefixes to keys.
"""
def __init__(self, prefix, **kwargs):
2023-09-30 19:41:50 -06:00
super().__init__()
self.redis = Redis(**kwargs)
self.prefix = prefix
try:
self.set('____', 1)
except redis_pkg.exceptions.ConnectionError as e:
print('Failed to connect to the Redis server:', e)
print('Did you install and start the Redis server?')
sys.exit(1)
def _key(self, key):
return f"{self.prefix}:{key}"
def set(self, key, value, ex: Union[ExpiryT, None] = None):
return self.redis.set(self._key(key), value, ex=ex)
2023-09-29 00:09:44 -06:00
def get(self, key, default=None, dtype=None):
# TODO: use pickle
import inspect
if inspect.isclass(default):
raise Exception
2023-09-23 23:14:22 -06:00
d = self.redis.get(self._key(key))
if dtype and d:
try:
if dtype == str:
return d.decode('utf-8')
if dtype in [dict, list]:
return json.loads(d.decode("utf-8"))
2023-09-23 23:14:22 -06:00
else:
return dtype(d)
except:
traceback.print_exc()
if not d:
return default
else:
return d
def incr(self, key, amount=1):
return self.redis.incr(self._key(key), amount)
def decr(self, key, amount=1):
return self.redis.decr(self._key(key), amount)
def sadd(self, key: str, *values: FieldT):
return self.redis.sadd(self._key(key), *values)
def srem(self, key: str, *values: FieldT):
return self.redis.srem(self._key(key), *values)
def sismember(self, key: str, value: str):
return self.redis.sismember(self._key(key), value)
2023-09-27 19:39:04 -06:00
def lindex(
self, name: str, index: int
):
return self.redis.lindex(self._key(name), index)
def lrem(self, name: str, count: int, value: str):
return self.redis.lrem(self._key(name), count, value)
def rpush(self, name: str, *values: FieldT):
return self.redis.rpush(self._key(name), *values)
def llen(self, name: str):
return self.redis.llen(self._key(name))
2023-09-28 03:44:30 -06:00
def zrangebyscore(
self,
name: KeyT,
min: ZScoreBoundT,
max: ZScoreBoundT,
start: Union[int, None] = None,
num: Union[int, None] = None,
withscores: bool = False,
score_cast_func: Union[type, Callable] = float,
):
return self.redis.zrangebyscore(self._key(name), min, max, start, num, withscores, score_cast_func)
def zremrangebyscore(
self, name: KeyT, min: ZScoreBoundT, max: ZScoreBoundT
):
return self.redis.zremrangebyscore(self._key(name), min, max)
def hincrby(
self, name: str, key: str, amount: int = 1
):
return self.redis.hincrby(self._key(name), key, amount)
2023-09-30 19:41:50 -06:00
def zcard(self, name: KeyT):
return self.redis.zcard(self._key(name))
2023-09-28 03:44:30 -06:00
def hdel(self, name: str, *keys: List):
return self.redis.hdel(self._key(name), *keys)
def hget(
self, name: str, key: str
):
return self.redis.hget(self._key(name), key)
def zadd(
self,
name: KeyT,
mapping: Mapping[AnyKeyT, EncodableT],
nx: bool = False,
xx: bool = False,
ch: bool = False,
incr: bool = False,
gt: bool = False,
lt: bool = False,
):
return self.redis.zadd(self._key(name), mapping, nx, xx, ch, incr, gt, lt)
2023-09-30 19:41:50 -06:00
def lpush(self, name: str, *values: FieldT):
return self.redis.lpush(self._key(name), *values)
def hset(
self,
name: str,
key: Optional = None,
value=None,
mapping: Optional[dict] = None,
items: Optional[list] = None,
):
return self.redis.hset(self._key(name), key, value, mapping, items)
2023-09-28 03:44:30 -06:00
def hkeys(self, name: str):
return self.redis.hkeys(self._key(name))
def hmget(self, name: str, keys: List, *args: List):
return self.redis.hmget(self._key(name), keys, *args)
def hgetall(self, name: str):
return self.redis.hgetall(self._key(name))
def keys(self, pattern: PatternT = "*", **kwargs):
raw_keys = self.redis.keys(self._key(pattern), **kwargs)
keys = []
for key in raw_keys:
p = key.decode('utf-8').split(':')
2023-09-29 00:09:44 -06:00
if len(p) >= 2:
# Delete prefix
del p[0]
2023-09-29 00:09:44 -06:00
k = ':'.join(p)
if k != '____':
keys.append(k)
return keys
2023-09-29 00:09:44 -06:00
def pipeline(self, transaction=True, shard_hint=None):
return self.redis.pipeline(transaction, shard_hint)
2023-09-30 19:41:50 -06:00
def smembers(self, name: str):
return self.redis.smembers(self._key(name))
def spop(self, name: str, count: Optional[int] = None):
return self.redis.spop(self._key(name), count)
def rpoplpush(self, src, dst):
return self.redis.rpoplpush(src, dst)
def zpopmin(self, name: KeyT, count: Union[int, None] = None):
return self.redis.zpopmin(self._key(name), count)
2023-09-29 00:09:44 -06:00
def exists(self, *names: KeyT):
n = []
for name in names:
n.append(self._key(name))
return self.redis.exists(*n)
def set_dict(self, key: Union[list, dict], dict_value, ex: Union[ExpiryT, None] = None):
return self.set(key, json.dumps(dict_value), ex=ex)
def get_dict(self, key):
r = self.get(key)
if not r:
return dict()
else:
return json.loads(r.decode("utf-8"))
def setp(self, name, value):
self.redis.set(name, pickle.dumps(value))
def getp(self, name: str):
r = self.redis.get(name)
if r:
2023-09-29 00:09:44 -06:00
return pickle.loads(r)
return r
def flush(self):
flushed = []
for key in self.redis.scan_iter(f'{self.prefix}:*'):
flushed.append(key)
self.redis.delete(key)
return flushed
2023-09-30 19:41:50 -06:00
def flushall(self, asynchronous: bool = ..., **kwargs) -> bool:
self.flush()
return True
def flushdb(self, asynchronous: bool = ..., **kwargs) -> bool:
self.flush()
return True
redis = RedisCustom('local_llm')