Files
LoveACE-EndF/loveace/utils/redis_client.py
Sibuxiangx bbc86b8330 ⚒️ 重大重构 LoveACE V2
引入了 mongodb
对数据库进行了一定程度的数据加密
性能改善
代码简化
统一错误模型和响应
使用 apifox 作为文档
2025-11-20 20:44:25 +08:00

548 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Redis客户端工具模块
提供类型完整的Redis客户端包装器支持内容验证和序列化
"""
import json
from typing import Any, Optional, Type, TypeVar, Union
import redis.asyncio as aioredis
from pydantic import BaseModel, ValidationError
from loveace.config.logger import logger
from loveace.database.creator import db_manager
T = TypeVar("T", bound=BaseModel)
class RedisClient:
"""类型完整的Redis客户端包装器
提供带有数据验证和序列化的Redis操作接口
Example:
>>> client = RedisClient(redis_instance)
>>> # 存储对象
>>> await client.set_object("user:1", user_data, User)
>>> # 获取对象
>>> user = await client.get_object("user:1", User)
"""
def __init__(self, redis_client: aioredis.Redis):
"""初始化Redis客户端包装器
Args:
redis_client: aioredis.Redis 实例
"""
self.client = redis_client
async def set_object(
self,
key: str,
value: Union[BaseModel, dict, Any],
model_class: Optional[Type[T]] = None,
expire: Optional[int] = None,
) -> bool:
"""设置对象到Redis支持自动验证和序列化
Args:
key: Redis键
value: 要存储的值BaseModel、dict或其他可序列化对象
model_class: 对象模型类用于验证。如果提供会先验证value
expire: 过期时间None表示不设置过期时间
Returns:
是否成功设置
Raises:
ValidationError: 当model_class验证失败时
TypeError: 当value无法序列化时
"""
try:
# 验证数据
if model_class is not None:
if isinstance(value, model_class):
validated_value = value
else:
validated_value = model_class(
**value if isinstance(value, dict) else value.dict()
)
else:
validated_value = value
# 序列化
if isinstance(validated_value, BaseModel):
data = validated_value.model_dump_json()
elif isinstance(validated_value, dict):
data = json.dumps(validated_value, ensure_ascii=False)
else:
data = json.dumps(validated_value, ensure_ascii=False)
# 存储到Redis
if expire:
await self.client.setex(key, expire, data)
else:
await self.client.set(key, data)
logger.debug(f"成功存储Redis键: {key}")
return True
except ValidationError as e:
logger.error(f"Redis对象验证失败 {key}: {e}")
raise
except Exception as e:
logger.error(f"Redis存储失败 {key}: {e}")
raise
async def get_object(
self,
key: str,
model_class: Type[T],
) -> Optional[T]:
"""从Redis获取对象并通过指定的模型类进行验证
Args:
key: Redis键
model_class: 对象模型类,用于反序列化和验证
Returns:
反序列化并验证后的对象如果键不存在则返回None
Raises:
ValidationError: 当数据验证失败时
"""
try:
data = await self.client.get(key)
if data is None:
logger.debug(f"Redis键不存在: {key}")
return None
# 反序列化
if isinstance(data, bytes):
data = data.decode("utf-8")
parsed_data = json.loads(data)
# 验证并创建模型实例
validated_value = model_class(**parsed_data)
logger.debug(f"成功获取并验证Redis键: {key}")
return validated_value
except ValidationError as e:
logger.error(f"Redis对象验证失败 {key}: {e}")
raise
except json.JSONDecodeError as e:
logger.error(f"Redis JSON解析失败 {key}: {e}")
raise
except Exception as e:
logger.error(f"Redis获取失败 {key}: {e}")
raise
async def get_object_safe(
self,
key: str,
model_class: Type[T],
default: Optional[T] = None,
) -> Optional[T]:
"""安全地从Redis获取对象验证失败时返回默认值
Args:
key: Redis键
model_class: 对象模型类,用于反序列化和验证
default: 验证失败时的默认返回值
Returns:
反序列化并验证后的对象验证失败返回default
"""
try:
return await self.get_object(key, model_class)
except (ValidationError, json.JSONDecodeError, Exception) as e:
logger.warning(f"Redis安全获取失败返回默认值 {key}: {e}")
return default
async def set_raw(
self,
key: str,
value: Union[str, bytes],
expire: Optional[int] = None,
) -> bool:
"""设置原始字符串值到Redis
Args:
key: Redis键
value: 要存储的值(字符串或字节)
expire: 过期时间(秒)
Returns:
是否成功设置
"""
try:
if expire:
await self.client.setex(key, expire, value)
else:
await self.client.set(key, value)
logger.debug(f"成功存储原始值到Redis: {key}")
return True
except Exception as e:
logger.error(f"Redis原始值存储失败 {key}: {e}")
raise
async def get_raw(self, key: str) -> Optional[Union[str, bytes]]:
"""获取原始字符串值
Args:
key: Redis键
Returns:
存储的值如果键不存在则返回None
"""
try:
data = await self.client.get(key)
if data is None:
logger.debug(f"Redis键不存在: {key}")
return None
logger.debug(f"成功获取原始值: {key}")
return data
except Exception as e:
logger.error(f"Redis获取失败 {key}: {e}")
raise
async def delete(self, key: str) -> int:
"""删除Redis键
Args:
key: 要删除的键
Returns:
删除的键数量
"""
try:
result = await self.client.delete(key)
logger.debug(f"成功删除Redis键: {key}")
return result
except Exception as e:
logger.error(f"Redis删除失败 {key}: {e}")
raise
async def exists(self, key: str) -> bool:
"""检查键是否存在
Args:
key: 要检查的键
Returns:
键是否存在
"""
try:
return await self.client.exists(key) > 0
except Exception as e:
logger.error(f"Redis检查失败 {key}: {e}")
raise
async def expire(self, key: str, seconds: int) -> bool:
"""设置键的过期时间
Args:
key: Redis键
seconds: 过期时间(秒)
Returns:
是否成功设置
"""
try:
result = await self.client.expire(key, seconds)
logger.debug(f"成功设置Redis键过期时间: {key}, {seconds}")
return result > 0
except Exception as e:
logger.error(f"Redis设置过期失败 {key}: {e}")
raise
async def ttl(self, key: str) -> int:
"""获取键的剩余生存时间
Args:
key: Redis键
Returns:
剩余生存时间(秒),-1表示永不过期-2表示键不存在
"""
try:
return await self.client.ttl(key)
except Exception as e:
logger.error(f"Redis获取TTL失败 {key}: {e}")
raise
async def increment(
self,
key: str,
amount: int = 1,
) -> int:
"""增加键的值
Args:
key: Redis键
amount: 增加的数量
Returns:
增加后的值
"""
try:
result = await self.client.incrby(key, amount)
logger.debug(f"成功增加Redis键: {key}")
return result
except Exception as e:
logger.error(f"Redis增加失败 {key}: {e}")
raise
async def decrement(
self,
key: str,
amount: int = 1,
) -> int:
"""减少键的值
Args:
key: Redis键
amount: 减少的数量
Returns:
减少后的值
"""
try:
result = await self.client.decrby(key, amount)
logger.debug(f"成功减少Redis键: {key}")
return result
except Exception as e:
logger.error(f"Redis减少失败 {key}: {e}")
raise
async def list_push(
self,
key: str,
values: list[Union[BaseModel, dict, str]],
model_class: Optional[Type[T]] = None,
) -> int:
"""向列表推入元素
Args:
key: Redis键
values: 要推入的值列表
model_class: 对象模型类,用于验证每个值
Returns:
推入后列表的长度
"""
try:
serialized_values = []
for value in values:
if model_class is not None:
if isinstance(value, model_class):
validated_value = value
else:
if isinstance(value, dict):
validated_value = model_class(**value)
else:
validated_value = value
else:
validated_value = value
if isinstance(validated_value, BaseModel):
serialized_values.append(validated_value.model_dump_json())
elif isinstance(validated_value, dict):
serialized_values.append(
json.dumps(validated_value, ensure_ascii=False)
)
else:
serialized_values.append(str(validated_value))
result: int = await self.client.rpush(key, *serialized_values) # type: ignore
logger.debug(f"成功推入Redis列表: {key}")
return result
except Exception as e:
logger.error(f"Redis列表推入失败 {key}: {e}")
raise
async def list_range(
self,
key: str,
start: int = 0,
end: int = -1,
model_class: Optional[Type[T]] = None,
) -> list[Union[T, str]]:
"""获取列表范围内的元素
Args:
key: Redis键
start: 开始索引
end: 结束索引
model_class: 对象模型类用于反序列化。如果为None则返回原始字符串
Returns:
列表中指定范围的元素
"""
try:
data: list[Any] = await self.client.lrange(key, start, end) # type: ignore
if model_class is None:
return data
result = []
for item in data:
if isinstance(item, bytes):
item = item.decode("utf-8")
try:
parsed = json.loads(item)
result.append(model_class(**parsed))
except (json.JSONDecodeError, ValidationError):
result.append(item)
return result
except Exception as e:
logger.error(f"Redis列表获取失败 {key}: {e}")
raise
async def hash_set(
self,
key: str,
mapping: dict[str, Union[BaseModel, dict, str, int]],
model_class: Optional[Type[T]] = None,
) -> int:
"""设置哈希表字段
Args:
key: Redis键
mapping: 字段值映射
model_class: 对象模型类,用于验证值
Returns:
新添加的字段数
"""
try:
serialized_mapping = {}
for field, value in mapping.items():
if model_class is not None and not isinstance(value, (str, int, float)):
if isinstance(value, dict):
validated_value = model_class(**value)
else:
validated_value = value
if isinstance(validated_value, BaseModel):
serialized_mapping[field] = validated_value.model_dump_json()
else:
serialized_mapping[field] = str(value)
else:
serialized_mapping[field] = str(value)
result: int = await self.client.hset(key, mapping=serialized_mapping) # type: ignore
logger.debug(f"成功设置Redis哈希表: {key}")
return result
except Exception as e:
logger.error(f"Redis哈希表设置失败 {key}: {e}")
raise
async def hash_get(
self,
key: str,
field: str,
model_class: Optional[Type[T]] = None,
) -> Optional[Union[T, str]]:
"""获取哈希表字段值
Args:
key: Redis键
field: 字段名
model_class: 对象模型类,用于反序列化
Returns:
字段值如果不存在则返回None
"""
try:
data: Optional[Any] = await self.client.hget(key, field) # type: ignore
if data is None:
return None
if isinstance(data, bytes):
data = data.decode("utf-8")
if model_class is None:
return data
try:
parsed = json.loads(data)
return model_class(**parsed)
except (json.JSONDecodeError, ValidationError):
return data
except Exception as e:
logger.error(f"Redis哈希表获取失败 {key}:{field}: {e}")
raise
async def hash_get_all(
self,
key: str,
model_class: Optional[Type[T]] = None,
) -> dict[str, Union[T, str]]:
"""获取所有哈希表字段
Args:
key: Redis键
model_class: 对象模型类,用于反序列化值
Returns:
哈希表中的所有字段值
"""
try:
data: dict[Any, Any] = await self.client.hgetall(key) # type: ignore
if model_class is None:
return data
result = {}
for field, value in data.items():
if isinstance(value, bytes):
value = value.decode("utf-8")
try:
parsed = json.loads(value)
result[field] = model_class(**parsed)
except (json.JSONDecodeError, ValidationError):
result[field] = value
return result
except Exception as e:
logger.error(f"Redis哈希表全量获取失败 {key}: {e}")
raise
async def hash_delete(
self,
key: str,
*fields: str,
) -> int:
"""删除哈希表字段
Args:
key: Redis键
fields: 要删除的字段名
Returns:
删除的字段数
"""
try:
result: int = await self.client.hdel(key, *fields) # type: ignore
logger.debug(f"成功删除Redis哈希表字段: {key}")
return result
except Exception as e:
logger.error(f"Redis哈希表删除失败 {key}: {e}")
raise
async def get_redis_client() -> RedisClient:
"""获取全局Redis客户端实例
Returns:
aioredis.Redis 实例
"""
redis_instance = await db_manager.get_redis_client()
redis_client = RedisClient(redis_instance)
return redis_client