Skip to content

Commit

Permalink
async message events, sleep function, better client timeout
Browse files Browse the repository at this point in the history
Several improvements in this commit:

1. Message event handlers are invoked in a thread so that they are
   non-blocking.
2. Added a sleep function that is generic across async modes.
3. The timeout to declare a client gone has been extended to match the
    ping timeout setting.
  • Loading branch information
miguelgrinberg committed Jun 28, 2016
1 parent 767deb3 commit 6670627
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 15 deletions.
4 changes: 3 additions & 1 deletion engineio/async_eventlet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import importlib
import sys

from eventlet import sleep
from eventlet.websocket import WebSocketWSGI as _WebSocketWSGI


Expand All @@ -19,5 +20,6 @@ def __call__(self, environ, start_response):
'queue': importlib.import_module('eventlet.queue'),
'queue_class': 'Queue',
'websocket': sys.modules[__name__],
'websocket_class': 'WebSocketWSGI'
'websocket_class': 'WebSocketWSGI',
'sleep': sleep
}
3 changes: 2 additions & 1 deletion engineio/async_gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,5 +58,6 @@ def wait(self):
'queue': importlib.import_module('gevent.queue'),
'queue_class': 'JoinableQueue',
'websocket': sys.modules[__name__] if _websocket_available else None,
'websocket_class': 'WebSocketWSGI' if _websocket_available else None
'websocket_class': 'WebSocketWSGI' if _websocket_available else None,
'sleep': gevent.sleep
}
4 changes: 3 additions & 1 deletion engineio/async_threading.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import importlib
import time

try:
queue = importlib.import_module('queue')
Expand All @@ -11,5 +12,6 @@
'queue': queue,
'queue_class': 'Queue',
'websocket': None,
'websocket_class': None
'websocket_class': None,
'sleep': time.sleep
}
20 changes: 17 additions & 3 deletions engineio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,16 @@ def start_background_task(self, target, *args, **kwargs):
th.start()
return th # pragma: no cover

def sleep(self, seconds=0):
"""Sleep for the requested amount of time using the appropriate async
model.
This is a utility function that applications can use to put a task to
sleep without having to worry about using the correct call for the
selected async mode.
"""
return self.async['sleep'](seconds)

def _generate_id(self):
"""Generate a unique session id."""
return uuid.uuid4().hex
Expand All @@ -307,7 +317,7 @@ def _handle_connect(self, environ, start_response, transport, b64=False):
'pingInterval': int(self.ping_interval * 1000)})
s.send(pkt)

if self._trigger_event('connect', sid, environ) is False:
if self._trigger_event('connect', sid, environ, async=False) is False:
self.logger.warning('Application rejected connection')
del self.sockets[sid]
return self._unauthorized()
Expand All @@ -329,10 +339,14 @@ def _upgrades(self, sid, transport):
return []
return ['websocket']

def _trigger_event(self, event, *args):
def _trigger_event(self, event, *args, **kwargs):
"""Invoke an event handler."""
async = kwargs.pop('async', False)
if event in self.handlers:
return self.handlers[event](*args)
if async:
return self.start_background_task(self.handlers[event], *args)
else:
return self.handlers[event](*args)

def _get_socket(self, sid):
"""Return the socket object for a given session."""
Expand Down
7 changes: 4 additions & 3 deletions engineio/socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def receive(self, pkt):
self.last_ping = time.time()
self.send(packet.Packet(packet.PONG, pkt.data))
elif pkt.packet_type == packet.MESSAGE:
self.server._trigger_event('message', self.sid, pkt.data)
self.server._trigger_event('message', self.sid, pkt.data,
async=True)
elif pkt.packet_type == packet.UPGRADE:
self.send(packet.Packet(packet.NOOP))
else:
Expand All @@ -55,7 +56,7 @@ def send(self, pkt):
"""Send a packet to the client."""
if self.closed:
raise IOError('Socket is closed')
if time.time() - self.last_ping > self.server.ping_interval * 5 / 4:
if time.time() - self.last_ping > self.server.ping_timeout:
self.server.logger.info('%s: Client is gone, closing socket',
self.sid)
self.close(wait=False, abort=True)
Expand Down Expand Up @@ -97,7 +98,7 @@ def handle_post_request(self, environ):

def close(self, wait=True, abort=False):
"""Close the socket connection."""
self.server._trigger_event('disconnect', self.sid)
self.server._trigger_event('disconnect', self.sid, async=False)
if not abort:
self.send(packet.Packet(packet.CLOSE))
self.closed = True
Expand Down
28 changes: 28 additions & 0 deletions tests/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import sys
import time
import unittest
import zlib

Expand Down Expand Up @@ -178,6 +179,27 @@ def test_on_event_invalid(self):
s = server.Server()
self.assertRaises(ValueError, s.on, 'invalid')

def test_trigger_event(self):
s = server.Server()
f = {}

@s.on('connect')
def foo(sid, environ):
return sid + environ

@s.on('message')
def bar(sid, data):
f['bar'] = sid + data
return 'bar'

r = s._trigger_event('connect', 1, 2, async=False)
self.assertEqual(r, 3)
r = s._trigger_event('message', 3, 4, async=True)
r.join()
self.assertEqual(f['bar'], 7)
r = s._trigger_event('message', 5, 6)
self.assertEqual(r, 'bar')

def test_close_one_socket(self):
s = server.Server()
mock_socket = self._get_mock_socket()
Expand Down Expand Up @@ -661,3 +683,9 @@ def bg_task():
task.join()
self.assertIn('task', flag)
self.assertTrue(flag['task'])

def test_sleep(self):
s = server.Server()
t = time.time()
s.sleep(0.1)
self.assertTrue(time.time() - t > 0.1)
12 changes: 6 additions & 6 deletions tests/test_socket.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,8 +245,8 @@ def test_websocket_read_write(self):
print(ws.send.call_args_list)
self.assertEqual(mock_server._trigger_event.call_count, 2)
mock_server._trigger_event.assert_has_calls([
mock.call('message', 'sid', 'foo'),
mock.call('disconnect', 'sid')])
mock.call('message', 'sid', 'foo', async=True),
mock.call('disconnect', 'sid', async=False)])
ws.send.assert_called_with('4bar')

def test_websocket_upgrade_read_write(self):
Expand Down Expand Up @@ -274,8 +274,8 @@ def test_websocket_upgrade_read_write(self):
print(ws.send.call_args_list)
self.assertEqual(mock_server._trigger_event.call_count, 2)
mock_server._trigger_event.assert_has_calls([
mock.call('message', 'sid', 'foo'),
mock.call('disconnect', 'sid')])
mock.call('message', 'sid', 'foo', async=True),
mock.call('disconnect', 'sid', async=False)])
ws.send.assert_called_with('4bar')

def test_websocket_upgrade_with_payload(self):
Expand Down Expand Up @@ -336,8 +336,8 @@ def test_websocket_ignore_invalid_packet(self):
print(ws.send.call_args_list)
self.assertEqual(mock_server._trigger_event.call_count, 2)
mock_server._trigger_event.assert_has_calls([
mock.call('message', 'sid', foo),
mock.call('disconnect', 'sid')])
mock.call('message', 'sid', foo, async=True),
mock.call('disconnect', 'sid', async=False)])
ws.send.assert_called_with('4bar')

def test_send_after_close(self):
Expand Down

0 comments on commit 6670627

Please sign in to comment.