Skip to content

Commit

Permalink
v4 protocol: ping/ping reversal in the client
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelgrinberg committed Dec 1, 2020
1 parent 38e20a3 commit 76a0615
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 125 deletions.
79 changes: 21 additions & 58 deletions engineio/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,16 @@ async def wait(self):
if self.read_loop_task:
await self.read_loop_task

async def send(self, data, binary=None):
async def send(self, data):
"""Send a message to a client.
:param data: The data to send to the client. Data can be of type
``str``, ``bytes``, ``list`` or ``dict``. If a ``list``
or ``dict``, the data will be serialized as JSON.
:param binary: ``True`` to send packet as binary, ``False`` to send
as text. If not given, unicode (Python 2) and str
(Python 3) are sent as text, and str (Python 2) and
bytes (Python 3) are sent as binary.
Note: this method is a coroutine.
"""
await self._send_packet(packet.Packet(packet.MESSAGE, data=data,
binary=binary))
await self._send_packet(packet.Packet(packet.MESSAGE, data=data))

async def disconnect(self, abort=False):
"""Disconnect from the server.
Expand Down Expand Up @@ -227,7 +222,8 @@ async def _connect_polling(self, url, headers, engineio_path):
'Unexpected status code {} in server response'.format(
r.status), arg)
try:
p = payload.Payload(encoded_payload=await r.read())
p = payload.Payload(encoded_payload=(await r.read()).decode(
'utf-8'))
except ValueError:
six.raise_from(exceptions.ConnectionError(
'Unexpected response from server'), None)
Expand Down Expand Up @@ -257,7 +253,6 @@ async def _connect_polling(self, url, headers, engineio_path):
# upgrade to websocket succeeded, we're done here
return

self.ping_loop_task = self.start_background_task(self._ping_loop)
self.write_loop_task = self.start_background_task(self._write_loop)
self.read_loop_task = self.start_background_task(
self._read_loop_polling)
Expand Down Expand Up @@ -316,8 +311,7 @@ async def _connect_websocket(self, url, headers, engineio_path):
else:
raise exceptions.ConnectionError('Connection error')
if upgrade:
p = packet.Packet(packet.PING, data='probe').encode(
always_bytes=False)
p = packet.Packet(packet.PING, data='probe').encode()
try:
await ws.send_str(p)
except Exception as e: # pragma: no cover
Expand All @@ -337,7 +331,7 @@ async def _connect_websocket(self, url, headers, engineio_path):
self.logger.warning(
'WebSocket upgrade failed: no PONG packet')
return False
p = packet.Packet(packet.UPGRADE).encode(always_bytes=False)
p = packet.Packet(packet.UPGRADE).encode()
try:
await ws.send_str(p)
except Exception as e: # pragma: no cover
Expand Down Expand Up @@ -369,7 +363,6 @@ async def _connect_websocket(self, url, headers, engineio_path):
await self._trigger_event('connect', run_async=False)

self.ws = ws
self.ping_loop_task = self.start_background_task(self._ping_loop)
self.write_loop_task = self.start_background_task(self._write_loop)
self.read_loop_task = self.start_background_task(
self._read_loop_websocket)
Expand All @@ -384,8 +377,8 @@ async def _receive_packet(self, pkt):
pkt.data if not isinstance(pkt.data, bytes) else '<binary>')
if pkt.packet_type == packet.MESSAGE:
await self._trigger_event('message', pkt.data, run_async=True)
elif pkt.packet_type == packet.PONG:
self.pong_received = True
elif pkt.packet_type == packet.PING:
await self._send_packet(packet.Packet(packet.PONG, pkt.data))
elif pkt.packet_type == packet.CLOSE:
await self.disconnect(abort=True)
elif pkt.packet_type == packet.NOOP:
Expand Down Expand Up @@ -462,33 +455,6 @@ async def async_handler():
return False
return ret

async def _ping_loop(self):
"""This background task sends a PING to the server at the requested
interval.
"""
self.pong_received = True
if self.ping_loop_event is None:
self.ping_loop_event = self.create_event()
else:
self.ping_loop_event.clear()
while self.state == 'connected':
if not self.pong_received:
self.logger.info(
'PONG response has not been received, aborting')
if self.ws:
await self.ws.close()
await self.queue.put(None)
break
self.pong_received = False
await self._send_packet(packet.Packet(packet.PING))
try:
await asyncio.wait_for(self.ping_loop_event.wait(),
self.ping_interval)
except (asyncio.TimeoutError,
asyncio.CancelledError): # pragma: no cover
pass
self.logger.info('Exiting ping task')

async def _read_loop_polling(self):
"""Read packets by polling the Engine.IO server."""
while self.state == 'connected':
Expand All @@ -508,7 +474,8 @@ async def _read_loop_polling(self):
await self.queue.put(None)
break
try:
p = payload.Payload(encoded_payload=await r.read())
p = payload.Payload(encoded_payload=(await r.read()).decode(
'utf-8'))
except ValueError:
self.logger.warning(
'Unexpected packet from server, aborting')
Expand All @@ -519,10 +486,6 @@ async def _read_loop_polling(self):

self.logger.info('Waiting for write loop task to end')
await self.write_loop_task
self.logger.info('Waiting for ping loop task to end')
if self.ping_loop_event: # pragma: no cover
self.ping_loop_event.set()
await self.ping_loop_task
if self.state == 'connected':
await self._trigger_event('disconnect', run_async=False)
try:
Expand All @@ -537,9 +500,17 @@ async def _read_loop_websocket(self):
while self.state == 'connected':
p = None
try:
p = (await self.ws.receive()).data
p = await asyncio.wait_for(
self.ws.receive(),
timeout=self.ping_interval + self.ping_timeout)
p = p.data
if p is None: # pragma: no cover
raise RuntimeError('WebSocket read returned None')
except asyncio.TimeoutError:
self.logger.warning(
'Server has stopped communicating, aborting')
await self.queue.put(None)
break
except aiohttp.client_exceptions.ServerDisconnectedError:
self.logger.info(
'Read loop: WebSocket connection was closed, aborting')
Expand All @@ -551,8 +522,6 @@ async def _read_loop_websocket(self):
str(e))
await self.queue.put(None)
break
if isinstance(p, six.text_type): # pragma: no cover
p = p.encode('utf-8')
try:
pkt = packet.Packet(encoded_packet=p)
except Exception as e: # pragma: no cover
Expand All @@ -564,10 +533,6 @@ async def _read_loop_websocket(self):

self.logger.info('Waiting for write loop task to end')
await self.write_loop_task
self.logger.info('Waiting for ping loop task to end')
if self.ping_loop_event: # pragma: no cover
self.ping_loop_event.set()
await self.ping_loop_task
if self.state == 'connected':
await self._trigger_event('disconnect', run_async=False)
try:
Expand Down Expand Up @@ -631,11 +596,9 @@ async def _write_loop(self):
try:
for pkt in packets:
if pkt.binary:
await self.ws.send_bytes(pkt.encode(
always_bytes=False))
await self.ws.send_bytes(pkt.encode())
else:
await self.ws.send_str(pkt.encode(
always_bytes=False))
await self.ws.send_str(pkt.encode())
self.queue.task_done()
except (aiohttp.client_exceptions.ServerDisconnectedError,
BrokenPipeError, OSError):
Expand Down
12 changes: 6 additions & 6 deletions engineio/asyncio_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class AsyncServer(server.Server):
"tornado", and finally "asgi". The first async mode that
has all its dependencies installed is the one that is
chosen.
:param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting.
:param ping_interval: The interval in seconds at which the client pings
the server. The default is 25 seconds. For advanced
:param ping_interval: The interval in seconds at which the server pings
the client. The default is 25 seconds. For advanced
control, a two element tuple can be given, where
the first number is the ping interval and the second
is a grace period added by the server. The default
grace period is 5 seconds.
is a grace period added by the server.
:param ping_timeout: The time in seconds that the client waits for the
server to respond before disconnecting. The default
is 5 seconds.
:param max_http_buffer_size: The maximum size of a message when using the
polling transport. The default is 1,000,000
bytes.
Expand Down
62 changes: 15 additions & 47 deletions engineio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,10 @@ def __init__(self,
self.upgrades = None
self.ping_interval = None
self.ping_timeout = None
self.pong_received = True
self.http = http_session
self.ws = None
self.read_loop_task = None
self.write_loop_task = None
self.ping_loop_task = None
self.ping_loop_event = None
self.queue = None
self.state = 'disconnected'
self.ssl_verify = ssl_verify
Expand Down Expand Up @@ -203,7 +200,7 @@ def wait(self):
if self.read_loop_task:
self.read_loop_task.join()

def send(self, data, binary=None):
def send(self, data):
"""Send a message to a client.
:param data: The data to send to the client. Data can be of type
Expand All @@ -214,8 +211,7 @@ def send(self, data, binary=None):
(Python 3) are sent as text, and str (Python 2) and
bytes (Python 3) are sent as binary.
"""
self._send_packet(packet.Packet(packet.MESSAGE, data=data,
binary=binary))
self._send_packet(packet.Packet(packet.MESSAGE, data=data))

def disconnect(self, abort=False):
"""Disconnect from the server.
Expand Down Expand Up @@ -309,7 +305,7 @@ def _connect_polling(self, url, headers, engineio_path):
'Unexpected status code {} in server response'.format(
r.status_code), arg)
try:
p = payload.Payload(encoded_payload=r.content)
p = payload.Payload(encoded_payload=r.content.decode('utf-8'))
except ValueError:
six.raise_from(exceptions.ConnectionError(
'Unexpected response from server'), None)
Expand Down Expand Up @@ -340,7 +336,6 @@ def _connect_polling(self, url, headers, engineio_path):
return

