Golang + Socket.io

在 Go 中使用 Socket.IO

Websocket

Websocket是全双工的基于TCP层的通信协议,为浏览器及网站服务器提供处理流式推送消息的方式。它不同于HTTP协议,但仍依赖HTTP的Upgrade头部进行协议的转换。
websocket 协议通信分为两个部分,先是握手,再是数据传输。
如下就是一个基本的websocket握手的请求与回包。
websocket handshake请求

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
Sec-WebSocket-Version: 13

websocket handshake返回

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

根据 RFC 6455 定义,websocket 消息统称为 messages, 一个message可以由多个frame构成,其中frame可以为文本数据,二进制数据或者控制帧等,websocket官方有6种类型并预留了10种类型用于未来的扩展。

Websocket协议中如何确保客户端与服务端接收到握手请求呢? 这里就要说到HTTP的两个头部字段, Sec-Websocket-Key
Sec-Websocket-Accept

  • 首先客户端发起请求,在头部 Sec-Websocket-Key
    中随机生成 base64 字符串;

    Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
  • 服务端收到请求后,根据头部 Sec-Websocket-Key
    与约定的 GUID, [RFC4122]) 258EAFA5-E914-47DA-95CA-C5AB0DC85B11
    拼接;

    dGhlIHNhbXBsZSBub25jZQ==258EAFA5-E914-47DA-95CA-C5AB0DC85B11
  • 使用 SHA-1 算法得到拼接的字符串的摘要 hash,最后用 base64 编码放入头部 Sec-Websocket-Accept
    返回客户端做认证。

    SHA1= b37a4f2cc0624f1690f64606cf385945b2bec4ea
    
      Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

更详细的说明可以看 RFC 6455 说明,服务端与客户端都有更详细的入参限制。

Data Framing 数据帧

了解完 websocket 握手的大致过程后,这个部分介绍下 websocket 数据帧(这比理解TCP/IP数据帧看着简单很多吧)与分片传输的方式。

0                   1                   2                   3
  0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
 +-+-+-+-+-------+-+-------------+-------------------------------+
 |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
 |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
 |N|V|V|V|       |S|             |   (if payload len==126/127)   |
 | |1|2|3|       |K|             |                               |
 +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
 |     Extended payload length continued, if payload len == 127  |
 + - - - - - - - - - - - - - - - +-------------------------------+
 |                               |Masking-key, if MASK set to 1  |
 +-------------------------------+-------------------------------+
 | Masking-key (continued)       |          Payload Data         |
 +-------------------------------- - - - - - - - - - - - - - - - +
 :                     Payload Data continued ...                :
 + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
 |                     Payload Data continued ...                |
 +---------------------------------------------------------------+
  • FIN: 表示是否为最后一个数据帧的标记位
  • opcode: 表示传输的 Payload 数据格式,如1表示纯文本(utf8)数据帧,2表示二进制数据帧

    %x0 denotes a continuation frame
      %x1 denotes a text frame
      %x2 denotes a binary frame
      %x3-7 are reserved for further non-control frames
      %x8 denotes a connection close
      %x9 denotes a ping
      %xA denotes a pong
      %xB-F are reserved for further control frames
  • MASK: 表示 Payload 数据是否标记,从客户端发送给服务端的包需要通过 Masking-key 与 Payload 数据进行异或操作,防止一些恶意程序直接获取传输内容内容。
  • Payload len:传输数据内容的长度
  • Payload Data: 传输数据

当一个完整消息体大小不可知时,websocket支持分片传输 Fragmentation。这样可以方便服务端使用可控大小的 buffer 来传输分段数据,减少带宽压力,同时可以有效控制服务器内存。
同时在多路传输的场景下,可以利用分片技术使不同的 namespace 的数据能共享对外传输通道。不用等待某个大的 message 传输完成,进入等待状态。
对于控制数据帧 Control Frames 不能使用分片方式,并且 Playload 数据不大于 125 bytes,但可以在分片帧中插队传输。通过 opcodes 最高位置位来标记控制帧,0x8 (Close), 0x9 (Ping), 0xA (Pong),Opcodes 0xB-0xF 保留。

go-socket.io 参考

package socketio
import "github.com/googollee/go-socket.io"

