0

im trying to initialize a queue rabbit consumer, process those message and send it to a websocket in django. But i got the title error.

Im new at this kind of stuuf. Can someone explain me how can i make this work. This is my code:

asgi.py

import threading
import django
from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from apps.websocket import routing
import asyncio
from apps.websocket.services.message_processor import start_rabbitmq_consumer

django.setup()

application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
    }
)


def start_consumer_thread():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.run_until_complete(start_rabbitmq_consumer())

consumer_thread = threading.Thread(target=start_consumer_thread, daemon=True)
consumer_thread.start()

consumer.py

import asyncio
import json
import logging
from aio_pika import connect, exceptions as aio_pika_exceptions
import httpx
from channels.layers import get_channel_layer
from apps.websocket.services.data_extractor import extract_notification_data
from core.settings import RABBIT_INTERSECTION_QUEUE, RABBIT_URL, API_TRACKER

logger = logging.getLogger(__name__)


async def start_rabbitmq_consumer():
    """Inicializa el consumidor"""
    while True:
        try:
            connection = await connect(RABBIT_URL)
            await consume_messages(connection)
        except aio_pika_exceptions.AMQPConnectionError as e:
            await asyncio.sleep(5)
        except Exception as e:
            await asyncio.sleep(5)


async def consume_messages(connection):
    """Iniciamos el consumo de mensajes de la cola de rabbit"""
    channel = await connection.channel()
    queue = await channel.declare_queue(RABBIT_INTERSECTION_QUEUE, durable=True)
    async for message in queue:
        async with message.process():
            data = json.loads(message.body.decode())
            await process_message(data)


async def process_message(data):
    """Procesamos los mensajes provenientes de la cola de Intersection"""
    notifications = extract_notification_data(data)
    for notification_payload in notifications:
        response_payload = await create_notification(notification_payload)
        if response_payload:
            await send_websocket_notification(response_payload, data['route_pre']['empresa_generadora'])


async def create_notification(notification_payload):
    """Enviamos una solicitud para crear la nueva notificación"""
    async with httpx.AsyncClient() as client:
        try:
            response = await client.post(
                f"{API_TRACKER}/tracker/notification/create/",
                json=notification_payload,
            )
            if response.status_code == 201:
                return response.json()
            else:
                return None
        except httpx.RequestError as exc:
            return None


async def send_websocket_notification(notification_response, company_id):
    """Enviamos al websocket la notifiación"""
    channel_layer = get_channel_layer()
    room_group_name = f"chat_{company_id}"
    await channel_layer.group_send(
        room_group_name,
        {"type": "send_notification", "message": notification_response},
    )

websocket.py

import asyncio
import json
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from apps.websocket.services.message_processor import start_rabbitmq_consumer

logger = logging.getLogger(__name__)


class WebSocketConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.company_id = self.scope['url_route']['kwargs']['company_id']
        self.room_group_name = f"chat_{self.company_id}"
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()
        logger.info(f"WebSocket connected: {self.channel_name}")

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        logger.info(
            f"WebSocket disconnected: {self.channel_name} with code {close_code}"
        )

    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]
        await self.send(text_data=json.dumps({"message": message}))
        logger.info(f"Message received and sent back to client: {message}")

    async def send_notification(self, event):
        message = event["message"]
        await self.send(text_data=json.dumps({"message": message}))
        logger.info(f"WebSocket sending notification: {message}")

i find this non elegant temporaly solution:

class WebSocketConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.company_id = self.scope['url_route']['kwargs']['company_id']
        self.room_group_name = f"chat_{self.company_id}"
        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()
        logger.info(f"WebSocket connected: {self.channel_name}")
        asyncio.create_task(
            start_rabbitmq_consumer()
        )  # Starting de consumer inside the ws

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
        logger.info(
            f"WebSocket disconnected: {self.channel_name} with code {close_code}"
        )

but there must be a better solution.

1
  • Please edit the question to limit it to a specific problem with enough detail to identify an adequate answer.
    – Community Bot
    Commented Jun 7 at 21:34

0