如何用 Node.js 实现一个简单的 Websocket 服务?






 构造函数

class WebSocket extends EventEmitter {
constructor(req, socket, upgradeHead){
super(); // 调用 EventEmitter 构造函数


// 1. 构造响应头 resHeaders 部分


// 2. 监听 socket 的 data 事件,以及 error 事件


// 3. 初始化成员属性


}
}

注意,我们需要继承内置的 EventEmitter ,这样生成的实例才能监听、绑定事件;

Node.js 采用事件驱动、异步编程,天生就是为了网络服务而设计的,继承 EventEmitter 就能享受到非阻塞模式的 IO 处理;


讲一下其中  响应头的构造
和  事件监听
部分。

返回响应头(Response Header)

根据协议规范,我们能写出响应头的内容:

  1. 将 Sec-WebSocket-Key 跟 258EAFA5-E914-47DA-95CA-C5AB0DC85B11拼接。
  2. 通过 SHA1 计算出摘要,并转成 base64 字符串。

具体代码如下:

    var resKey = hashWebSocketKey(req.headers['sec-websocket-key']);


// 构造响应头
var resHeaders = [
'HTTP/1.1 101 Switching Protocols',
'Upgrade: websocket',
'Connection: Upgrade',
'Sec-WebSocket-Accept: ' + resKey
]
.concat('', '')
.join('\r\n');
socket.write(resHeaders);

当执行 socket.write(resHeaders); 到后就和客户端建立起 WebSocket 连接了,剩下去就是数据的处理。

监听事件

socket 就是 TCP 协议的抽象,直接在上面监听已有的 data 事件和 close 事件这两个事件。

还有其他事件,比如 error、end 等,详细参考
net.Socket
文档

    socket.on('data', data => {
this.buffer = Buffer.concat([this.buffer, data]);
while (this._processBuffer()) {} // 循环处理返回的 data 数据
});


socket.on('close', had_error => {
if (!this.closed) {
this.emit('close', 1006);
this.closed = true;
}
});

close 的事件逻辑比较简单,比较重要的是 data 的事件监听部分。核心就是 this._processBuffer() 这个方法,用于处理客户端传送过来的数据(即 Frame 数据)

注意该方法是放在 while 循环语句里,处理好边界情况,防止死循环。






 


Frame 帧数据的处理

WebSocket 客户端、服务端通信的最小单位是帧(frame),由1个或多个帧组成一条完整的消息(message)。

这 this._processBuffer() 部分代码逻辑就是用来解析帧数据的,所以它是实现 Websocket 代码的关键;(该方法里面用到了大量的位操作符以及
Buffer
类的操作)

帧数据结构详细定义可参考
RFC6455 5.2节
,上面罗列的参考文章都有详细的解读,我在这儿也不啰嗦讲细节了,直接看代码比听我用文字讲要好。

这里就其中两个细节需要铺垫一下,方便更好地理解代码。

操作码(Opcode)

Opcode 即 操作代码,Opcode 的值决定了应该如何解析后续的数据载荷(data payload)

根据 Opcode 我们可以大致将数据帧分成两大类:数据帧 和 控制帧。

数据帧
:目前只有 3 种,对应的 opcode 是:

  • 0x0:数据延续帧
  • 0x1:utf-8文本
  • 0x2:二进制数据;
  • 0x3 – 0x7:目前保留,用于后续定义的非控制帧。

控制帧:
除了上述 3 种数据帧之外,剩下的都是控制帧

  • 0x8:表示连接断开
  • 0x9:表示 ping 操作
  • 0xA:表示 pong 操作
  • 0xB – 0xF:目前保留,用于后续定义的控制帧

在代码里,我们会先从帧数据中提取操作码:

var opcode = byte1 & 0x0f; //截取第一个字节的后 4 位,即 opcode 码

然后根据协议获取到真正的数据载荷(data payload),然后将这两部分传给 _handleFrame 方法:

this._handleFrame(opcode, payload); // 处理操作码

该方法会根据不同的 opcode 做出不同的操作:

_handleFrame(opcode, buffer) {
var payload;
switch (opcode) {
case OPCODES.TEXT:
payload = buffer.toString('utf8'); //如果是文本需要转化为utf8的编码
this.emit('data', opcode, payload); //Buffer.toString()默认utf8 这里是故意指示的
break;
case OPCODES.BINARY: //二进制文件直接交付
payload = buffer;
this.emit('data', opcode, payload);
break;
case OPCODES.PING: // 发送 pong 做响应
this._doSend(OPCODES.PONG, buffer);
break;
case OPCODES.PONG: //不做处理
console.log('server receive pong');
break;
case OPCODES.CLOSE: // close有很多关闭码
let code, reason; // 用于获取关闭码和关闭原因
if (buffer.length >= 2) {
code = buffer.readUInt16BE(0);
reason = buffer.toString('utf8', 2);
}
this.close(code, reason);
this.emit('close', code, reason);
break;
default:
this.close(1002, 'unhandle opcode:' + opcode);
}
}

