文章

WebSocket编程

WebSocket编程

WebSocket协议解读

avatar

websocket和http协议的关联:

  • 都是应用层协议,都基于tcp传输协议。
  • 跟http有良好的兼容性,ws和http的默认端口都是80,wss和https的默认端口都是443。
  • websocket在握手阶段采用http发送数据。

websocket和http协议的差异:

  • http是半双工,而websocket通过多路复用实现了全双工。
  • http只能由client主动发起数据请求,而websocket还可以由server主动向client推送数据。在需要及时刷新的场景中,http只能靠client高频地轮询,浪费严重。
  • http是短连接(也可以实现长连接, HTTP1.1 的连接默认使用长连接),每次数据请求都得经过三次握手重新建立连接,而websocket是长连接。
  • http长连接中每次请求都要带上header,而websocket在传输数据阶段不需要带header。

WebSocket是HTML5下的产物,能更好的节省服务器资源和带宽,

websocket应用场景举例

  • html5多人游戏
  • 聊天室
  • 协同编辑
  • 基于实时位置的应用
  • 股票实时报价
  • 弹幕
  • 视频会议

websocket握手协议:

Request Header

Sec-Websocket-Version:13
Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Key:duR0pUQxNgBJsRQKj2Jxsw==

Response Header

Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Accept:a1y2oy1zvgHsVyHMx+hZ1AYrEHI=
  • Upgrade:websocket和Connection:Upgrade指明使用WebSocket协议。
  • Sec-WebSocket-Version 指定Websocket协议版本。
  • Sec-WebSocket-Key是一个Base64 encode的值,是浏览器随机生成的。
  • 服务端收到Sec-WebSocket-Key后拼接上一个固定的GUID,进行一次SHA-1摘要,再转成Base64编码,得到Sec-WebSocket-Accept返回给客户端。客户端对本地的Sec-WebSocket-Key执行同样的操作跟服务端返回的结果进行对比,如果不一致会返回错误关闭连接。如此操作是为了把websocket header跟http header区分开。

WebSocket CS架构实现

  首先需要安装gorilla的websocket包。

go get github.com/gorilla/websocket
  1. 将http升级到WebSocket协议。
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*websocket.Conn, error)
  1. 客户端发起握手,请求建立连接。
func (*websocket.Dialer) Dial(urlStr string, requestHeader http.Header) (*websocket.Conn, *http.Response, error)
  1. 基于connection进行read和write。

go web socket API

  • websocket发送的消息类型有5种:TextMessag,BinaryMessage, CloseMessag,PingMessage,PongMessage。
  • TextMessag和BinaryMessage分别表示发送文本消息和二进制消息。
  • CloseMessage关闭帧,接收方收到这个消息就关闭连接
  • PingMessage和PongMessage是保持心跳的帧,发送方接收方是PingMessage,接收方发送方是PongMessage,目前浏览器没有相关api发送ping给服务器,只能由服务器发ping给浏览器,浏览器返回pong消息。

目录

image-20230330122155421

websocket/client/client.go

package main

import (
   "fmt"
   "github.com/gorilla/websocket"
   "go_mashi/websocket/common"
   "io"
   "net/http"
   "strconv"
)

func main() {
   dialer := &websocket.Dialer{}
   header := http.Header{
      "Name": []string{"yama"},
   }
   //Dial:握手阶段,会发送一条http请求。请求一个不存在的路径试试看
   conn, resp, err := dialer.Dial("ws://localhost:5657", header)
   if err != nil {
      fmt.Printf("dial 失败%s\n", err.Error())

      return
   }
   defer resp.Body.Close()
   fmt.Println("statusCode" + strconv.Itoa(resp.StatusCode))
   msg, _ := io.ReadAll(resp.Body)
   fmt.Print(string(msg))
   fmt.Println("response header")
   for k, v := range resp.Header {
      fmt.Printf("key=%s value=%s\n", k, v)
   }
   defer conn.Close()
   for i := 0; i < 5; i++ {

      request := common.Request{A: 3, B: 5}
      conn.WriteJSON(request) //websocket.Conn直接提供发json序列化和反序列化方法
      var response common.Response
      conn.ReadJSON(&response)
      fmt.Println(response.Sum)
   }
}

