WebSocket编程
WebSocket编程
WebSocket协议解读
websocket和http协议的关联:
- 都是应用层协议,都基于tcp传输协议。
- 跟http有良好的兼容性,ws和http的默认端口都是80,wss和https的默认端口都是443。
- websocket在握手阶段采用http发送数据。
websocket和http协议的差异:
- http是半双工,而websocket通过多路复用实现了全双工。
- http只能由client主动发起数据请求,而websocket还可以由server主动向client推送数据。在需要及时刷新的场景中,http只能靠client高频地轮询,浪费严重。
- http是短连接(也可以实现长连接, HTTP1.1 的连接默认使用长连接),每次数据请求都得经过三次握手重新建立连接,而websocket是长连接。
- http长连接中每次请求都要带上header,而websocket在传输数据阶段不需要带header。
WebSocket是HTML5下的产物,能更好的节省服务器资源和带宽,
websocket应用场景举例
- html5多人游戏
- 聊天室
- 协同编辑
- 基于实时位置的应用
- 股票实时报价
- 弹幕
- 视频会议
websocket握手协议:
Request Header
Sec-Websocket-Version:13
Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Key:duR0pUQxNgBJsRQKj2Jxsw==
Response Header
Upgrade:websocket
Connection:Upgrade
Sec-Websocket-Accept:a1y2oy1zvgHsVyHMx+hZ1AYrEHI=
- Upgrade:websocket和Connection:Upgrade指明使用WebSocket协议。
- Sec-WebSocket-Version 指定Websocket协议版本。
- Sec-WebSocket-Key是一个Base64 encode的值,是浏览器随机生成的。
- 服务端收到Sec-WebSocket-Key后拼接上一个固定的GUID,进行一次SHA-1摘要,再转成Base64编码,得到Sec-WebSocket-Accept返回给客户端。客户端对本地的Sec-WebSocket-Key执行同样的操作跟服务端返回的结果进行对比,如果不一致会返回错误关闭连接。如此操作是为了把websocket header跟http header区分开。
WebSocket CS架构实现
首先需要安装gorilla的websocket包。
go get github.com/gorilla/websocket
- 将http升级到WebSocket协议。
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*websocket.Conn, error)
- 客户端发起握手,请求建立连接。
func (*websocket.Dialer) Dial(urlStr string, requestHeader http.Header) (*websocket.Conn, *http.Response, error)
- 基于connection进行read和write。
go web socket API
- websocket发送的消息类型有5种:TextMessag,BinaryMessage, CloseMessag,PingMessage,PongMessage。
- TextMessag和BinaryMessage分别表示发送文本消息和二进制消息。
- CloseMessage关闭帧,接收方收到这个消息就关闭连接
- PingMessage和PongMessage是保持心跳的帧,发送方接收方是PingMessage,接收方发送方是PongMessage,目前浏览器没有相关api发送ping给服务器,只能由服务器发ping给浏览器,浏览器返回pong消息。
目录
websocket/client/client.go
package main
import (
"fmt"
"github.com/gorilla/websocket"
"go_mashi/websocket/common"
"io"
"net/http"
"strconv"
)
func main() {
dialer := &websocket.Dialer{}
header := http.Header{
"Name": []string{"yama"},
}
//Dial:握手阶段,会发送一条http请求。请求一个不存在的路径试试看
conn, resp, err := dialer.Dial("ws://localhost:5657", header)
if err != nil {
fmt.Printf("dial 失败%s\n", err.Error())
return
}
defer resp.Body.Close()
fmt.Println("statusCode" + strconv.Itoa(resp.StatusCode))
msg, _ := io.ReadAll(resp.Body)
fmt.Print(string(msg))
fmt.Println("response header")
for k, v := range resp.Header {
fmt.Printf("key=%s value=%s\n", k, v)
}
defer conn.Close()
for i := 0; i < 5; i++ {
request := common.Request{A: 3, B: 5}
conn.WriteJSON(request) //websocket.Conn直接提供发json序列化和反序列化方法
var response common.Response
conn.ReadJSON(&response)
fmt.Println(response.Sum)
}
}
websocket/common/model.go
package common
type Request struct {
A int
B int
}
type Response struct {
Sum int
}
websocket/server/server.go
package main
import (
"fmt"
"github.com/gorilla/websocket"
"go_mashi/websocket/common"
"net"
"net/http"
"strconv"
"time"
)
type WsServer struct {
//listener net.Listener
addr string
upgrade *websocket.Upgrader
}
func NewWsServer(port int) *WsServer {
ws := new(WsServer)
ws.addr = "0.0.0.0:" + strconv.Itoa(port)
ws.upgrade = &websocket.Upgrader{
HandshakeTimeout: 5 * time.Second, //握手超时时间
ReadBufferSize: 4096, //读缓冲大小
WriteBufferSize: 4096, //写缓冲大小
}
//没有给listener赋值
return ws
}
// httpHandler必须实现ServeHTTP接口
func (ws *WsServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
//if r.URL.Path != "/Add" {
// fmt.Println("Path error")
// http.Error(w, "请求的路径不存在", 222) //把出错的话术写到ResponseWriter里
// return
//}
conn, err := ws.upgrade.Upgrade(w, r, nil) //将http协议升级到websocket协议
if err != nil {
fmt.Printf("升级失败%s\n", err.Error())
return
}
fmt.Printf("跟客户端%s建立好了websocket连接\n", r.RemoteAddr)
go ws.handleOneConnection(conn)
}
// 处理连接里发来的请求数据
func (ws *WsServer) handleOneConnection(conn *websocket.Conn) {
defer func() {
conn.Close()
}()
//长连接
for {
conn.SetReadDeadline(time.Now().Add(20 * time.Second))
var request common.Request
if err := conn.ReadJSON(&request); err != nil {
//判断是不是超时
if netError, ok := err.(net.Error); ok {
if netError.Timeout() {
fmt.Printf("发生了读超时")
return
}
}
fmt.Println(err)
return
}
response := common.Response{
Sum: request.A + request.B,
}
if err := conn.WriteJSON(response); err != nil {
fmt.Println(err)
}
}
}
func (ws *WsServer) start() error {
//var err error
//ws.listener, err = net.Listen("tcp", ws.addr)
//if err != nil {
// fmt.Printf("listen 失败 %s\n", err.Error())
// return err
//}
//if err = http.Serve(ws.listener, ws); err != nil {
// fmt.Printf("serve失败 %s\n", err.Error())
// return err
//}
if err := http.ListenAndServe(ws.addr, ws); err != nil {
fmt.Printf("ListenAndServe失败 %s\n", err.Error())
return err
} else {
return nil
}
}
func main() {
ws := NewWsServer(5657)
ws.start()
}
聊于室实现
gorilla的websocket项目中有一个聊天室的demo,此处讲一下它的设计思路。我们的代码基于原代码进行了简化和修改,并加上中文注释。总体架构如下图所示
Hub
- Hub持有每一个Client的指针,broadcast管道里有数据时把它写入每一个Client的send管道中。
- 注销Client时关闭Client的send管道
Client
- 前端(browser)请求建立websocket连接时,为这条websocket连接专门启一个协程,创建一个client。
- client把前端发过来的数据写入hub的broadcast管道。
- client把自身send管道里的数据写给前端。
- client跟前端的连接断开时请求从hub那儿注销自己。
Front
- 当打开浏览器页面时,前端会请求建立websocket连接。
- 关闭浏览器页面时会主动关闭websocket连接。
存活监测
- 当hub发现client的send管道写不进数据时,把client注销掉。
- client给websocket连接设置一个读超时,并周期性地给前端发ping消息,如果没有收到pong消息则下一次的conn.read()会报出超时错误,此时client关闭websocket连接。
案例展示
目录:
websocket/chatWebsocket/client.go
package main
import (
"bytes"
"fmt"
"github.com/gorilla/websocket"
"net/http"
)
var (
newLine = []byte{'\n'}
space = []byte{' '}
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type Client struct {
hub *Hub
conn *websocket.Conn
send chan []byte
frontName []byte //前端的名字,用于展示在消息前面
}
// 从websocket连接里读出数据,发给hub
func (client *Client) read() {
defer func() { //收尾工作
client.hub.unregister <- client //从hub那注销client
fmt.Printf("%s offine \n", client.frontName)
fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
client.conn.Close() //关闭websocket管道
}()
for {
_, message, err := client.conn.ReadMessage() //如果前端主动断开连接,该航就会报错,for循环会退出。注销client时,hub那会关闭client.send管道
if err != nil {
fmt.Println(err)
break //只要ReadMessage失败,就关闭websocket管道,注销client,退出
} else {
//换行符用空格替代,bytes.TrimSpace把首位连接的空格去掉
message = bytes.TrimSpace(bytes.Replace(message, newLine, space, -1))
if len(client.frontName) == 0 {
client.frontName = message //约定:从浏览器读到的第一条消息代表前端的身份表示。该信息不进行广播
fmt.Printf("%s online\n", string(client.frontName))
} else {
//要广播的内容前面加上front的名字
client.hub.broadcast <- bytes.Join([][]byte{client.frontName, message}, []byte(": "))
fmt.Printf("%s:%s\n", string(client.frontName), message)
}
}
}
}
// 从hub的broadcast那儿读取数据,写到websocket连接里面去
func (client *Client) write() {
defer func() {
fmt.Printf("close connection to %s\n", client.conn.RemoteAddr().String())
client.conn.Close() //给前端写数据失败,就可以关闭连接
}()
for {
msg, ok := <-client.send
if !ok {
fmt.Println("client.send管道已关闭")
client.conn.WriteMessage(websocket.CloseMessage, []byte{}) //写一条关闭信息就可以结束一切了
return
}
/*
消息类型有5种:TextMessage BinaryMessage CloseMessage PingMessage PongMessage
*/
if writer, err := client.conn.NextWriter(websocket.TextMessage); err != nil { //通过NextWriter创建一个新的writer,主要是为了确保上一个writer已经被关闭,即它想写的内容已经flush到conn里去了
return
} else {
writer.Write(msg)
writer.Write(newLine) //没发送一条消息,都加一个换行符
//为了提升性能,如果client.send里还有消息,则趁这一次都写给前端
n := len(client.send)
for i := 0; i < n; i++ {
writer.Write(<-client.send)
writer.Write(newLine)
}
if err := writer.Close(); err != nil { //必须调close,否则下次调用client.conn.NextWriter时本条消息才会发送给浏览器
return // 结束一切
}
}
}
}
func ServeWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil) //http升级为websocket协议
if err != nil {
fmt.Printf("upgrade error : %v\n", err)
return
}
fmt.Printf("connect to client %s\n", conn.RemoteAddr().String())
//每来一个前端请求,就会创建一个client
client := &Client{
hub: hub,
conn: conn,
send: make(chan []byte, 256),
}
//向hub注册client
client.hub.register <- client
//启动子协程,运行ServeWs的协程退出后子协程也不会能出
//websocket是全双工模式,可以同时read和write
go client.read()
go client.write()
}
websocket/chatWebsocket/home.html
<!DOCTYPE html>
<html lang="en">
<head>
<title>聊天室</title>
<meta charset="UTF-8">
<script type="text/javascript">
window.onload = function () {//页面打开时执行以下初始化内容
var conn;
var msg = document.getElementById("msg");
var log = document.getElementById("log");
function appendLog(item) {
var doScroll = log.scrollTop > log.scrollHeight - log.clientHeight - 1;
log.appendChild(item);
if (doScroll) {
log.scrollTop = log.scrollHeight - log.clientHeight;
}
}
document.getElementById("form").onsubmit = function () {
if (!conn) {
return false;
}
if (!msg.value) {
return false;
}
conn.send(msg.value);
msg.value = "";
return false;
};
if (window["WebSocket"]) {//如果支持websockte就尝试连接
//从浏览器的开发者工具里看一下ws的请求头
conn = new WebSocket("ws://127.0.0.1:5657/chat");//请求跟websocket服务端建立连接(注意端口要一致)。关闭浏览器页面时会自动断开连接
conn.onclose = function (evt) {
var item = document.createElement("div")
item.innerHTML = "<b>Connection closed.</b>";//连接关闭时打印一条信息
appendLog(item);
};
conn.onmessage = function (evt) {//如果conn里有消息
var messages = evt.data.split('\n');//用换行符分隔每条消息
for (var i = 0; i < messages.length; i++) {
var item = document.createElement("div");
item.innerText = messages[i];//把消息逐条显示在屏幕上
appendLog(item);
}
};
} else {
var item = document.createElement("div");
item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
appendLog(item);
}
};
</script>
<style type="text/css">
html {
overflow: hidden;
}
body {
overflow: hidden;
padding: 0;
margin: 0;
width: 100%;
height: 100%;
background: gray;
}
#log {
background: white;
margin: 0;
padding: 0.5em 0.5em 0.5em 0.5em;
position: absolute;
top: 0.5em;
left: 0.5em;
right: 0.5em;
bottom: 3em;
overflow: auto;
}
#form {
padding: 0 0.5em 0 0.5em;
margin: 0;
position: absolute;
bottom: 1em;
left: 0px;
width: 100%;
overflow: hidden;
}
</style>
</head>
<body>
<div id="log"></div>
<form id="form">
<input type="submit" value="发送" />
<input type="text" id="msg" size="100" autofocus />
</form>
</body>
</html>
websocket/chatWebsocket/hub.go
package main
type Hub struct {
clients map[*Client]bool //维护所有的client
register chan *Client //client注册请求通道来接收
unregister chan *Client //client注销请求通道来接收
broadcast chan []byte //需要广播的消息
}
func NewHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan []byte), //同步管道,确保hub这里消息不会堆积,如果同时有多个client想给hub发数据就阻塞
}
}
func (hub *Hub) Run() {
for {
select {
case client := <-hub.register:
hub.clients[client] = true // 注册client
//注销
case client := <-hub.unregister:
//防止重复注销
if _, ok := hub.clients[client]; ok {
delete(hub.clients, client) //注销client
close(client.send) //hub从此以后不需要再向该client广播消息了
}
case msg := <-hub.broadcast:
for client := range hub.clients {
client.send <- msg
}
}
}
}
websocket/chatWebsocket/main.go
package main
import (
"flag"
"fmt"
"net/http"
)
func serveHome(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, "websocket/chatWebsocket/home.html") //请求根目录直接返回一个html页面
}
func main() {
port := flag.String("port", "5657", "http service port") //如果命令行不指定port参数,则默认为5657
flag.Parse() //解析命令行输入的port参数
hub := NewHub()
go hub.Run()
http.HandleFunc("/", serveHome)
http.HandleFunc("/chat", func(writer http.ResponseWriter, request *http.Request) {
ServeWs(hub, writer, request)
})
fmt.Printf("http serve on port%s\n", *port)
if err := http.ListenAndServe(":"+*port, nil); err != nil { //如果启动成功,该行会一直阻塞,hub.run()会一直运行。由于上面已经定义了路由,所以这里handler参数可以传nil
fmt.Printf("start http service error:%s\n", err)
}
}