文章目录
- 异常信息
- 原由
- 代码
- 错误点
- 解决办法
异常信息
panic: concurrent write to websocket connection
原由
golang 编写 websocket
go版本:1.19
使用了第三方框架: https://github.com/gorilla/websocket/tree/main
代码
server.go
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"html/template"
"log"
"net/http"
"github.com/gorilla/websocket"
)
var addr = flag.String("addr", "localhost:8080", "http service address")
var upgrader = websocket.Upgrader{} // use default options
func echo(w http.ResponseWriter, r *http.Request) {
c, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Print("upgrade:", err)
return
}
defer c.Close()
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
break
}
fmt.Println(string(message))
fmt.Println(mt)
//log.Printf("recv: %s, type: %s", message, websocket.FormatMessageType(mt))
err = c.WriteMessage(mt, message)
if err != nil {
log.Println("write:", err)
break
}
}
}
func home(w http.ResponseWriter, r *http.Request) {
homeTemplate.Execute(w, "ws://"+r.Host+"/echo")
}
func main() {
flag.Parse()
log.SetFlags(0)
http.HandleFunc("/echo", echo)
http.HandleFunc("/", home)
log.Fatal(http.ListenAndServe(*addr, nil))
}
var homeTemplate = template.Must(template.New("").Parse(`
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script>
window.addEventListener("load", function(evt) {
var output = document.getElementById("output");
var input = document.getElementById("input");
var ws;
var print = function(message) {
var d = document.createElement("div");
d.textContent = message;
output.appendChild(d);
output.scroll(0, output.scrollHeight);
};
document.getElementById("open").onclick = function(evt) {
if (ws) {
return false;
}
ws = new WebSocket("{{.}}");
ws.onopen = function(evt) {
print("OPEN");
}
ws.onclose = function(evt) {
print("CLOSE");
ws = null;
}
ws.onmessage = function(evt) {
print("RESPONSE: " + evt.data);
}
ws.onerror = function(evt) {
print("ERROR: " + evt.data);
}
return false;
};
document.getElementById("send").onclick = function(evt) {
if (!ws) {
return false;
}
print("SEND: " + input.value);
ws.send(input.value);
return false;
};
document.getElementById("close").onclick = function(evt) {
if (!ws) {
return false;
}
ws.close();
return false;
};
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server,
"Send" to send a message to the server and "Close" to close the connection.
You can change the message and send multiple times.
<p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<p><input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output" style="max-height: 70vh;overflow-y: scroll;"></div>
</td></tr></table>
</body>
</html>
`))
client.go
// Copyright 2015 The Gorilla WebSocket Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"flag"
"fmt"
"log"
"net/url"
"os"
"os/signal"
"time"
"github.com/gorilla/websocket"
)
var addr = flag.String("addr", "localhost:8080", "http service address")
func main() {
flag.Parse()
log.SetFlags(0)
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
log.Printf("connecting to %s", u.String())
c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
if err != nil {
log.Fatal("dial:", err)
}
defer c.Close()
done := make(chan struct{})
go func() {
// 发送Ping帧,检查连接是否活跃
for {
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Println("Failed to send Ping: ", err)
return
}
fmt.Println("Ping success")
time.Sleep(10 * time.Second)
}
}()
go func() {
defer close(done)
for {
mt, message, err := c.ReadMessage()
if err != nil {
log.Println("read:", err)
return
}
fmt.Println(mt)
fmt.Println(string(message)) // 时间
//log.Printf("recv: %s, type: %s", message, websocket.FormatMessageType(mt))
}
}()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case t := <-ticker.C:
err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
if err != nil {
log.Println("write:", err)
return
}
case <-interrupt:
log.Println("interrupt")
// Cleanly close the connection by sending a close message and then
// waiting (with timeout) for the server to close the connection.
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
if err != nil {
log.Println("write close:", err)
return
}
select {
case <-done:
case <-time.After(time.Second):
}
return
}
}
}
错误点
我希望在连接过程中,通信双方一直检测,也就使用了 PING
,检测活跃。
go func() {
// 发送Ping帧,检查连接是否活跃
for {
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Println("Failed to send Ping: ", err)
return
}
fmt.Println("Ping success")
time.Sleep(10 * time.Second)
}
}()
出现了开始的错误信息:panic: concurrent write to websocket connection
,错误信息说:不能并发的给 socket 发消息。
错误 “concurrent write to websocket connection” 指的是有多个goroutine尝试同时向同一个WebSocket连接写入数据,这是不被允许的。gorilla/websocket 库并不是为并发写操作设计的,因此你需要确保对每个WebSocket连接的写操作在任何时候只由一个goroutine执行。
解决这个问题的方法是使用同步机制,比如互斥锁(sync.Mutex),来同步对WebSocket连接的写操作。下面是一个修改后的示例,展示如何使用互斥锁来避免并发写的问题:
解决办法
在这个示例中,我们定义了一个WebSocketConnection结构体,它包含一个websocket.Conn和一个sync.Mutex。在发送Ping消息的goroutine中,我们在写操作之前获取互斥锁,并在写操作完成后释放锁。这样可以确保在任何时候只有一个goroutine能够执行写操作。
请注意,如果还有其他goroutine需要写入WebSocket连接,它们也需要在执行写操作前获取互斥锁,并在完成后释放锁。这样可以避免并发写入的问题,并确保WebSocket连接的正确使用。
示例代码
package main
import (
"log"
"net/http"
"sync"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// 定义一个结构体来包含WebSocket连接和互斥锁
type WebSocketConnection struct {
Conn *websocket.Conn
Lock sync.Mutex
}
func handleConnections(ws *websocket.Conn) {
defer ws.Close()
log.Println("Connection established")
// 创建WebSocketConnection实例
conn := &WebSocketConnection{
Conn: ws,
Lock: sync.Mutex{},
}
// Ping goroutine
go func() {
for {
// 使用互斥锁来同步写操作
conn.Lock()
if err := ws.WriteMessage(websocket.PingMessage, nil); err != nil {
log.Println("Failed to send Ping: ", err)
return
}
conn.Unlock()
time.Sleep(10 * time.Second)
}
}()
// 消息处理goroutine
go func() {
// 这里可以处理接收到的消息等
// ...
}()
// 这里可以添加更多的goroutine来处理不同的任务
// ...
}
func main() {
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println("upgrade:", err)
return
}
go handleConnections(ws)
})
log.Fatal(http.ListenAndServe(":8080", nil))
}