Skip to content

Commit

Permalink
minor refactor of clients for consistency with servers
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Jan 5, 2019
1 parent 0478110 commit d9c278f
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 74 deletions.
4 changes: 1 addition & 3 deletions engineio/async_drivers/eventlet.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
from __future__ import absolute_import

import importlib
import sys

from eventlet.green.threading import Thread, Event
from eventlet.queue import Queue
from eventlet import sleep
from eventlet.websocket import WebSocketWSGI as _WebSocketWSGI


class WebSocketWSGI(_WebSocketWSGI):
def __init__(self, *args, **kwargs):
super(WebSocketWSGI, self).__init__(*args, **kwargs)
Expand Down
3 changes: 0 additions & 3 deletions engineio/async_drivers/gevent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from __future__ import absolute_import

import importlib
import sys

import gevent
from gevent.queue import JoinableQueue
from gevent.event import Event
Expand Down
2 changes: 0 additions & 2 deletions engineio/async_drivers/gevent_uwsgi.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import absolute_import

import importlib
import sys
import six

import gevent
Expand Down
36 changes: 19 additions & 17 deletions engineio/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async def connect(self, url, headers={}, transports=None,
if not transports:
raise ValueError('No valid transports provided')
self.transports = transports or valid_transports
self.queue, self.queue_empty = self._create_queue()
self.queue = self.create_queue()
return await getattr(self, '_connect_' + self.transports[0])(
url, headers, engineio_path)

Expand Down Expand Up @@ -111,7 +111,7 @@ async def disconnect(self, abort=False):
await self._send_packet(packet.Packet(packet.CLOSE))
await self.queue.put(None)
self.state = 'disconnecting'
await self._trigger_event('disconnect')
await self._trigger_event('disconnect', run_async=False)
if self.current_transport == 'websocket':
await self.ws.close()
if not abort:
Expand Down Expand Up @@ -148,6 +148,16 @@ async def sleep(self, seconds=0):
"""
return await asyncio.sleep(seconds)

def create_queue(self):
"""Create a queue object."""
q = asyncio.Queue()
q.Empty = asyncio.QueueEmpty
return q

def create_event(self):
"""Create an event object."""
return asyncio.Event()

async def _connect_polling(self, url, headers, engineio_path):
"""Establish a long-polling connection to the Engine.IO server."""
if aiohttp is None: # pragma: no cover
Expand Down Expand Up @@ -185,7 +195,7 @@ async def _connect_polling(self, url, headers, engineio_path):

self.state = 'connected'
client.connected_clients.append(self)
await self._trigger_event('connect')
await self._trigger_event('connect', run_async=False)

for pkt in p.packets[1:]:
await self._receive_packet(pkt)
Expand Down Expand Up @@ -258,7 +268,7 @@ async def _connect_websocket(self, url, headers, engineio_path):

self.state = 'connected'
client.connected_clients.append(self)
await self._trigger_event('connect')
await self._trigger_event('connect', run_async=False)

self.ws = ws
self.ping_loop_task = self.start_background_task(self._ping_loop)
Expand All @@ -275,7 +285,7 @@ async def _receive_packet(self, pkt):
'Received packet %s data %s', packet_name,
pkt.data if not isinstance(pkt.data, bytes) else '<binary>')
if pkt.packet_type == packet.MESSAGE:
await self._trigger_event('message', pkt.data)
await self._trigger_event('message', pkt.data, run_async=True)
elif pkt.packet_type == packet.PONG:
self.pong_received = True
elif pkt.packet_type == packet.NOOP:
Expand Down Expand Up @@ -304,14 +314,6 @@ async def _send_request(
except aiohttp.ClientError:
return

def _create_queue(self):
"""Create the client's send queue."""
return asyncio.Queue(), asyncio.QueueEmpty

def _create_event(self):
"""Create an event."""
return asyncio.Event()

async def _trigger_event(self, event, *args, **kwargs):
"""Invoke an event handler."""
run_async = kwargs.pop('run_async', False)
Expand Down Expand Up @@ -407,7 +409,7 @@ async def _read_loop_polling(self):
self.ping_loop_event.set()
await self.ping_loop_task
if self.state == 'connected':
await self._trigger_event('disconnect')
await self._trigger_event('disconnect', run_async=False)
try:
client.connected_clients.remove(self)
except ValueError: # pragma: no cover
Expand Down Expand Up @@ -442,7 +444,7 @@ async def _read_loop_websocket(self):
self.ping_loop_event.set()
await self.ping_loop_task
if self.state == 'connected':
await self._trigger_event('disconnect')
await self._trigger_event('disconnect', run_async=False)
try:
client.connected_clients.remove(self)
except ValueError: # pragma: no cover
Expand All @@ -459,7 +461,7 @@ async def _write_loop(self):
timeout = max(self.ping_interval, self.ping_timeout)
try:
packets = [await asyncio.wait_for(self.queue.get(), timeout)]
except (self.queue_empty, asyncio.TimeoutError,
except (self.queue.Empty, asyncio.TimeoutError,
asyncio.CancelledError):
self.logger.error('packet queue is empty, aborting')
self._reset()
Expand All @@ -471,7 +473,7 @@ async def _write_loop(self):
while True:
try:
packets.append(self.queue.get_nowait())
except self.queue_empty:
except self.queue.Empty:
break
if packets[-1] is None:
packets = packets[:-1]
Expand Down
39 changes: 20 additions & 19 deletions engineio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,8 @@ def __init__(self, logger=False, json=None):
self.read_loop_task = None
self.write_loop_task = None
self.ping_loop_task = None
self.ping_loop_event = self._create_event()
self.ping_loop_event = self.create_event()
self.queue = None
self.queue_empty = None
self.state = 'disconnected'

if json is not None:
Expand Down Expand Up @@ -162,7 +161,7 @@ def connect(self, url, headers={}, transports=None,
if not transports:
raise ValueError('No valid transports provided')
self.transports = transports or valid_transports
self.queue, self.queue_empty = self._create_queue()
self.queue = self.create_queue()
return getattr(self, '_connect_' + self.transports[0])(
url, headers, engineio_path)

Expand Down Expand Up @@ -199,7 +198,7 @@ def disconnect(self, abort=False):
self._send_packet(packet.Packet(packet.CLOSE))
self.queue.put(None)
self.state = 'disconnecting'
self._trigger_event('disconnect')
self._trigger_event('disconnect', run_async=False)
if self.current_transport == 'websocket':
self.ws.close()
if not abort:
Expand Down Expand Up @@ -241,6 +240,16 @@ def sleep(self, seconds=0):
"""Sleep for the requested amount of time."""
return time.sleep(seconds)

def create_queue(self, *args, **kwargs):
"""Create a queue object."""
q = queue.Queue(*args, **kwargs)
q.Empty = queue.Empty
return q

def create_event(self, *args, **kwargs):
"""Create an event object."""
return threading.Event(*args, **kwargs)

def _reset(self):
self.state = 'disconnected'

Expand Down Expand Up @@ -282,7 +291,7 @@ def _connect_polling(self, url, headers, engineio_path):

self.state = 'connected'
connected_clients.append(self)
self._trigger_event('connect')
self._trigger_event('connect', run_async=False)

for pkt in p.packets[1:]:
self._receive_packet(pkt)
Expand Down Expand Up @@ -351,7 +360,7 @@ def _connect_websocket(self, url, headers, engineio_path):

self.state = 'connected'
connected_clients.append(self)
self._trigger_event('connect')
self._trigger_event('connect', run_async=False)
self.ws = ws

# start background tasks associated with this client
Expand All @@ -369,7 +378,7 @@ def _receive_packet(self, pkt):
'Received packet %s data %s', packet_name,
pkt.data if not isinstance(pkt.data, bytes) else '<binary>')
if pkt.packet_type == packet.MESSAGE:
self._trigger_event('message', pkt.data)
self._trigger_event('message', pkt.data, run_async=True)
elif pkt.packet_type == packet.PONG:
self.pong_received = True
elif pkt.packet_type == packet.NOOP:
Expand Down Expand Up @@ -397,14 +406,6 @@ def _send_request(
except urllib3.exceptions.MaxRetryError:
pass

def _create_queue(self):
"""Create the client's send queue."""
return queue.Queue(), queue.Empty

def _create_event(self):
"""Create an event."""
return threading.Event()

def _trigger_event(self, event, *args, **kwargs):
"""Invoke an event handler."""
run_async = kwargs.pop('run_async', False)
Expand Down Expand Up @@ -495,7 +496,7 @@ def _read_loop_polling(self):
self.ping_loop_event.set()
self.ping_loop_task.join()
if self.state == 'connected':
self._trigger_event('disconnect')
self._trigger_event('disconnect', run_async=False)
try:
connected_clients.remove(self)
except ValueError: # pragma: no cover
Expand Down Expand Up @@ -530,7 +531,7 @@ def _read_loop_websocket(self):
self.ping_loop_event.set()
self.ping_loop_task.join()
if self.state == 'connected':
self._trigger_event('disconnect')
self._trigger_event('disconnect', run_async=False)
try:
connected_clients.remove(self)
except ValueError: # pragma: no cover
Expand All @@ -547,7 +548,7 @@ def _write_loop(self):
timeout = max(self.ping_interval, self.ping_timeout)
try:
packets = [self.queue.get(timeout=timeout)]
except self.queue_empty:
except self.queue.Empty:
self.logger.error('packet queue is empty, aborting')
self._reset()
break
Expand All @@ -558,7 +559,7 @@ def _write_loop(self):
while True:
try:
packets.append(self.queue.get(block=False))
except self.queue_empty:
except self.queue.Empty:
break
if packets[-1] is None:
packets = packets[:-1]
Expand Down
Loading

0 comments on commit d9c278f

Please sign in to comment.