Real-time notifications with Flask and Server-Sent Events (SSE)
Server-Sent Events (SSE) offer a straightforward way to push real-time updates from a server to a client. Unlike WebSockets, SSE is designed one-way ony, from the server to the client which makes it an excellent choice for applications that need to send notifications, updates, or messages to clients without full-duplex connections. In this post, we'll try and implement SSE using Python. Hope you'll find this relevant.
Isn't flask-sse
already around ?
Meh. flask-sse
package seemed promising, but relies on Redis. Moreover, who-tf needs extra-dependencies for such a simple feature ? Please keep your application simple and dependency-free. In a perfectly optimised world, you'd event ditch python and flask for C and Assembly
Setting Up the Flask Application
Alright, with this settled, let's create a basic Flask application. Create a file named app.py
and add the following code:
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return 'Hello, World!'
if __name__ == "__main__":
app.run(debug=True)
Sever-Sent Events (SSE)
SSE format
In order to implement Server-Sent Events (SSE), we need to follow a certain format :
event: MessageEvent\\ndata: {"message": "Hello World!"}\\n\\n
Carriage returns (\n) are important because they delimitate the beginnings and ends of consecutive messages in HTTP. Here’s a helper function to format messages correctly:
# [...]
def format_sse(data: str, event=None) -> str:
msg = f'data: {data}\n\n'
if event: msg = f'event: {event}\n{msg}'
return msg
# [...]
The event parameter is optional, it allows defining topics to which clients can subscribe to. This avoids having to define one message queue for each topic.
Implementing the PubSub Pattern
To handle the pubsub pattern, we'll create an EventBouncer
class. The latter will manage subscribers and broadcast messages to them.
from queue import Queue, Full
# [...]
class EventBouncer:
def __init__(self, maxsize : int =16) -> None:
self.subscribers = []
self.maxsize=maxsize
def subscribe(self) -> Queue:
q = Queue(maxsize=self.maxsize)
self.subscribers.append(q)
return q
def broadcast(self, msg : str) -> None:
for i in reversed(range(len(self.subscribers))):
try:
self.subscribers[i].put_nowait(msg)
except Full:
del self.subscribers[i]
# [...]
The EventBouncer
class has two methods.
The first being subscribe. When a client wants to receive notifications, this method appends a Queue
to the list of subscribers. The queue is thread-safe and has a maximum size of 5. The queue module is part of Python’s standard library.
The second one is called broadcast. We could call her announce or dispatch because this method takes a message and dispatches it to all subscribers. If a queue is full, it removes the subscriber, assuming it's no longer active.
Using the EventBouncer
Instantiate the EventBouncer
and use it to send messages :
# [...]
bouncer = EventBouncer()
# [...]
Creating the /subscribe
Route
Create a route for clients to subscribe to messages.
from flask import Response
# [...]
@app.route('/subscribe', methods=['GET'])
def subscribe():
def stream():
messages = bouncer.subscribe()
while True:
msg = messages.get()
yield msg
return Response(stream(), mimetype='text/event-stream')
# [...]
Creating the /publish
Route
Create a route for clients to publish messages to the bouncer.
from flask import request
# [...]
@app.route('/publish', methods=["POST"])
def publish():
data = request.get_data().decode('utf-8')
msg = format_sse(data=data)
bouncer.broadcast(msg=msg)
return {}, 200
# [...]
The stream function blocks until a new message arrives and then yields it. The response will never terminate, so ensure Flask is running in threaded mode then returns a message stream to the client. The mimetype='text/event-stream'
ensures that the client understands the response as an SSE stream.
Full app.py
script
from flask import Flask, Response, request
from queue import Queue, Full
# --------------------------------------------
class EventBouncer:
def __init__(self, maxsize : int =16) -> None:
self.subscribers = []
self.maxsize=maxsize
def subscribe(self) -> Queue:
q = Queue(maxsize=self.maxsize)
self.subscribers.append(q)
return q
def broadcast(self, msg : str) -> None:
for i in reversed(range(len(self.subscribers))):
try:
self.subscribers[i].put_nowait(msg)
except Full:
del self.subscribers[i]
def format_sse(data: str, event=None) -> str:
msg = f'data: {data}\n\n'
if event: msg = f'event: {event}\n{msg}'
return msg
# --------------------------------------------
app = Flask(__name__)
bouncer = EventBounder()
# --------------------------------------------
@app.route('/subscribe', methods=['GET'])
def subscribe():
def stream():
messages = bouncer.subscribe()
while True:
msg = messages.get()
yield msg
return Response(stream(), mimetype='text/event-stream')
@app.route('/publish', methods=["POST"])
def publish():
data = request.get_data().decode('utf-8')
msg = format_sse(data=data)
bouncer.broadcast(msg=msg)
return {}, 200
# --------------------------------------------
if __name__ == "__main__":
app.run(debug=True)
Running the Server
Everything seems to be done, you can now run the Flask server:
$ python ./app.py
or
$ export FLASK_APP=app.py
$ export FLASK_ENV=development
$ flask run
To test for notifications, open a new terminal and run the following command:
$ curl -N http://127.0.0.1:5000/subscribe
To bounce messages, you can use a loop in yet another terminal to send a POST request every second :
$ while true; do curl -X POST http://127.0.0.1:5000/publish -d "Hey!"; sleep 1; done
You should now see a steady stream of Hey!
messages in the listening terminal.