Skip to content

Commit

Permalink
detect lost connections (eventlet/gevent)
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed May 6, 2017
1 parent 55f5be8 commit e9a3161
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 4 deletions.
4 changes: 4 additions & 0 deletions engineio/async_eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@


class WebSocketWSGI(_WebSocketWSGI):
def __init__(self, *args, **kwargs):
self._sock = None

def __call__(self, environ, start_response):
if 'eventlet.input' not in environ:
raise RuntimeError('You need to use the eventlet server. '
'See the Deployment section of the '
'documentation for more information.')
self._sock = environ['eventlet.input'].get_socket()
return super(WebSocketWSGI, self).__call__(environ, start_response)


Expand Down
4 changes: 3 additions & 1 deletion engineio/async_gevent_uwsgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ class uWSGIWebSocket(object): # pragma: no cover
"""
def __init__(self, app):
self.app = app
self._sock = None

def __call__(self, environ, start_response):
self._sock = uwsgi.connection_fd()
self.environ = environ

uwsgi.websocket_handshake()
Expand Down Expand Up @@ -55,7 +57,7 @@ def select_greenlet_runner(fd, event):
break
self._select_greenlet = gevent.spawn(
select_greenlet_runner,
uwsgi.connection_fd(),
self._sock,
self._event)

self.app(self)
Expand Down
19 changes: 17 additions & 2 deletions engineio/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ def _upgrade_websocket(self, environ, start_response):

def _websocket_handler(self, ws):
"""Engine.IO handler for websocket transport."""
if hasattr(ws, '_sock') and ws._sock is not None: # pragma: no cover
ws._sock.settimeout(self.server.ping_interval)

if self.connected:
# the socket was already connected, so this is an upgrade
self.queue.join() # flush the queue first
Expand Down Expand Up @@ -170,17 +173,22 @@ def _websocket_handler(self, ws):
# start separate writer thread
def writer():
while True:
packets = None
try:
packets = self.poll()
except exceptions.QueueEmpty:
break
if not packets:
# empty packet list returned -> connection closed
if not self.closed: # pragma: no cover
self.close(wait=True, abort=True)
break
try:
for pkt in packets:
ws.send(pkt.encode(always_bytes=False))
except:
if not self.closed: # pragma: no cover
self.close(wait=True, abort=True)
break
writer_task = self.server.start_background_task(writer)

Expand All @@ -192,7 +200,13 @@ def writer():
p = None
try:
p = ws.wait()
except:
except Exception as e:
# if the socket is already closed, we can assume this is a
# downstream error of that
if not self.closed:
self.server.logger.info(
'%s: Unexpected error "%s", closing connection',
self.sid, str(e))
break
if p is None:
# connection closed by client
Expand All @@ -213,7 +227,8 @@ def writer():

self.queue.put(None) # unlock the writer task so that it can exit
writer_task.join()
self.close(wait=True, abort=True)
if not self.closed:
self.close(wait=True, abort=True)
if reraise_exc:
raise reraise_exc

Expand Down
2 changes: 1 addition & 1 deletion tests/test_async_eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_bad_environ(self):
return_value='data')
def test_wsgi_call(self, _WebSocketWSGI):
_WebSocketWSGI.__call__ = lambda e, s: 'data'
environ = {'eventlet.input': None}
environ = {'eventlet.input': mock.MagicMock()}
start_response = 'bar'
wsgi = async_eventlet.WebSocketWSGI(None)
self.assertEqual(wsgi(environ, start_response), 'data')

0 comments on commit e9a3161

Please sign in to comment.