Skip to content

Commit

Permalink
minor refactor of the async drivers
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Jan 5, 2019
1 parent 70c59b0 commit 0478110
Show file tree
Hide file tree
Showing 16 changed files with 188 additions and 118 deletions.
4 changes: 2 additions & 2 deletions engineio/async_drivers/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,6 @@ async def wait(self):
'create_route': create_route,
'translate_request': translate_request,
'make_response': make_response,
'websocket': sys.modules[__name__],
'websocket_class': 'WebSocket'
'event': asyncio.Event,
'websocket': WebSocket,
}
5 changes: 3 additions & 2 deletions engineio/async_drivers/asgi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import sys


Expand Down Expand Up @@ -218,6 +219,6 @@ async def wait(self):
'asyncio': True,
'translate_request': translate_request,
'make_response': make_response,
'websocket': sys.modules[__name__],
'websocket_class': 'WebSocket'
'event': asyncio.Event,
'websocket': WebSocket,
}
15 changes: 7 additions & 8 deletions engineio/async_drivers/eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import importlib
import sys

from eventlet.green.threading import Thread, Event
from eventlet.queue import Queue
from eventlet import sleep
from eventlet.websocket import WebSocketWSGI as _WebSocketWSGI


class WebSocketWSGI(_WebSocketWSGI):
def __init__(self, *args, **kwargs):
super(WebSocketWSGI, self).__init__(*args, **kwargs)
Expand All @@ -22,11 +23,9 @@ def __call__(self, environ, start_response):


_async = {
'threading': importlib.import_module('eventlet.green.threading'),
'thread_class': 'Thread',
'queue': importlib.import_module('eventlet.queue'),
'queue_class': 'Queue',
'websocket': sys.modules[__name__],
'websocket_class': 'WebSocketWSGI',
'sleep': sleep
'thread': Thread,
'queue': Queue,
'event': Event,
'websocket': WebSocketWSGI,
'sleep': sleep,
}
14 changes: 7 additions & 7 deletions engineio/async_drivers/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import sys

import gevent
from gevent.queue import JoinableQueue
from gevent.event import Event
try:
import geventwebsocket # noqa
_websocket_available = True
Expand Down Expand Up @@ -55,11 +57,9 @@ def wait(self):


_async = {
'threading': sys.modules[__name__],
'thread_class': 'Thread',
'queue': importlib.import_module('gevent.queue'),
'queue_class': 'JoinableQueue',
'websocket': sys.modules[__name__] if _websocket_available else None,
'websocket_class': 'WebSocketWSGI' if _websocket_available else None,
'sleep': gevent.sleep
'thread': Thread,
'queue': JoinableQueue,
'event': Event,
'websocket': WebSocketWSGI if _websocket_available else None,
'sleep': gevent.sleep,
}
14 changes: 7 additions & 7 deletions engineio/async_drivers/gevent_uwsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import six

import gevent
from gevent.queue import JoinableQueue
from gevent.event import Event
import uwsgi
_websocket_available = hasattr(uwsgi, 'websocket_handshake')

Expand Down Expand Up @@ -147,11 +149,9 @@ def wait(self):


_async = {
'threading': sys.modules[__name__],
'thread_class': 'Thread',
'queue': importlib.import_module('gevent.queue'),
'queue_class': 'JoinableQueue',
'websocket': sys.modules[__name__] if _websocket_available else None,
'websocket_class': 'uWSGIWebSocket' if _websocket_available else None,
'sleep': gevent.sleep
'thread': Thread,
'queue': JoinableQueue,
'event': Event,
'websocket': uWSGIWebSocket if _websocket_available else None,
'sleep': gevent.sleep,
}
5 changes: 3 additions & 2 deletions engineio/async_drivers/sanic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import sys
from urllib.parse import urlsplit

Expand Down Expand Up @@ -140,6 +141,6 @@ async def wait(self):
'create_route': create_route,
'translate_request': translate_request,
'make_response': make_response,
'websocket': sys.modules[__name__] if WebSocketProtocol else None,
'websocket_class': 'WebSocket' if WebSocketProtocol else None
'event': asyncio.Event,
'websocket': WebSocket if WebSocketProtocol else None,
}
17 changes: 8 additions & 9 deletions engineio/async_drivers/threading.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import importlib
from __future__ import absolute_import
import threading
import time

try:
queue = importlib.import_module('queue')
import queue
except ImportError: # pragma: no cover
queue = importlib.import_module('Queue') # pragma: no cover
import Queue as queue