func NewBroadcast() Broadcast
type Broadcast interface {
    Join(room string, connection Conn)            // Join causes the connection to join a room
    Leave(room string, connection Conn)           // Leave causes the connection to leave a room
    LeaveAll(connection Conn)                     // LeaveAll causes given connection to leave all rooms
    Clear(room string)                            // Clear causes removal of all connections from the room
    Send(room, event string, args ...interface{}) // Send will send an event with args to the room
    SendAll(event string, args ...interface{})    // SendAll will send an event with args to all the rooms
    Len(room string) int                          // Len gives number of connections in the room
    Rooms(connection Conn) []string               // Gives list of all the rooms if no connection given, else list of all the rooms the connection joined
}
type Conn interface {
    ID() string // session id
    Close() error
    URL() url.URL
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    RemoteHeader() http.Header

    // Context of this connection. You can save one context for one
    // connection, and share it between all handlers. The handlers
    // is called in one goroutine, so no need to lock context if it
    // only be accessed in one connection.
    Context() interface{}
    SetContext(v interface{})
    Namespace() string
    Emit(msg string, v ...interface{})

    // Broadcast server side apis
    Join(room string)
    Leave(room string)
    LeaveAll()
    Rooms() []string
}
func NewServer(c *engineio.Options) (*Server, error)
type Server
    func (s *Server) BroadcastToRoom(room, event string, args ...interface{})
    func (s *Server) ClearRoom(room string)
    func (s *Server) Close() error
    func (s *Server) JoinRoom(room string, connection Conn)
    func (s *Server) LeaveAllRooms(connection Conn)
    func (s *Server) LeaveRoom(room string, connection Conn)
    func (s *Server) OnConnect(nsp string, f func(Conn) error)
    func (s *Server) OnDisconnect(nsp string, f func(Conn, string))
    func (s *Server) OnError(nsp string, f func(Conn, error))
    func (s *Server) OnEvent(nsp, event string, f interface{})
    func (s *Server) RoomLen(room string) int
    func (s *Server) Rooms() []string
    func (s *Server) Serve() error
    func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)


type net.Addr interface {
    Network() string // name of the network (for example, "tcp", "udp")
    String() string  // string form of address (for example, "192.0.2.1:25", "[2001:db8::1]:80")
}

go-socket.io demo

Install the package with:

go get github.com/googollee/go-socket.io

Import it with:

import "github.com/googollee/go-socket.io"

go-socket.io 提供个基本的实列,参考原代码中的 Example 目录。

package main

import (
    "fmt"
    "log"
    "net/http"

    "github.com/googollee/go-socket.io"
)

type Msg struct {
    UserId    string   `json:"userId"`
    Text      string   `json:"text"`
    State     string   `json:"state"`
    Namespace string   `json:"namespace"`
    Rooms     []string `json:"rooms"`
}

func main() {
    server, err := socketio.NewServer(nil)
    if err != nil {
        log.Fatal(err)
    }
    server.OnConnect("/", func(s socketio.Conn) error {
        msg := Msg{s.ID(), "connected!", "notice", "", nil}
        s.SetContext("")
        s.Emit("res", msg)
        fmt.Println("connected /:", s.ID())
        // fmt.Printf("URL: %#v \n", s.URL())
        // fmt.Printf("LocalAddr: %#+v \n", s.LocalAddr())
        // fmt.Printf("RemoteAddr: %#+v \n", s.RemoteAddr())
        // fmt.Printf("RemoteHeader: %#+v \n", s.RemoteHeader())
        // fmt.Printf("Cookies: %s \n", s.RemoteHeader().Get("Cookie"))
        return nil
    })

    server.OnEvent("/", "join", func(s socketio.Conn, room string) {
        s.Join(room)
        msg := Msg{s.ID(), "<= " + s.ID() + " join " + room, "state", s.Namespace(), s.Rooms()}
        fmt.Println("/:join", room, s.Namespace(), s.Rooms())
        server.BroadcastToRoom(room, "res", msg)
    })
    server.OnEvent("/", "leave", func(s socketio.Conn, room string) {
        s.Leave(room)
        msg := Msg{s.ID(), "<= " + s.ID() + " leave " + room, "state", s.Namespace(), s.Rooms()}
        fmt.Println("/:chat received", room, s.Namespace(), s.Rooms())
        server.BroadcastToRoom(room, "res", msg)
    })

    server.OnEvent("/", "chat", func(s socketio.Conn, msg string) {
        res := Msg{s.ID(), " 0 {
            fmt.Println("broadcast to", rooms)
            for i := range rooms {
                server.BroadcastToRoom(rooms[i], "res", res)
            }
            // server.BroadcastToRoom(s.Rooms()[0], "res", res)
        }
    })

    server.OnEvent("/", "notice", func(s socketio.Conn, msg string) {
        fmt.Println("/:notice:", msg)
        s.Emit("reply", "have "+msg)
    })
    server.OnEvent("/chat", "msg", func(s socketio.Conn, msg string) string {
        fmt.Println("/chat:msg received", msg)
        return "recv " + msg
    })
    server.OnEvent("/", "bye", func(s socketio.Conn) string {
        last := s.Context().(Msg)
        s.Emit("bye", last)
        res := Msg{s.ID(), " 0 {
            fmt.Println("broadcast to", rooms)
            for i := range rooms {
                server.BroadcastToRoom(rooms[i], "res", res)
            }
        }
        fmt.Printf("/:bye last context: %#+v \n", s.Context())
        return last.Text
    })

    server.OnError("/", func(s socketio.Conn, e error) {
        fmt.Println("/:error ", e)
    })
    server.OnDisconnect("/", func(s socketio.Conn, reason string) {
        fmt.Println("/:closed", s.ID(), reason)
    })

    go server.Serve()
    defer server.Close()

    http.Handle("/socket.io/", server)
    http.Handle("/", http.FileServer(http.Dir("./asset")))

    log.Println("Serving at localhost:8000...")
    log.Fatal(http.ListenAndServe(":8000", nil))
}

