Skip to content

Commit

Permalink
Merge branch 'efficiosoft-uwsgi-gevent-support'
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Aug 21, 2016
2 parents 298310a + 8f92f4e commit ad64b54
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 4 deletions.
153 changes: 153 additions & 0 deletions engineio/async_gevent_uwsgi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import importlib
import sys
import six

import gevent
try:
import uwsgi
_websocket_available = hasattr(uwsgi, 'websocket_handshake')
except ImportError:
_websocket_available = False


class Thread(gevent.Greenlet): # pragma: no cover
"""
This wrapper class provides gevent Greenlet interface that is compatible
with the standard library's Thread class.
"""
def __init__(self, target, args=[], kwargs={}):
super(Thread, self).__init__(target, *args, **kwargs)

def _run(self):
return self.run()


class uWSGIWebSocket(object): # pragma: no cover
"""
This wrapper class provides a uWSGI WebSocket interface that is
compatible with eventlet's implementation.
"""
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
self.environ = environ

uwsgi.websocket_handshake()

self._req_ctx = None
if hasattr(uwsgi, 'request_context'):
# uWSGI >= 2.1.x with support for api access across-greenlets
self._req_ctx = uwsgi.request_context()
else:
# use event and queue for sending messages
from gevent.event import Event
from gevent.queue import Queue
from gevent.select import select
self._event = Event()
self._send_queue = Queue()

# spawn a select greenlet
def select_greenlet_runner(fd, event):
"""Sets event when data becomes available to read on fd."""
while True:
event.set()
select([fd], [], [])[0]
self._select_greenlet = gevent.spawn(
select_greenlet_runner,
uwsgi.connection_fd(),
self._event)

return self.app(self)

def close(self):
"""Disconnects uWSGI from the client."""
uwsgi.disconnect()
if self._req_ctx is None:
# better kill it here in case wait() is not called again
self._select_greenlet.kill()
self._event.set()

def _send(self, msg):
"""Transmits message either in binary or UTF-8 text mode,
depending on its type."""
if isinstance(msg, six.binary_type):
method = uwsgi.websocket_send_binary
else:
method = uwsgi.websocket_send
if self._req_ctx is not None:
method(msg, request_context=self._req_ctx)
else:
method(msg)

def _decode_received(self, msg):
"""Returns either bytes or str, depending on message type."""
if not isinstance(msg, six.binary_type):
# already decoded - do nothing
return msg
# only decode from utf-8 if message is not binary data
type = six.byte2int(msg[0:1])
if type >= 48: # no binary
return msg.decode('utf-8')
# binary message, don't try to decode
return msg

def send(self, msg):
"""Queues a message for sending. Real transmission is done in
wait method.
Sends directly if uWSGI version is new enough."""
if self._req_ctx is not None:
self._send(msg)
else:
self._send_queue.put(msg)
self._event.set()

def wait(self):
"""Waits and returns received messages.
If running in compatibility mode for older uWSGI versions,
it also sends messages that have been queued by send().
A return value of None means that connection was closed.
This must be called repeatedly. For uWSGI < 2.1.x it must
be called from the main greenlet."""
while True:
if self._req_ctx is not None:
try:
msg = uwsgi.websocket_recv(request_context=self._req_ctx)
except IOError: # connection closed
return None
return self._decode_received(msg)
else:
# we wake up at least every 3 seconds to let uWSGI
# do its ping/ponging
event_set = self._event.wait(timeout=3)
if event_set:
self._event.clear()
# maybe there is something to send
msgs = []
while True:
try:
msgs.append(self._send_queue.get(block=False))
except gevent.queue.Empty:
break
for msg in msgs:
self._send(msg)
# maybe there is something to receive, if not, at least
# ensure uWSGI does its ping/ponging
try:
msg = uwsgi.websocket_recv_nb()
except IOError: # connection closed
self._select_greenlet.kill()
return None
if msg: # message available
return self._decode_received(msg)


async = {
'threading': sys.modules[__name__],
'thread_class': 'Thread',
'queue': importlib.import_module('gevent.queue'),
'queue_class': 'JoinableQueue',
'websocket': sys.modules[__name__] if _websocket_available else None,
'websocket_class': 'uWSGIWebSocket' if _websocket_available else None,
'sleep': gevent.sleep
}
2 changes: 1 addition & 1 deletion engineio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def __init__(self, async_mode=None, ping_timeout=60, ping_interval=25,
self.logger.setLevel(logging.ERROR)
self.logger.addHandler(logging.StreamHandler())
if async_mode is None:
modes = ['eventlet', 'gevent', 'threading']
modes = ['eventlet', 'gevent_uwsgi', 'gevent', 'threading']
else:
modes = [async_mode]
self.async = None
Expand Down
13 changes: 10 additions & 3 deletions engineio/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def _websocket_handler(self, ws):
always_bytes=False):
self.server.logger.info(
'%s: Failed websocket upgrade, no PING packet', self.sid)
return
return []
ws.send(packet.Packet(
packet.PONG,
data=six.text_type('probe')).encode(always_bytes=False))
Expand All @@ -144,24 +144,27 @@ def _websocket_handler(self, ws):
('%s: Failed websocket upgrade, expected UPGRADE packet, '
'received %s instead.'),
self.sid, pkt)
return
return []
self.upgraded = True
else:
self.connected = True
self.upgraded = True

