refactor: WebSocket 握手逻辑改进以及心跳包检测

This commit is contained in:
sjlleo
2022-08-09 03:25:35 -04:00
parent b09d4bab74
commit a4124b50aa

View File

@@ -12,6 +12,8 @@ import (
)
type WsConn struct {
Connecting bool
Connected bool // 连接状态
MsgSendCh chan string // 消息发送通道
MsgReceiveCh chan string // 消息接收通道
Done chan struct{} // 发送结束通道
@@ -23,14 +25,43 @@ type WsConn struct {
var wsconn *WsConn
func (c *WsConn) messageReceiveHandler() {
defer close(c.Done)
for {
_, msg, err := c.Conn.ReadMessage()
if err != nil {
return
func (c *WsConn) keepAlive() {
go func() {
// 开启一个定时器
for {
<-time.After(time.Second * 54)
if c.Connected {
c.Conn.WriteMessage(websocket.TextMessage, []byte("ping"))
}
}
}()
for {
if !c.Connected && !c.Connecting {
c.Connecting = true
c.recreateWsConn()
// log.Println("WebSocket 连接意外断开,正在尝试重连...")
// return
}
// 降低检测频率,优化 CPU 占用情况
<-time.After(200 * time.Millisecond)
}
}
func (c *WsConn) messageReceiveHandler() {
// defer close(c.Done)
for {
if c.Connected {
_, msg, err := c.Conn.ReadMessage()
if err != nil {
// 读取信息出错,连接已经意外断开
// log.Println(err)
c.Connected = false
return
}
if string(msg) != "pong" {
c.MsgReceiveCh <- string(msg)
}
}
c.MsgReceiveCh <- string(msg)
}
}
@@ -39,20 +70,26 @@ func (c *WsConn) messageSendHandler() {
// 循环监听发送
select {
case <-c.Done:
log.Println("发送协程已经退出")
return
case t := <-c.MsgSendCh:
err := c.Conn.WriteMessage(websocket.TextMessage, []byte(t))
if err != nil {
log.Println("write:", err)
return
// log.Println(t)
if !c.Connected {
c.MsgReceiveCh <- `{"ip":"` + t + `", "asnumber":"API服务端异常"}`
} else {
err := c.Conn.WriteMessage(websocket.TextMessage, []byte(t))
if err != nil {
log.Println("write:", err)
return
}
}
// 来自终端的中断运行请求
case <-c.Interrupt:
// 向 websocket 发起关闭连接任务
err := c.Conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
// log.Println("write close:", err)
os.Exit(1)
}
select {
// 等到了结果,直接退出
@@ -66,6 +103,27 @@ func (c *WsConn) messageSendHandler() {
}
}
func (c *WsConn) recreateWsConn() {
u := url.URL{Scheme: "wss", Host: "api.leo.moe", Path: "/v2/ipGeoWs"}
// log.Printf("connecting to %s", u.String())
ws, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
c.Conn = ws
if err != nil {
log.Println("dial:", err)
// <-time.After(time.Second * 1)
c.Connected = false
c.Connecting = false
return
} else {
c.Connected = true
}
c.Connecting = false
c.Done = make(chan struct{})
go c.messageReceiveHandler()
}
func createWsConn() *WsConn {
// 设置终端中断通道
interrupt := make(chan os.Signal, 1)
@@ -75,17 +133,28 @@ func createWsConn() *WsConn {
// log.Printf("connecting to %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
wsconn = &WsConn{
Conn: c,
Connected: true,
Connecting: false,
MsgSendCh: make(chan string, 10),
MsgReceiveCh: make(chan string, 10),
}
if err != nil {
log.Fatal("dial:", err)
log.Println("dial:", err)
// <-time.After(time.Second * 1)
wsconn.Connected = false
wsconn.Done = make(chan struct{})
go wsconn.keepAlive()
go wsconn.messageSendHandler()
return wsconn
}
// defer c.Close()
// 将连接写入WsConn方便随时可取
wsconn = &WsConn{
Conn: c,
MsgSendCh: make(chan string),
MsgReceiveCh: make(chan string),
}
wsconn.Done = make(chan struct{})
go wsconn.keepAlive()
go wsconn.messageReceiveHandler()
go wsconn.messageSendHandler()
return wsconn