valberg.dk

Projects

Writing a chat application in Django 4.2 using async StreamingHttpResponse, Server-Sent Events and PostgreSQL LISTEN/NOTIFY


With the release of Django 4.2 we got the following 1:

StreamingHttpResponse now supports async iterators when Django is served via ASGI.

And the documentation has been expanded with the following 2:

When serving under ASGI, however, a StreamingHttpResponse need not stop other requests from being served whilst waiting for I/O. This opens up the possibility of long-lived requests for streaming content and implementing patterns such as long-polling, and server-sent events.

Being a sucker for simplicity I got quite intrigued by the possibility to serve server-sent events (also known as SSE) from Django in an asynchronous manner.

So I set out to write a small, drumroll please, chat application!

This blog post documents my process of writing this application and how the bits and pieces fit together.

The code for the chat application can be found at github.com/valberg/django-sse.

Table of contents

What are server-sent events and why do we want to use them?

Server-sent events is "old tech", as in that is has been supported in major browser since around 2010-2011 3. The idea is that the client "subscribes" to an HTTP endpoint, and the server can then issue data to the client as long as the connection is open. This is a great performance boost compared to other techniques as for instance polling the server.

But wait, isn't websockets "shinier"?

It depends. In many situations when it comes to developing web applications, we just want a way to push data to the client, and here a bidirectional connection like websockets feel like an overkill. Also, I would argue that using POST/PUT requests from the client and SSE to the client might be "just enough" compared to websockets.

SSE also has the added benefit of having a built-in reconnection mechanism, which is something we would have to implement ourselves with websockets.

All in all SSE is a much simpler solution than websockets, and in many (most?) cases that is all we need.

A simple implementation of an SSE endpoint

So lets get to some code!

First we need our model for storing the chat messages:

class ChatMessage(models.Model):
    user = models.CharField(max_length=255)
    text = models.CharField(max_length=255)

With the model defined we can write our view to stream the messages.

The following is something along the lines of my initial attempt. First we have to define the view, which in fact will not change for the remainder of this blog post. The juicy bits are in the stream_messages() function. Note that the view is an async view, denoted by the async keyword.

async def stream_messages_view(request: HttpRequest) -> StreamingHttpResponse:
    return StreamingHttpResponse(
        streaming_content=stream_messages(),
        content_type="text/event-stream",
    )

We tell the StreamingHttpResponse class to get its streaming content from the stream_messages function. The following is my first initial implementation of stream_messages:

async def stream_messages() -> AsyncGenerator[str, None]:
    latest_message = None

    while True:
        current_message = await ChatMessage.objects.order_by("-id").afirst()

        # If we have a new message yield that
        if latest_message != current_message:
            yield "data: {current_message.text}\n\n"
            latest_message = current_message

        await asyncio.sleep(5)

So we've gotten rid of the HTTP overhead of polling by not having to do a request from the client every 5 seconds. But we are still doing a query to the database every 5 seconds, and that for each client. This is not ideal and is probably something we could have done with a synchronous view.

Let's see if we can do better.

More old tech to the rescue: PostgreSQL LISTEN/NOTIFY

This is where we could reach for more infrastructure which could help us giving the database a break. This could be listening for data in Redis (this is what django-channels does), or even having a queue in RabbitMQ. No matter what, it is means more infrastructure.

But I use PostgreSQL - and PostgreSQL is, like Django, "batteries included".

PostgreSQL has a mechanism called "LISTEN/NOTIFY" where a client can LISTEN to a "channel" and then other clients can NOTIFY on that same channel which will be broadcasted to all listeners .

This seems like something we can use, but the good ol' psycopg2 doesn't have async support, and I'm not even sure if asgiref's sync_to_async4 would help us here.

Enter psycopg 3

I had put the whole thing on ice until I realized that another big thing (maybe a bit bigger than StreamingHttpResponse) in Django 4.2 is the support for psycopg 3 - and psycopg 3 is very much async!

So I went for a stroll in the psycopg 3 documentation and struck gold5:

import psycopg
conn = psycopg.connect("", autocommit=True)
conn.execute("LISTEN mychan")
gen = conn.notifies()
for notify in gen:
    print(notify)
    if notify.payload == "stop":
        gen.close()
print("there, I stopped")

This does almost what we want! It just isn't async and isn't getting connection info from Django.

So by combining the snippet from the psycopg 3 documentation and my previous stream_messages I came up with this:

from collections.abc import AsyncGenerator
import psycopg
from django.db import connection

async def stream_messages() -> AsyncGenerator[str, None]:

    # Get the connection params from Django
    connection_params = connection.get_connection_params()

    # Somehow Django 4.2.1 sets the cursor_factory to 
    # django.db.backends.postgresql.base.Cursor
    # which causes problems. Read more about it in the 
    # "Differences between 4.2 and 4.2.1" section in the Appendix.
    # Removing it from the connection parameters works around this.
    connection_params.pop('cursor_factory')

    aconnection = await psycopg.AsyncConnection.connect(
        **connection_params,
        autocommit=True,
    )

    channel_name = "lobby"

    async with aconnection.cursor() as acursor:
        await acursor.execute(f"LISTEN {channel_name}")
        gen = aconnection.notifies()
        async for notify in gen:
            yield f"data: {notify.payload}\n\n"