websocket/common/model.go

package common

type Request struct {
   A int
   B int
}
type Response struct {
   Sum int
}

websocket/server/server.go

package main

import (
   "fmt"
   "github.com/gorilla/websocket"
   "go_mashi/websocket/common"
   "net"
   "net/http"
   "strconv"
   "time"
)

type WsServer struct {
   //listener net.Listener
   addr    string
   upgrade *websocket.Upgrader
}

func NewWsServer(port int) *WsServer {
   ws := new(WsServer)
   ws.addr = "0.0.0.0:" + strconv.Itoa(port)
   ws.upgrade = &websocket.Upgrader{
      HandshakeTimeout: 5 * time.Second, //握手超时时间
      ReadBufferSize:   4096,            //读缓冲大小
      WriteBufferSize:  4096,            //写缓冲大小
   }
   //没有给listener赋值
   return ws
}

// httpHandler必须实现ServeHTTP接口
func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

   //if r.URL.Path != "/Add" {
   // fmt.Println("Path error")
   // http.Error(w, "请求的路径不存在", 222) //把出错的话术写到ResponseWriter里
   // return
   //}

   conn, err := ws.upgrade.Upgrade(w, r, nil) //将http协议升级到websocket协议
   if err != nil {
      fmt.Printf("升级失败%s\n", err.Error())
      return
   }
   fmt.Printf("跟客户端%s建立好了websocket连接\n", r.RemoteAddr)
   go ws.handleOneConnection(conn)
}

// 处理连接里发来的请求数据
func (ws *WsServer) handleOneConnection(conn *websocket.Conn) {
   defer func() {
      conn.Close()
   }()
   //长连接
   for {
      conn.SetReadDeadline(time.Now().Add(20 * time.Second))
      var request common.Request
      if err := conn.ReadJSON(&request); err != nil {
         //判断是不是超时
         if netError, ok := err.(net.Error); ok {
            if netError.Timeout() {
               fmt.Printf("发生了读超时")
               return
            }
         }
         fmt.Println(err)
         return
      }
      response := common.Response{
         Sum: request.A + request.B,
      }
      if err := conn.WriteJSON(response); err != nil {
         fmt.Println(err)
      }
   }
}
func (ws *WsServer) start() error {
   //var err error
   //ws.listener, err = net.Listen("tcp", ws.addr)
   //if err != nil {
   // fmt.Printf("listen 失败 %s\n", err.Error())
   // return err
   //}
   //if err = http.Serve(ws.listener, ws); err != nil {
   // fmt.Printf("serve失败 %s\n", err.Error())
   // return err
   //}
   if err := http.ListenAndServe(ws.addr, ws); err != nil {
      fmt.Printf("ListenAndServe失败 %s\n", err.Error())
      return err
   } else {
      return nil
   }
}
func main() {
   ws := NewWsServer(5657)
   ws.start()
}

聊于室实现

gorilla的websocket项目中有一个聊天室的demo,此处讲一下它的设计思路。我们的代码基于原代码进行了简化和修改,并加上中文注释。总体架构如下图所示

avatar

Hub

  • Hub持有每一个Client的指针,broadcast管道里有数据时把它写入每一个Client的send管道中。
  • 注销Client时关闭Client的send管道

Client

  • 前端(browser)请求建立websocket连接时,为这条websocket连接专门启一个协程,创建一个client。
  • client把前端发过来的数据写入hub的broadcast管道。
  • client把自身send管道里的数据写给前端。
  • client跟前端的连接断开时请求从hub那儿注销自己。

Front

  • 当打开浏览器页面时,前端会请求建立websocket连接。
  • 关闭浏览器页面时会主动关闭websocket连接。

存活监测

  • 当hub发现client的send管道写不进数据时,把client注销掉。
  • client给websocket连接设置一个读超时,并周期性地给前端发ping消息,如果没有收到pong消息则下一次的conn.read()会报出超时错误,此时client关闭websocket连接。

案例展示

目录:

image-20230330150352505

websocket/chatWebsocket/client.go

package main

import (
   "bytes"
   "fmt"
   "github.com/gorilla/websocket"
   "net/http"
)

