How to Use Redis Pub/Sub for Scheduled Notifications with FastAPI, Redis, and WebSockets?

Scheduling Notifications with FastAPI, Redis, and WebSockets

In this blog post, we'll walk through setting up a FastAPI application to schedule notifications that are sent via WebSockets using Redis. We'll cover everything from setting up the project to explaining each method and function used. Let's dive in!

Prerequisites

Before we get started, ensure you have Python installed. You can download it from python.org. Also, ensure you have Redis installed and running on your local machine or a remote server.

Setting Up the Project

First, let's create a new directory for our project and navigate into it:

mkdir notification_scheduler
cd notification_scheduler

Create a virtual environment and activate it:

python -m venv venv
source venv/bin/activate  # On Windows use `venv\Scripts\activate`
Installing Required Packages

We'll use FastAPI to create our web server, Redis for our message broker, and Uvicorn as the ASGI server to run our FastAPI app.

Install the required packages:

pip install fastapi uvicorn redis
Creating the FastAPI Application

Let's create our FastAPI application. Create a new file named main.py and add the following code:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as redis
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Dict
import json
import asyncio

app = FastAPI()

# Initialize Redis connection
redis_url = "redis://localhost"
redis_client = redis.from_url(redis_url)

# Keep track of active WebSocket channels
active_channels = set()

# Data model for scheduling notifications
class Notification(BaseModel):
    channel: str
    message: Dict  # Adjusted to accept a dictionary
    send_at: str  # datetime string in the format "YYYY-MM-DD HH:MM:SS"

# Function to publish a message
async def publish_message(channel: str, message: dict):
    message_str = json.dumps(message)
    if channel in active_channels:
        await redis_client.publish(channel, message_str)
    else:
        await store_unsent_notification(channel, message_str)

# Function to store unsent notifications
async def store_unsent_notification(channel: str, message: str):
    unsent_key = f"unsent:{channel}"
    await redis_client.lpush(unsent_key, message)

# Function to retrieve and send unsent notifications
async def send_unsent_notifications(channel: str, websocket: WebSocket):
    unsent_key = f"unsent:{channel}"
    while True:
        message = await redis_client.rpop(unsent_key)
        if not message:
            break
        await websocket.send_text(message.decode('utf-8'))

# Schedule and run the notification at the specified time
async def schedule_publish_message(channel: str, message: dict, send_time: datetime):
    await asyncio.sleep((send_time - datetime.now()).total_seconds())
    await publish_message(channel, message)

# Endpoint to schedule a notification
@app.post("/schedule/")
async def schedule_notification(notification: Notification):
    send_time = datetime.strptime(notification.send_at, "%Y-%m-%d %H:%M:%S")
    asyncio.create_task(schedule_publish_message(notification.channel, notification.message, send_time))
    return {"status": "Notification scheduled", "send_at": notification.send_at}

# WebSocket endpoint for subscriber
@app.websocket("/ws/{channel}")
async def websocket_endpoint(websocket: WebSocket, channel: str):
    await websocket.accept()
    active_channels.add(channel)
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(channel)

    try:
        # Send any unsent notifications
        await send_unsent_notifications(channel, websocket)

        async for message in pubsub.listen():
            if message['type'] == 'message':
                await websocket.send_text(message['data'].decode('utf-8'))
    except WebSocketDisconnect:
        active_channels.discard(channel)
        await pubsub.unsubscribe(channel)
    finally:
        await pubsub.unsubscribe(channel)
        await websocket.close()
        active_channels.discard(channel)

# Run the app with: uvicorn main:app --reload
Explanation of the Code
Initial Setup
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import redis.asyncio as redis
from datetime import datetime, timedelta
from pydantic import BaseModel
from typing import Dict
import json
import asyncio
  • FastAPI: A modern, fast web framework for building APIs with Python 3.6+ based on standard Python type hints.
  • redis.asyncio: Redis client library with asyncio support for asynchronous communication with Redis.
  • datetime: Module supplies classes for manipulating dates and times.
  • pydantic: Data validation and settings management using Python type annotations.
  • asyncio: Library to write concurrent code using the async/await syntax.