注意格式 server.OnEvent(namespace, event, func...)
, OnEvent 可以直接返回字符串,也可以通过 socket.Emit 向客户端返回数据。 server.BroadcastToRoom 提供了广播方法,通过内部的 broadcast.Send 方法进行广播,但是没有公开内部的 SendAll 方法。用户连接直接通过 Join/Leave 加入或离开房间,可以加入多个房间,当前连接的 Rooms() 返回用户加入的房间号,server.Rooms() 则记录了当前系统所有的房间号。
当前连接提供了 SetContext/Context 方法来保存/读取当前用户会话中的上下文数据。
使用 Golang 后端的 go-socket.io 和 Node.js 的 socket.io 在前端的连接方式上有细小差别,socket.io 可以通过 query 参数来配置房间号、用户 ID:

const socket = io(namespace, {
  // Actual use can pass parameters here
  query: {
    room,
    userId,
  },
  transports: ['websocket']
});

在 go-socket.io 中就需要手动读取客户端传入的房间号、用户 ID 等信息:

s.URL().RawQuery
"room=tent-lodge&userId=client_57222..."

配套页面 index.html:



  Socket Test
  
  
  



  <!-- 

Socket Test

--> // browser const log = console.log; function onType(e){ console.log("onType", e.target); if(!socket) return; var msg = e.target.value; var action = msg.split(" ")[0]; var tend = msg.split(" ")[1]; if(msg=="bye"){ socket.emit('bye', msg); }else if(action=="join" && tend){ socket.emit("join", msg.split(" ")[1]); }else if(action=="leave" && tend){ socket.emit("leave", msg.split(" ")[1]); }else{ socket.emit("chat", msg); } addMessage("send", {userId, text: msg}); e.target.value = ""; } var userId = `client_${Math.random().toFixed(5).substr(2)}`; var it = document.createElement('ul'); it.className = "messages"; document.body.appendChild(it) function addMessage(type, msg){ var li = document.createElement('li'); li.innerHTML = ""+msg.text+""+""+msg.userId+""; li.className = type+" "+(msg.type || ""); if(userId==msg.userId){ li.className += " self"; } if((msg.text=="connected!" || msg.text=="disconnected!") && msg.rooms){ var rooms = Object.keys(msg.rooms).length; var s = []; for(var i in msg.rooms){ s.push("Room:"+i+" Numbers:"+msg.rooms[i].length+"") } li.innerHTML += "
"+s.join("
")+"
"; } it.appendChild(li); } window.onload = function() { // init var room = 'tent-lodge'; var namespace = '/'; // namespace = '/example'; // namespace = "http://127.0.0.1:7001/"; // var socket = io(); var socket = io(namespace); // const socket = io(namespace, { // // Actual use can pass parameters here // query: { // room, // userId, // }, // transports: ['websocket'] // }); socket.on('connect', () => { const id = socket.id; log('#connect,', id, socket); // receive online user information addMessage("connect", {userId, text: "connected "+id}); var msg = 'Hello Socket.io!'; socket.emit('chat', msg); addMessage("send", {userId, text: msg}); // listen for its own id to implement p2p communication socket.on(id, msg => { log('#receive,', msg); }); }); socket.on('res', msg => { addMessage("received", msg); console.log('res from server:', msg); }); socket.on('online', msg => { log('#online,', msg); }); // system events socket.on('disconnect', msg => { log('#disconnect', msg); }); socket.on('disconnecting', () => { log('#disconnecting'); }); socket.on('error', (res) => { log('#error', res); }); window.socket = socket; };