mirror of
https://github.com/tanrax/SSE-Fake.git
synced 2024-11-14 03:25:42 +01:00
62 lines
1.8 KiB
Python
62 lines
1.8 KiB
Python
|
# sse_fake/asgi.py
|
||
|
import os
|
||
|
from django.core.asgi import get_asgi_application
|
||
|
from channels.auth import AuthMiddlewareStack
|
||
|
from channels.routing import ProtocolTypeRouter, URLRouter
|
||
|
from django.urls import re_path
|
||
|
import django_eventstream
|
||
|
from django_eventstream import send_event
|
||
|
from faker import Faker
|
||
|
from random import randint
|
||
|
from time import sleep
|
||
|
import threading
|
||
|
|
||
|
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sse_fake.settings")
|
||
|
|
||
|
application = ProtocolTypeRouter(
|
||
|
{
|
||
|
# Django's ASGI application to handle traditional HTTP requests
|
||
|
"http": URLRouter(
|
||
|
[
|
||
|
re_path(
|
||
|
r"^events/",
|
||
|
AuthMiddlewareStack(
|
||
|
URLRouter(django_eventstream.routing.urlpatterns)
|
||
|
),
|
||
|
{"channels": ["events"]},
|
||
|
),
|
||
|
re_path(r"", get_asgi_application()),
|
||
|
]
|
||
|
),
|
||
|
}
|
||
|
)
|
||
|
|
||
|
|
||
|
def send_random_event():
|
||
|
fake = Faker()
|
||
|
while True:
|
||
|
random_event = randint(0, 2)
|
||
|
random_data = ""
|
||
|
if random_event == 0:
|
||
|
# User connected
|
||
|
random_data = {"action": "User connected", "name": fake.first_name()}
|
||
|
elif random_event == 1:
|
||
|
# User disconnected
|
||
|
random_data = {"action": "User disconnected", "name": fake.first_name()}
|
||
|
elif random_event == 2:
|
||
|
# New message
|
||
|
random_data = {
|
||
|
"action": "New message",
|
||
|
"name": fake.first_name(),
|
||
|
"text": fake.text(),
|
||
|
}
|
||
|
send_event(
|
||
|
"events",
|
||
|
"message"
|
||
|
,
|
||
|
random_data,
|
||
|
)
|
||
|
sleep(randint(1, 5))
|
||
|
|
||
|
# Run in other thread send_random_event()
|
||
|
threading.Thread(target=send_random_event).start()
|