Skip to content

Commit

Permalink
WebSocket support for threading mode
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Apr 18, 2021
1 parent 7c6b05c commit b84537a
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 21 deletions.
24 changes: 15 additions & 9 deletions docs/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -591,8 +591,7 @@ Standard Threads
While not comparable to eventlet and gevent in terms of performance,
the Engine.IO server can also be configured to work with multi-threaded web
servers that use standard Python threads. This is an ideal setup to use with
development servers such as `Werkzeug <http://werkzeug.pocoo.org>`_. Only the
long-polling transport is currently available when using standard threads.
development servers such as `Werkzeug <http://werkzeug.pocoo.org>`_.

Instances of class ``engineio.Server`` will automatically use the threading
mode if neither eventlet nor gevent are not installed. To request the
Expand All @@ -613,15 +612,22 @@ development web server based on Werkzeug::
# ... Engine.IO and Flask handler functions ...

if __name__ == '__main__':
app.run(threaded=True)
app.run()

The example that follows shows how to start an Engine.IO application using
Gunicorn's threaded worker class::

When using the threading mode, it is important to ensure that the WSGI server
can handle multiple concurrent requests using threads, since a client can have
up to two outstanding requests at any given time. The Werkzeug server is
single-threaded by default, so the ``threaded=True`` option is required.
$ gunicorn -w 1 --threads 100 module:app

With the above configuration the server will be able to handle up to 100
concurrent clients.

Note that servers that use worker processes instead of threads, such as
gunicorn, do not support an Engine.IO server configured in threading mode.
When using standard threads, WebSocket is supported through the
`simple-websocket <https://github.com/miguelgrinberg/simple-websocket>`_
package, which must be installed separately. This package provides a
multi-threaded WebSocket server that is compatible with Werkzeug and Gunicorn's
threaded worker. Other multi-threaded web servers are not supported and will
not enable the WebSocket transport.

Scalability Notes
~~~~~~~~~~~~~~~~~
Expand Down
37 changes: 34 additions & 3 deletions engineio/async_drivers/threading.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,48 @@
from __future__ import absolute_import
import queue
import threading
import time

try:
import queue
from simple_websocket import Server, ConnectionClosed
_websocket_available = True
except ImportError: # pragma: no cover
import Queue as queue
_websocket_available = False


class WebSocketWSGI(object): # pragma: no cover
"""
This wrapper class provides a threading WebSocket interface that is
compatible with eventlet's implementation.
"""
def __init__(self, app):
self.app = app

def __call__(self, environ, start_response):
self.ws = Server(environ)
return self.app(self)

def close(self):
return self.ws.close()

def send(self, message):
try:
return self.ws.send(message)
except ConnectionClosed:
raise IOError()

def wait(self):
try:
return self.ws.receive()
except ConnectionClosed:
raise IOError()


_async = {
'thread': threading.Thread,
'queue': queue.Queue,
'queue_empty': queue.Empty,
'event': threading.Event,
'websocket': None,
'websocket': WebSocketWSGI if _websocket_available else None,
'sleep': time.sleep,
}
5 changes: 1 addition & 4 deletions engineio/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
from base64 import b64encode
from engineio.json import JSONDecodeError
import logging
try:
import queue
except ImportError: # pragma: no cover
import Queue as queue
import queue
import signal
import ssl
import threading
Expand Down
9 changes: 8 additions & 1 deletion engineio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,14 @@ def _handle_connect(self, environ, start_response, transport,
def _upgrades(self, sid, transport):
"""Return the list of possible upgrades for a client connection."""
if not self.allow_upgrades or self._get_socket(sid).upgraded or \
self._async['websocket'] is None or transport == 'websocket':
transport == 'websocket':
return []
if self._async['websocket'] is None: # pragma: no cover
self._log_error_once(
'The WebSocket transport is not available, you must install a '
'WebSocket server that is compatible with your async mode to '
'enable it. See the documentation for details.',
'no-websocket')
return []
return ['websocket']

Expand Down
20 changes: 16 additions & 4 deletions tests/common/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,19 +97,31 @@ def test_create_ignores_kwargs(self):
server.Server(foo='bar') # this should not raise

def test_async_mode_threading(self):
sys.modules['simple_websocket'] = mock.MagicMock()
s = server.Server(async_mode='threading')
assert s.async_mode == 'threading'

import threading
from engineio.async_drivers import threading as async_threading
import queue

try:
import queue
except ImportError:
import Queue as queue
assert s._async['thread'] == threading.Thread
assert s._async['queue'] == queue.Queue
assert s._async['websocket'] == async_threading.WebSocketWSGI
del sys.modules['simple_websocket']
del sys.modules['engineio.async_drivers.threading']

def test_async_mode_threading_without_websocket(self):
s = server.Server(async_mode='threading')
assert s.async_mode == 'threading'

import threading
import queue

assert s._async['thread'] == threading.Thread
assert s._async['queue'] == queue.Queue
assert s._async['websocket'] is None
del sys.modules['engineio.async_drivers.threading']

def test_async_mode_eventlet(self):
s = server.Server(async_mode='eventlet')
Expand Down

0 comments on commit b84537a

Please sign in to comment.