From bbcdbafba00d95bbf592b2c94f532242b6e4b205 Mon Sep 17 00:00:00 2001 From: Andros Fenollosa Date: Thu, 7 Nov 2024 16:20:51 +0100 Subject: [PATCH] Add fifo --- myproject/settings.py | 9 +++-- waiting_room/consumers.py | 17 ++++++-- waiting_room/tasks.py | 39 ++++++++++++++++++- .../templates/components/tasks/layout.html | 1 + .../templates/components/tasks/location.html | 5 +++ 5 files changed, 63 insertions(+), 8 deletions(-) create mode 100644 waiting_room/templates/components/tasks/location.html diff --git a/myproject/settings.py b/myproject/settings.py index 576b9c1..bdc990c 100644 --- a/myproject/settings.py +++ b/myproject/settings.py @@ -43,11 +43,14 @@ INSTALLED_APPS = [ 'channels', # Servidor de WebSockets ] +REDIS_HOST = "redis" +REDIS_PORT = 6379 + CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { - "hosts": [("redis", 6379)], + "hosts": [(REDIS_HOST, REDIS_PORT)], }, }, } @@ -61,8 +64,8 @@ HUEY = { 'utc': False, 'blocking': True, 'connection': { - 'host': 'redis', - 'port': 6379, + 'host': REDIS_HOST, + 'port': REDIS_PORT, 'db': 0, 'connection_pool': None, 'read_timeout': 1, diff --git a/waiting_room/consumers.py b/waiting_room/consumers.py index 2152192..38648c4 100644 --- a/waiting_room/consumers.py +++ b/waiting_room/consumers.py @@ -1,14 +1,19 @@ import json +import redis +from django.conf import settings from channels.generic.websocket import WebsocketConsumer from asgiref.sync import async_to_sync -from waiting_room.tasks import calculate_min_distance +from waiting_room.tasks import calculate_min_distance, run_tasks_from_queue, render_location_in_the_queue, notify_of_new_position class MyConsumer(WebsocketConsumer): + redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + def connect(self): self.room_group_name = self.scope["url_route"]["kwargs"]["room_name"] async_to_sync(self.channel_layer.group_add)(self.room_group_name, self.channel_name) self.accept() + notify_of_new_position() def disconnect(self, close_code): async_to_sync(self.channel_layer.group_discard)(self.room_group_name, self.channel_name) @@ -16,11 +21,15 @@ class MyConsumer(WebsocketConsumer): def receive(self, text_data): json_data = json.loads(text_data) - # Echo - self.send(text_data=text_data) # Run task if json_data['task'] == 'calculate': - calculate_min_distance(self.room_group_name) + # Check if the new task is already in the queue + if self.redis_conn.lrange('enqueue', 0, -1).count(self.room_group_name.encode()) == 0: + # Add the task to the queue + self.redis_conn.rpush('enqueue', self.room_group_name) + notify_of_new_position() + # Send accurate location in the queue + run_tasks_from_queue() def channel_message(self, event): message = event['message'] diff --git a/waiting_room/tasks.py b/waiting_room/tasks.py index a7ccaa3..639c8ed 100644 --- a/waiting_room/tasks.py +++ b/waiting_room/tasks.py @@ -1,4 +1,6 @@ -from huey.contrib.djhuey import task +from django.conf import settings +import redis +from huey.contrib.djhuey import task, lock_task import operator from django.template.loader import render_to_string from itertools import permutations @@ -8,6 +10,7 @@ from math import factorial from channels.layers import get_channel_layer from asgiref.sync import async_to_sync + def render_progress_bar(group_name, progress, result=None): channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)( @@ -21,6 +24,18 @@ def render_progress_bar(group_name, progress, result=None): } ) +def render_location_in_the_queue(group_name, location): + channel_layer = get_channel_layer() + async_to_sync(channel_layer.group_send)( + group_name, + { + 'type': 'channel_message', + 'message': render_to_string('components/tasks/location.html', { + 'location': location, + }), + } + ) + @task() def calculate_min_distance(group_name): # Distance matrix between cities @@ -72,3 +87,25 @@ def calculate_min_distance(group_name): return shortest_route return calculate_shortest_route(distances) + +@task() +@lock_task('run-queue-lock') +def run_tasks_from_queue(): + notify_of_new_position() + # Get the first task from the queue + redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + task = redis_conn.lindex('enqueue', 0) + if task: + # Run task + r = calculate_min_distance(task.decode('utf-8')) + # Wait for task to finish + r(blocking=True) + # Run the next task + redis_conn.lpop('enqueue') + run_tasks_from_queue() + +@task() +def notify_of_new_position(): + redis_conn = redis.StrictRedis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0) + for index, group_name in enumerate(redis_conn.lrange('enqueue', 0, -1)): + render_location_in_the_queue(group_name.decode('utf-8'), index ) diff --git a/waiting_room/templates/components/tasks/layout.html b/waiting_room/templates/components/tasks/layout.html index 38d56f3..5ab77c6 100644 --- a/waiting_room/templates/components/tasks/layout.html +++ b/waiting_room/templates/components/tasks/layout.html @@ -9,6 +9,7 @@ background: lightgray; "> {% include 'components/tasks/update.html' %} + {% include 'components/tasks/location.html' %}