Spaces:
Runtime error
Runtime error
| const { Writable } = require('stream') | |
| const diagnosticsChannel = require('diagnostics_channel') | |
| const { parserStates, opcodes, states, emptyBuffer } = require('./constants') | |
| const { kReadyState, kSentClose, kResponse, kReceivedClose } = require('./symbols') | |
| const { isValidStatusCode, failWebsocketConnection, websocketMessageReceived } = require('./util') | |
| const { WebsocketFrameSend } = require('./frame') | |
| // This code was influenced by ws released under the MIT license. | |
| // Copyright (c) 2011 Einar Otto Stangvik <einaros@gmail.com> | |
| // Copyright (c) 2013 Arnout Kazemier and contributors | |
| // Copyright (c) 2016 Luigi Pinca and contributors | |
| const channels = {} | |
| channels.ping = diagnosticsChannel.channel('undici:websocket:ping') | |
| channels.pong = diagnosticsChannel.channel('undici:websocket:pong') | |
| class ByteParser extends Writable { | |
| #buffers = [] | |
| #byteOffset = 0 | |
| #state = parserStates.INFO | |
| #info = {} | |
| #fragments = [] | |
| constructor (ws) { | |
| super() | |
| this.ws = ws | |
| } | |
| /** | |
| * @param {Buffer} chunk | |
| * @param {() => void} callback | |
| */ | |
| _write (chunk, _, callback) { | |
| this.#buffers.push(chunk) | |
| this.#byteOffset += chunk.length | |
| this.run(callback) | |
| } | |
| /** | |
| * Runs whenever a new chunk is received. | |
| * Callback is called whenever there are no more chunks buffering, | |
| * or not enough bytes are buffered to parse. | |
| */ | |
| run (callback) { | |
| while (true) { | |
| if (this.#state === parserStates.INFO) { | |
| // If there aren't enough bytes to parse the payload length, etc. | |
| if (this.#byteOffset < 2) { | |
| return callback() | |
| } | |
| const buffer = this.consume(2) | |
| this.#info.fin = (buffer[0] & 0x80) !== 0 | |
| this.#info.opcode = buffer[0] & 0x0F | |
| // If we receive a fragmented message, we use the type of the first | |
| // frame to parse the full message as binary/text, when it's terminated | |
| this.#info.originalOpcode ??= this.#info.opcode | |
| this.#info.fragmented = !this.#info.fin && this.#info.opcode !== opcodes.CONTINUATION | |
| if (this.#info.fragmented && this.#info.opcode !== opcodes.BINARY && this.#info.opcode !== opcodes.TEXT) { | |
| // Only text and binary frames can be fragmented | |
| failWebsocketConnection(this.ws, 'Invalid frame type was fragmented.') | |
| return | |
| } | |
| const payloadLength = buffer[1] & 0x7F | |
| if (payloadLength <= 125) { | |
| this.#info.payloadLength = payloadLength | |
| this.#state = parserStates.READ_DATA | |
| } else if (payloadLength === 126) { | |
| this.#state = parserStates.PAYLOADLENGTH_16 | |
| } else if (payloadLength === 127) { | |
| this.#state = parserStates.PAYLOADLENGTH_64 | |
| } | |
| if (this.#info.fragmented && payloadLength > 125) { | |
| // A fragmented frame can't be fragmented itself | |
| failWebsocketConnection(this.ws, 'Fragmented frame exceeded 125 bytes.') | |
| return | |
| } else if ( | |
| (this.#info.opcode === opcodes.PING || | |
| this.#info.opcode === opcodes.PONG || | |
| this.#info.opcode === opcodes.CLOSE) && | |
| payloadLength > 125 | |
| ) { | |
| // Control frames can have a payload length of 125 bytes MAX | |
| failWebsocketConnection(this.ws, 'Payload length for control frame exceeded 125 bytes.') | |
| return | |
| } else if (this.#info.opcode === opcodes.CLOSE) { | |
| if (payloadLength === 1) { | |
| failWebsocketConnection(this.ws, 'Received close frame with a 1-byte body.') | |
| return | |
| } | |
| const body = this.consume(payloadLength) | |
| this.#info.closeInfo = this.parseCloseBody(false, body) | |
| if (!this.ws[kSentClose]) { | |
| // If an endpoint receives a Close frame and did not previously send a | |
| // Close frame, the endpoint MUST send a Close frame in response. (When | |
| // sending a Close frame in response, the endpoint typically echos the | |
| // status code it received.) | |
| const body = Buffer.allocUnsafe(2) | |
| body.writeUInt16BE(this.#info.closeInfo.code, 0) | |
| const closeFrame = new WebsocketFrameSend(body) | |
| this.ws[kResponse].socket.write( | |
| closeFrame.createFrame(opcodes.CLOSE), | |
| (err) => { | |
| if (!err) { | |
| this.ws[kSentClose] = true | |
| } | |
| } | |
| ) | |
| } | |
| // Upon either sending or receiving a Close control frame, it is said | |
| // that _The WebSocket Closing Handshake is Started_ and that the | |
| // WebSocket connection is in the CLOSING state. | |
| this.ws[kReadyState] = states.CLOSING | |
| this.ws[kReceivedClose] = true | |
| this.end() | |
| return | |
| } else if (this.#info.opcode === opcodes.PING) { | |
| // Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in | |
| // response, unless it already received a Close frame. | |
| // A Pong frame sent in response to a Ping frame must have identical | |
| // "Application data" | |
| const body = this.consume(payloadLength) | |
| if (!this.ws[kReceivedClose]) { | |
| const frame = new WebsocketFrameSend(body) | |
| this.ws[kResponse].socket.write(frame.createFrame(opcodes.PONG)) | |
| if (channels.ping.hasSubscribers) { | |
| channels.ping.publish({ | |
| payload: body | |
| }) | |
| } | |
| } | |
| this.#state = parserStates.INFO | |
| if (this.#byteOffset > 0) { | |
| continue | |
| } else { | |
| callback() | |
| return | |
| } | |
| } else if (this.#info.opcode === opcodes.PONG) { | |
| // A Pong frame MAY be sent unsolicited. This serves as a | |
| // unidirectional heartbeat. A response to an unsolicited Pong frame is | |
| // not expected. | |
| const body = this.consume(payloadLength) | |
| if (channels.pong.hasSubscribers) { | |
| channels.pong.publish({ | |
| payload: body | |
| }) | |
| } | |
| if (this.#byteOffset > 0) { | |
| continue | |
| } else { | |
| callback() | |
| return | |
| } | |
| } | |
| } else if (this.#state === parserStates.PAYLOADLENGTH_16) { | |
| if (this.#byteOffset < 2) { | |
| return callback() | |
| } | |
| const buffer = this.consume(2) | |
| this.#info.payloadLength = buffer.readUInt16BE(0) | |
| this.#state = parserStates.READ_DATA | |
| } else if (this.#state === parserStates.PAYLOADLENGTH_64) { | |
| if (this.#byteOffset < 8) { | |
| return callback() | |
| } | |
| const buffer = this.consume(8) | |
| const upper = buffer.readUInt32BE(0) | |
| // 2^31 is the maxinimum bytes an arraybuffer can contain | |
| // on 32-bit systems. Although, on 64-bit systems, this is | |
| // 2^53-1 bytes. | |
| // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Errors/Invalid_array_length | |
| // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/common/globals.h;drc=1946212ac0100668f14eb9e2843bdd846e510a1e;bpv=1;bpt=1;l=1275 | |
| // https://source.chromium.org/chromium/chromium/src/+/main:v8/src/objects/js-array-buffer.h;l=34;drc=1946212ac0100668f14eb9e2843bdd846e510a1e | |
| if (upper > 2 ** 31 - 1) { | |
| failWebsocketConnection(this.ws, 'Received payload length > 2^31 bytes.') | |
| return | |
| } | |
| const lower = buffer.readUInt32BE(4) | |
| this.#info.payloadLength = (upper << 8) + lower | |
| this.#state = parserStates.READ_DATA | |
| } else if (this.#state === parserStates.READ_DATA) { | |
| if (this.#byteOffset < this.#info.payloadLength) { | |
| // If there is still more data in this chunk that needs to be read | |
| return callback() | |
| } else if (this.#byteOffset >= this.#info.payloadLength) { | |
| // If the server sent multiple frames in a single chunk | |
| const body = this.consume(this.#info.payloadLength) | |
| this.#fragments.push(body) | |
| // If the frame is unfragmented, or a fragmented frame was terminated, | |
| // a message was received | |
| if (!this.#info.fragmented || (this.#info.fin && this.#info.opcode === opcodes.CONTINUATION)) { | |
| const fullMessage = Buffer.concat(this.#fragments) | |
| websocketMessageReceived(this.ws, this.#info.originalOpcode, fullMessage) | |
| this.#info = {} | |
| this.#fragments.length = 0 | |
| } | |
| this.#state = parserStates.INFO | |
| } | |
| } | |
| if (this.#byteOffset > 0) { | |
| continue | |
| } else { | |
| callback() | |
| break | |
| } | |
| } | |
| } | |
| /** | |
| * Take n bytes from the buffered Buffers | |
| * @param {number} n | |
| * @returns {Buffer|null} | |
| */ | |
| consume (n) { | |
| if (n > this.#byteOffset) { | |
| return null | |
| } else if (n === 0) { | |
| return emptyBuffer | |
| } | |
| if (this.#buffers[0].length === n) { | |
| this.#byteOffset -= this.#buffers[0].length | |
| return this.#buffers.shift() | |
| } | |
| const buffer = Buffer.allocUnsafe(n) | |
| let offset = 0 | |
| while (offset !== n) { | |
| const next = this.#buffers[0] | |
| const { length } = next | |
| if (length + offset === n) { | |
| buffer.set(this.#buffers.shift(), offset) | |
| break | |
| } else if (length + offset > n) { | |
| buffer.set(next.subarray(0, n - offset), offset) | |
| this.#buffers[0] = next.subarray(n - offset) | |
| break | |
| } else { | |
| buffer.set(this.#buffers.shift(), offset) | |
| offset += next.length | |
| } | |
| } | |
| this.#byteOffset -= n | |
| return buffer | |
| } | |
| parseCloseBody (onlyCode, data) { | |
| // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.5 | |
| /** @type {number|undefined} */ | |
| let code | |
| if (data.length >= 2) { | |
| // _The WebSocket Connection Close Code_ is | |
| // defined as the status code (Section 7.4) contained in the first Close | |
| // control frame received by the application | |
| code = data.readUInt16BE(0) | |
| } | |
| if (onlyCode) { | |
| if (!isValidStatusCode(code)) { | |
| return null | |
| } | |
| return { code } | |
| } | |
| // https://datatracker.ietf.org/doc/html/rfc6455#section-7.1.6 | |
| /** @type {Buffer} */ | |
| let reason = data.subarray(2) | |
| // Remove BOM | |
| if (reason[0] === 0xEF && reason[1] === 0xBB && reason[2] === 0xBF) { | |
| reason = reason.subarray(3) | |
| } | |
| if (code !== undefined && !isValidStatusCode(code)) { | |
| return null | |
| } | |
| try { | |
| // TODO: optimize this | |
| reason = new TextDecoder('utf-8', { fatal: true }).decode(reason) | |
| } catch { | |
| return null | |
| } | |
| return { code, reason } | |
| } | |
| get closingInfo () { | |
| return this.#info.closeInfo | |
| } | |
| } | |
| module.exports = { | |
| ByteParser | |
| } | |