# start background tasks associated with this client
self.ping_loop_task = self.start_background_task(self._ping_loop)
self.write_loop_task = self.start_background_task(self._write_loop)
self.read_loop_task = self.start_background_task(
self._read_loop_polling)
Expand Down Expand Up @@ -411,7 +406,6 @@ def _connect_websocket(self, url, headers, engineio_path):
parsed_url = urllib.parse.urlparse(
proxy_url if '://' in proxy_url
else 'scheme://' + proxy_url)
print(parsed_url)
extra_options['http_proxy_host'] = parsed_url.hostname
extra_options['http_proxy_port'] = parsed_url.port
extra_options['http_proxy_auth'] = (
Expand All @@ -428,7 +422,9 @@ def _connect_websocket(self, url, headers, engineio_path):
try:
ws = websocket.create_connection(
websocket_url + self._get_url_timestamp(), header=headers,
cookie=cookies, enable_multithread=True, **extra_options)
cookie=cookies, enable_multithread=True,
timeout=self.ping_interval + self.ping_timeout,
**extra_options)
except (ConnectionError, IOError, websocket.WebSocketException):
if upgrade:
self.logger.warning(
Expand Down Expand Up @@ -491,7 +487,6 @@ def _connect_websocket(self, url, headers, engineio_path):
self.ws = ws

# start background tasks associated with this client
self.ping_loop_task = self.start_background_task(self._ping_loop)
self.write_loop_task = self.start_background_task(self._write_loop)
self.read_loop_task = self.start_background_task(
self._read_loop_websocket)
Expand All @@ -506,8 +501,8 @@ def _receive_packet(self, pkt):
pkt.data if not isinstance(pkt.data, bytes) else '<binary>')
if pkt.packet_type == packet.MESSAGE:
self._trigger_event('message', pkt.data, run_async=True)
elif pkt.packet_type == packet.PONG:
self.pong_received = True
elif pkt.packet_type == packet.PING:
self._send_packet(packet.Packet(packet.PONG, pkt.data))
elif pkt.packet_type == packet.CLOSE:
self.disconnect(abort=True)
elif pkt.packet_type == packet.NOOP:
Expand Down Expand Up @@ -575,28 +570,6 @@ def _get_url_timestamp(self):
"""Generate the Engine.IO query string timestamp."""
return '&t=' + str(time.time())