FastAPI Application Instance
app = FastAPI()

Creates an instance of the FastAPI class, which we will use to define our routes and WebSocket endpoints.

Redis Connection
redis_url = "redis://localhost"
redis_client = redis.from_url(redis_url)

Initializes the connection to Redis using the provided URL.

Active Channels
active_channels = set()

Keeps track of active WebSocket channels. This is useful to check if a channel is open when we need to send a message.

Notification Model
class Notification(BaseModel):
    channel: str
    message: Dict
    send_at: str

Defines the data model for scheduling notifications. The Notification class inherits from BaseModel, which automatically validates the incoming data.

Publish Message Function
async def publish_message(channel: str, message: dict):
    message_str = json.dumps(message)
    if channel in active_channels:
        await redis_client.publish(channel, message_str)
    else:
        await store_unsent_notification(channel, message_str)

Publishes a message to the specified Redis channel. If the channel is not active, the message is stored as unsent.

Store Unsent Notifications
async def store_unsent_notification(channel: str, message: str):
    unsent_key = f"unsent:{channel}"
    await redis_client.lpush(unsent_key, message)

Stores unsent notifications in a Redis list. This allows us to send them later when the channel becomes active.

Send Unsent Notifications
async def send_unsent_notifications(channel: str, websocket: WebSocket):
    unsent_key = f"unsent:{channel}"
    while True:
        message = await redis_client.rpop(unsent_key)
        if not message:
            break
        await websocket.send_text(message.decode('utf-8'))

Sends any unsent notifications to the WebSocket client when it connects.

Schedule and Run Notification
async def schedule_publish_message(channel: str, message: dict, send_time: datetime):
    await asyncio.sleep((send_time - datetime.now()).total_seconds())
    await publish_message(channel, message)

Schedules the notification to be sent at the specified time by using asyncio.sleep to delay execution until the send time.

Schedule Notification Endpoint
@app.post("/schedule/")
async def schedule_notification(notification: Notification):
    send_time = datetime.strptime(notification.send_at, "%Y-%m-%d %H:%M:%S")
    asyncio.create_task(schedule_publish_message(notification.channel, notification.message, send_time))
    return {"status": "Notification scheduled", "send_at": notification.send_at}

Endpoint to schedule a notification. This endpoint accepts a Notification object, parses the send time, and creates a task to schedule the notification.

WebSocket Endpoint
@app.websocket("/ws/{channel}")
async def websocket_endpoint(websocket: WebSocket, channel: str):
    await websocket.accept()
    active_channels.add(channel)
    pubsub = redis_client.pubsub()
    await pubsub.subscribe(channel)

    try:
        await send_unsent_notifications(channel, websocket)
        async for message in pubsub.listen():
            if message['type'] == 'message':
                await websocket.send_text(message['data'].decode('utf-8'))
    except WebSocketDisconnect:
        active_channels.discard(channel)
        await pubsub.unsubscribe(channel)
    finally:
        await pubsub.unsubscribe(channel)
        await websocket.close()
        active_channels.discard(channel)

WebSocket endpoint that accepts connections, subscribes to the specified Redis channel, and listens for messages. When a message is received, it is sent to the WebSocket client.

Running the Application

To run the application, use the following command:

uvicorn main:app --reload
Testing the Application

You can test the application using curl or Postman to schedule a notification. Here is an example using curl:

curl -X POST "http://127.0.0.1:8000/schedule/" \
     -H "Content-Type: application/json" \
     -d '{
           "channel": "ranvijay",
           "message": {
             "id": "123",
             "name": "abc",
             "status": 1
           },
           "send_at": "2024-06-05 18:30:00"
         }'
Conclusion

In this blog post, we covered how to set up a FastAPI application to schedule notifications using Redis and WebSockets. We explained each step, the methods used, and how to test the application. This setup ensures that notifications are handled properly and asynchronously, leveraging the power of FastAPI and Redis for efficient message handling.

Comments

Leave a Reply