Skip to content

Commit

Permalink
fixed handling of queue empty exceptions
Browse files Browse the repository at this point in the history
Fixes #88
  • Loading branch information
miguelgrinberg committed Jan 10, 2019
1 parent c7e8a36 commit bc128c2
Show file tree
Hide file tree
Showing 14 changed files with 60 additions and 32 deletions.
1 change: 0 additions & 1 deletion engineio/async_drivers/aiohttp.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,5 @@ async def wait(self):
'create_route': create_route,
'translate_request': translate_request,
'make_response': make_response,
'event': asyncio.Event,
'websocket': WebSocket,
}
2 changes: 0 additions & 2 deletions engineio/async_drivers/asgi.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import sys


Expand Down Expand Up @@ -219,6 +218,5 @@ async def wait(self):
'asyncio': True,
'translate_request': translate_request,
'make_response': make_response,
'event': asyncio.Event,
'websocket': WebSocket,
}
5 changes: 3 additions & 2 deletions engineio/async_drivers/eventlet.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

from eventlet.green.threading import Thread, Event
from eventlet.queue import Queue
from eventlet import queue
from eventlet import sleep
from eventlet.websocket import WebSocketWSGI as _WebSocketWSGI

Expand All @@ -22,7 +22,8 @@ def __call__(self, environ, start_response):

