So I've been migrating my server that was using Django Channels 1.x -> 2.x+

The original design would send a task to celery using getAFTreeTask.delay(message.reply_channel.name) and by having access to the channel_name it could haply reply asynchronously

from celery import task
from channels import Channel

def getAFTreeTask(channel_name):
    tree = Request().cache_af_tree()
        "text": json.dumps({
            "channel": "AF_INIT",
            "payload": tree

Now I've migrated my server to Channels 2.x+ for various reasons. According to the docs

class Consumer(JsonWebsocketConsumer):

     def connect(self):
        print("Client Connected: ", self.channel_name)

    def receive_json(self, content, **kwargs):
        parse_request(self.channel_name, content)

    def disconnect(self, content):

    def chat_message(self, event):
        print("Entered reply channel")

A consumer set out like this should receive request via the channel layer providing I use the right channel_name, now the consumer works as a send-receive websocket correctly if the response has access to self.send_json() or self.send() for the other generic consumers, so I'm assuming all my settings are correct, my problem is when I try to use the channel layer to send something, like this (according to https://channels.readthedocs.io/en/latest/topics/channel_layers.html#single-channels)

from channels.layers import get_channel_layer
from asgiref.sync import AsyncToSync

def parse_request(channel_name, content):

    print("parsed ", channel_name, content)
    channel_layer = get_channel_layer()
    AsyncToSync(channel_layer.send)(channel_name, {
        "type": "chat.message",
        "text": "Hello there!",

I get

edit (Full stack trace):

    2018-02-02 18:28:35,984 ERROR    Exception inside application: There is no current event loop in thread 'Thread-3'.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/consumer.py", line 51, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/utils.py", line 48, in await_many_dispatch
    await dispatch(result)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 81, in inner
    return await async_func(*args, **kwargs)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 65, in __call__
    return await asyncio.wait_for(future, timeout=None)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 373, in wait_for
    return (yield from fut)
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 74, in thread_handler
    raise e
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 72, in thread_handler
    self.func(*args, **kwargs)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/consumer.py", line 93, in dispatch
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/generic/websocket.py", line 40, in websocket_receive
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/generic/websocket.py", line 104, in receive
    self.receive_json(self.decode_json(text_data), **kwargs)
  File "./MYAPP/API/consumers.py", line 13, in receive_json
    parse_api_request(self.channel_name, content)
  File "./MYAPP/API/api_request.py", line 16, in parse_api_request
    AsyncToSync(channel_layer.send)(channel_name, {
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 17, in __init__
    self.main_event_loop = asyncio.get_event_loop()
  File "/usr/lib/python3.5/asyncio/events.py", line 632, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/usr/lib/python3.5/asyncio/events.py", line 578, in get_event_loop
    % threading.current_thread().name)
  There is no current event loop in thread 'Thread-3'.

and if i don't use AsyncToSync i get (which according to docs I'm not supposed to do, just to check)

2018-02-02 18:34:27,965 WARNING  ./MYAPP/API/api_request.py:18: builtins.RuntimeWarning: coroutine 'RedisChannelLayer.send' was never awaited

which I don't understand as I followed the the guide exactly, I also tried replying from the celery task (a separate thread) and didn't get the same Error but nothing happens, the celery log just says the task completed, but I didn't get a reply.

Also, tried sending the response directly over

AsyncToSync(channel_layer.send)(channel_name, {
            "type": "websocket.send",
            "text": "Hello there!",

from within and out of the thread and got the same non results....

Has anyone been able to send via the channel_layers outside the Consumers object.

FYI my settings.py

ASGI_APPLICATION = "myapp.routing.application"

    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
After a reply from Andres Godwin:

I turns out it was a bug in asgiref < 2.1.3, by upgrading the return values from SyncToAsync/AsyncToSync have been fixed!

So my working implementation, for anyone who's interested:


from channels.consumer import AsyncConsumer

class My_Consumer(AsyncConsumer):

async def websocket_connect(self, event):
    await self.send({
        "type": "websocket.accept",

async def websocket_receive(self, event):
    parse_api_request(self.channel_name, json.loads(event['text']))

async def celery_message(self, event):
    print("Service Received")
    await self.send({
        "type": "websocket.send",
        "text": event["text"],


from channels.layers import get_channel_layer
from asgiref.sync import AsyncToSync

def async_send(channel_name, text):
    channel_layer = get_channel_layer()
            {"type": "celery.message",
             "text": json.dumps(text)

def getAFTree(channel_name, message):
    getAFTreeTask.delay(channel_name, message)

def getAFTreeTask(channel_name, message):
    tree = Request().cache_af_tree()
    async_send(channel_name, {
                "channel": "AF_INIT",
                "payload": tree