Appart from problems with the cursor_factory (which I'll get back to in the appendix), this code is pretty straight forward and, most importantly, works!

Whenever a NOTIFY lobby, '<message>' is issued, the stream_messages function will yield the message to the listener.

Test the endpoint with curl

So now we've got the LISTEN part in place.

If we connect to the endpoint using curl (-N disables buffering and is a way to consume streming content with curl):

$ curl -N http://localhost:8000/messages/

And connect to our database and run:

NOTIFY lobby, 'Hello, world!';

We, excitingly, get the following result :

data: Hello, world!

Amazing!

Issuing the NOTIFY command from Django

But we want the NOTIFY command to be issued when a new chat message is submitted.

For this we'll have a small utility function which does the heavy lifting. Note that this is just a very simple synchronous function since everything is just happening within a single request-response cycle.

from django.db import connection


def notify(*, channel: str, event: str, payload: str) -> None:
    payload = json.dumps({
        "event": event,
        "content": payload,
    })
    with connection.cursor() as cursor:
        cursor.execute(
            f"NOTIFY {channel}, '{payload}'",
        )

And then we can use this in our view (I'm using @csrf_exempt here since this is just a quick proof of concept):

@csrf_exempt
@require_POST
def post_message_view(request: HttpRequest) -> HttpResponse:
    message = request.POST.get("message")
    user = request.POST.get("user")
    message = ChatMessage.objects.create(user=user, text=message)
    notify(
        channel="lobby",
        event="message_created",
        content=json.dumps({
            "text": message.text,
            "user": message.user,
        })
    )
    return HttpResponse("OK")

The keen observer will notice that we are storing the payload content as a JSON string within a JSON string.

This is because we have two recipients of the payload. The first is the stream_messages function which is going to send the payload to the client with a event, and the second is the browser which is going to parse the payload and use the event to determine what to do with the payload.

For this we'll have to update our stream_messages function as follows:

async def stream_messages() -> AsyncGenerator[str, None]:
    connection_params = connection.get_connection_params()

    # Remove the cursor_factory parameter since I can't get
    # the default from Django 4.2.1 to work.
    # Django 4.2 didn't have the parameter and that worked.
    connection_params.pop('cursor_factory')

    aconnection = await psycopg.AsyncConnection.connect(
        **connection_params,
        autocommit=True,
    )
    channel_name = "lobby"
    async with aconnection.cursor() as acursor:
        await acursor.execute(f"LISTEN {channel_name}")
        gen = aconnection.notifies()
        async for notify in gen:
            payload = json.loads(notify.payload)
            event = payload.pop("event")
            data = payload.pop("data")
            yield f"event: {event}\ndata: {data}\n\n"

Everything is the same except that we now parse the payload from the NOTIFY command and construct the SSE payload with an event and a data field. This will come in handy when dealing with the frontend.

Another way to do this would be to use Django's signals or event writing a PostgreSQL trigger which issues the NOTIFY command.

Hooking up the frontend

Now that we've got the backend in place, we can get something up and running on the frontend.

We could use HTMX's SSE extension but for this example we'll just use the EventSource API directly.

<template id="message">
    <div style="border: 1px solid black; margin: 5px; padding: 5px;">
        <strong class="user"></strong>: <span class="message"></span>
    </div>
</template>

<div id="messages"></div>

<script>
    const source = new EventSource("/messages/");

    // Note that the event we gave our notify utility function is called "message_created"
    // so that's what we listen for here.
    source.addEventListener("message_created", function(evt) {
        // Parse the payload
        let payload = JSON.parse(evt.data);

        // Get and clone our template
        let template = document.getElementById('message');
        let clone = template.content.cloneNode(true);

        // Update our cloned template
        clone.querySelector('.user').innerText = payload.user;
        clone.querySelector('.message').innerText = payload.text;

        // Append the cloned template to our list of messages
        document.getElementById('messages').appendChild(clone);
    });
</script>

And that's it! We can now open two browser windows and see the messages appear in real time.

Check out the repo for the full code where I've also added a simple form for submitting new messages.

Dealing with reconnections

One of the nice things about SSE is that it will automatically reconnect if the connection is lost. It even has a mechanism for dealing with the fact that the client might have missed some events while it was disconnected.

This is done by sending a Last-Event-ID header with the request. The value of this header is the id of the last event that the client received. The server can then use this to determine which events to send to the client.

To deal with this we can expand on our stream_messages function and view as follows:

async def stream_messages(last_id: int | None = None) -> AsyncGenerator[str, None]:
    connection_params = connection.get_connection_params()
    connection_params.pop('cursor_factory')
    aconnection = await psycopg.AsyncConnection.connect(
        **connection_params,
        autocommit=True,
    )
    channel_name = "lobby"

    if last_id:
        messages = ChatMessage.objects.filter(id__gt=last_id)
        async for message in messages:
            yield f"id: {message.id}\nevent: message_created\ndata: {message.as_json()}\n\n"

    async with aconnection.cursor() as acursor:
        await acursor.execute(f"LISTEN {channel_name}")
        gen = aconnection.notifies()
        async for notify in gen:
            payload = json.loads(notify.payload)
            event = payload.get("event")
            event_id = payload.get("event_id")
            data = payload.get("data")
            yield f"id: {event_id}\nevent: {event}\ndata: {data}\n\n"


async def stream_messages_view(
    request: HttpRequest,
) -> StreamingHttpResponse:
    last_id = request.headers.get("Last-Event-ID")
    return StreamingHttpResponse(
        streaming_content=stream_messages(last_id=last_id),
        content_type="text/event-stream",
    )

We now send the id of each message, and whenever a (re)connection is made we check if the client sent a Last-Event-ID and if so we send all messages with an id greater than that.

This change does also require some changes to our utility functions and model. Those are to be found in the git repo.

Conclusion

Django is boring, which is a good thing, to the degree where it is always the safe option. But with the advances in async support it is becoming a viable, and shiny, option for doing real time stuff. Mix in some other solid and boring tech like PostgreSQL and SSE, and you end up with a very solid foundation for building real time applications.

Appendix

How to run ASGI applications in development

One thing that took me some time to realise is that the Django runserver is not capable of running async views returning StreamingHttpResponse.

Running the view with the builtin runserver results in the following error:

.../django/http/response.py:514: Warning: StreamingHttpResponse must
consume asynchronous iterators in order to serve them synchronously. 
Use a synchronous iterator instead.

Fortunately Daphne, the ASGI server which was developed to power Django Channels, has an async runserver which we can use:

To set this up we'll have to install the daphne package, add daphne to the top of our installed apps, and set the ASGI_APPLICATION setting to point to our ASGI application.

INSTALLED_APPS = [
    "daphne",
    ...
    "chat",  # Our app
]

ASGI_APPLICATION = "project.asgi.application"

Now we can just run ./manage.py runserver as before and we are async ready!

Difference between 4.2 and 4.2.1

The code worked initially in 4.2, but 4.2.1 fixed a regression regarding setting a custom cursor in the database configuration.

In 4.2 we get this from connection.get_connection_params():

{
    'dbname': 'postgres',
    'user': 'postgres',
    'password': 'postgres',
    'host': 'localhost',
    'port': 5432,
    'context': <psycopg.adapt.AdaptersMap object at 0x7f019cda7a60>,
    'prepare_threshold': None
}

in 4.2.1 we get this:

{
    'dbname': 'postgres',
    'client_encoding': 'UTF8',
    'cursor_factory': <class 'django.db.backends.postgresql.base.Cursor'>,
    'user': 'postgres',
    'password': 'postgres',
    'host': 'localhost',
    'port': '5432',
    'context': <psycopg.adapt.AdaptersMap object at 0x7f56464bcdd0>,
    'prepare_threshold': None
}

django.db.backends.postgresql.base.Cursor is not async iterable.

So we can probably try to set our own cursor_factory in settings:

from psycopg import AsyncCursor

DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'postgres',
        'USER': 'postgres',
        'PASSWORD': 'postgres',
        'HOST': 'localhost',
        'PORT': '5432',
        'OPTIONS': {
            "cursor_factory": AsyncCursor
        }
    }
}

But alas. For some reason this does not work. I guess that Django does some wrapping of the cursor - or maybe I've just encountered a bug. The cursor is at least not treated as an async cursor and thus we get the following error:

.../django-sse/venv/lib/python3.11/site-packages/django/db/backends/utils.py:41: 
RuntimeWarning: coroutine 'AsyncCursor.close' was never awaited

  self.close()

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

.../django-sse/venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py:1560:
RuntimeWarning: coroutine 'AsyncCursor.execute' was never awaited

  cursor.execute(sql, params)

RuntimeWarning: Enable tracemalloc to get the object allocation traceback

So instead I opted for removing the cursor_factory in the streaming function. So that now looks like so:

async def stream_messages() -> AsyncGenerator[str, None]:
    connection_params = connection.get_connection_params()
    connection_params.pop('cursor_factory')
    aconnection = await psycopg.AsyncConnection.connect(
        **connection_params,
        autocommit=True,
    )
    channel_name = "lobby"
    async with aconnection.cursor() as acursor:
        print(type(acursor))
        await acursor.execute(f"LISTEN {channel_name}")
        gen = aconnection.notifies()
        async for notify in gen:
            yield f"data: {notify.payload}\n\n"