A Cython Redis client that provides FastAPI-style web application support with advanced features for distributed, real-time applications.
- High Performance: Cython-optimized Redis operations with native speed
- Connection Pooling: Efficient connection management with automatic failover
- Protocol Support: RESP2 and RESP3 protocol negotiation
- Async Support: Full async/await support with uvloop optimization
- Streams: Redis Streams with consumer groups
- JSON: RedisJSON operations
- Geospatial: Redis geospatial indexing
- Time Series: RedisTimeSeries support
- Bloom Filters: Probabilistic data structures
- Bitmaps: Bit-level operations
- Worker Coordination: Graceful scaling and recovery of crashed workers
- JWT Authentication: Token management with refresh tokens and blacklisting
- Session Management: Multi-session tracking with automatic cleanup
- 2FA Support: Time-based OTP with backup codes
- Rate Limiting: Built-in rate limiting capabilities
- Streaming Iterators: Real-time data streaming over SSE/WebSockets
- Distributed Locks: Redis-based distributed locking
- Shared State: Thread-safe shared dictionaries across processes
- Cluster Operations: Redis Cluster support with slot management
- Health Monitoring: Automatic dead worker detection and recovery
# Install from PyPI
pip install cyredis
# Or install from source
git clone https://github.com/yourusername/cyredis.git
cd cyredis
pip install -e .from cy_redis import HighPerformanceRedis
# Synchronous usage
with HighPerformanceRedis() as redis:
redis.set("key", "value")
value = redis.get("key")
print(value) # "value"
# Async usage
import asyncio
async def main():
async with HighPerformanceRedis() as redis:
await redis.set_async("key", "value")
value = await redis.get_async("key")
print(value)
asyncio.run(main())CyRedis provides enterprise-grade web application support that integrates seamlessly with FastAPI:
from fastapi import FastAPI
from cy_redis.web_app_support import WebApplicationSupport
app = FastAPI()
# Initialize CyRedis web app support
app_support = WebApplicationSupport()
@app.on_event("startup")
async def startup():
app_support.initialize()
@app.on_event("shutdown")
async def shutdown():
app_support.shutdown_graceful() # Graceful shutdown with workload yielding
@app.post("/login")
async def login(username: str, password: str):
result = await app_support.authenticate_user(username, password)
if not result['success']:
raise HTTPException(status_code=401, detail=result['error'])
return result
@app.get("/protected")
async def protected_route(token: str):
payload = app_support.verify_user_access(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid token")
return {"user_id": payload['user_id']}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)from fastapi.responses import StreamingResponse
from fastapi import WebSocket, WebSocketDisconnect
@app.get("/sse/chat/{user_id}")
async def sse_chat(user_id: str, token: str):
"""Real-time chat via Server-Sent Events"""
# Verify JWT token
payload = app_support.jwt_middleware(token, required_permissions=['stream_access'])
if not payload or payload['user_id'] != user_id:
raise HTTPException(status_code=403, detail="Access denied")
def generate_sse():
async def sse_generator():
async with app_support.stream_iterator(
"chat_messages",
consumer_group=f"chat_{user_id}",
consumer_name=f"user_{user_id}"
) as stream_iter:
async for messages in stream_iter:
for message in messages:
yield f"data: {json.dumps(message['data'])}\n\n"
return sse_generator()
return StreamingResponse(generate_sse(), media_type="text/event-stream")
@app.websocket("/ws/chat/{user_id}")
async def websocket_chat(websocket: WebSocket, user_id: str, token: str):
"""Real-time chat via WebSocket"""
# Verify websocket token
ws_payload = app_support.jwt_middleware(token, required_permissions=['stream_access', 'realtime_access'])
if not ws_payload or ws_payload['user_id'] != user_id:
await websocket.send_json({"error": "Invalid websocket token"})
await websocket.close()
return
await websocket.accept()
try:
async with app_support.stream_iterator(
"chat_messages",
consumer_group=f"ws_chat_{user_id}",
consumer_name=f"ws_{uuid.uuid4().hex[:8]}"
) as stream_iter:
async for messages in stream_iter:
for message in messages:
await websocket.send_json(message['data'])
except WebSocketDisconnect:
pass
except Exception as e:
await websocket.send_json({"error": str(e)})
finally:
await websocket.close()from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""JWT authentication dependency"""
token = credentials.credentials
payload = app_support.jwt_middleware(token)
if not payload:
raise HTTPException(status_code=401, detail="Invalid authentication token")
return payload
@app.get("/api/data")
async def get_data(current_user = Depends(get_current_user)):
"""Protected API endpoint"""
return {"user_id": current_user['user_id'], "data": "sensitive information"}
@app.post("/api/write")
async def write_data(data: dict, current_user = Depends(get_current_user)):
"""Protected write endpoint"""
# Check if user has write permissions
if not app_support.require_scopes(current_user['access_token'], ['write']):
raise HTTPException(status_code=403, detail="Insufficient permissions")
# Process the write operation
return {"status": "success"}CyRedis provides enterprise-grade worker coordination for handling scaling and recovery:
# Workers automatically register themselves
worker_info = app_support.get_worker_info()
print(f"Worker ID: {worker_info['worker_id']}")
print(f"Status: {worker_info['status']}")# Graceful shutdown yields workload to other workers
app_support.shutdown_graceful()
# Process:
# 1. Detect active sessions and tasks
# 2. Find available healthy workers
# 3. Redistribute sessions and tasks
# 4. Wait for active work to complete
# 5. Clean shutdown# Monitor worker health
cluster_info = app_support.get_cluster_info()
print(f"Healthy workers: {cluster_info['healthy_workers']}")
print(f"Dead workers: {cluster_info['dead_workers']}")
# Force recovery of specific worker
app_support.force_worker_recovery("worker_123")stats = app_support.get_system_stats()
print(json.dumps(stats, indent=2))
# Output includes:
# - User counts and session info
# - Worker cluster health
# - Queue statistics
# - Rate limiting stats
# - Shared dictionary stats# Monitor individual worker
worker_stats = app_support.get_worker_info()
print(f"Worker uptime: {worker_stats['uptime_seconds']}s")
print(f"Worker health: {worker_stats['status']}")
# Monitor entire cluster
cluster_stats = app_support.get_cluster_info()
print(f"Total workers: {cluster_stats['total_workers']}")
print(f"Healthy workers: {cluster_stats['healthy_workers']}")# Create different token types
access_token = app_support.create_access_token(user_id)
websocket_token = app_support.create_websocket_token(user_id)
api_token = app_support.create_api_token(user_id, scopes=['read', 'write'])
# Verify tokens
payload = app_support.verify_user_access(access_token)
ws_payload = app_support.verify_websocket_token(websocket_token)
api_payload = app_support.verify_api_token(api_token, ['read'])
# Token revocation
app_support.revoke_token(access_token)# Different authentication levels
@app.get("/public")
async def public_endpoint():
return {"message": "Public data"}
@app.get("/protected")
async def protected_endpoint(token: str):
payload = app_support.require_auth(token)
if not payload:
raise HTTPException(status_code=401)
return {"user_id": payload['user_id']}
@app.get("/admin")
async def admin_endpoint(token: str):
if not app_support.require_scopes(token, ['admin']):
raise HTTPException(status_code=403)
return {"message": "Admin data"}# Stream chat messages
async with app_support.stream_iterator(
"chat_messages",
consumer_group="chat_users",
consumer_name=f"user_{user_id}"
) as stream_iter:
async for messages in stream_iter:
for message in messages:
# Process each message
yield f"data: {json.dumps(message['data'])}\n\n"# Stream user notifications
async with app_support.list_iterator(f"notifications:{user_id}") as list_iter:
async for items in list_iter:
for item in items:
# Process each notification
yield f"data: {json.dumps({'message': item})}\n\n"# Listen to pub/sub channels
async with app_support.pubsub_iterator(["user_notifications", "global_news"]) as pubsub_iter:
async for message in pubsub_iter:
# Process broadcast message
yield f"data: {json.dumps(message)}\n\n"# Run all tests
pytest
# With coverage
pytest --cov=cy_redis --cov-report=html
# Specific test categories
pytest -m integration # Redis integration tests
pytest -m unit # Unit tests
pytest -m worker_coordination # Worker coordination tests# Run worker coordination tests
python tests/test_worker_coordination.py
# Run with main test runner
./scripts/run_tests.sh --worker-coord --coverage
# Run example applications
python examples/web_app_example.py
python examples/streaming_example.py# Start test environment
./scripts/run_tests.sh --docker-up --all
# Run tests with Docker
./scripts/run_tests.sh --docker-up --integrationexamples/web_app_example.py: Full web application with authentication, sessions, and shared stateexamples/streaming_example.py: Real-time streaming with SSE/WebSocket integrationexamples/enhanced_cyredis_demo.py: Advanced features demonstration
# 1. Initialize web app support
from cy_redis.web_app_support import WebApplicationSupport
app_support = WebApplicationSupport()
# 2. User authentication
tokens = app_support.authenticate_user(user_id, password)
access_token = tokens['access_token']
# 3. Session management
session_id = app_support.create_user_session(user_id)
session = app_support.get_session(session_id)
# 4. Real-time streaming
async with app_support.stream_iterator("chat_messages") as stream:
async for messages in stream:
for message in messages:
yield message
# 5. Shared state
shared_dict = app_support.get_shared_dict("user_data")
shared_dict[user_id] = {"preferences": {...}}
# 6. Worker coordination
cluster_info = app_support.get_cluster_info()
worker_info = app_support.get_worker_info()app_support = WebApplicationSupport(
host="redis-cluster.example.com",
port=6379,
max_connections=20,
max_workers=8
)# Custom token settings
token_manager = TokenManager(
redis_client,
secret_key="your-secret-key",
access_token_expiry=900, # 15 minutes
refresh_token_expiry=604800 # 7 days
)session_manager = SessionManager(
redis_client,
session_timeout=7200, # 2 hours
cleanup_interval=600 # 10 minutes
)-
High Memory Usage
# Adjust cache settings shared_dict = app_support.get_shared_dict('large_dataset') shared_dict.cache_ttl = 10 # Shorter cache for volatile data
-
Worker Lock Contention
# Increase lock timeouts lock = DistributedLock(redis, "my_lock", ttl_ms=10000)
-
Token Verification Failures
# Check Redis connectivity and token blacklisting payload = app_support.verify_user_access(token)
# Connection pooling
redis_client = CyRedisClient(max_connections=20)
# Caching strategy
fast_dict = app_support.get_shared_dict('fast_changing_data')
fast_dict.cache_ttl = 10 # Short cache
slow_dict = app_support.get_shared_dict('static_config')
slow_dict.cache_ttl = 300 # Long cache- Fork the repository
- Create a feature branch
- Add tests for new features
- Run the test suite
- Submit a pull request
MIT License - see LICENSE file for details.
- Built on top of the excellent hiredis C library
- Inspired by redis-py and FastAPI
- Uses uvloop for async performance
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: Full Documentation
CyRedis - Bringing enterprise-grade Redis functionality to Python with FastAPI-style web application support! π