Skip to content

Commit

Permalink
Support for gevent and threading in addition to eventlet.
Browse files Browse the repository at this point in the history
Also improved example application.
  • Loading branch information
miguelgrinberg committed Aug 4, 2015
1 parent 07d3bc0 commit e72e488
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 78 deletions.
31 changes: 14 additions & 17 deletions README.rst
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,22 @@ Python implementation of the `Engine.IO`_ realtime server.
Features
--------

- Fully compatible with the Javascript `engine.io-client`_ library.
- Compatible with Python 2.7 and Python 3.3+.
- Based on `Eventlet`_, enabling large number of clients even on modest
hardware.
- Includes a WSGI middleware that integrates Engine.IO traffic with
standard WSGI applications.
- Uses an event-based architecture implemented with decorators that
hides the details of the protocol.
- Implements HTTP long-polling and WebSocket transports.
- Supports XHR2 and XHR browsers as clients.
- Supports text and binary messages.
- Supports gzip and deflate HTTP compression.
- Configurable CORS responses to avoid cross-origin problems with
browsers.
- Fully compatible with the Javascript `engine.io-client`_ library, versions 1.5.0 and up.
- Compatible with Python 2.7 and Python 3.3+.
- Supports large number of clients even on modest hardware when used with an asynchronous server based on `eventlet`_ or `gevent`_. For development and testing, any WSGI compliant multi-threaded server can be used.
- Includes a WSGI middleware that integrates Engine.IO traffic with standard WSGI applications.
- Uses an event-based architecture implemented with decorators that hides the details of the protocol.
- Implements HTTP long-polling and WebSocket transports.
- Supports XHR2 and XHR browsers as clients.
- Supports text and binary messages.
- Supports gzip and deflate HTTP compression.
- Configurable CORS responses to avoid cross-origin problems with browsers.

Example
-------

The following application uses Flask to serve the HTML/Javascript to the
client:
The following application uses the Eventlet asynchronous server, and includes a
small Flask application that serves the HTML/Javascript to the client:

::

Expand Down Expand Up @@ -72,6 +68,7 @@ Resources

.. _Engine.IO: https://github.com/Automattic/engine.io
.. _engine.io-client: https://github.com/Automattic/engine.io-client
.. _Eventlet: http://eventlet.net/
.. _eventlet: http://eventlet.net/
.. _gevent: http://gevent.org/
.. _Documentation: http://pythonhosted.org/python-engineio
.. _PyPI: https://pypi.python.org/pypi/python-engineio
153 changes: 142 additions & 11 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ integrated with a Python WSGI application. The following are some of its
features:

- Fully compatible with the Javascript
`engine.io-client <https://github.com/Automattic/engine.io-client>`_ library.
`engine.io-client <https://github.com/Automattic/engine.io-client>`_ library,
versions 1.5.0 and up.
- Compatible with Python 2.7 and Python 3.3+.
- Based on `Eventlet <http://eventlet.net/>`_, enabling large number of
clients even on modest hardware.
- Supports large number of clients even on modest hardware when used with
an asynchronous server based on `Eventlet <http://eventlet.net/>`_ or
`gevent <http://gevent.org>`_. For development and testing, any WSGI
compliant multi-threaded server can be used.
- Includes a WSGI middleware that integrates Engine.IO traffic with standard
WSGI applications.
- Uses an event-based architecture implemented with decorators that hides the
Expand All @@ -40,17 +43,17 @@ client-side Javascript code required to setup an Engine.IO connection to
a server::

var socket = eio('http://chat.example.com');
socket.on('open', function() {
alert('connected');
socket.on('message', function(data) { alert(data); });
socket.on('close', function() { alert('disconnected'); });
});
socket.on('open', function() { alert('connected'); });
socket.on('message', function(data) { alert(data); });
socket.on('close', function() { alert('disconnected'); });
socket.send('Hello from the client!');

Getting Started
---------------

The following is a basic example of an Engine.IO server that uses Flask to
deploy the client code to the browser::
The following application is a basic example that uses the Eventlet
asynchronous server and includes a small Flask application that serves the
HTML/Javascript to the client::

import engineio
import eventlet
Expand Down Expand Up @@ -86,7 +89,7 @@ deploy the client code to the browser::

The client-side application must include the
`engine.io-client <https://github.com/Automattic/engine.io-client>`_ library
(versions 1.5.0 or newer recommended).
(version 1.5.0 or newer recommended).

Each time a client connects to the server the ``connect`` event handler is
invoked with the ``sid`` (session ID) assigned to the connection and the WSGI
Expand All @@ -105,6 +108,134 @@ any connected client at any time. The ``engineio.Server.send()`` method takes
the client's ``sid`` and the message payload, which can be of type ``str``,
``bytes``, ``list`` or ``dict`` (the last two are JSON encoded).

Deployment
----------

The following sections describe a variaty of deployment strategies for
Engine.IO servers.

Eventlet
~~~~~~~~