_async = {
'threading': importlib.import_module('threading'),
'thread_class': 'Thread',
'queue': queue,
'queue_class': 'Queue',
'thread': threading.Thread,
'queue': queue.Queue,
'event': threading.Event,
'websocket': None,
'websocket_class': None,
'sleep': time.sleep
'sleep': time.sleep,
}
4 changes: 2 additions & 2 deletions engineio/async_drivers/tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,6 @@ async def wait(self):
'asyncio': True,
'translate_request': translate_request,
'make_response': make_response,
'websocket': sys.modules[__name__],
'websocket_class': 'WebSocket'
'event': asyncio.Event,
'websocket': WebSocket,
}
22 changes: 22 additions & 0 deletions engineio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,28 @@ async def sleep(self, seconds=0):
"""
return await asyncio.sleep(seconds)

def create_queue(self, *args, **kwargs):
"""Create a queue object using the appropriate async model.
This is a utility function that applications can use to create a queue
without having to worry about using the correct call for the selected
async mode. For asyncio based async modes, this returns an instance of
``asyncio.Queue``.
"""
queue = asyncio.Queue(*args, **kwargs)
queue.Empty = asyncio.QueueEmpty
return queue

def create_event(self, *args, **kwargs):
"""Create an event object using the appropriate async model.
This is a utility function that applications can use to create an
event without having to worry about using the correct call for the
selected async mode. For asyncio based async modes, this returns
an instance of ``asyncio.Event``.
"""
return asyncio.Event(*args, **kwargs)

async def _handle_connect(self, environ, transport, b64=False):
"""Handle a client connection request."""
if self.start_service_task:
Expand Down
10 changes: 2 additions & 8 deletions engineio/asyncio_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@


class AsyncSocket(socket.Socket):
def create_queue(self):
return asyncio.Queue()

async def poll(self):
"""Wait for packets to send to the client."""
try:
Expand Down Expand Up @@ -124,13 +121,10 @@ async def _upgrade_websocket(self, environ):
"""Upgrade the connection from polling to websocket."""
if self.upgraded:
raise IOError('Socket has been upgraded already')
if self.server._async['websocket'] is None or \
self.server._async['websocket_class'] is None:
if self.server._async['websocket'] is None:
# the selected async mode does not support websocket
return self.server._bad_request()
websocket_class = getattr(self.server._async['websocket'],
self.server._async['websocket_class'])
ws = websocket_class(self._websocket_handler)
ws = self.server._async['websocket'](self._websocket_handler)
return await ws(environ)

async def _websocket_handler(self, ws):
Expand Down
29 changes: 24 additions & 5 deletions engineio/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import gzip
import importlib
import logging
import sys
import uuid
import zlib

Expand Down Expand Up @@ -403,9 +404,7 @@ def start_background_task(self, target, *args, **kwargs):
the Python standard library. The `start()` method on this object is
already called by this function.
"""
th = getattr(self._async['threading'],
self._async['thread_class'])(target=target, args=args,
kwargs=kwargs)
th = self._async['thread'](target=target, args=args, kwargs=kwargs)
th.start()
return th # pragma: no cover

