SSE-Fake/sse_fake/asgi.py
2022-03-04 11:33:29 +01:00

62 lines
1.8 KiB
Python

# sse_fake/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.urls import re_path
import django_eventstream
from django_eventstream import send_event
from faker import Faker
from random import randint
from time import sleep
import threading
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sse_fake.settings")
application = ProtocolTypeRouter(
{
# Django's ASGI application to handle traditional HTTP requests
"http": URLRouter(
[
re_path(
r"^events/",
AuthMiddlewareStack(
URLRouter(django_eventstream.routing.urlpatterns)
),
{"channels": ["events"]},
),
re_path(r"", get_asgi_application()),
]
),
}
)
def send_random_event():
fake = Faker()
while True:
random_event = randint(0, 2)
random_data = ""
if random_event == 0:
# User connected
random_data = {"action": "User connected", "name": fake.first_name()}
elif random_event == 1:
# User disconnected
random_data = {"action": "User disconnected", "name": fake.first_name()}
elif random_event == 2:
# New message
random_data = {
"action": "New message",
"name": fake.first_name(),
"text": fake.text(),
}
send_event(
"events",
"message"
,
random_data,
)
sleep(randint(1, 5))
# Run in other thread send_random_event()
threading.Thread(target=send_random_event).start()