分片(Fragment)

规范文档:
5.4 –  Fragmentation

一旦 WebSocket 客户端、服务端建立连接后,后续的操作都是基于数据帧的传递。理论上来说,每个帧(Frame)的大小是没有限制的。

对于大块的数据,Websocket 协议建议对数据进行分片(Fragment)操作。

分片的意义主要是两方面:

  • 主要目的是允许当消息开始但不必缓冲该消息时发送一个未知大小的消息。如果消息不能被分片,那么端点将不得不缓冲整个消息以便在首字节发生之前统计出它的长度。对于分片,服务器或中间件可以选择一个合适大小的缓冲,当缓冲满时,再写一个片段到网络。

  • 另一方面分片传输也能更高效地利用多路复用提高带宽利用率,一个逻辑通道上的一个大消息独占输出通道是不可取的,因此多路复用需要可以分割消息为更小的分段来更好的共享输出通道。参考文档
    I/O多路复用技术(multiplexing)是什么?

WebSocket 协议提供的分片方法,是将原本一个大的帧拆分成数个小的帧。下面是把一个大的Frame分片的图示:

分片图示

根据 FIN 的值来判断,是否已经收到消息的最后一个数据帧。由图可知,第一个分片的 FIN 为 0,Opcode 为非0值(0x1 或 0x2),最后一个分片的FIN为1,Opcode为 0。中间分片的 FIN 和 opcode 二者均为 0。

    • `FIN=1` 表示当前数据帧为消息的最后一个数据帧,此时接收方已经收到完整的消息,可以对消息进行处理。

    • `FIN=0`,则接收方还需要继续监听接收其余的数据帧。

  • opcode在数据交换的场景下,表示的是数据的类型。

    • `0x01` 表示文本,永远是 `utf8` 编码的

    • `0x02` 表示二进制

    • 而 `0x00` 比较特殊,表示 延续帧(continuation frame),顾名思义,就是完整消息对应的数据帧还没接收完。

代码里,我们需要检测 FIN 的值,如果为 0 说明有分片,需要记录第一个 FIN 为 0 时的 opcode 值,缓存到 this.frameOpcode 属性中,将载荷缓存到 this.frames 属性中:

    var FIN = byte1 & 0x80; // 如果为0x80,则标志传输结束,获取高位 bit
// 如果是 0 的话,说明是延续帧,需要保存好 opCode
if (!FIN) {
this.frameOpcode = opcode || this.frameOpcode; // 确保不为 0;
}


//....
// 有可能是分帧,需要拼接数据
this.frames = Buffer.concat([this.frames, payload]); // 保存到 frames 中


当接收到最后一个 FIN 帧的时候,就可以组装后给 _handleFrame 方法:

    if (FIN) {
payload = this.frames.slice(0); // 获取所有拼接完整的数据
opcode = opcode || this.frameOpcode; // 如果是 0 ,则保持获取之前保存的 code
this.frames = Buffer.alloc(0); // 清空 frames
this.frameOpcode = 0; // 清空 opcode
this._handleFrame(opcode, payload); // 处理操作码
}

发送数据帧

上面讲的都是接收并解析来自客户端的数据帧,当我们想给客户端发送数据帧的时候,也得按协议来。

这部分操作相当于是上述 _processBuffer 方法的逆向操作,在代码里我们使用 encodeMessage 方法(为了简单起见,我们发送给客户端的数据没有经过掩码处理)将发送的数据分装成数据帧的格式,然后调用 socket.write 方法发送给客户端;

  _doSend(opcode, payload) {
// 1. 考虑数据分片
this.socket.write(
encodeMessage(count > 0 ? OPCODES.CONTINUE : opcode, payload)
); //编码后直接通过socket发送

为了考虑分片场景,特意设置 MAX_FRAME_SIZE 来对每次发送的数据长度做截断做分片:

    // ...
var len = Buffer.byteLength(payload);
// 分片的距离逻辑
var count = 0;
// 这里可以针对 payload 的长度做分片
while (len > MAX_FRAME_SIZE) {
var framePayload = payload.slice(0, MAX_FRAME_SIZE);
payload = payload.slice(MAX_FRAME_SIZE);
this.socket.write(
encodeMessage(
count > 0 ? OPCODES.CONTINUE : opcode,
framePayload,
false
)
); //编码后直接通过socket发送
count++;
len = Buffer.byteLength(payload);
}
// ...

至此已经实现 Websocket 协议的关键部分,所组装起来的代码就能和客户端建立 Websocket 连接并进行数据交互了。