`Eventlet <http://eventlet.net/>`_ is a high performance concurrent networking
library for Python 2 and 3 that uses coroutines, enabling code to be written in
the same style used with the blocking standard library functions. An Engine.IO
server deployed with eventlet has access to the long-polling and WebSocket
transports.

Instances of class ``engineio.Server`` will automatically use eventlet for
asynchronous operations if the library is installed. To request its use
explicitly, the ``async_mode`` option can be given in the constructor::

eio = engineio.Server(async_mode='eventlet')

A server configured for eventlet is deployed as a regular WSGI application,
using the provided ``engineio.Middleware``::

import eventlet
app = engineio.Middleware(eio)
eventlet.wsgi.server(eventlet.listen(('', 8000)), app)

An alternative to running the eventlet WSGI server as above is to use
`gunicorn <gunicorn.org>`_, a fully featured pure Python web server. The
command to launch the application under gunicorn is shown below::

$ gunicorn -k eventlet -w 1 module:app

It is important to specify that only one worker process is used with gunicorn.
A single worker can handle a large number of clients when using eventlet.

Note that when using gunicorn the WebSocket transport is not available. To make
WebSocket work, eventlet uses its own extensions to the WSGI standard, which
gunicorn does not support.

Note: Eventlet provides a ``monkey_patch()`` function that replaces all the
blocking functions in the standard library with equivalent asynchronous
versions. While python-engineio does not require monkey patching, other
libraries such as database drivers are likely to require it.

Gevent
~~~~~~

`Gevent <http://gevent.org>`_ is another asynchronous framework based on
coroutines, very similar to eventlet. Only the long-polling transport is
currently available when using gevent.

Instances of class ``engineio.Server`` will automatically use gevent for
asynchronous operations if the library is installed and eventlet is not
installed. To request gevent to be selected explicitly, the ``async_mode``
option can be given in the constructor::

eio = engineio.Server(async_mode='gevent')

A server configured for gevent is deployed as a regular WSGI application,
using the provided ``engineio.Middleware``::

from gevent import pywsgi
app = engineio.Middleware(eio)
pywsgi.WSGIServer(('', 5000), app).serve_forever()

An alternative to running the eventlet WSGI server as above is to use
`gunicorn <gunicorn.org>`_, a fully featured pure Python web server. The
command to launch the application under gunicorn is shown below::

$ gunicorn -k gevent -w 1 module:app

It is important to specify that only one worker process is used with gunicorn.
A single worker can handle a large number of clients when using gevent.

Note: Gevent provides a ``monkey_patch()`` function that replaces all the
blocking functions in the standard library with equivalent asynchronous
versions. While python-engineio does not require monkey patching, other
libraries such as database drivers are likely to require it.

Standard Threading Library
~~~~~~~~~~~~~~~~~~~~~~~~~~

While not comparable to eventlet and gevent in terms of performance,
python-engineio can also be configured to work with multi-threaded web servers
that use standard Python threads. This is an ideal setup to use with
development servers such as `Werkzeug <http://werkzeug.pocoo.org>`_. Only the
long-polling transport is currently available when using gevent.

Instances of class ``engineio.Server`` will automatically use the threading
mode if eventlet and gevent are not installed. To request the threading
mode explicitly, the ``async_mode`` option can be given in the constructor::

eio = engineio.Server(async_mode='threading')

A server configured for threading is deployed as a regular web application,
using any WSGI complaint multi-threaded server. The example below deploys an
Engine.IO application combined with a Flask web application, using Flask's
development web server based on Werkzeug::

eio = engineio.Server(async_mode='threading')
app = Flask(__name__)
app.wsgi_app = engineio.Middleware(eio, app.wsgi_app)

# ... Engine.IO and Flask handler functions ...

if __name__ == '__main__':
app.run(threaded=True)

When using the threading mode, it is important to ensure that the WSGI server
can handle concurrent requests using threads, as a client can have up to two
outstanding requests at any given time. The Werkzeug server is single-threaded
by default, so the ``threaded=True`` option must be included.

Multi-process deployments
~~~~~~~~~~~~~~~~~~~~~~~~~

Engine.IO is a stateful protocol, which makes horizontal scaling more
difficult. To deploy a cluster of Engine.IO processes, possibly hosted in
multiple servers, the following conditions must be met:

- Each Engine.IO process must be able to handle multiple requests, either by
using eventlet, gevent, or standard threads.
- The load balancer must be configured to always forward requests from a client
to the same process. Load balancers call this *sticky sessions*, or
*session affinity*.

API Reference
-------------