_async = {
'thread': Thread,
'queue': Queue,
'queue': queue.Queue,
'queue_empty': queue.Empty,
'event': Event,
'websocket': WebSocketWSGI,
'sleep': sleep,
Expand Down
5 changes: 3 additions & 2 deletions engineio/async_drivers/gevent.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import absolute_import

import gevent
from gevent.queue import JoinableQueue
from gevent import queue
from gevent.event import Event
try:
import geventwebsocket # noqa
Expand Down Expand Up @@ -55,7 +55,8 @@ def wait(self):

_async = {
'thread': Thread,
'queue': JoinableQueue,
'queue': queue.JoinableQueue,
'queue_empty': queue.Empty,
'event': Event,
'websocket': WebSocketWSGI if _websocket_available else None,
'sleep': gevent.sleep,
Expand Down
5 changes: 3 additions & 2 deletions engineio/async_drivers/gevent_uwsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import six

import gevent
from gevent.queue import JoinableQueue
from gevent import queue
from gevent.event import Event
import uwsgi
_websocket_available = hasattr(uwsgi, 'websocket_handshake')
Expand Down Expand Up @@ -148,7 +148,8 @@ def wait(self):

_async = {
'thread': Thread,
'queue': JoinableQueue,
'queue': queue.JoinableQueue,
'queue_empty': queue.Empty,
'event': Event,
'websocket': uWSGIWebSocket if _websocket_available else None,
'sleep': gevent.sleep,
Expand Down
2 changes: 0 additions & 2 deletions engineio/async_drivers/sanic.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import sys
from urllib.parse import urlsplit

Expand Down Expand Up @@ -141,6 +140,5 @@ async def wait(self):
'create_route': create_route,
'translate_request': translate_request,
'make_response': make_response,
'event': asyncio.Event,
'websocket': WebSocket if WebSocketProtocol else None,
}
1 change: 1 addition & 0 deletions engineio/async_drivers/threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
_async = {
'thread': threading.Thread,
'queue': queue.Queue,
'queue_empty': queue.Empty,
'event': threading.Event,
'websocket': None,
'sleep': time.sleep,
Expand Down
1 change: 0 additions & 1 deletion engineio/async_drivers/tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,5 @@ async def wait(self):
'asyncio': True,
'translate_request': translate_request,
'make_response': make_response,
'event': asyncio.Event,
'websocket': WebSocket,
}
14 changes: 11 additions & 3 deletions engineio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,9 +298,17 @@ def create_queue(self, *args, **kwargs):
async mode. For asyncio based async modes, this returns an instance of
``asyncio.Queue``.
"""
queue = asyncio.Queue(*args, **kwargs)
queue.Empty = asyncio.QueueEmpty
return queue
return asyncio.Queue(*args, **kwargs)

def get_queue_empty_exception(self):
"""Return the queue empty exception for the appropriate async model.
This is a utility function that applications can use to work with a
queue without having to worry about using the correct call for the
selected async mode. For asyncio based async modes, this returns an
instance of ``asyncio.QueueEmpty``.
"""
return asyncio.QueueEmpty

def create_event(self, *args, **kwargs):
"""Create an event object using the appropriate async model.
Expand Down
15 changes: 10 additions & 5 deletions engineio/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import gzip
import importlib
import logging
import sys
import uuid
import zlib

Expand Down Expand Up @@ -425,10 +424,16 @@ def create_queue(self, *args, **kwargs):
without having to worry about using the correct call for the selected
async mode.
"""
queue = self._async['queue'](*args, **kwargs)
queue.Empty = getattr(
sys.modules[queue.__class__.__module__], 'Empty')
return queue
return self._async['queue'](*args, **kwargs)

def get_queue_empty_exception(self):
"""Return the queue empty exception for the appropriate async model.
This is a utility function that applications can use to work with a
queue without having to worry about using the correct call for the
selected async mode.
"""
return self._async['queue_empty']

def create_event(self, *args, **kwargs):
"""Create an event object using the appropriate async model.
Expand Down
5 changes: 3 additions & 2 deletions engineio/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ def __init__(self, server, sid):

def poll(self):
"""Wait for packets to send to the client."""
queue_empty = self.server.get_queue_empty_exception()
try:
packets = [self.queue.get(timeout=self.server.ping_timeout)]
self.queue.task_done()
except self.queue.Empty:
except queue_empty:
raise exceptions.QueueEmpty()
if packets == [None]:
return []
while True:
try:
packets.append(self.queue.get(block=False))
self.queue.task_done()
except self.queue.Empty:
except queue_empty:
break
return packets

Expand Down
3 changes: 2 additions & 1 deletion tests/asyncio/test_asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,8 @@ async def foo_handler(arg):
def test_create_queue(self):
s = asyncio_server.AsyncServer()
q = s.create_queue()
self.assertRaises(q.Empty, q.get_nowait)
empty = s.get_queue_empty_exception()
self.assertRaises(empty, q.get_nowait)

def test_create_event(self):
s = asyncio_server.AsyncServer()
Expand Down
28 changes: 22 additions & 6 deletions tests/common/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class TestServer(unittest.TestCase):
_mock_async._async = {
'thread': 't',
'queue': 'q',
'queue_empty': RuntimeError,
'websocket': 'w',
}

Expand Down Expand Up @@ -116,8 +117,10 @@ def test_async_mode_eventlet(self):
@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_uwsgi(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['gevent.queue'] = mock.MagicMock()
sys.modules['gevent'].queue = mock.MagicMock()
sys.modules['gevent.queue'] = sys.modules['gevent'].queue
sys.modules['gevent.queue'].JoinableQueue = 'foo'
sys.modules['gevent.queue'].Empty = RuntimeError
sys.modules['gevent.event'] = mock.MagicMock()
sys.modules['gevent.event'].Event = 'bar'
sys.modules['uwsgi'] = mock.MagicMock()
Expand All @@ -128,6 +131,7 @@ def test_async_mode_gevent_uwsgi(self, import_module):

self.assertEqual(s._async['thread'], async_gevent_uwsgi.Thread)
self.assertEqual(s._async['queue'], 'foo')
self.assertEqual(s._async['queue_empty'], RuntimeError)
self.assertEqual(s._async['event'], 'bar')
self.assertEqual(s._async['websocket'],
async_gevent_uwsgi.uWSGIWebSocket)
Expand All @@ -140,8 +144,10 @@ def test_async_mode_gevent_uwsgi(self, import_module):
@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_uwsgi_without_uwsgi(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['gevent.queue'] = mock.MagicMock()
sys.modules['gevent'].queue = mock.MagicMock()
sys.modules['gevent.queue'] = sys.modules['gevent'].queue
sys.modules['gevent.queue'].JoinableQueue = 'foo'
sys.modules['gevent.queue'].Empty = RuntimeError
sys.modules['gevent.event'] = mock.MagicMock()
sys.modules['gevent.event'].Event = 'bar'
sys.modules['uwsgi'] = None
Expand All @@ -155,8 +161,10 @@ def test_async_mode_gevent_uwsgi_without_uwsgi(self, import_module):
@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_uwsgi_without_websocket(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['gevent.queue'] = mock.MagicMock()
sys.modules['gevent'].queue = mock.MagicMock()
sys.modules['gevent.queue'] = sys.modules['gevent'].queue
sys.modules['gevent.queue'].JoinableQueue = 'foo'
sys.modules['gevent.queue'].Empty = RuntimeError
sys.modules['gevent.event'] = mock.MagicMock()
sys.modules['gevent.event'].Event = 'bar'
sys.modules['uwsgi'] = mock.MagicMock()
Expand All @@ -168,6 +176,7 @@ def test_async_mode_gevent_uwsgi_without_websocket(self, import_module):

self.assertEqual(s._async['thread'], async_gevent_uwsgi.Thread)
self.assertEqual(s._async['queue'], 'foo')
self.assertEqual(s._async['queue_empty'], RuntimeError)
self.assertEqual(s._async['event'], 'bar')
self.assertEqual(s._async['websocket'], None)
del sys.modules['gevent']
Expand All @@ -179,8 +188,10 @@ def test_async_mode_gevent_uwsgi_without_websocket(self, import_module):
@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['gevent.queue'] = mock.MagicMock()
sys.modules['gevent'].queue = mock.MagicMock()
sys.modules['gevent.queue'] = sys.modules['gevent'].queue
sys.modules['gevent.queue'].JoinableQueue = 'foo'
sys.modules['gevent.queue'].Empty = RuntimeError
sys.modules['gevent.event'] = mock.MagicMock()
sys.modules['gevent.event'].Event = 'bar'
sys.modules['geventwebsocket'] = 'geventwebsocket'
Expand All @@ -191,6 +202,7 @@ def test_async_mode_gevent(self, import_module):

self.assertEqual(s._async['thread'], async_gevent.Thread)
self.assertEqual(s._async['queue'], 'foo')
self.assertEqual(s._async['queue_empty'], RuntimeError)
self.assertEqual(s._async['event'], 'bar')
self.assertEqual(s._async['websocket'], async_gevent.WebSocketWSGI)
del sys.modules['gevent']
Expand All @@ -202,8 +214,10 @@ def test_async_mode_gevent(self, import_module):
@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_without_websocket(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['gevent.queue'] = mock.MagicMock()
sys.modules['gevent'].queue = mock.MagicMock()
sys.modules['gevent.queue'] = sys.modules['gevent'].queue
sys.modules['gevent.queue'].JoinableQueue = 'foo'
sys.modules['gevent.queue'].Empty = RuntimeError
sys.modules['gevent.event'] = mock.MagicMock()
sys.modules['gevent.event'].Event = 'bar'
sys.modules['geventwebsocket'] = None
Expand All @@ -214,6 +228,7 @@ def test_async_mode_gevent_without_websocket(self, import_module):

self.assertEqual(s._async['thread'], async_gevent.Thread)
self.assertEqual(s._async['queue'], 'foo')
self.assertEqual(s._async['queue_empty'], RuntimeError)
self.assertEqual(s._async['event'], 'bar')
self.assertEqual(s._async['websocket'], None)
del sys.modules['gevent']
Expand Down Expand Up @@ -908,7 +923,8 @@ def test_sleep(self):
def test_create_queue(self):
s = server.Server()
q = s.create_queue()
self.assertRaises(q.Empty, q.get, timeout=0.01)
empty = s.get_queue_empty_exception()
self.assertRaises(empty, q.get, timeout=0.01)

def test_create_event(self):
s = server.Server()
Expand Down
5 changes: 2 additions & 3 deletions tests/common/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ def bg_task(target, *args, **kwargs):
return th

def create_queue(*args, **kwargs):
q = queue.Queue(*args, **kwargs)
q.Empty = queue.Empty
return q
return queue.Queue(*args, **kwargs)

mock_server.start_background_task = bg_task
mock_server.create_queue = create_queue
mock_server.get_queue_empty_exception.return_value = queue.Empty
return mock_server

def _join_bg_tasks(self):
Expand Down

0 comments on commit bc128c2

Please sign in to comment.