• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

一个非侵入的Go事务管理库——工作原理

开发技术 开发技术 2周前 (06-21) 15次浏览

在上一篇文章“一个非侵入的Go事务管理库——如何使用”中,我讲述了如何使用事务库。有些读者可能读过"清晰架构(Clean Architecture)的Go微服务: 事物管理" ,其中描述了事务管理系统的旧版本。那篇文章和本文之间会有一些重叠。因为大多数人可能还没有读过那篇文章或者即使读了也忘记了它的内容。因此为了照顾多数读者,本文还是从头开始(假设你没有读过前文)。如果你读过,那你可以直接跳过熟悉的部分。

好的事务库对于使用它的应用程序是透明的。在Go的“sql”库中,有两种类型的数据库链接,“sql.DB”和“sql.Tx”。当你不需要事务支持时,使用“sql.DB”;否则使用“sql.Tx”。为了让这两种不同场景共享相同的持久层代码,我们需要对数据库链接进行一个封装来同时支持这两种场景。我从"db transaction in golang" 里得到了这个想法。

数据库层的接口

数据库层是事务管理库中处理数据库访问的最低层。应用程序不需要修改该层,只有事务管理库需要这样做。

数据库访问封装

下面是可同时支持事务和非事务操作的共享数据库访问接口, 它在“gdbc.go”中定义。

// SqlGdbc (SQL Go database connection) is a wrapper for SQL database handler ( can be *sql.DB or *sql.Tx)
// It should be able to work with all SQL data that follows SQL standard.
type SqlGdbc interface {
	Exec(query string, args ...interface{}) (sql.Result, error)
	Prepare(query string) (*sql.Stmt, error)
	Query(query string, args ...interface{}) (*sql.Rows, error)
	QueryRow(query string, args ...interface{}) *sql.Row
	// If need transaction support, add this interface
	Transactioner
}

// Transactioner is the transaction interface for database handler
// It should only be applicable to SQL database
type Transactioner interface {
	// Rollback a transaction
	Rollback() error
	// Commit a transaction
	Commit() error
	// TxEnd commits a transaction if no errors, otherwise rollback
	// txFunc is the operations wrapped in a transaction
	TxEnd(txFunc func() error) error

}

它有两部分。一个是数据库接口,它包含了常规的数据库操作,如查询表、更新表记录。另一个事务接口,它包含里支持事务所需要的函数,如“提交”和“回滚”。“SqlGdbc”接口是两者的结合。该接口将用于连接数据库。

数据库访问接口的实现

下面是数据库访问接口的代码实现。它在“sqlConnWrapper.go”文件中。它定义了两个结构体,“SqlDBTx”是对“sql.DB”的封装,将被非事务函数使用。“SqlConnTx”是对“sql.Tx”的封装,将被事务函数使用。

// SqlDBTx is the concrete implementation of sqlGdbc by using *sql.DB
type SqlDBTx struct {
	DB *sql.DB
}

// SqlConnTx is the concrete implementation of sqlGdbc by using *sql.Tx
type SqlConnTx struct {
	DB *sql.Tx
}

func (sdt *SqlDBTx) Exec(query string, args ...interface{}) (sql.Result, error) {
	return sdt.DB.Exec(query, args...)
}

func (sdt *SqlDBTx) Prepare(query string) (*sql.Stmt, error) {
	return sdt.DB.Prepare(query)
}

func (sdt *SqlDBTx) Query(query string, args ...interface{}) (*sql.Rows, error) {
	return sdt.DB.Query(query, args...)
}

func (sdt *SqlDBTx) QueryRow(query string, args ...interface{}) *sql.Row {
	return sdt.DB.QueryRow(query, args...)
}

func (sdb *SqlConnTx) Exec(query string, args ...interface{}) (sql.Result, error) {
	return sdb.DB.Exec(query, args...)
}

func (sdb *SqlConnTx) Prepare(query string) (*sql.Stmt, error) {
	return sdb.DB.Prepare(query)
}

func (sdb *SqlConnTx) Query(query string, args ...interface{}) (*sql.Rows, error) {
	return sdb.DB.Query(query, args...)
}

func (sdb *SqlConnTx) QueryRow(query string, args ...interface{}) *sql.Row {
	return sdb.DB.QueryRow(query, args...)
}

事务接口的实现

下面是“Transactioner”接口的代码实现,它在文件 "txConn.go"中。我从"database/sql Tx — detecting Commit or Rollback"中得到这个想法。

因为“SqlDBTx”不支持事务,所以它的所有函数都返回“nil"。

// DB doesn't rollback, do nothing here
func (cdt *SqlDBTx) Rollback() error {
	return nil
}

//DB doesnt commit, do nothing here
func (cdt *SqlDBTx) Commit() error {
	return nil
}

// DB doesnt rollback, do nothing here
func (cdt *SqlDBTx) TxEnd(txFunc func() error) error {
	return nil
}

func (sct *SqlConnTx) TxEnd(txFunc func() error) error {
	var err error
	tx := sct.DB

	defer func() {
		if p := recover(); p != nil {
			log.Println("found p and rollback:", p)
			tx.Rollback()
			panic(p) // re-throw panic after Rollback
		} else if err != nil {
			log.Println("found error and rollback:", err)
			tx.Rollback() // err is non-nil; don't change it
		} else {
			log.Println("commit:")
			err = tx.Commit() // if Commit returns error update err with commit err
		}
	}()
	err = txFunc()
	return err
}

func (sct *SqlConnTx) Rollback() error {
	return sct.DB.Rollback()
}

func (sct *SqlConnTx) Commit() error {
	return sct.DB.Commit()
}

持久层的接口

在数据库层之上是持久层,应用程序使用持久层来访问数据库表中的记录。你需要定义一个函数在本层中实现对事务的支持。下面是持久层的事务接口,它位于“txDataService.go”文件中。