def _ping_loop(self):
"""This background task sends a PING to the server at the requested
interval.
"""
self.pong_received = True
if self.ping_loop_event is None:
self.ping_loop_event = self.create_event()
else:
self.ping_loop_event.clear()
while self.state == 'connected':
if not self.pong_received:
self.logger.info(
'PONG response has not been received, aborting')
if self.ws:
self.ws.close(timeout=0)
self.queue.put(None)
break
self.pong_received = False
self._send_packet(packet.Packet(packet.PING))
self.ping_loop_event.wait(timeout=self.ping_interval)
self.logger.info('Exiting ping task')

def _read_loop_polling(self):
"""Read packets by polling the Engine.IO server."""
while self.state == 'connected':
Expand All @@ -616,7 +589,7 @@ def _read_loop_polling(self):
self.queue.put(None)
break
try:
p = payload.Payload(encoded_payload=r.content)
p = payload.Payload(encoded_payload=r.content.decode('utf-8'))
except ValueError:
self.logger.warning(
'Unexpected packet from server, aborting')
Expand All @@ -627,10 +600,6 @@ def _read_loop_polling(self):

self.logger.info('Waiting for write loop task to end')
self.write_loop_task.join()
self.logger.info('Waiting for ping loop task to end')
if self.ping_loop_event: # pragma: no cover
self.ping_loop_event.set()
self.ping_loop_task.join()
if self.state == 'connected':
self._trigger_event('disconnect', run_async=False)
try:
Expand All @@ -646,6 +615,11 @@ def _read_loop_websocket(self):
p = None
try:
p = self.ws.recv()
except websocket.WebSocketTimeoutException:
self.logger.warning(
'Server has stopped communicating, aborting')
self.queue.put(None)
break
except websocket.WebSocketConnectionClosedException:
self.logger.warning(
'WebSocket connection was closed, aborting')
Expand All @@ -657,8 +631,6 @@ def _read_loop_websocket(self):
str(e))
self.queue.put(None)
break
if isinstance(p, six.text_type): # pragma: no cover
p = p.encode('utf-8')
try:
pkt = packet.Packet(encoded_packet=p)
except Exception as e: # pragma: no cover
Expand All @@ -670,10 +642,6 @@ def _read_loop_websocket(self):

self.logger.info('Waiting for write loop task to end')
self.write_loop_task.join()
self.logger.info('Waiting for ping loop task to end')
if self.ping_loop_event: # pragma: no cover
self.ping_loop_event.set()
self.ping_loop_task.join()
if self.state == 'connected':
self._trigger_event('disconnect', run_async=False)
try:
Expand Down Expand Up @@ -735,7 +703,7 @@ def _write_loop(self):
# websocket
try:
for pkt in packets:
encoded_packet = pkt.encode(always_bytes=False)
encoded_packet = pkt.encode()
if pkt.binary:
self.ws.send_binary(encoded_packet)
else:
Expand Down
Loading

0 comments on commit 76a0615

Please sign in to comment.