Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(dht): Do not store ConnectionType #2049

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
6 changes: 2 additions & 4 deletions packages/autocertifier-server/src/StreamrChallenger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ import {
ManagedConnection,
RoutingRpcCommunicator,
createRandomDhtAddress,
getRawFromDhtAddress,
ConnectionType
getRawFromDhtAddress
} from '@streamr/dht'
import { toProtoRpcClient } from '@streamr/proto-rpc'
import { Logger } from '@streamr/utils'
Expand Down Expand Up @@ -42,8 +41,7 @@ export const runStreamrChallenge = (
const address = 'wss://' + remotePeerDescriptor.websocket!.host + ':' +
remotePeerDescriptor.websocket!.port

const managedConnection = new ManagedConnection(LOCAL_PEER_DESCRIPTOR,
ConnectionType.WEBSOCKET_CLIENT, socket, undefined)
const managedConnection = new ManagedConnection(LOCAL_PEER_DESCRIPTOR, socket, undefined)
managedConnection.setRemotePeerDescriptor(remotePeerDescriptor!)

const onDisconnected = () => {
Expand Down
7 changes: 3 additions & 4 deletions packages/dht/src/connection/Connection.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import EventEmitter from 'eventemitter3'
import { PeerDescriptor } from '../proto/packages/dht/protos/DhtRpc'
import { ConnectionID, ConnectionType, ConnectionEvents } from './IConnection'
import { ConnectionID, ConnectionEvents } from './IConnection'
import { v4 as uuid } from 'uuid'

// TODO merge with SimulatorConnection?
export class Connection extends EventEmitter<ConnectionEvents> {
public connectionId: ConnectionID
public connectionType: ConnectionType
private peerDescriptor?: PeerDescriptor

constructor(connectionType: ConnectionType) {
constructor() {
super()
this.connectionId = createRandomConnectionId()
this.connectionType = connectionType
}

setPeerDescriptor(peerDescriptor: PeerDescriptor): void {
Expand Down
2 changes: 1 addition & 1 deletion packages/dht/src/connection/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ export class ConnectionManager extends EventEmitter<TransportEvents> implements
private onConnected(connection: ManagedConnection) {
const peerDescriptor = connection.getPeerDescriptor()!
this.emit('connected', peerDescriptor)
logger.trace(getNodeIdFromPeerDescriptor(peerDescriptor) + ' onConnected() ' + connection.connectionType)
logger.trace(getNodeIdFromPeerDescriptor(peerDescriptor) + ' onConnected()')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would keep the connectionType in the logging statements when it is relevant.

this.onConnectionCountChange()
}

Expand Down
4 changes: 1 addition & 3 deletions packages/dht/src/connection/IConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ export interface ConnectionEvents {
export enum ConnectionType {
WEBSOCKET_SERVER = 'websocket-server',
WEBSOCKET_CLIENT = 'websocket-client',
WEBRTC = 'webrtc',
SIMULATOR_SERVER = 'simulator-server',
SIMULATOR_CLIENT = 'simulator-client',
WEBRTC = 'webrtc'
}

export type ConnectionID = BrandedString<'ConnectionID'>
Expand Down
8 changes: 2 additions & 6 deletions packages/dht/src/connection/ManagedConnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConnectionEvents, ConnectionID, ConnectionType, IConnection } from './IConnection'
import { ConnectionEvents, ConnectionID, IConnection } from './IConnection'
import * as Err from '../helpers/errors'
import { Handshaker } from './Handshaker'
import { HandshakeError, PeerDescriptor } from '../proto/packages/dht/protos/DhtRpc'
Expand Down Expand Up @@ -32,7 +32,6 @@ export class ManagedConnection extends EventEmitter<Events> {
private inputBuffer: Uint8Array[] = []
public connectionId: ConnectionID
private remotePeerDescriptor?: PeerDescriptor
public connectionType: ConnectionType
private handshaker?: Handshaker
private handshakeCompleted = false
private lastUsed: number = Date.now()
Expand All @@ -48,7 +47,6 @@ export class ManagedConnection extends EventEmitter<Events> {

constructor(
localPeerDescriptor: PeerDescriptor,
connectionType: ConnectionType,
outgoingConnection?: IConnection,
incomingConnection?: IConnection,
targetPeerDescriptor?: PeerDescriptor
Expand All @@ -58,13 +56,12 @@ export class ManagedConnection extends EventEmitter<Events> {
this.localPeerDescriptor = localPeerDescriptor
this.outgoingConnection = outgoingConnection
this.incomingConnection = incomingConnection
this.connectionType = connectionType
this.connectionId = createRandomConnectionId()

this.send = this.send.bind(this)
this.onDisconnected = this.onDisconnected.bind(this)

logger.trace('creating ManagedConnection of type: ' + connectionType)
logger.trace('creating ManagedConnection')
if (incomingConnection && outgoingConnection) {
throw new Err.IllegalArguments('Managed connection constructor only accepts either an incoming connection OR a outgoing connection')
}
Expand Down Expand Up @@ -247,7 +244,6 @@ export class ManagedConnection extends EventEmitter<Events> {
} catch (e) {
logger.debug(`Connection to ${getNodeIdOrUnknownFromPeerDescriptor(this.remotePeerDescriptor)} timed out`, {
peerDescriptor: this.remotePeerDescriptor,
type: this.connectionType,
lifetime: Date.now() - this.created
})
await this.close(false)
Expand Down
14 changes: 0 additions & 14 deletions packages/dht/src/connection/ManagedWebrtcConnection.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,8 @@
import { PeerDescriptor } from '../proto/packages/dht/protos/DhtRpc'
import { ConnectionType } from './IConnection'
import { ManagedConnection } from './ManagedConnection'
import { NodeWebrtcConnection } from './webrtc/NodeWebrtcConnection'

export class ManagedWebrtcConnection extends ManagedConnection {

constructor(localPeerDescriptor: PeerDescriptor,
connectingConnection?: NodeWebrtcConnection,
connectedConnection?: NodeWebrtcConnection
) {
super(
localPeerDescriptor,
ConnectionType.WEBRTC,
connectingConnection,
connectedConnection
)
}

public getWebrtcConnection(): NodeWebrtcConnection {
if (this.outgoingConnection) {
return this.outgoingConnection as unknown as NodeWebrtcConnection
Expand Down
6 changes: 2 additions & 4 deletions packages/dht/src/connection/simulator/SimulatorConnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConnectionType, IConnection } from '../IConnection'
import { IConnection } from '../IConnection'
import { Simulator } from './Simulator'
import { Message, PeerDescriptor } from '../../proto/packages/dht/protos/DhtRpc'
import { Connection } from '../Connection'
Expand All @@ -18,15 +18,13 @@ export class SimulatorConnection extends Connection implements IConnection {
constructor(
localPeerDescriptor: PeerDescriptor,
targetPeerDescriptor: PeerDescriptor,
connectionType: ConnectionType,
simulator: Simulator
) {
super(connectionType)
super()

this.localPeerDescriptor = localPeerDescriptor
this.setPeerDescriptor(targetPeerDescriptor)
this.targetPeerDescriptor = targetPeerDescriptor
this.connectionType = connectionType
this.simulator = simulator

this.send = this.send.bind(this)
Expand Down
10 changes: 4 additions & 6 deletions packages/dht/src/connection/simulator/SimulatorConnector.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { ConnectionType } from '../IConnection'

import {
HandshakeError,
PeerDescriptor,
Expand Down Expand Up @@ -38,9 +36,9 @@ export class SimulatorConnector {
return existingConnection
}

const connection = new SimulatorConnection(this.localPeerDescriptor, targetPeerDescriptor, ConnectionType.SIMULATOR_CLIENT, this.simulator)
const connection = new SimulatorConnection(this.localPeerDescriptor, targetPeerDescriptor, this.simulator)

const managedConnection = new ManagedConnection(this.localPeerDescriptor, ConnectionType.SIMULATOR_CLIENT, connection, undefined)
const managedConnection = new ManagedConnection(this.localPeerDescriptor, connection, undefined)
managedConnection.setRemotePeerDescriptor(targetPeerDescriptor)

this.connectingConnections.set(nodeId, managedConnection)
Expand All @@ -67,9 +65,9 @@ export class SimulatorConnector {
return
}
const connection = new SimulatorConnection(this.localPeerDescriptor,
sourceConnection.localPeerDescriptor, ConnectionType.SIMULATOR_SERVER, this.simulator)
sourceConnection.localPeerDescriptor, this.simulator)

const managedConnection = new ManagedConnection(this.localPeerDescriptor, ConnectionType.SIMULATOR_SERVER, undefined, connection)
const managedConnection = new ManagedConnection(this.localPeerDescriptor, undefined, connection)

logger.trace('connected')

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import EventEmitter from 'eventemitter3'
import { WebrtcConnectionEvents, IWebrtcConnection, RtcDescription } from './IWebrtcConnection'
import { IConnection, ConnectionID, ConnectionEvents, ConnectionType } from '../IConnection'
import { IConnection, ConnectionID, ConnectionEvents } from '../IConnection'
import { Logger } from '@streamr/utils'
import { IceServer } from './WebrtcConnector'
import { createRandomConnectionId } from '../Connection'
Expand All @@ -22,7 +22,6 @@ interface Params {
export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrtcConnection, IConnection {

public connectionId: ConnectionID
public readonly connectionType: ConnectionType = ConnectionType.WEBRTC
// We need to keep track of connection state ourselves because
// RTCPeerConnection.connectionState is not supported on Firefox
private lastState: RTCPeerConnectionState = 'connecting'
Expand Down
3 changes: 1 addition & 2 deletions packages/dht/src/connection/webrtc/NodeWebrtcConnection.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { IWebrtcConnection, WebrtcConnectionEvents } from './IWebrtcConnection'
import { ConnectionType, IConnection, ConnectionID, ConnectionEvents } from '../IConnection'
import { IConnection, ConnectionID, ConnectionEvents } from '../IConnection'
import { PeerDescriptor } from '../../proto/packages/dht/protos/DhtRpc'
import EventEmitter from 'eventemitter3'
import nodeDatachannel, { DataChannel, DescriptionType, PeerConnection } from 'node-datachannel'
Expand Down Expand Up @@ -56,7 +56,6 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IConne
private lastState: RtcPeerConnectionState = 'connecting'
private remoteDescriptionSet = false
private connectingTimeoutRef?: NodeJS.Timeout
public readonly connectionType: ConnectionType = ConnectionType.WEBRTC
private readonly iceServers: IceServer[]
private readonly _bufferThresholdHigh: number // TODO: buffer handling must be implemented before production use (NET-938)
private readonly bufferThresholdLow: number
Expand Down
3 changes: 1 addition & 2 deletions packages/dht/src/connection/websocket/ClientWebsocket.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Logger } from '@streamr/utils'
import EventEmitter from 'eventemitter3'
import { ICloseEvent, IMessageEvent, w3cwebsocket as Websocket } from 'websocket'
import { ConnectionEvents, ConnectionID, ConnectionType, IConnection } from '../IConnection'
import { ConnectionEvents, ConnectionID, IConnection } from '../IConnection'
import { createRandomConnectionId } from '../Connection'

const logger = new Logger(module)
Expand All @@ -19,7 +19,6 @@ export class ClientWebsocket extends EventEmitter<ConnectionEvents> implements I

public readonly connectionId: ConnectionID
private socket?: Websocket
public connectionType = ConnectionType.WEBSOCKET_CLIENT

private destroyed = false

Expand Down
3 changes: 1 addition & 2 deletions packages/dht/src/connection/websocket/ServerWebsocket.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import EventEmitter from 'eventemitter3'
import { IConnection, ConnectionID, ConnectionEvents, ConnectionType } from '../IConnection'
import { IConnection, ConnectionID, ConnectionEvents } from '../IConnection'
import { Message, connection as WsConnection } from 'websocket'
import { Logger } from '@streamr/utils'
import { Url } from 'url'
Expand All @@ -22,7 +22,6 @@ enum MessageType {
export class ServerWebsocket extends EventEmitter<ConnectionEvents> implements IConnection {

public readonly connectionId: ConnectionID
public readonly connectionType = ConnectionType.WEBSOCKET_SERVER
public readonly resourceURL: Url
private socket?: WsConnection
private stopped = false
Expand Down
6 changes: 2 additions & 4 deletions packages/dht/src/connection/websocket/WebsocketConnector.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ClientWebsocket } from './ClientWebsocket'
import { IConnection, ConnectionType } from '../IConnection'
import { ConnectionType, IConnection } from '../IConnection'
import { ITransport } from '../../transport/ITransport'
import { ListeningRpcCommunicator } from '../../transport/ListeningRpcCommunicator'
import { WebsocketConnectorRpcLocal } from './WebsocketConnectorRpcLocal'
Expand Down Expand Up @@ -232,7 +232,7 @@ export class WebsocketConnector {

const managedConnection = new ManagedConnection(
this.localPeerDescriptor!,
ConnectionType.WEBSOCKET_CLIENT,

socket,
undefined,
targetPeerDescriptor
Expand Down Expand Up @@ -276,7 +276,6 @@ export class WebsocketConnector {
})
const managedConnection = new ManagedConnection(
this.localPeerDescriptor!,
ConnectionType.WEBSOCKET_SERVER,
undefined,
undefined,
targetPeerDescriptor
Expand Down Expand Up @@ -307,7 +306,6 @@ export class WebsocketConnector {
} else {
const managedConnection = new ManagedConnection(
this.localPeerDescriptor!,
ConnectionType.WEBSOCKET_SERVER,
undefined,
serverWebsocket,
targetPeerDescriptor
Expand Down
24 changes: 6 additions & 18 deletions packages/dht/test/end-to-end/Layer0Webrtc.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import { areEqualBinaries, waitForEvent3 } from '@streamr/utils'
import { ConnectionManager } from '../../src/connection/ConnectionManager'
import { ConnectionType } from '../../src/connection/IConnection'
import { DhtNode } from '../../src/dht/DhtNode'
import { PeerDescriptor } from '../../src/proto/packages/dht/protos/DhtRpc'
import { createMockPeerDescriptor } from '../utils/utils'
import { getNodeIdFromPeerDescriptor } from '../../src/exports'

describe('Layer0 with WebRTC connections', () => {

Expand Down Expand Up @@ -67,14 +65,9 @@ describe('Layer0 with WebRTC connections', () => {
node2.joinDht([epPeerDescriptor]),
node1.joinDht([epPeerDescriptor])
])
const nodeId1 = getNodeIdFromPeerDescriptor(node1.getLocalPeerDescriptor())
const nodeId2 = getNodeIdFromPeerDescriptor(node2.getLocalPeerDescriptor())
expect((node1.getTransport() as ConnectionManager).hasConnection(nodeId2)).toEqual(true)
expect((node2.getTransport() as ConnectionManager).hasConnection(nodeId1)).toEqual(true)
expect((node1.getTransport() as ConnectionManager).getConnection(nodeId2)!.connectionType)
.toEqual(ConnectionType.WEBRTC)
expect((node2.getTransport() as ConnectionManager).getConnection(nodeId1)!.connectionType)
.toEqual(ConnectionType.WEBRTC)

expect((node1.getTransport() as ConnectionManager).hasConnection(node2.getNodeId())).toEqual(true)
expect((node2.getTransport() as ConnectionManager).hasConnection(node1.getNodeId())).toEqual(true)

}, 60000)

Expand All @@ -85,13 +78,8 @@ describe('Layer0 with WebRTC connections', () => {
node3.joinDht([epPeerDescriptor]),
node4.joinDht([epPeerDescriptor])
])
const nodeId1 = getNodeIdFromPeerDescriptor(node1.getLocalPeerDescriptor())
const nodeId2 = getNodeIdFromPeerDescriptor(node2.getLocalPeerDescriptor())
expect((node1.getTransport() as ConnectionManager).hasConnection(nodeId2)).toEqual(true)
expect((node2.getTransport() as ConnectionManager).hasConnection(nodeId1)).toEqual(true)
expect((node1.getTransport() as ConnectionManager).getConnection(nodeId2)!.connectionType)
.toEqual(ConnectionType.WEBRTC)
expect((node2.getTransport() as ConnectionManager).getConnection(nodeId1)!.connectionType)
.toEqual(ConnectionType.WEBRTC)

expect((node1.getTransport() as ConnectionManager).hasConnection(node2.getNodeId())).toEqual(true)
expect((node2.getTransport() as ConnectionManager).hasConnection(node1.getNodeId())).toEqual(true)
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { ConnectionManager } from '../../src/connection/ConnectionManager'
import { LatencyType, Simulator } from '../../src/connection/simulator/Simulator'
import { Message, MessageType, NodeType, PeerDescriptor } from '../../src/proto/packages/dht/protos/DhtRpc'
import { RpcMessage } from '../../src/proto/packages/proto-rpc/protos/ProtoRpc'
import { ConnectionType } from '../../src/connection/IConnection'
import { ITransport } from '../../src/transport/ITransport'
import * as Err from '../../src/helpers/errors'
import { SimulatorTransport } from '../../src/connection/simulator/SimulatorTransport'
Expand Down Expand Up @@ -69,9 +68,6 @@ describe('WebRTC Connection Management', () => {

manager2.on('message', (message: Message) => {
expect(message.messageId).toEqual('mockerer')
expect(manager1.getConnection(getNodeIdFromPeerDescriptor(peerDescriptor2))!.connectionType).toEqual(ConnectionType.WEBRTC)
expect(manager2.getConnection(getNodeIdFromPeerDescriptor(peerDescriptor1))!.connectionType).toEqual(ConnectionType.WEBRTC)

done()
})
dummyMessage.targetDescriptor = peerDescriptor2
Expand All @@ -92,9 +88,6 @@ describe('WebRTC Connection Management', () => {
}
manager1.on('message', (message: Message) => {
expect(message.messageId).toEqual('mockerer')
expect(manager1.getConnection(getNodeIdFromPeerDescriptor(peerDescriptor2))!.connectionType).toEqual(ConnectionType.WEBRTC)
expect(manager2.getConnection(getNodeIdFromPeerDescriptor(peerDescriptor1))!.connectionType).toEqual(ConnectionType.WEBRTC)

done()
})
dummyMessage.targetDescriptor = peerDescriptor1
Expand Down
Loading