文章目录
- 案例分析
- gorm源码解读
- gin context 生命周期
- context什么时候cancel的
- 什么时候context会被动cancel掉呢?
- 野生协程如何处理
案例分析
报错信息
{
"L":"ERROR",
"T":"2024-12-17T11:11:33.005+0800",
"file":"*/log.go:61",
"message":"sql_trace",
"__type":"sql",
"trace_id":"6ab69b5d333de40c8327d8572336fa2c",
"error":"context canceled; invalid connection",
"elapsed":"2.292ms",
"rows":0,
"sql":"UPDATE `logs` SET `response_time`=1734405092,`status`='success' WHERE id = 226081"
}
案发代码:
func Sync(c *gin.Context) {
var params services.Params
// 参数绑定
c.ShouldBindBodyWith(¶ms, binding.JSON)
// 参数效验
// 记录日志
...
// 开协程 更新日志
go func() {
defer helpers.Recovery(c)
models.Log{Ctx: c.Request.Context()}.UpdateLog(logId, res)
}()
c.JSON(200, response.Success(nil))
return
}
func UpdateLog(id uint, r *services.ResJson) bool {
exec := models.DefaultDB().WithContext(s.Ctx).Where("id = ?", id).Model(&Log{}).Updates(map[string]interface{}{
"status": StatusSuccess,
"response_time": time.Now().Unix(),
})
return exec.RowsAffected > 0
}
在更新数据库时,开了一个协程去更新
gorm源码解读
gorm Find、Update方法会触发GORM内部的处理器链,其中包括构建SQL语句、准备参数等。
最终,会调用到processor.Execute(db *DB)方法,这个方法会遍历并执行一系列注册的回调函数。
gorm.io/gorm@v1.25.11/finisher_api.go
// Update updates column with value using callbacks. Reference: https://gorm.io/docs/update.html#Update-Changed-Fields
func (db *DB) Update(column string, value interface{}) (tx *DB) {
tx = db.getInstance()
tx.Statement.Dest = map[string]interface{}{column: value}
return tx.callbacks.Update().Execute(tx)
}
// gorm.io/gorm@v1.25.11/callbacks.go
func (p *processor) Execute(db *DB) *DB {
...
for _, f := range p.fns {
f(db)
}
}
// 注册回调函数
gorm@v1.25.11/callbacks/callbacks.go
func RegisterDefaultCallbacks(db *gorm.DB, config *Config) {
enableTransaction := func(db *gorm.DB) bool {
return !db.SkipDefaultTransaction
}
if len(config.CreateClauses) == 0 {
config.CreateClauses = createClauses
}
if len(config.QueryClauses) == 0 {
config.QueryClauses = queryClauses
}
if len(config.DeleteClauses) == 0 {
config.DeleteClauses = deleteClauses
}
if len(config.UpdateClauses) == 0 {
config.UpdateClauses = updateClauses
}
createCallback := db.Callback().Create()
createCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
createCallback.Register("gorm:before_create", BeforeCreate)
createCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(true))
createCallback.Register("gorm:create", Create(config))
createCallback.Register("gorm:save_after_associations", SaveAfterAssociations(true))
createCallback.Register("gorm:after_create", AfterCreate)
createCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
createCallback.Clauses = config.CreateClauses
queryCallback := db.Callback().Query()
queryCallback.Register("gorm:query", Query)
queryCallback.Register("gorm:preload", Preload)
queryCallback.Register("gorm:after_query", AfterQuery)
queryCallback.Clauses = config.QueryClauses
deleteCallback := db.Callback().Delete()
deleteCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
deleteCallback.Register("gorm:before_delete", BeforeDelete)
deleteCallback.Register("gorm:delete_before_associations", DeleteBeforeAssociations)
deleteCallback.Register("gorm:delete", Delete(config))
deleteCallback.Register("gorm:after_delete", AfterDelete)
deleteCallback.Match(enableTransaction).Register("gorm:commit_or_rollback_transaction", CommitOrRollbackTransaction)
deleteCallback.Clauses = config.DeleteClauses
updateCallback := db.Callback().Update()
updateCallback.Match(enableTransaction).Register("gorm:begin_transaction", BeginTransaction)
updateCallback.Register("gorm:setup_reflect_value", SetupUpdateReflectValue)
updateCallback.Register("gorm:before_update", BeforeUpdate)
updateCallback.Register("gorm:save_before_associations", SaveBeforeAssociations(false))
updateCallback.Register("gorm:update", Update(config))
updateCallback.Register("gorm:save_after_associations", SaveAfterAssociations(false))
updateCallback.Register("gorm:after_update", AfterUpdate)
....
}
gorm.io/gorm@v1.25.11/callbacks/update.go
// Update update hook
func Update(config *Config) func(db *gorm.DB) {
supportReturning := utils.Contains(config.UpdateClauses, "RETURNING")
return func(db *gorm.DB) {
if db.Error != nil {
return
}
if db.Statement.Schema != nil {
for _, c := range db.Statement.Schema.UpdateClauses {
db.Statement.AddClause(c)
}
}
if db.Statement.SQL.Len() == 0 {
db.Statement.SQL.Grow(180)
db.Statement.AddClauseIfNotExists(clause.Update{})
if _, ok := db.Statement.Clauses["SET"]; !ok {
if set := ConvertToAssignments(db.Statement); len(set) != 0 {
defer delete(db.Statement.Clauses, "SET")
db.Statement.AddClause(set)
} else {
return
}
}
db.Statement.Build(db.Statement.BuildClauses...)
}
checkMissingWhereConditions(db)
if !db.DryRun && db.Error == nil {
if ok, mode := hasReturning(db, supportReturning); ok {
// Update函数最终会调用到底层数据库驱动的QueryContext方法,这个方法接受一个context.Context对象作为参数。
if rows, err := db.Statement.ConnPool.QueryContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...); db.AddError(err) == nil {
dest := db.Statement.Dest
db.Statement.Dest = db.Statement.ReflectValue.Addr().Interface()
gorm.Scan(rows, db, mode)
db.Statement.Dest = dest
db.AddError(rows.Close())
}
} else {
result, err := db.Statement.ConnPool.ExecContext(db.Statement.Context, db.Statement.SQL.String(), db.Statement.Vars...)
if db.AddError(err) == nil {
db.RowsAffected, _ = result.RowsAffected()
}
}
}
}
}
调用数据库驱动:
Update函数最终会调用到底层数据库驱动的QueryContext方法,这个方法接受一个context.Context对象作为参数。
go1.22.3/src/database/sql/sql.go:1727
// QueryContext executes a query that returns rows, typically a SELECT.
// The args are for any placeholder parameters in the query.
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error) {
var rows *Rows
var err error
err = db.retry(func(strategy connReuseStrategy) error {
rows, err = db.query(ctx, query, args, strategy)
return err
})
return rows, err
}
底层数据库连接:
QueryContext方法会进一步调用query方法,这个方法会处理数据库连接的重试逻辑。
在query方法中,会调用conn方法来获取一个数据库连接,并在这个连接上执行查询。
conn方法会处理context的取消和超时信号,如果context被取消或超时,它会中断数据库连接操作并返回错误。
go1.22.3/src/database/sql/sql.go:1748
func (db *DB) query(ctx context.Context, query string, args []any, strategy connReuseStrategy) (*Rows, error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.queryDC(ctx, nil, dc, dc.releaseConn, query, args)
}
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
那为什么会出现context canceled?
gin context 生命周期
大多数情况下,context一直能持续到请求结束
当请求发生错误的时候,context会立刻被cancel掉
context什么时候cancel的
server端接受新请求时会起一个协程go c.serve(connCtx)
func (srv *Server) Serve(l net.Listener) error {
// ...
for {
rw, err := l.Accept()
connCtx := ctx
// ...
go c.serve(connCtx)
}
}
协程里面for循环从链接中读取请求,重点是这里每次读取到请求的时候都会启动后台协程(w.conn.r.startBackgroundRead())继续从链接中读取。
// Serve a new connection.
func (c *conn) serve(ctx context.Context) {
// ...
// HTTP/1.x from here on.
ctx, cancelCtx := context.WithCancel(ctx)
c.cancelCtx = cancelCtx
defer cancelCtx()
// ...
for {
// 从链接中读取请求
w, err := c.readRequest(ctx)
if c.r.remain != c.server.initialReadLimitSize() {
// If we read any bytes off the wire, we're active.
c.setState(c.rwc, StateActive, runHooks)
}
// ....
// 启动协程后台读取链接
if requestBodyRemains(req.Body) {
registerOnHitEOF(req.Body, w.conn.r.startBackgroundRead)
} else {
w.conn.r.startBackgroundRead()
}
// ...
// 这里转到gin里面的serverHttp方法
serverHandler{c.server}.ServeHTTP(w, w.req)
// 请求结束之后cancel掉context
w.cancelCtx()
// ...
}
}
gin中执行ServeHttp方法
// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// ...
// 执行我们写的handle方法
engine.handleHTTPRequest(c)
// ...
}
正常请求结束之后gin框架会主动cancel掉context, ctx会清空,回收到ctx pool中。
// github.com/gin-gonic/gin@v1.7.7/gin.go
// ServeHTTP conforms to the http.Handler interface.
func (engine *Engine) ServeHTTP(w http.ResponseWriter, req *http.Request) {
c := engine.pool.Get().(*Context)
c.writermem.reset(w)
c.Request = req
c.reset()
engine.handleHTTPRequest(c)
engine.pool.Put(c)
}
// github.com/gin-gonic/gin@v1.7.7/context.go
func (c *Context) reset() {
c.Writer = &c.writermem
c.Params = c.Params[0:0]
c.handlers = nil
c.index = -1
c.fullPath = ""
c.Keys = nil
c.Errors = c.Errors[0:0]
c.Accepted = nil
c.queryCache = nil
c.formCache = nil
*c.params = (*c.params)[:0]
*c.skippedNodes = (*c.skippedNodes)[:0]
}
什么时候context会被动cancel掉呢?
秘密就在w.conn.r.startBackgroundRead()
这个后台读取的协程里了。
func (cr *connReader) startBackgroundRead() {
// ...
go cr.backgroundRead()
}
func (cr *connReader) backgroundRead() {
n, err := cr.conn.rwc.Read(cr.byteBuf[:])
// ...
if ne, ok := err.(net.Error); ok && cr.aborted && ne.Timeout() {
// Ignore this error. It's the expected error from
// another goroutine calling abortPendingRead.
} else if err != nil {
cr.handleReadError(err)
}
// ...
}
func (cr *connReader) handleReadError(_ error) {
// 这里cancel了context
cr.conn.cancelCtx()
cr.closeNotify()
}
startBackgroundRead
-> backgroundRead
-> handleReadError
。在handleReadError函数里面会把context cancel掉。
当服务端在处理业务的同时,后台有个协程监控链接的状态,如果链接有问题就会把context cancel掉。(cancel的目的就是快速失败——业务不用处理了,就算服务端返回结果了,客户端也不处理了)
野生协程如何处理
- http请求如有野生协程,不能使用request context(因为response之后context就会被cancel掉了),应当使用独立的context(比如
context.Background()
) - 禁用野生协程,控制协程生命周期