Expand Down
51 changes: 49 additions & 2 deletions engineio/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import gzip
import importlib
import logging
import uuid
import zlib
Expand All @@ -17,6 +18,11 @@ class Server(object):
This class implements a fully compliant Engine.IO web server with support
for websocket and long-polling transports.
:param async_mode: The library used for asynchronous operations. Valid
options are "threading", "eventlet" and "gevent". If
this argument is not given, "eventlet" is tried first,
then "gevent", and finally "threading". The websocket
transport is only supported in "eventlet" mode.
:param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting.
:param ping_interval: The interval in seconds at which the client pings
Expand All @@ -41,7 +47,7 @@ class Server(object):
compression_methods = ['gzip', 'deflate']
event_names = ['connect', 'disconnect', 'message']

def __init__(self, ping_timeout=60, ping_interval=25,
def __init__(self, async_mode=None, ping_timeout=60, ping_interval=25,
max_http_buffer_size=100000000, allow_upgrades=True,
http_compression=True, compression_threshold=1024,
cookie='io', cors_allowed_origins=None,
Expand All @@ -67,6 +73,46 @@ def __init__(self, ping_timeout=60, ping_interval=25,
else:
self.logger.setLevel(logging.ERROR)

threading = None
queue = None
queue_class = None
websocket = None
if async_mode is None or async_mode == 'eventlet':
try:
threading = importlib.import_module('eventlet.green.threading')
queue = importlib.import_module('eventlet.queue')
queue_class = 'Queue'
websocket = importlib.import_module('eventlet.websocket')
async_mode = 'eventlet'
except ImportError:
pass
if async_mode is None or async_mode == 'gevent':
try:
threading = importlib.import_module('gevent.threading')
queue = importlib.import_module('gevent.queue')
queue_class = 'JoinableQueue'
websocket = None
async_mode = 'gevent'
except ImportError:
pass
if async_mode is None or async_mode == 'threading':
threading = importlib.import_module('threading')
try:
queue = importlib.import_module('queue')
except ImportError: # pragma: no cover
queue = importlib.import_module('Queue') # pragma: no cover
queue_class = 'Queue'
websocket = None
async_mode = 'threading'
if threading is None:
raise ValueError('Invalid async_mode specified')
self.async_mode = async_mode
self.async = {'threading': threading,
'queue': queue,
'queue_class': queue_class,
'websocket': websocket}
self.logger.info('Server initialized for %s.', self.async_mode)

def on(self, event, handler=None):
"""Register an event handler.
Expand Down Expand Up @@ -230,7 +276,8 @@ def _handle_connect(self, environ):

def _upgrades(self, sid):
"""Return the list of possible upgrades for a client connection."""
if not self.allow_upgrades or self._get_socket(sid).upgraded:
if not self.allow_upgrades or self._get_socket(sid).upgraded or \
self.async['websocket'] is None:
return []
return ['websocket']

Expand Down
22 changes: 11 additions & 11 deletions engineio/socket.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import time

import eventlet
from eventlet import websocket
import six

from . import packet
Expand All @@ -15,7 +12,8 @@ class Socket(object):
def __init__(self, server, sid):
self.server = server
self.sid = sid
self.queue = eventlet.queue.Queue()
self.queue = getattr(self.server.async['queue'],
self.server.async['queue_class'])()
self.last_ping = time.time()
self.upgraded = False
self.closed = False
Expand All @@ -25,12 +23,12 @@ def poll(self):
try:
packets = [self.queue.get(timeout=self.server.ping_timeout)]
self.queue.task_done()
except eventlet.queue.Empty:
except self.server.async['queue'].Empty:
raise IOError()
try:
packets.append(self.queue.get(block=False))
self.queue.task_done()
except eventlet.queue.Empty:
except self.server.async['queue'].Empty:
pass
return packets

Expand All @@ -53,7 +51,7 @@ def send(self, pkt):
"""Send a packet to the client."""
if self.closed:
raise IOError('Socket is closed')
if time.time() - self.last_ping > self.server.ping_interval:
if time.time() - self.last_ping > self.server.ping_interval * 5 / 4:
self.server.logger.info('%s: Client is gone, closing socket',
self.sid)
self.close(wait=False, abort=True)
Expand Down Expand Up @@ -103,7 +101,8 @@ def _upgrade_websocket(self, environ, start_response):
"""Upgrade the connection from polling to websocket."""
if self.upgraded:
raise IOError('Socket has been upgraded already')
ws = websocket.WebSocketWSGI(self._websocket_handler)
ws = self.server.async['websocket'].WebSocketWSGI(
self._websocket_handler)
return ws(environ, start_response)

def _websocket_handler(self, ws):
Expand Down Expand Up @@ -140,10 +139,11 @@ def writer():
except:
break

writer_task = eventlet.spawn(writer)
writer_task = self.server.async['threading'].Thread(target=writer)
writer_task.start()

self.server.logger.info(
'%s: Upgrade to websocket succesful', self.sid)
'%s: Upgrade to websocket successful', self.sid)

while True:
try:
Expand All @@ -160,4 +160,4 @@ def writer():
except ValueError:
pass
self.close(wait=False, abort=True)
writer_task.wait()
writer_task.join()
Loading

0 comments on commit e72e488

Please sign in to comment.