var (
   newLine = []byte{'\n'}
   space   = []byte{' '}
)
var upgrader = websocket.Upgrader{
   CheckOrigin: func(r *http.Request) bool {
      return true
   },
}

type Client struct {
   hub       *Hub
   conn      *websocket.Conn
   send      chan []byte
   frontName []byte //前端的名字,用于展示在消息前面

}

// 从websocket连接里读出数据,发给hub
func (client *Client) read() {

   defer func() { //收尾工作
      client.hub.unregister <- client //从hub那注销client
      fmt.Printf("%s offine \n", client.frontName)
      fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
      client.conn.Close() //关闭websocket管道
   }()
   for {
      _, message, err := client.conn.ReadMessage() //如果前端主动断开连接,该航就会报错,for循环会退出。注销client时,hub那会关闭client.send管道
      if err != nil {
         fmt.Println(err)
         break //只要ReadMessage失败,就关闭websocket管道,注销client,退出
      } else {
         //换行符用空格替代,bytes.TrimSpace把首位连接的空格去掉
         message = bytes.TrimSpace(bytes.Replace(message, newLine, space, -1))
         if len(client.frontName) == 0 {
            client.frontName = message //约定:从浏览器读到的第一条消息代表前端的身份表示。该信息不进行广播
            fmt.Printf("%s online\n", string(client.frontName))
         } else {
            //要广播的内容前面加上front的名字
            client.hub.broadcast <- bytes.Join([][]byte{client.frontName, message}, []byte(": "))
            fmt.Printf("%s:%s\n", string(client.frontName), message)
         }
      }
   }
}

// 从hub的broadcast那儿读取数据,写到websocket连接里面去
func (client *Client) write() {
   defer func() {
      fmt.Printf("close connection to  %s\n", client.conn.RemoteAddr().String())
      client.conn.Close() //给前端写数据失败,就可以关闭连接
   }()
   for {
      msg, ok := <-client.send
      if !ok {
         fmt.Println("client.send管道已关闭")
         client.conn.WriteMessage(websocket.CloseMessage, []byte{}) //写一条关闭信息就可以结束一切了
         return
      }
      /*
         消息类型有5种:TextMessage BinaryMessage CloseMessage PingMessage PongMessage
      */
      if writer, err := client.conn.NextWriter(websocket.TextMessage); err != nil { //通过NextWriter创建一个新的writer,主要是为了确保上一个writer已经被关闭,即它想写的内容已经flush到conn里去了
         return
      } else {
         writer.Write(msg)
         writer.Write(newLine) //没发送一条消息,都加一个换行符
         //为了提升性能,如果client.send里还有消息,则趁这一次都写给前端
         n := len(client.send)
         for i := 0; i < n; i++ {
            writer.Write(<-client.send)
            writer.Write(newLine)
         }
         if err := writer.Close(); err != nil { //必须调close,否则下次调用client.conn.NextWriter时本条消息才会发送给浏览器
            return // 结束一切
         }
      }
   }

}
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
   conn, err := upgrader.Upgrade(w, r, nil) //http升级为websocket协议
   if err != nil {
      fmt.Printf("upgrade error : %v\n", err)
      return
   }
   fmt.Printf("connect to client %s\n", conn.RemoteAddr().String())
   //每来一个前端请求,就会创建一个client
   client := &Client{
      hub:  hub,
      conn: conn,
      send: make(chan []byte, 256),
   }
   //向hub注册client
   client.hub.register <- client
   //启动子协程,运行ServeWs的协程退出后子协程也不会能出
   //websocket是全双工模式,可以同时read和write
   go client.read()
   go client.write()
}

websocket/chatWebsocket/home.html

<!DOCTYPE html>
<html lang="en">