# start separate writer thread
def writer():
while True:
try:
packets = self.poll()
except IOError:
break
if not packets:
# empty packet list returned -> connection closed
break
try:
for pkt in packets:
ws.send(pkt.encode(always_bytes=False))
except:
break

self.server.start_background_task(writer)

self.server.logger.info(
Expand All @@ -173,6 +176,7 @@ def writer():
except:
break
if p is None:
# connection closed by client
break
if isinstance(p, six.text_type): # pragma: no cover
p = p.encode('utf-8')
Expand All @@ -181,5 +185,8 @@ def writer():
self.receive(pkt)
except ValueError:
pass

self.close(wait=True, abort=True)
self.queue.put(None) # unlock the writer task so that it can exit

return []
66 changes: 66 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,64 @@ def test_async_mode_eventlet(self):
self.assertEqual(s.async['websocket'], async_eventlet)
self.assertEqual(s.async['websocket_class'], 'WebSocketWSGI')

@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_uwsgi(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['uwsgi'] = mock.MagicMock()
s = server.Server(async_mode='gevent_uwsgi')
self.assertEqual(s.async_mode, 'gevent_uwsgi')

from engineio import async_gevent_uwsgi

self.assertEqual(s.async['threading'], async_gevent_uwsgi)
self.assertEqual(s.async['thread_class'], 'Thread')
self.assertEqual(s.async['queue'], 'gevent.queue')
self.assertEqual(s.async['queue_class'], 'JoinableQueue')
self.assertEqual(s.async['websocket'], async_gevent_uwsgi)
self.assertEqual(s.async['websocket_class'], 'uWSGIWebSocket')
del sys.modules['gevent']
del sys.modules['uwsgi']
del sys.modules['engineio.async_gevent_uwsgi']

@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_uwsgi_without_uwsgi(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['uwsgi'] = None
s = server.Server(async_mode='gevent_uwsgi')
self.assertEqual(s.async_mode, 'gevent_uwsgi')

from engineio import async_gevent_uwsgi

self.assertEqual(s.async['threading'], async_gevent_uwsgi)
self.assertEqual(s.async['thread_class'], 'Thread')
self.assertEqual(s.async['queue'], 'gevent.queue')
self.assertEqual(s.async['queue_class'], 'JoinableQueue')
self.assertEqual(s.async['websocket'], None)
self.assertEqual(s.async['websocket_class'], None)
del sys.modules['gevent']
del sys.modules['uwsgi']
del sys.modules['engineio.async_gevent_uwsgi']

@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent_uwsgi_without_websocket(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
sys.modules['uwsgi'] = mock.MagicMock()
del sys.modules['uwsgi'].websocket_handshake
s = server.Server(async_mode='gevent_uwsgi')
self.assertEqual(s.async_mode, 'gevent_uwsgi')

from engineio import async_gevent_uwsgi

self.assertEqual(s.async['threading'], async_gevent_uwsgi)
self.assertEqual(s.async['thread_class'], 'Thread')
self.assertEqual(s.async['queue'], 'gevent.queue')
self.assertEqual(s.async['queue_class'], 'JoinableQueue')
self.assertEqual(s.async['websocket'], None)
self.assertEqual(s.async['websocket_class'], None)
del sys.modules['gevent']
del sys.modules['uwsgi']
del sys.modules['engineio.async_gevent_uwsgi']

@mock.patch('importlib.import_module', side_effect=_mock_import)
def test_async_mode_gevent(self, import_module):
sys.modules['gevent'] = mock.MagicMock()
Expand Down Expand Up @@ -149,11 +207,19 @@ def test_async_mode_auto_eventlet(self, import_module):

@mock.patch('importlib.import_module', side_effect=[ImportError,
_mock_async])
def test_async_mode_auto_gevent_uwsgi(self, import_module):
s = server.Server()
self.assertEqual(s.async_mode, 'gevent_uwsgi')

@mock.patch('importlib.import_module', side_effect=[ImportError,
ImportError,
_mock_async])
def test_async_mode_auto_gevent(self, import_module):
s = server.Server()
self.assertEqual(s.async_mode, 'gevent')

@mock.patch('importlib.import_module', side_effect=[ImportError,
ImportError,
ImportError,
_mock_async])
def test_async_mode_auto_threading(self, import_module):
Expand Down

0 comments on commit ad64b54

Please sign in to comment.