-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge branch 'streamr-1.0' into rm-connection-type
- Loading branch information
Showing
181 changed files
with
7,925 additions
and
3,762 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
node_modules/** | ||
dist/** | ||
.idea/** | ||
src/proto/** |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
{ | ||
"extends": "../../.eslintrc.js" | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
.DS_Store | ||
node_modules | ||
dist |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
{ | ||
"name": "@streamr/autocertifier-client", | ||
"version": "100.0.0-pretestnet.3", | ||
"description": "Autocertifier Client for Streamr Network", | ||
"repository": { | ||
"type": "git", | ||
"url": "git+https://github.com/streamr-dev/network.git", | ||
"directory": "packages/autocertifier-client" | ||
}, | ||
"main": "dist/src/exports.js", | ||
"types": "dist/src/exports.d.ts", | ||
"license": "STREAMR NETWORK OPEN SOURCE LICENSE", | ||
"author": "Streamr Network AG <[email protected]>", | ||
"scripts": { | ||
"generate-protoc-code": "./proto.sh", | ||
"build": "tsc -b tsconfig.node.json", | ||
"check": "tsc -p ./tsconfig.jest.json --noEmit", | ||
"clean": "jest --clearCache || true; rm -rf dist *.tsbuildinfo node_modules/.cache || true", | ||
"eslint": "eslint --cache --cache-location=node_modules/.cache/.eslintcache/ '*/**/*.{js,ts}'", | ||
"prepublishOnly": "npm run clean && NODE_ENV=production tsc -b tsconfig.node.json" | ||
}, | ||
"dependencies": { | ||
"@protobuf-ts/runtime-rpc": "^2.8.2", | ||
"@streamr/utils": "100.0.0-pretestnet.3", | ||
"eventemitter3": "^5.0.0", | ||
"node-forge": "^1.3.1", | ||
"request": "^2.88.2" | ||
}, | ||
"devDependencies": { | ||
"@types/node-forge": "^1.3.5", | ||
"@types/request": "^2.48.8", | ||
"ts-node": "^10.9.1" | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
mkdir -p ./src/proto | ||
npx protoc --ts_out ./src/proto --ts_opt server_generic,generate_dependencies,long_type_number --proto_path ../.. packages/autocertifier-client/protos/AutoCertifier.proto |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
syntax = "proto3"; | ||
option optimize_for = CODE_SIZE; | ||
|
||
message HasSessionRequest { | ||
string sessionId = 1; | ||
} | ||
|
||
message HasSessionResponse { | ||
} | ||
|
||
service AutoCertifierRpc { | ||
rpc hasSession (HasSessionRequest) returns (HasSessionResponse); | ||
} |
186 changes: 186 additions & 0 deletions
186
packages/autocertifier-client/src/AutoCertifierClient.ts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,186 @@ | ||
import { EventEmitter } from 'eventemitter3' | ||
import { IAutoCertifierRpc } from './proto/packages/autocertifier-client/protos/AutoCertifier.server' | ||
import { HasSessionRequest, HasSessionResponse } from './proto/packages/autocertifier-client/protos/AutoCertifier' | ||
import { ServerCallContext } from '@protobuf-ts/runtime-rpc' | ||
import { filePathToNodeFormat } from '@streamr/utils' | ||
import { RestClient } from './RestClient' | ||
import { CertifiedSubdomain } from './data/CertifiedSubdomain' | ||
import fs from 'fs' | ||
import path from 'path' | ||
import * as forge from 'node-forge' | ||
import { Logger } from '@streamr/utils' | ||
|
||
interface AutoCertifierClientEvents { | ||
updatedCertificate: (domain: CertifiedSubdomain) => void | ||
} | ||
|
||
export type HasSession = (request: HasSessionRequest, context: ServerCallContext) => Promise<HasSessionResponse> | ||
|
||
const logger = new Logger(module) | ||
|
||
export const SERVICE_ID = 'system/auto-certificer' | ||
const ONE_DAY = 1000 * 60 * 60 * 24 | ||
const MAX_INT_32 = 2147483647 | ||
|
||
// TODO: remove code duplication regarding ongoingSessions management | ||
// TODO: add logging and make logging consistent | ||
// TODO: validate CertifiedSubdomain when read from file and when received from server | ||
export class AutoCertifierClient extends EventEmitter<AutoCertifierClientEvents> implements IAutoCertifierRpc { | ||
|
||
private updateTimeout?: NodeJS.Timeout | ||
private readonly restClient: RestClient | ||
private readonly configFile: string | ||
private readonly streamrWebSocketPort: number | ||
private readonly ongoingSessions: Set<string> = new Set() | ||
|
||
constructor( | ||
configFile: string, | ||
streamrWebSocketPort: number, | ||
restApiUrl: string, | ||
registerRpcMethod: ( | ||
serviceId: string, | ||
rpcMethodName: string, | ||
method: HasSession | ||
) => void | ||
) { | ||
super() | ||
|
||
this.restClient = new RestClient(restApiUrl) | ||
this.configFile = filePathToNodeFormat(configFile) | ||
this.streamrWebSocketPort = streamrWebSocketPort | ||
registerRpcMethod(SERVICE_ID, 'hasSession', this.hasSession.bind(this)) | ||
} | ||
|
||
public async start(): Promise<void> { | ||
if (!fs.existsSync(this.configFile)) { | ||
await this.createCertificate() | ||
} else { | ||
await this.ensureCertificateValidity() | ||
} | ||
} | ||
|
||
private async ensureCertificateValidity(): Promise<void> { | ||
const certificate = this.loadCertificateFromDisk() | ||
const certObj = forge.pki.certificateFromPem(certificate.certificate) | ||
const expirationTimestamp = certObj.validity.notAfter.getTime() | ||
if (Date.now() >= expirationTimestamp - ONE_DAY) { | ||
await this.updateCertificate() | ||
} else { | ||
// TODO: most of the time the ip should not change. Calling this is important for whenever it does. | ||
// should avoid calling this.updateSubDomainIp in scheduled calls if certificate is not expiring. | ||
await this.updateSubdomainIp() | ||
this.scheduleCertificateUpdate(expirationTimestamp) | ||
this.emit('updatedCertificate', certificate) | ||
} | ||
} | ||
|
||
private loadCertificateFromDisk(): CertifiedSubdomain { | ||
const certificate = JSON.parse(fs.readFileSync(this.configFile, 'utf8')) as CertifiedSubdomain | ||
return certificate | ||
} | ||
|
||
public stop(): void { | ||
if (this.updateTimeout) { | ||
clearTimeout(this.updateTimeout) | ||
this.updateTimeout = undefined | ||
} | ||
} | ||
|
||
private scheduleCertificateUpdate(expirationTimestamp: number): void { | ||
if (this.updateTimeout) { | ||
clearTimeout(this.updateTimeout) | ||
this.updateTimeout = undefined | ||
} | ||
// update certificate 1 day before it expires | ||
let updateIn = expirationTimestamp - Date.now() | ||
if (updateIn > ONE_DAY) { | ||
updateIn = updateIn - ONE_DAY | ||
} | ||
// TODO: This sets the timeout to the maximum value of a 32-bit integer due to the limitation setTimeout has. | ||
// The original expirationTimestamp should be kept somewhere so that the certificate is not updated every 24 days. | ||
if (updateIn > MAX_INT_32) { | ||
updateIn = MAX_INT_32 | ||
} | ||
|
||
logger.info(updateIn + ' milliseconds until certificate update') | ||
// TODO: use tooling from @streamr/utils to set the timeout with an abortController. | ||
this.updateTimeout = setTimeout(this.ensureCertificateValidity, updateIn) | ||
} | ||
|
||
private createCertificate = async (): Promise<void> => { | ||
const sessionId = await this.restClient.createSession() | ||
let certifiedSubdomain: CertifiedSubdomain | ||
|
||
this.ongoingSessions.add(sessionId) | ||
|
||
try { | ||
certifiedSubdomain = await this.restClient.createSubdomainAndCertificate(this.streamrWebSocketPort, sessionId) | ||
} finally { | ||
this.ongoingSessions.delete(sessionId) | ||
} | ||
const dir = path.dirname(this.configFile) | ||
// TODO: use async fs methods? | ||
if (!fs.existsSync(dir)) { | ||
fs.mkdirSync(dir, { recursive: true }) | ||
} | ||
fs.writeFileSync(this.configFile, JSON.stringify(certifiedSubdomain)) | ||
const certObj = forge.pki.certificateFromPem(certifiedSubdomain.certificate) | ||
|
||
const expirationTimestamp = certObj.validity.notAfter.getTime() | ||
this.scheduleCertificateUpdate(expirationTimestamp) | ||
|
||
this.emit('updatedCertificate', certifiedSubdomain) | ||
} | ||
|
||
private updateCertificate = async (): Promise<void> => { | ||
const sessionId = await this.restClient.createSession() | ||
this.ongoingSessions.add(sessionId) | ||
|
||
const oldCertifiedSubdomain = JSON.parse(fs.readFileSync(this.configFile, 'utf8')) as CertifiedSubdomain | ||
const updatedCertifiedSubdomain = await this.restClient.updateCertificate(oldCertifiedSubdomain.fqdn.split('.')[0], | ||
this.streamrWebSocketPort, oldCertifiedSubdomain.authenticationToken, sessionId) | ||
|
||
this.ongoingSessions.delete(sessionId) | ||
|
||
// TODO: use async fs methods? | ||
fs.writeFileSync(this.configFile, JSON.stringify(updatedCertifiedSubdomain)) | ||
const certObj = forge.pki.certificateFromPem(updatedCertifiedSubdomain.certificate) | ||
|
||
const expirationTimestamp = certObj.validity.notAfter.getTime() | ||
this.scheduleCertificateUpdate(expirationTimestamp) | ||
|
||
// TODO: if the certificate was not updated there's no need to emit the event. Could compare certificates? | ||
this.emit('updatedCertificate', updatedCertifiedSubdomain) | ||
} | ||
|
||
// This method should be called whenever the IP address or port of the node changes | ||
public updateSubdomainIp = async (): Promise<void> => { | ||
if (!fs.existsSync(this.configFile)) { | ||
logger.warn('updateSubdomainIp() called while subdomain file does not exist') | ||
return | ||
} | ||
// TODO: use async fs methods? | ||
const oldSubdomain = JSON.parse(fs.readFileSync(this.configFile, 'utf8')) as CertifiedSubdomain | ||
logger.info('updateSubdomainIp() called for ' + JSON.stringify(oldSubdomain)) | ||
const sessionId = await this.restClient.createSession() | ||
this.ongoingSessions.add(sessionId) | ||
await this.restClient.updateSubdomainIp( | ||
oldSubdomain.fqdn.split('.')[0], | ||
this.streamrWebSocketPort, | ||
sessionId, | ||
oldSubdomain.authenticationToken | ||
) | ||
this.ongoingSessions.delete(sessionId) | ||
} | ||
|
||
// IAutoCertifierRpc implementation | ||
// TODO: could move to the DHT package or move all rpc related logic here from AutoCertifierClientFacade in DHT | ||
async hasSession(request: HasSessionRequest): Promise<HasSessionResponse> { | ||
logger.trace('hasSession() called ' + this.ongoingSessions.size + ' ongoing sessions') | ||
if (this.ongoingSessions.has(request.sessionId)) { | ||
return { sessionId: request.sessionId } | ||
} else { | ||
throw `Session not found ${request.sessionId}` | ||
} | ||
} | ||
} |
Oops, something went wrong.