<head>
    <title>聊天室</title>
    <meta charset="UTF-8">
    <script type="text/javascript">
        window.onload = function () {//页面打开时执行以下初始化内容
            var conn;
            var msg = document.getElementById("msg");
            var log = document.getElementById("log");

            function appendLog(item) {
                var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
                log.appendChild(item);
                if (doScroll) {
                    log.scrollTop = log.scrollHeight - log.clientHeight;
                }
            }

            document.getElementById("form").onsubmit = function () {
                if (!conn) {
                    return false;
                }
                if (!msg.value) {
                    return false;
                }
                conn.send(msg.value);
                msg.value = "";
                return false;
            };

            if (window["WebSocket"]) {//如果支持websockte就尝试连接
                //从浏览器的开发者工具里看一下ws的请求头
                conn = new WebSocket("ws://127.0.0.1:5657/chat");//请求跟websocket服务端建立连接(注意端口要一致)。关闭浏览器页面时会自动断开连接
                conn.onclose = function (evt) {
                    var item = document.createElement("div")
                    item.innerHTML = "<b>Connection closed.</b>";//连接关闭时打印一条信息
                    appendLog(item);
                };
                conn.onmessage = function (evt) {//如果conn里有消息
                    var messages = evt.data.split('\n');//用换行符分隔每条消息
                    for (var i = 0; i < messages.length; i++) {
                        var item = document.createElement("div");
                        item.innerText = messages[i];//把消息逐条显示在屏幕上
                        appendLog(item);
                    }
                };
            } else {
                var item = document.createElement("div");
                item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
                appendLog(item);
            }
        };
    </script>
    <style type="text/css">
        html {
            overflow: hidden;
        }

        body {
            overflow: hidden;
            padding: 0;
            margin: 0;
            width: 100%;
            height: 100%;
            background: gray;
        }

        #log {
            background: white;
            margin: 0;
            padding: 0.5em 0.5em 0.5em 0.5em;
            position: absolute;
            top: 0.5em;
            left: 0.5em;
            right: 0.5em;
            bottom: 3em;
            overflow: auto;
        }

        #form {
            padding: 0 0.5em 0 0.5em;
            margin: 0;
            position: absolute;
            bottom: 1em;
            left: 0px;
            width: 100%;
            overflow: hidden;
        }
    </style>
</head>

<body>
<div id="log"></div>
<form id="form">
    <input type="submit" value="发送" />
    <input type="text" id="msg" size="100" autofocus />
</form>
</body>

</html>

websocket/chatWebsocket/hub.go

package main

type Hub struct {
   clients    map[*Client]bool //维护所有的client
   register   chan *Client     //client注册请求通道来接收
   unregister chan *Client     //client注销请求通道来接收
   broadcast  chan []byte      //需要广播的消息
}

func NewHub() *Hub {
   return &Hub{
      clients:    make(map[*Client]bool),
      register:   make(chan *Client),
      unregister: make(chan *Client),
      broadcast:  make(chan []byte), //同步管道,确保hub这里消息不会堆积,如果同时有多个client想给hub发数据就阻塞
   }
}

func (hub *Hub) Run() {
   for {
      select {
      case client := <-hub.register:
         hub.clients[client] = true // 注册client
         //注销
      case client := <-hub.unregister:
         //防止重复注销
         if _, ok := hub.clients[client]; ok {
            delete(hub.clients, client) //注销client
            close(client.send)          //hub从此以后不需要再向该client广播消息了
         }
      case msg := <-hub.broadcast:
         for client := range hub.clients {
            client.send <- msg
         }
      }

   }
}

websocket/chatWebsocket/main.go

package main

import (
   "flag"
   "fmt"
   "net/http"
)

func serveHome(w http.ResponseWriter, r *http.Request) {
   http.ServeFile(w, r, "websocket/chatWebsocket/home.html") //请求根目录直接返回一个html页面

}
func main() {
   port := flag.String("port", "5657", "http service port") //如果命令行不指定port参数,则默认为5657
   flag.Parse()                                             //解析命令行输入的port参数
   hub := NewHub()
   go hub.Run()

   http.HandleFunc("/", serveHome)
   http.HandleFunc("/chat", func(writer http.ResponseWriter, request *http.Request) {
      ServeWs(hub, writer, request)

   })
   fmt.Printf("http serve on port%s\n", *port)
   if err := http.ListenAndServe(":"+*port, nil); err != nil { //如果启动成功,该行会一直阻塞,hub.run()会一直运行。由于上面已经定义了路由,所以这里handler参数可以传nil
      fmt.Printf("start http service error:%s\n", err)
   }
}
License:  CC BY 4.0 test