diff --git a/packages/communication/src/SocketConnection.ts b/packages/communication/src/SocketConnection.ts index 87ab694..8fea6eb 100644 --- a/packages/communication/src/SocketConnection.ts +++ b/packages/communication/src/SocketConnection.ts @@ -14,6 +14,8 @@ export class SocketConnection implements IConnection private _messages: MessageClassManager = new MessageClassManager(); private _codec: ICodec = new EvaWireFormat(); private _dataBuffer: ArrayBuffer = null; + private _pendingChunks: Uint8Array[] = []; + private _pendingBytes: number = 0; private _isReady: boolean = false; private _pendingClientMessages: IMessageComposer[] = []; private _pendingServerMessages: IMessageDataWrapper[] = []; @@ -36,6 +38,7 @@ export class SocketConnection implements IConnection private _cryptoState: CryptoState = 'disabled'; private _sessionKey: CryptoKey = null; private _pendingEncryptedSends: ArrayBuffer[] = []; + private _decryptChain: Promise = Promise.resolve(); public init(socketUrl: string): void { @@ -50,6 +53,9 @@ export class SocketConnection implements IConnection private createSocket(socketUrl: string): void { this._dataBuffer = new ArrayBuffer(0); + this._pendingChunks = []; + this._pendingBytes = 0; + this._decryptChain = Promise.resolve(); const cryptoEnabled = !!GetConfiguration().getValue('crypto.ws.enabled', false); if(cryptoEnabled && !this.subtleCryptoAvailable()) { @@ -108,24 +114,106 @@ export class SocketConnection implements IConnection if(this._cryptoState === 'ready') { - this.decryptFrame(data) - .then(plain => + const frame = data; + + this._decryptChain = this._decryptChain.then(async () => + { + if((this._cryptoState !== 'ready') || this._intentionalClose) return; + + try { - this._dataBuffer = this.concatArrayBuffers(this._dataBuffer, plain); - this.processReceivedData(); - }) - .catch(err => + const plain = await this.decryptFrame(frame); + + this.enqueueAndProcess(plain); + } + catch (err) { NitroLogger.error('[ws-crypto] decrypt failed', err); this._cryptoState = 'error'; this._intentionalClose = true; if(this._socket) this._socket.close(); - }); + } + }); return; } - this._dataBuffer = this.concatArrayBuffers(this._dataBuffer, data); + this.enqueueAndProcess(data); + } + + private enqueueAndProcess(data: ArrayBuffer): void + { + if(data && data.byteLength) + { + this._pendingChunks.push(new Uint8Array(data)); + this._pendingBytes += data.byteLength; + } + + if(this._pendingBytes < 4) return; + + const messageLength = this.peekFirstMessageLength(); + + if(messageLength < 0 || (this._pendingBytes < (messageLength + 4))) return; + + this._dataBuffer = this.mergePendingChunks(); + this._pendingChunks = []; + this._pendingBytes = 0; + this.processReceivedData(); + + const remainder = this._dataBuffer; + this._dataBuffer = new ArrayBuffer(0); + + if(remainder && remainder.byteLength) + { + this._pendingChunks.push(new Uint8Array(remainder)); + this._pendingBytes = remainder.byteLength; + } + } + + private peekFirstMessageLength(): number + { + let value = 0; + let read = 0; + + for(const chunk of this._pendingChunks) + { + for(let i = 0; (i < chunk.byteLength) && (read < 4); i++) + { + value = ((value << 8) | chunk[i]) >>> 0; + read++; + } + + if(read >= 4) break; + } + + if(read < 4) return -1; + + return value | 0; + } + + private mergePendingChunks(): ArrayBuffer + { + if(!this._pendingChunks.length) return new ArrayBuffer(0); + + if(this._pendingChunks.length === 1) + { + const only = this._pendingChunks[0]; + + if((only.byteOffset === 0) && (only.byteLength === only.buffer.byteLength)) return only.buffer as ArrayBuffer; + + return only.buffer.slice(only.byteOffset, only.byteOffset + only.byteLength) as ArrayBuffer; + } + + const merged = new Uint8Array(this._pendingBytes); + let offset = 0; + + for(const chunk of this._pendingChunks) + { + merged.set(chunk, offset); + offset += chunk.byteLength; + } + + return merged.buffer; } private async handleServerHello(frame: ArrayBuffer): Promise @@ -339,7 +427,10 @@ export class SocketConnection implements IConnection this._pendingClientMessages = []; this._pendingServerMessages = []; + this._pendingChunks = []; + this._pendingBytes = 0; this._dataBuffer = null; + this._decryptChain = Promise.resolve(); } public ready(): void @@ -484,16 +575,6 @@ export class SocketConnection implements IConnection return this._codec.decode(this); } - private concatArrayBuffers(buffer1: ArrayBuffer, buffer2: ArrayBuffer): ArrayBuffer - { - const array = new Uint8Array(buffer1.byteLength + buffer2.byteLength); - - array.set(new Uint8Array(buffer1), 0); - array.set(new Uint8Array(buffer2), buffer1.byteLength); - - return array.buffer; - } - private getMessagesForWrapper(wrapper: IMessageDataWrapper): IMessageEvent[] { if(!wrapper) return null; diff --git a/packages/communication/src/codec/evawire/EvaWireFormat.ts b/packages/communication/src/codec/evawire/EvaWireFormat.ts index 1d3a72b..3ec5229 100644 --- a/packages/communication/src/codec/evawire/EvaWireFormat.ts +++ b/packages/communication/src/codec/evawire/EvaWireFormat.ts @@ -66,24 +66,28 @@ export class EvaWireFormat implements ICodec { if(!connection || !connection.dataBuffer || !connection.dataBuffer.byteLength) return null; + const buffer = connection.dataBuffer; + const totalLength = buffer.byteLength; + const dataView = new DataView(buffer); const wrappers: IMessageDataWrapper[] = []; + let offset = 0; - while(connection.dataBuffer.byteLength) + while(offset + 4 <= totalLength) { - if(connection.dataBuffer.byteLength < 4) break; + const length = dataView.getInt32(offset); - const container = new BinaryReader(connection.dataBuffer); - const length = container.readInt(); + if(length < 0 || (offset + 4 + length) > totalLength) break; - if(length > (connection.dataBuffer.byteLength - 4)) break; + const bodyStart = offset + 4; + const body = new BinaryReader(buffer.slice(bodyStart, bodyStart + length) as ArrayBuffer); - const extracted = container.readBytes(length); + wrappers.push(new EvaWireDataWrapper(body.readShort(), body)); - wrappers.push(new EvaWireDataWrapper(extracted.readShort(), extracted)); - - connection.dataBuffer = connection.dataBuffer.slice(length + 4); + offset = bodyStart + length; } + connection.dataBuffer = (offset >= totalLength) ? new ArrayBuffer(0) : buffer.slice(offset); + return wrappers; } }