Colócate en la siguiente situación: cuando un usuario pulsa un botón de tu web, se ejecuta una funcionalidad que tarda mucho tiempo en dar el resultado (tal vez 10 minutos). Sin embargo, la experiencia sería nefasta si esperara todo ese tiempo sin poder hacer nada. Por ello, decidimos que la tarea se ejecute en segundo plano, permitiendo al usuario seguir navegando por toda la web sin restricciones. Cuando termine la funcionalidad, se le notificará el resultado con un mensaje emergente. Y no solo eso, además le daremos un feedback en tiempo real del progreso. Sencillo de decir y complejo de hacer. ¿Cómo podemos lograrlo?
A continuación puedes ver un ejemplo de lo que queremos conseguir. Las 2 ventanas superiores corresponden al mismo usuario, en 2 pestañas diferentes. Un usuario inicia una tarea en la primera pestaña y se le notifica el progreso en ambas a la vez. Por otro lado tenemos 2 cliente diferentes en la parte superior (por lo tanto hay 3 clientes en total). Cada uno ejecuta una tarea diferente y recibe el progreso de la misma. Además, aunque ellos cambien de página, recarguen o cierren el navegador y vuelvan a entrar, seguirán recibiendo el progreso de la tarea. ¡Espectacular!
Hay varios retos técnicos que debemos resolver. La dificultad está en orquestar los diferentes softwares, protocolos y tecnologías que intervienen en la solución.
- Ejecutar tareas en segundo plano: Para ello, utilizaremos Huey, que ya he hablado en un post anterior.
- Comunicación entre Django y el usuario: Usaremos el protocolo WebSockets, que nos permitirá enviar mensajes al cliente en tiempo real. Lo implementaremos porque es comunicar al usuario el progreso de la tarea en tiempo real. Django Channels es la impelentación oficial en nuestro querido framework.
- Comunicación entre la tarea y Django: Redis es perfecto para ello. Huey y Django Channels se comunicarán a través de esta maravillosa base de datos en memoria. Es necesario ya que la tarea debe comunicar a Django el progreso, o el porcentaje completado, de la tarea. Por si sola, si no hacemos nada, Django solo conocerá si esta trabajando o ha terminado.
- Renderizado de la barra de progreso y resultado: htmx será una solución idónea para simplificar el renderizado. Se encargará de gestionar la conexión al servidor de WebSockets, recibir el estado de la tarea, actualizar la barra de progreso y mostrar el resultado. En otras palabras, htmx se encargará del frontend.
Con todas las herramientas definidas, ya podemos empezar a trabajar en la implementación.
1. Docker (Opcional)
Hay muchas opciones para instalar Django y todos los servicios necesarios. Te dejo usar la que más te guste. En mi caso usaré Docker para simplificar la instalación de Redis, Huey y Django.
Creamos el archivo Dockerfile
con el siguiente contenido:
FROM python:3.12-slim
# Prevents Python from writing pyc files to disc (equivalent to python -B option)
ENV PYTHONDONTWRITEBYTECODE=1
# Prevents Python from buffering stdout and stderr (equivalent to python -u option)
ENV PYTHONUNBUFFERED=1
# set work directory
WORKDIR /usr/src/app/
# set time
RUN ln -fs /usr/share/zoneinfo/Europe/Madrid /etc/localtime
RUN dpkg-reconfigure -f noninteractive tzdata
# install software
RUN apt-get update && apt-get install -y \
build-essential \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
# install dependencies
RUN pip3 install --upgrade pip
COPY ./requirements.txt .
RUN pip3 install -r requirements.txt
EXPOSE 8000
# Debug
CMD ["python3", "manage.py", "runserver", "0.0.0.0:8000"]
# Production
#CMD ["daphne", "-b", "0.0.0.0", "-p", "8000", "myproject.asgi:application"]
Utiliza el archivo requirements.txt
para instalar las dependencias de Python que usaremos:
# Django
Django==5.1.2
daphne==4.1.2
asgiref==3.8.1
# Django Channels
channels==4.1.0
redis==5.2.0
# Task queue
huey==2.5.2
Ahora es el turno de compose.yaml
:
services:
redis:
image: redis:alpine
restart: "no"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 30s
timeout: 60s
retries: 5
expose:
- 6379
django:
build:
context: .
restart: "no"
volumes:
- .:/usr/src/app/
ports:
- 8000:8000
depends_on:
- redis
huey:
build: .
restart: "no"
entrypoint: ./manage.py run_huey -f
volumes:
- .:/usr/src/app
depends_on:
- redis
Ya esta toda la configuración de Docker.
El siguiente paso será preparar el proyecto Django.
2. Django
Para crear los archivos necesarios de Django. Si estamos usando Docker, será sencillo con los siguientes comandos:
docker build -t django_example .
docker run -v $(pwd):/usr/src/app/ django_example django-admin startproject myproject .
Aparecerán nuevos archivos en el directorio actual.
myproject/
: Directorio del proyecto Django.manage.py
: Script para gestionar el proyecto.
Ahora creamos una aplicación llamada waiting room
:
docker run -v $(pwd):/usr/src/app/ django_example python manage.py startapp waiting_room
Si estas usando Linux, y tal vez MacOS, deberíamos cambiar los permisos de los archivos creados por Docker:
sudo chown $USER:$USER -R .
(No olvides el punto al final del comando)
Editamos el archivo myproject/settings.py
para añadir las aplicaciones necesarias:
INSTALLED_APPS = [
'daphne', # Servidor asíncrono
# ... Resto de apps
'waiting_room', # Nuestra aplicación
'huey.contrib.djhuey', # Colas de tareas
'daphne', # Servidor de WebSockets
'channels', # Django Channels
]
Indicamos que cualquier dominio pueda conectarse para poder probar la aplicación en local:
ALLOWED_HOSTS = ["*"]
Y añadimos la configuración de Django Channels:
ASGI_APPLICATION = 'myproject.asgi.application'
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [("redis", 6379)],
},
},
}
Levantamos el servidor de Django:
docker compose up -d
Y ejecutamos las migraciones:
docker compose exec django python manage.py migrate
Entra en http://localhost:8000 con tu navegador favorito y te encontrarás con la página de bienvenida de Django.
El siguiente paso será levantar un servidor de WebSockets con Django Channels, para comunicar el progreso de la tarea al cliente.
3. Django Channels
Vamos a crear un consumidor de WebSockets para comunicar el progreso de la tarea al cliente. No voy a explicar conceptos básicos, como que son las salas, grupos o consumidores. Si necesitas más información, te recomiendo la documentación oficial.
Creamos el archivo waiting_room/consumers.py
con el siguiente contenido:
import json
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync
class MyConsumer(WebsocketConsumer):
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(self.room_group_name, self.channel_name)
self.close()
def receive(self, text_data):
# Echo
self.send(text_data=text_data)
connect
: Se ejecuta cuando un cliente se conecta al consumidor. Añade el canal del cliente al gruporoom_name
, que es el identificador de la sala y se lo proporcionaremos más adelante por medio de la URL.disconnect
: Se ejecuta cuando un cliente se desconecta del consumidor. Elimina el canal del cliente del gruporoom_name
.receive
: Se ejecuta cuando el consumidor recibe un mensaje del cliente. No lo usaremos en este caso ya que la comunicación será unidireccional, de servidor a cliente. Sin embargo, he añadido un sencillo echo para que puedas probar la conexión. Todo lo que se envíe al consumidor, lo devolverá al cliente.
A continuación sobrescribimos el archivo myproject/asgi.py
:
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
django.setup()
from django.core.asgi import get_asgi_application
from channels.security.websocket import AllowedHostsOriginValidator
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import re_path
from waiting_room.consumers import MyConsumer
application = ProtocolTypeRouter(
{
# Django's ASGI application to handle traditional HTTP requests
"http": get_asgi_application(),
# WebSocket handler
"websocket": AllowedHostsOriginValidator(
URLRouter([re_path(r"^ws/(?P<room_name>[a-zA-Z0-9_]+)/$", MyConsumer.as_asgi())])
),
}
)
Si recuerdas, configuramos en settings.py
la variable ASGI_APPLICATION
con el valor myproject.asgi.application
. ASGI significa Asynchronous Server Gateway Interface, y es el protocolo asíncrono que utiliza Django Channels para gestionar las conexiones WebSocket.
Entre los cambios que hemos añadido podemos encontrar:
http
para manejar las peticiones HTTP.websocket
para gestionar las peticiones WebSocket. La ruta de conexión seráws/<room_name>/
. Cada usuario deberá pedir una conexión con unroom_name
diferente.- Añadimos
AllowedHostsOriginValidator
para solo permitir conexiones de los hosts indicados enALLOWED_HOSTS
.
Detenemos el servidor de Django y lo volvemos a levantar:
docker compose down
docker compose up
Deberías visualizar un output similar al siguiente:
System check identified no issues (0 silenced).
Django version 5.1.2, using settings 'myproject.settings'
Starting ASGI/Daphne version 4.1.2 development server at http://0.0.0.0:8000/
Quit the server with CONTROL-C.
¡Ya tenemos el servidor de WebSockets funcionando!
Puedes comprobar la conexión en un terminal a parte con el comando websocat
:
websocat ws://localhost:8000/ws/nombre_temporal/
Escribe un texto, pulsa enter y lo verás repetido justo debajo. Como tenemos un sencillo echo, lo que ha ocurrido es que has enviado un mensaje y justo después lo has recibido. ¡Ya tienes una conexión WebSocket funcionando!
Ahora es el turno de Huey, que se encargará de ejecutar la tarea en segundo plano.
4. Huey
Es el momento de arrancar nuestra cola de tareas asíncrona. Para ello, debemos configurar Huey para que arranque en un proceso independiente. Su integración en Django es fantástica, por lo cual será sencillo de desplegar.
El primer paso será añadir la configuración de Huey en myproject/settings.py
:
HUEY = {
'huey_class': 'huey.RedisHuey',
'name': 'queue',
'results': True,
'store_none': False,
'immediate': False,
'utc': False,
'blocking': True,
'connection': {
'host': 'redis',
'port': 6379,
'db': 0,
'connection_pool': None,
'read_timeout': 1,
'url': None,
},
'consumer': {
'workers': 4,
'worker_type': 'thread',
'initial_delay': 0.1,
'backoff': 1.15,
'max_delay': 10.0,
'scheduler_interval': 1,
'periodic': True,
'check_worker_health': True,
'health_check_interval': 1,
},
}
¡Ya esta listo! Lo arrancaremos después de definir la tarea que queremos ejecutar.
Puedes revisar el siguiente artículo en caso de querer profundizar en la implementación de Huey.
5. Tarea
Con el sistema de colas de tareas preparado, el servidor de WebSockets funcionando y Django listo, solo nos queda definir la tarea que queremos ejecutar en segundo plano.
Para el ejemplo resolveremos por fuerza bruta el problema del viajante usando 10 ciudades. O, en otras palabras, calcular cual es el camino más corto pasando por todas las ciudades, donde cada ciudad solo se visite una vez. Es un problema NP-completo, por lo que no existe una solución eficiente. Sin embargo, es perfecto para nosotros. Se harán 10! cálculos (3.628.800), lo que nos permitirá ver el progreso de la tarea paulatinamente.
Creamos el archivo waiting_room/tasks.py
con el siguiente contenido:
from huey.contrib.djhuey import task
import operator
from itertools import permutations
from collections import Counter
from functools import reduce
from math import factorial
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def render_progress_bar(group_name, message):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'channel_message',
'message': message
}
)
@task()
def calculate_min_distance(group_name):
# Distance matrix between cities
distances = [
[0, 29, 20, 21, 16, 31, 100, 12, 5, 78],
[29, 0, 15, 29, 28, 40, 72, 21, 29, 41],
[20, 15, 0, 15, 14, 25, 81, 9, 23, 27],
[21, 29, 15, 0, 4, 12, 92, 12, 25, 13],
[16, 28, 14, 4, 0, 16, 94, 9, 20, 16],
[31, 40, 25, 12, 16, 0, 95, 24, 36, 3],
[100, 72, 81, 92, 94, 95, 0, 90, 101, 99],
[12, 21, 9, 12, 9, 24, 90, 0, 15, 25],
[5, 29, 23, 25, 20, 36, 101, 15, 0, 35],
[78, 41, 27, 13, 16, 3, 99, 25, 35, 0],
]
num_cities = len(distances)
cities = list(range(num_cities))
def count_permutations(sequence):
total = factorial(len(sequence))
duplicates = Counter(sequence).values()
divisor = reduce(operator.mul, (factorial(v) for v in duplicates), 1)
return total / divisor
def calculate_shortest_route(distances):
shortest_route = float("inf")
city_count = 0
# Calculate all possible permutations of cities (routes)
total_permutations = count_permutations(cities)
percentaje = 0
for perm in permutations(cities):
# Send progress to the group
temp_percentaje = int(city_count / total_permutations * 100)
if temp_percentaje != percentaje:
percentaje = temp_percentaje
render_progress_bar(group_name, percentaje)
city_count += 1
# Calculate the distance of the route
route_distance = 0
for i in range(num_cities - 1):
route_distance += distances[perm[i]][perm[i + 1]]
route_distance += distances[perm[-1]][perm[0]] # Back to the start city
shortest_route = min(shortest_route, route_distance)
render_progress_bar(group_name, 100)
return shortest_route
return calculate_shortest_route(distances)
Hemos creado la tarea calculate_min_distance
, usando el decorador @task
de Huey. Recibe un parámetro group_name
, que es el identificador de la sala (donde estará conectado el cliente) a la que queremos enviar el progreso de la tarea.
Por otro lado hemos definido la función render_progress_bar
, que usará el canal de la sala para enviar el progreso de la tarea al cliente. Se conectará al grupo group_name
y enviará un mensaje con el porcentaje completado. De momento puro texto, pero más adelante lo convertiremos en HTML.
En el consumer no existe ningún evento, o función, denominado channel_message
. Editamos el archivo waiting_room/consumers.py
para incluirlo:
# ...
class MyConsumer(WebsocketConsumer):
# ...
def channel_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=str(message))
De momento no tenemos ningún sistema que ejecute la tarea. Temporalmente la ejecutaremos cuando un cliente se conecte.
# ...
class MyConsumer(WebsocketConsumer):
# ...
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
calculate_min_distance(self.room_group_name) # Nueva
# ...
Reinicia los contenedores de Docker:
docker compose down
docker compose up
Y ejecuta la tarea con websocat
:
websocat ws://localhost:8000/ws/nombre_temporal/
Deberías obtener el porcentaje que crece paulatinamente.
En estos momentos, el frontend puede lanzar la tarea en segundo plano e ir recibiendo la información del progreso en tiempo real. ¡Alucinante!
Lamentablemente aún no tenemos ningún frontend que pueda consumirlo. Pero no te agobies, en el siguiente paso lo solucionaremos.
6. HTML con htmx
Ya esta preparado el backend. Un cliente mediante WebSockets puede iniciar, consultar y recibir el resultado. Ahora es el turno del frontend.
- Página donde iniciaremos la tarea y podremos ver el resultado.
- Página en blanco, que usaremos para comprobar que podemos cambiar de página mientras se ejecuta la tarea en segundo plano.
- Componente para renderizar la barra de progreso y el enlace que nos llevará a la página de resultado. A su vez estará dividido en su estructura y en el contenido que iremos actualizando.
Empezaremos creando el componente. Primero creamos waiting_room/templates/components/tasks/update.html
:
<div id="component-notification__update">
<h1>Task</h1>
{% if result %}
<h2>Done!</h2>
<a href="{% url 'waiting_room:index' %}?result={{ result }}">Go to result</a>
{% elif progress %}
<progress value="{{ progress }}" max="100">{{ progress }}</progress>
{% else %}
<p>Nothing</p>
{% endif %}
</div>
Es el fragmento, o sección del componente, que mostrará el avance de la tarea.
- Si
result
tiene un valor, mostrará un mensaje de que la tarea ha terminado y un enlace para ver el resultado. - Si
progress
tiene un valor, mostrará una barra de progreso con el porcentaje completado. - Si no hay ningún valor de ninguna variable, mostrará un mensaje de que no hay nada que hacer.
La tarea será la encargada de renderizarlo con las variables adecuadas y enviarlo al cliente. Por ello debemos modificar waiting_room/tasks.py
:
# ...
from django.template.loader import render_to_string # Nueva
def render_progress_bar(group_name, progress, result=None):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'channel_message',
'message': render_to_string('components/tasks/update.html', { # Nueva
'progress': progress,
'result': result,
}),
}
)
# ...
Ya no enviamos un texto plano, sino que renderizamos y enviamos HTML. Incorporemos una conexión al servidor de WebSockets en el frontend usando htmx.
La librería htmx ha ido ganando popularidad entre los backends. Automatiza la comunicación entre el frontend y el backend gestionando las conexiones AJAX (fetch), Server Send Events y WebSockets con atributos de HTML. Puedes construir SPAs (Single Page Application) evitando la necesidad de escribir JavaScript. Si sientes curiosidad, puedes profundizar buscando información sobre el paradigma Hypermedia-Driven Applications.
En nuestro caso usaremos únicamente la funcionalidad de WebSockets. No aspiro a que aprendas todas las posibilidades que ofrece htmx, pero sí que te familiarices con su uso y evitemos la complejidad requerida en JavaScript.
Ahora creamos el componente waiting_room/templates/components/tasks/layout.html
:
<section
id="component-notification"
hx-ext="ws"
style="
position: fixed;
right: 0;
bottom: 0;
padding: 2rem;
background: lightgray;
">
{% include 'components/tasks/update.html' %}
</section>
<script>
document.querySelector('#component-notification').setAttribute('ws-connect', '/ws/' + localStorage.getItem('userId') + '/');
</script>
Hay pocas líneas de código, pero es lleno de funcionalidades. Vamos a explicarlas:
hx-ext="ws"
: Forma parte de la extensión de WebSockets de htmx. Indica que el componente se conectará a un servidor de WebSockets.- Se le ha dado estilos para que aparezca en la esquina inferior derecha.
- En su interior se incluye el fragmento
update.html
que hemos creado anteriormente. Te preguntarás por qué no hemos incluido el fragmento directamente en la página. Se debe a que es la única parte que iremos actualizando y enviando por WebSockets (usando htmx). Si no estuviera de esta forma, no sería posible enviar solo el fragmento y tendríamos que enviar toda la página. - Al final del componente, se añade un script para indicar, dinámicamente, la dirección a la que se conectará htmx al servidor de WebSocket. Más adelante guardarmos un identificador único en el localStorage para que cada cliente tenga su propia sala donde recibir el progreso de la tarea. De esta forma, si un cliente cambia de página, recarga o cierra el navegador, podremos volver a conectar con la misma sala y seguir pintando las actualizaciones.
Diseñamos una plantilla base para no repetir la estructura principal en las páginas. Creamos el archivo waiting_room/templates/base.html
:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0, shrink-to-fit=no">
<title>{% block title %}Waiting Room{% endblock %}</title>
<script src="https://unpkg.com/htmx.org@2.0.3" integrity="sha384-0895/pl2MU10Hqc6jd4RvrthNlDiE9U1tWmX7WRESftEDRosgxNsQG/Ze9YMRzHq" crossorigin="anonymous"></script>
<script src="https://unpkg.com/htmx-ext-ws@2.0.1/ws.js"></script>
<script>
if (!localStorage.getItem('userId')) {
localStorage.setItem('userId', '{{ user_id }}');
}
</script>
</head>
<body>
<main>
{% include 'components/tasks/layout.html' %}
{% block content %}{% endblock %}
</main>
</body>
</html>
- Se incluye el CDN de htmx y la extensión de WebSockets. Sería el equivalente a instalarlos con su extensión de WebSockets en el proyecto.
- Se añade un script para guardar un identificador único, recibido por Django, en el localStorage. Solo en el caso que no existe. El identificador será el nombre de la sala a la cual se conectará el cliente.
- Insertamos el componente
layout.html
en el<main>
. - El titulo de la página se añadirá en el bloque
title
, mientras que el contenido de la página se añadirá en el bloquecontent
.
Ahora creamos la página de bienvenida waiting_room/templates/index.html
, que poseerá el botón para iniciar la tarea y mostrar el resultado.
{% extends 'base.html' %}
{% block title %}Run task{% endblock %}
{% block content %}
<h1>Index</h1>
<section
id="form-start-task"
hx-ext="ws"
>
<form id="form" ws-send>
<input type="submit" name="task" value="calculate">
</form>
</section>
<script>
document.querySelector('#form-start-task').setAttribute('ws-connect', '/ws/' + localStorage.getItem('userId') + '/');
</script>
<section>
{% if result %}
<h2>The result is {{ result }}</h2>
{% endif %}
</section>
<a href="{% url 'waiting_room:about_us' %}">Go to about us</a>
{% endblock %}
Si queremos arrancar la tarea, necesitamos un botón. Cuando sea pulsado, enviaremos un mensaje al servidor de WebSockets. Como hemos dicho anteriormente, htmx nos gestionará todas las tareas de conexión. Solo hará falta un formulario con algunos atributos.
Incluimos el atributo hx-ext="ws"
en algún padre, que indica que su interior se conectará a un servidor de WebSockets. A continuación, incluimos un formulario con el atributo ws-send
. Cuando se envíe, todos sus campos serán enviados al servidor de WebSockets con un formato JSON. Por último, añadimos dinámicamente el atributo ws-connect
para conectarnos a la sala privada del cliente.
También puedes observar que hemos preparado un espacio para ver el futuro resultado y un enlace para ir a la página en blanco.
Es el turno de la página en blanco waiting_room/templates/about_us.html
:
{% extends 'base.html' %}
{% block title %}About us{% endblock %}
{% block content %}
<h1>About us</h1>
<a href="{% url 'waiting_room:index' %}">Go to index</a>
{% endblock %}
No tiene nada especial, solo un enlace para volver a la página de bienvenida. Te recuerdo que esta página la estamos creado para comprobar que podemos cambiar de página mientras se ejecuta la tarea en segundo plano.
El siguiente paso es configurar las vistas, que en nuestro caso serán muy sencillas. Editamos el archivo waiting_room/views.py
:
from django.shortcuts import render
import uuid
def make_user_id():
return str(uuid.uuid4()).replace('-', '')
def index(request):
result = request.GET.get('result', None)
return render(request, 'index.html', {
'result': result,
'user_id': make_user_id(),
})
def about_us(request):
return render(request, 'about_us.html', {
'user_id': make_user_id(),
})
user_id
es un identificador único que se generará en cada petición. Más adelante lo guardaremos en el frontend, en el localStorage del cliente, para que pueda conectarse a su sala.index
recibe el resultado de la tarea, si lo hubiera, y lo envía a la plantilla.about_us
no recibe ningún parámetro, pero genera unuser_id
para que el cliente pueda conectarse a su sala.
Y las rutas en waiting_room/urls.py
:
from django.urls import path
from . import views
app_name = 'waiting_room'
urlpatterns = [
path('', views.index, name='index'),
path('about_us/', views.about_us, name='about_us'),
]
Y, muy importante, añadimos las rutas en myproject/urls.py
:
from django.urls import path, include
urlpatterns = [
path('', include('waiting_room.urls')),
]
Al entrar en http://localhost:8000
deberías visualizar la página de bienvenida y un enlace para moverte a la página en blanco. Todo ello con el componente de la barra de progreso en la esquina inferior derecha.
Con la parte visual preparada, es el momento de añadir la lógica para iniciar la tarea.
7. Lanzar la tarea
Sabemos como recibir datos, hemos creado un echo que devuelve el mismo mensaje que recibimos en el consumer. Ahora debemos ser capaces de enviar un mensaje al servidor de WebSockets para iniciar la tarea.
Primero borramos la llamada cuando un cliente se conecta en waiting_room/consumers.py
:
# ...
class MyConsumer(WebsocketConsumer):
# ...
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
# calculate_min_distance(self.room_group_name) # Borrar
# ...
Ahora modificamos receive
. Cuando reciba un mensaje con el campo task
con el texto calculate
, se iniciará el trabajo.
import json
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync
from waiting_room.tasks import calculate_min_distance
class MyConsumer(WebsocketConsumer):
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
def disconnect(self, close_code):
async_to_sync(self.channel_layer.group_discard)(self.room_group_name, self.channel_name)
self.close()
def receive(self, text_data):
json_data = json.loads(text_data)
# Run task
if json_data['task'] == 'calculate': # Nueva
calculate_min_distance(self.room_group_name) # Nueva
def channel_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=str(message))
¿De donde venía task
y calculate
? Del formulario que gestiona htmx.
<form id="form" ws-send>
<input type="submit" name="task" value="calculate">
</form>
Y con esto ya estará todo terminado.
El trabajo ha dado sus frutos. Nuestros usuarios pueden ejecutar tareas largas sin que moleste en su navegación. Incluso pueden cerrar sus navegadores para volver cuando lo necesiten. ¡Espectacular!
Notas finales
Queda mucho margen de mejora. Puedes considerar que hemos creado una prueba de concepto, pero no un producto final.
A continuación te dejo algunas ideas para mejorar la aplicación:
- En lugar de llamar en cada vista
make_user_id
, podríamos hacerlo en un middleware. - Añadir un botón para cancelar la tarea.
- Incluir una cola de espera para evitar que todas las tareas se ejecuten al mismo tiempo.
- Retocar para mostrar varias tareas a la vez en el componente de progreso.
- Añadir un sistema de autenticación para que el usuario no dependa de un identificador único.
Y muchas más.
Todo el código fuente de este artículo está disponible en este repositorio. Clona, descarga y juega con él.
Happy hacking!
Extra: Tarea que no se puede paralelizar
Añadimos una situación muy común. Los usuarios tendrán que compartir un recurso que no se puede paralelizar. Podrías imaginar en una impresora. Si un usuario está imprimiendo, los demás tendrán que esperar.
Vamos a añadir nuevos requisitos.
- La tarea solo puede estar ejecutándose por un usuario a la vez.
- Si esta en marcha por un usuario, los demás deberán esperar.
- La cola se organizará por orden en que piden ejecutar la tarea. (first in, first out o FIFO)
- Se notificará a todos los usuario que esperan cual es su posición en la cola.
Como la cola debe leerse por Django, Django Channels y Huey, necesitamos un lugar común donde almacenarla y no tengamos problemas de concurrencia. Redis es es idóneo para ello, otra vez.
Primero prepararemos los nuevos elementos visuales de HTML. Al panel de tasks incluiremos un texto que indique la posición en la cola. Creamos el archivo waiting_room/templates/components/tasks/location.html
:
<div id="component-notification__location">
{% if location > 0 %}
<p>You have {{ location }} persons in front of you.</p>
{% endif %}
</div>
Si la variable location
es mayor que 0, mostrará un mensaje indicando cuantas personas están delante de ti. En caso contrario, no mostrará nada.
Ahora editamos waiting_room/templates/components/tasks/layout.html
. Incluiremos el HTML que acabamos de crear:
<section
id="component-notification"
hx-ext="ws"
style="
position: fixed;
right: 0;
bottom: 0;
padding: 2rem;
background: lightgray;
">
{% include 'components/tasks/update.html' %}
{% include 'components/tasks/location.html' %} # Nueva
</section>
<script>
document.querySelector('#component-notification').setAttribute('ws-connect', '/ws/' + localStorage.getItem('userId') + '/');
</script>
¡Preparado! Vamos a por la lógica.
Nos movemos al archivo de configuración de Django, myproject/settings.py
, y añadimos la configuración de Redis:
REDIS_HOST = "redis"
REDIS_PORT = 6379
Como vamos a conectar a Redis en varios lugares, es buena práctica dejar sus variables en un lugar común.
Crearemos la nueva tarea encargada de dibujar, a todos los usuarios que esperan, su posición en la cola. Creamos el archivo waiting_room/tasks.py
la siguiente tarea:
from django.conf import settings
import redis
def render_location_in_the_queue(group_name, location):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
group_name,
{
'type': 'channel_message',
'message': render_to_string('components/tasks/location.html', {
'location': location,
}),
}
)
@task()
def notify_of_new_position():
redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
for index, group_name in enumerate(redis_conn.lrange('enqueue', 0, -1)):
render_location_in_the_queue(group_name.decode('utf-8'), index)
render_location_in_the_queue
: Renderiza el HTML con la posición en la cola y lo envía al cliente.notify_of_new_position
: Recorre la cola y envía la posición a cada cliente.redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
creamos una conexión a Redis.redis_conn.lrange('enqueue', 0, -1)
: Leemos la listaenqueue
(que rellenaremos más adelante) de Redis y la iteramos.render_location_in_the_queue(group_name.decode('utf-8'), index)
: Llamamos a la primera función para enviar la posición en la cola. Como primer parámetro le damos el nombre de la sala del cliente (es el valor que guardaremos en la lista de Redis) y como segundo la posición en la cola (0, 1, 2, …).
Más adelante la invocaremos en ciertos eventos, como cuando un cliente pide ejecutar la tarea, cuando termina o pasa al siguiente. Pero antes, debemos añadir la lógica para gestionar la cola.
Continuamos editando waiting_room/tasks.py
.
from huey.contrib.djhuey import task, lock_task # Actualizada
@task()
@lock_task('run-queue-lock')
def run_tasks_from_queue():
notify_of_new_position()
# Get the first task from the queue
redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0)
task = redis_conn.lindex('enqueue', 0)
if task:
# Run task
r = calculate_min_distance(task.decode('utf-8'))
# Wait for task to finish
r(blocking=True)
# Run the next task
redis_conn.lpop('enqueue')
run_tasks_from_queue()
Es una función recursiva, que se llama a si misma, hasta que termina la cola. En cada iteración, notifica a todos los clientes de su posición en la cola, ejecuta la tarea y la elimina al cliente de la cola.
Podemos destacar algunas partes:
@lock_task('run-queue-lock')
: Añadimos un candado para que solo se ejecute una vez a la vez. Si varios clientes intentan ejecutar la tarea al mismo tiempo, solo uno podrá hacerlo.notify_of_new_position()
: Notifica a todos los clientes de su posición en la cola.redis_conn.lindex('enqueue', 0)
: Obtiene el primer elemento de la listaenqueue
. Más adelante la llenaremos con los nombres de las salas de los clientes.r = calculate_min_distance(task.decode('utf-8'))
: Ejecuta la tarea y la guarda enr
.r(blocking=True)
: Espera a que la tarea termine, haciendo que sea bloqueante o síncrona. O dicho de otra forma, no se ejecutará la siguiente tarea hasta que la actual haya finalizado.redis_conn.lpop('enqueue')
: Elimina el primer elemento de la listaenqueue
, o el cliente que acaba de terminar su turno.run_tasks_from_queue()
: Llama a si misma para ejecutar la siguiente tarea.
Cuando no haya más clientes esperando, la función terminará.
Es el momento de lanzar la tarea en los eventos adecuados. Modificamos waiting_room/consumers.py
:
import json
import redis # Nueva
from django.conf import settings
from channels.generic.websocket import WebsocketConsumer
from asgiref.sync import async_to_sync
from waiting_room.tasks import calculate_min_distance, run_tasks_from_queue, render_location_in_the_queue, notify_of_new_position # Actualizada
class MyConsumer(WebsocketConsumer):
redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) # Nueva
def connect(self):
self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"]
async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name)
self.accept()
notify_of_new_position() # Nueva
# ...
def receive(self, text_data):
json_data = json.loads(text_data)
# Run task
if json_data['task'] == 'calculate': # Nueva
# Check if the new task is already in the queue
if self.redis_conn.lrange('enqueue', 0, -1).count(self.room_group_name.encode()) == 0:
# Add the task to the queue
self.redis_conn.rpush('enqueue', self.room_group_name)
notify_of_new_position()
# Send accurate location in the queue
run_tasks_from_queue()
# ...
Cada vez un cliente se conecta, o un cliente pide ejecutar la tarea o pasa el turno al siguiente en la cola, notificamos a todos los clientes de su posición. Estos son los eventos que necesitamos.
También se ha añadido la lógica que gestiona la entrada de un nuevo cliente en la cola.
self.redis_conn.lrange('enqueue', 0, -1).count(self.room_group_name.encode()) == 0
: Comprobamos si el cliente ya está en la cola. Si esta presente, lo ignoramos.self.redis_conn.rpush('enqueue', self.room_group_name)
: Añadimos el cliente a la cola.run_tasks_from_queue()
: Lanzamos la tarea. No hay peligro de que se ejecute más de una vez, ya que hemos añadido una restricción de bloqueo.
Y con esto ya hemos terminado la actualización. Ahora los usuarios tendrán que esperar su turno, ¡como en la vida real!
{{ comments.length }} comentarios