// TxDataInterface represents operations needed for transaction support.
type TxDataInterface interface {
	// EnableTx is called at the end of a transaction and based on whether there is an error, it commits or rollback the
	// transaction.
	// txFunc is the business function wrapped in a transaction
	EnableTx(txFunc func() error) error
}

以下是它的实现代码。它只是调用下层数据库中的函数“TxEnd()”,该函数已在数据库层实现。下面的代码不是事务库的代码(它是本文中惟一的不是事务库中的代码),你需要在应用程序中实现它。

func (uds *UserDataSql) EnableTx(txFunc func() error) error {
	return uds.DB.TxEnd(txFunc)
}

获取数据库链接的代码

除了我们上面描述的调用接口之外,在应用程序中你还需要先获得数据库链接。事务库中有两个函数可以完成这个任务。

返回"SqlGdbc"接口的函数

函数"Build()"(在"factory.go"中)将返回"SqlGdbc"接口。根据传入的参数,它讲返回满足"SqlGdbc"接口的结构,如果需要事务支持就是“SqlConnTx”,不需要就是“SqlDBTx”。如果你不需要在应用程序中直接使用数据库链接,那么调用它是最好的。

// Build returns the SqlGdbc interface. This is the interface that you can use directly in your persistence layer
// If you don't need to cache sql.DB connection, you can call this function because you won't be able to get the sql.DB
// in SqlGdbc interface (if you need to do it, call BuildSqlDB()
func Build(dsc *config.DatabaseConfig) (gdbc.SqlGdbc, error) {
	db, err := sql.Open(dsc.DriverName, dsc.DataSourceName)
	if err != nil {
		return nil, errors.Wrap(err, "")
	}
	// check the connection
	err = db.Ping()
	if err != nil {
		return nil, errors.Wrap(err, "")
	}
	dt, err := buildGdbc(db, dsc)
	if err != nil {
		return nil, err
	}
	return dt, nil
}

func buildGdbc(sdb *sql.DB,dsc *config.DatabaseConfig) (gdbc.SqlGdbc, error){
	var sdt gdbc.SqlGdbc
	if dsc.Tx {
		tx, err := sdb.Begin()
		if err != nil {
			return nil, err
		}
		sdt = &gdbc.SqlConnTx{DB: tx}
		log.Println("buildGdbc(), create TX:")
	} else {
		sdt = &gdbc.SqlDBTx{sdb}
		log.Println("buildGdbc(), create DB:")
	}
	return sdt, nil
}

返回数据库链接的函数

函数"BuildSqlDB()"(在"factory.go"中)将返回"sql.DB"。它会忽略传入的事务标识参数。应用程序在调用这个函数获得数据库链接后,还需要根据事务标识自己生成“SqlConnTx”或“SqlDBTx”。如果你需要在应用程序里缓存"sql.DB",那么你必须调用这个函数。

// BuildSqlDB returns the sql.DB. The calling function need to generate corresponding gdbc.SqlGdbc struct based on
// sql.DB in order to use it in your persistence layer
// If you need to cache sql.DB connection, you need to call this function
func BuildSqlDB(dsc *config.DatabaseConfig) (*sql.DB, error) {
	db, err := sql.Open(dsc.DriverName, dsc.DataSourceName)
	if err != nil {
		return nil, errors.Wrap(err, "")
	}
	// check the connection
	err = db.Ping()
	if err != nil {
		return nil, errors.Wrap(err, "")
	}
	return db, nil

}

局限性

首先,它只支持SQL数据库的事务。如果你有一个NoSql数据库,那么它不支持(大多数NoSql数据库不支持事务)。

其次,如果你的事务跨越数据库(例如在不同的微服务之间),那么它将无法工作。常用的做法是使用“Saga Pattern”。你可以为事务中的每个操作编写一个补偿操作,并在回滚阶段逐个执行补偿操作。在应用程序中添加“Saga”解决方案并不困难。你可能会问,为什么不把“Saga”加到事务库中呢? 这是一个有趣的问题。我觉得还是单独为“Saga”建一个库比较合适。

第三,它不支持嵌套事务(Nested Transaction),因此你需要手动确保在代码中没有嵌套事务。如果代码库不是太复杂,这很容易做到。如果你有一个非常复杂的代码库,其中有很多事务和非事务代码混在一起,那么你需要一个支持嵌套事务的解决方案。我没有花时间研究如何添加嵌套事务,但它应该有一定的工作量。如果你对此感兴趣,可以从"database/sql: nested transaction or save point support"开始。到目前为止,对于大多数场景,当前的解决方案可能是在代价不大的情况下的最佳方案。

如何扩展库的功能

“SqlGdbc”接口没有列出“sql”包中的所有函数,只列出我的应用程序中需要的函数。你可以轻松地扩展该接口以包含其他函数。

例如,如果需要将全链路跟踪(详情请见"Go微服务全链路跟踪详解")扩展到数据库中,则可能需要在上下文中传递到数据库函数中。“sql”库已经支持具有上下文的数据库函数。你只需要找到它们并将它们添加到"SqlGdbc"接口中,然后在"sqlConnWrapper "中实现它们。然后在持久层中,需要使用上下文作为参数调用函数。

源码:

完整源码: "jfeng45/gtransaction"

索引:

1 "一个非侵入的Go事务管理库——如何使用"

2 "清晰架构(Clean Architecture)的Go微服务: 事物管理"

3 "db transaction in golang"

4 "database/sql Tx — detecting Commit or Rollback"

5 "Applying the Saga Pattern – GOTO Conference"

6 "database/sql: nested transaction or save point support"

7 "Go微服务全链路跟踪详解"


喜欢 (0)