Expand All @@ -419,6 +418,27 @@ def sleep(self, seconds=0):
"""
return self._async['sleep'](seconds)

def create_queue(self, *args, **kwargs):
"""Create a queue object using the appropriate async model.
This is a utility function that applications can use to create a queue
without having to worry about using the correct call for the selected
async mode.
"""
queue = self._async['queue'](*args, **kwargs)
queue.Empty = getattr(
sys.modules[queue.__class__.__module__], 'Empty')
return queue

def create_event(self, *args, **kwargs):
"""Create an event object using the appropriate async model.
This is a utility function that applications can use to create an
event without having to worry about using the correct call for the
selected async mode.
"""
return self._async['event'](*args, **kwargs)

def _generate_id(self):
"""Generate a unique session id."""
return uuid.uuid4().hex
Expand Down Expand Up @@ -466,8 +486,7 @@ def _handle_connect(self, environ, start_response, transport, b64=False):
def _upgrades(self, sid, transport):
"""Return the list of possible upgrades for a client connection."""
if not self.allow_upgrades or self._get_socket(sid).upgraded or \
self._async['websocket_class'] is None or \
transport == 'websocket':
self._async['websocket'] is None or transport == 'websocket':
return []
return ['websocket']

Expand Down
17 changes: 5 additions & 12 deletions engineio/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class Socket(object):
def __init__(self, server, sid):
self.server = server
self.sid = sid
self.queue = self.create_queue()
self.queue = self.server.create_queue()
self.last_ping = time.time()
self.connected = False
self.upgrading = False
Expand All @@ -24,24 +24,20 @@ def __init__(self, server, sid):
self.closed = False
self.session = {}

def create_queue(self):
return getattr(self.server._async['queue'],
self.server._async['queue_class'])()

def poll(self):
"""Wait for packets to send to the client."""
try:
packets = [self.queue.get(timeout=self.server.ping_timeout)]
self.queue.task_done()
except self.server._async['queue'].Empty:
except self.queue.Empty:
raise exceptions.QueueEmpty()
if packets == [None]:
return []
while True:
try:
packets.append(self.queue.get(block=False))
self.queue.task_done()
except self.server._async['queue'].Empty:
except self.queue.Empty:
break
return packets

Expand Down Expand Up @@ -142,13 +138,10 @@ def _upgrade_websocket(self, environ, start_response):
"""Upgrade the connection from polling to websocket."""
if self.upgraded:
raise IOError('Socket has been upgraded already')
if self.server._async['websocket'] is None or \
self.server._async['websocket_class'] is None:
if self.server._async['websocket'] is None:
# the selected async mode does not support websocket
return self.server._bad_request()
websocket_class = getattr(self.server._async['websocket'],
self.server._async['websocket_class'])
ws = websocket_class(self._websocket_handler)
ws = self.server._async['websocket'](self._websocket_handler)
return ws(environ, start_response)

def _websocket_handler(self, ws):
Expand Down
16 changes: 13 additions & 3 deletions tests/asyncio/test_asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ def get_async_mock(environ={'REQUEST_METHOD': 'GET', 'QUERY_STRING': ''}):
'translate_request': mock.MagicMock(),
'make_response': mock.MagicMock(),
'websocket': 'w',
'websocket_class': 'wc'
}
a._async['translate_request'].return_value = environ
a._async['make_response'].return_value = 'response'
Expand Down Expand Up @@ -99,8 +98,7 @@ def test_async_mode_aiohttp(self):
async_aiohttp.translate_request)
self.assertEqual(s._async['make_response'],
async_aiohttp.make_response)
self.assertEqual(s._async['websocket'], async_aiohttp)
self.assertEqual(s._async['websocket_class'], 'WebSocket')
self.assertEqual(s._async['websocket'].__name__, 'WebSocket')

@mock.patch('importlib.import_module')
def test_async_mode_auto_aiohttp(self, import_module):
Expand Down Expand Up @@ -881,6 +879,18 @@ async def foo_handler(arg):
fut)
self.assertEqual(result, ['bar'])

def test_create_queue(self):
s = asyncio_server.AsyncServer()
q = s.create_queue()
self.assertRaises(q.Empty, q.get_nowait)

def test_create_event(self):
s = asyncio_server.AsyncServer()
e = s.create_event()
self.assertFalse(e.is_set())
e.set()
self.assertTrue(e.is_set())

@mock.patch('importlib.import_module')
def test_service_task_started(self, import_module):
a = self.get_async_mock()
Expand Down
16 changes: 10 additions & 6 deletions tests/asyncio/test_asyncio_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,17 @@ def _get_mock_server(self):
'create_route': mock.MagicMock(),
'translate_request': mock.MagicMock(),
'make_response': mock.MagicMock(),
'websocket': 'w',
'websocket_class': 'wc'}
'websocket': 'w'}
mock_server._async['translate_request'].return_value = 'request'
mock_server._async['make_response'].return_value = 'response'
mock_server._trigger_event = AsyncMock()

def create_queue(*args, **kwargs):
queue = asyncio.Queue(*args, **kwargs)
queue.Empty = asyncio.QueueEmpty
return queue

mock_server.create_queue = create_queue
return mock_server

def test_create(self):
Expand Down Expand Up @@ -184,14 +190,13 @@ def test_upgrade_handshake(self):
def test_upgrade(self):
mock_server = self._get_mock_server()
mock_server._async['websocket'] = mock.MagicMock()
mock_server._async['websocket_class'] = 'WebSocket'
mock_ws = AsyncMock()
mock_server._async['websocket'].WebSocket.return_value = mock_ws
mock_server._async['websocket'].return_value = mock_ws
s = asyncio_socket.AsyncSocket(mock_server, 'sid')
s.connected = True
environ = "foo"
_run(s._upgrade_websocket(environ))
mock_server._async['websocket'].WebSocket.assert_called_once_with(
mock_server._async['websocket'].assert_called_once_with(
s._websocket_handler)
mock_ws.mock.assert_called_once_with(environ)

Expand Down Expand Up @@ -246,7 +251,6 @@ def test_upgrade_no_upgrade_packet(self):
def test_upgrade_not_supported(self):
mock_server = self._get_mock_server()
mock_server._async['websocket'] = None
mock_server._async['websocket_class'] = None
s = asyncio_socket.AsyncSocket(mock_server, 'sid')
s.connected = True
environ = "foo"
Expand Down
Loading

0 comments on commit 0478110

Please sign in to comment.