Noë Flatreaud

Real-time notifications with Flask and Server-Sent Events (SSE)

Server-Sent Events (SSE) offer a straightforward way to push real-time updates from a server to a client. Unlike WebSockets, SSE is designed one-way ony, from the server to the client which makes it an excellent choice for applications that need to send notifications, updates, or messages to clients without full-duplex connections. In this post, we'll try and implement SSE using Python. Hope you'll find this relevant.

Isn't flask-sse already around ?

Meh. flask-sse package seemed promising, but relies on Redis. Moreover, who-tf needs extra-dependencies for such a simple feature ? Please keep your application simple and dependency-free. In a perfectly optimised world, you'd event ditch python and flask for C and Assembly

Setting Up the Flask Application

Alright, with this settled, let's create a basic Flask application. Create a file named app.py and add the following code:

from flask import Flask

app = Flask(__name__)

@app.route('/')
def index():
    return 'Hello, World!'
 
if __name__ == "__main__":
    app.run(debug=True)

Sever-Sent Events (SSE)

SSE format

In order to implement Server-Sent Events (SSE), we need to follow a certain format :

event: MessageEvent\\ndata: {"message": "Hello World!"}\\n\\n

Carriage returns (\n) are important because they delimitate the beginnings and ends of consecutive messages in HTTP. Here’s a helper function to format messages correctly:

# [...]
def format_sse(data: str, event=None) -> str:
    msg = f'data: {data}\n\n'
    if event: msg = f'event: {event}\n{msg}'
    return msg
# [...]

The event parameter is optional, it allows defining topics to which clients can subscribe to. This avoids having to define one message queue for each topic.

Implementing the PubSub Pattern

To handle the pubsub pattern, we'll create an EventBouncer class. The latter will manage subscribers and broadcast messages to them.

from queue import Queue, Full

# [...]

class EventBouncer:

    def __init__(self, maxsize : int =16) -> None:
        self.subscribers = []
        self.maxsize=maxsize

    def subscribe(self) -> Queue:
        q = Queue(maxsize=self.maxsize)
        self.subscribers.append(q)
        return q

    def broadcast(self, msg : str) -> None:
        for i in reversed(range(len(self.subscribers))):
            try:
                self.subscribers[i].put_nowait(msg)
            except Full:
                del self.subscribers[i]

# [...]

The EventBouncer class has two methods.

The first being subscribe. When a client wants to receive notifications, this method appends a Queue to the list of subscribers. The queue is thread-safe and has a maximum size of 5. The queue module is part of Python’s standard library.

The second one is called broadcast. We could call her announce or dispatch because this method takes a message and dispatches it to all subscribers. If a queue is full, it removes the subscriber, assuming it's no longer active.

Using the EventBouncer

Instantiate the EventBouncer and use it to send messages :

# [...]
bouncer = EventBouncer()
# [...]

Creating the /subscribe Route

Create a route for clients to subscribe to messages.

from flask import Response

# [...]

@app.route('/subscribe', methods=['GET'])
def subscribe():
    def stream():
        messages = bouncer.subscribe()
        while True:
            msg = messages.get()
            yield msg
    return Response(stream(), mimetype='text/event-stream')

# [...]

Creating the /publish Route

Create a route for clients to publish messages to the bouncer.

from flask import request
# [...]

@app.route('/publish', methods=["POST"])
def publish():
    data = request.get_data().decode('utf-8')
    msg = format_sse(data=data)
    bouncer.broadcast(msg=msg)
    return {}, 200
# [...]

The stream function blocks until a new message arrives and then yields it. The response will never terminate, so ensure Flask is running in threaded mode then returns a message stream to the client. The mimetype='text/event-stream' ensures that the client understands the response as an SSE stream.

Full app.py script

from flask import Flask, Response, request
from queue import Queue, Full

# --------------------------------------------

class EventBouncer:

    def __init__(self, maxsize : int =16) -> None:
        self.subscribers = []
        self.maxsize=maxsize

    def subscribe(self) -> Queue:
        q = Queue(maxsize=self.maxsize)
        self.subscribers.append(q)
        return q

    def broadcast(self, msg : str) -> None:
        for i in reversed(range(len(self.subscribers))):
            try:
                self.subscribers[i].put_nowait(msg)
            except Full:
                del self.subscribers[i]

def format_sse(data: str, event=None) -> str:
    msg = f'data: {data}\n\n'
    if event: msg = f'event: {event}\n{msg}'
    return msg

# --------------------------------------------

app = Flask(__name__)
bouncer = EventBounder()

# --------------------------------------------

@app.route('/subscribe', methods=['GET'])
def subscribe():
    def stream():
        messages = bouncer.subscribe()
        while True:
            msg = messages.get()
            yield msg
    return Response(stream(), mimetype='text/event-stream')

@app.route('/publish', methods=["POST"])
def publish():
    data = request.get_data().decode('utf-8')
    msg = format_sse(data=data)
    bouncer.broadcast(msg=msg)
    return {}, 200

# --------------------------------------------

if __name__ == "__main__":
    app.run(debug=True)

Running the Server

Everything seems to be done, you can now run the Flask server:

$ python ./app.py

or

$ export FLASK_APP=app.py
$ export FLASK_ENV=development
$ flask run

To test for notifications, open a new terminal and run the following command:

$ curl -N http://127.0.0.1:5000/subscribe

To bounce messages, you can use a loop in yet another terminal to send a POST request every second :

$ while true; do curl -X POST http://127.0.0.1:5000/publish -d "Hey!"; sleep 1; done

You should now see a steady stream of Hey! messages in the listening terminal.

#blog #flask #python #ss