• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

以太坊 — 交易池的特点 与 中断恢复

开发技术 开发技术 2周前 (01-11) 10次浏览

作者:林冠宏 / 指尖下的幽灵。转载者,请: 务必标明出处。

博客:http://www.cnblogs.com/linguanh/

掘金:https://juejin.im/user/1785262612681997

GitHub : https://github.com/af913337456/

出版的书籍:

  • 《1.0-区块链以太坊DApp开发实战》
  • 《2.0-区块链DApp开发:基于以太坊和比特币公链》

目录

  • 前序
  • 以太坊交易池知识点总结
  • 源码探秘
    • 本地交易
      • 本地钱包地址的初始化
      • 加载本地交易
      • pool.journal.load
      • pool.AddLocals
      • 本地交易文件的更新
    • 远程交易
      • P2P 通讯模块的初始化
      • 接收 P2P 消息
      • 添加远程交易到交易池
  • “ 彩蛋 ”

21年的第一篇文章,开源写作6年。

最近比特币以太坊的价格也已然起飞,现在一个 BTC 已能全款辆某斯拉 model 3汽车。离谱。

发布这篇文章:从区块链技术研发者的角度,说说我的区块链从业经历和对它的理解 的时候,是去年,现在回首去看最后那段话,一语成谶


言归正传。

一般做数据池之类的开发。比如:订单池,请求池…,传统的服务端思想会引导我们直接向消息中间件想去。使用各类消息组件去实现,比如 RocketMQ,Redis,Kafka…

然而,在区块链公链应用中,现已知的多条公链,每一条,都有交易池这么一个功能模块,且,它们的代码实现都没有引入消息中间件去实现。

早前在阅读以太坊公链源码的时候,我就对以太坊交易池这一块的实现思想感到新颖,今天总结下,分享给大家看看,区块链公链应用中不依赖消息中间件去实现交易池的做法及其特点。


以太坊交易池知识点总结 _(BTW:面试的时候可死记)

  1. 交易的分类:
    • 从本地文件存与不存的角度去看:
      1. 本地交易,若交易的发送者地址是配置变量指定的地址,则认为是本地交易:
        • 节点启动的时候,可以在配置文件指定,不开启本地交易的操作
      2. 远程交易,不满足 1 条件的交易。
    • 从内存存储的角度去看:
      1. Queue,待进入 Pending 的交易,结构是 map[addr]TxList
      2. Pending,待进入打包队列的交易,结构和 Queue 一样,由 1 转化而来。
  2. 交易的输入(产生):
    • 程序启动之初:
      1. 本地交易,从本地文件加载到内存,本地若没,自然是 0 输入;
      2. 远程交易,由 P2P 通讯模块,接收到交易数据,存储到内存。
    • 程序运行中:
      1. 自己接收交易的 RPC请求,SendTransaction 或 SendRawTransaction;
      2. 通过 P2P 通讯模块,接收其它节点的信息,包含的动作有:
        1. 旧交易的移除;
        2. 新交易的增加。
  3. 交易的持久化策略:
    • 本地交易:
      1. 定时从 Pending 和 Queue 中选出本地交易存储到本地文件
      2. 存储方式,文件替换,先 new 一个,再 rename 一波;
      3. 注意第 2 点,文件的替换,意味着即是更新也是删除操作;
      4. 编码方式,rlp 编码,不是 json。
    • 远程交易:
      1. 不存,不进行持久化,总是依赖由其它节点 P2P 通讯同步过来。
  4. 中断恢复:
    1. 本地交易,同上面 程序启动之初 的操作;
    2. 远程交易,没有恢复,内存中的交易丢了就是丢了,不影响。即使当初正在打包,即使当前节点挂了,其它节点还在工作。

上面第 4 点,中断恢复,对比于传统后端服务的消息中间件,对消息的不丢失保障性,区块链公链的做法,完全是靠分布式来维持的,单节点的数据丢失,可以从其它节点同步过来。所以,它们交易池的实现的实现,相对来说,更加灵活,编码难点在消息同步部分。


下面进入枯燥的源码分析阶段,读有余力的读者可以继续

要看注释。

本地交易

1. 本地钱包地址的初始化

源码文件:tx_pool.go,config.Locals 由配置文件指定,是以太坊钱包地址数组。

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	...
	for _, addr := range config.Locals { // 从配置文件添加 本地地址
		log.Info("Setting new local account", "address", addr)
		// 添加到 locals 变量里面,后面会用它来过滤出一个地址是否是本地地址
		pool.locals.add(addr) 
	}
	...
}

2. 从本地文件,加载交易数据数据,即加载本地交易

func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain blockChain) *TxPool {
	...
	pool.locals = newAccountSet(pool.signer)
	for _, addr := range config.Locals {
		log.Info("Setting new local account", "address", addr)
		pool.locals.add(addr)
	}
	...	
 	// 上面添加完了
	// If local transactions and journaling is enabled, load from disk
	if !config.NoLocals && config.Journal != "" { // 如果配置开启了本地加载的需求
		pool.journal = newTxJournal(config.Journal)
   		// load 是加载函数,pool.AddLocals 是实际添加函数
		if err := pool.journal.load(pool.AddLocals); err != nil {
			log.Warn("Failed to load transaction journal", "err", err)
		}
		if err := pool.journal.rotate(pool.local()); err != nil {
			log.Warn("Failed to rotate transaction journal", "err", err)
		}
	}
	...
    go pool.loop() // 循环处理事件
}

3. pool.journal.load

源码文件:tx_journal.go

func (journal *txJournal) load(add func([]*types.Transaction) []error) error {
	// Skip the parsing if the journal file doesn't exist at all
	if _, err := os.Stat(journal.path); os.IsNotExist(err) {
		return nil
	}
	// Open the journal for loading any past transactions
	input, err := os.Open(journal.path) // 打开文件,读取流数据
	if err != nil {
		return err
	}
	...
	stream := rlp.NewStream(input, 0) // 使用 rlp 编码算法解码数据
	...
	loadBatch := func(txs types.Transactions) {
		for _, err := range add(txs) { // 调用 add 函数,进行添加
			if err != nil {
				log.Debug("Failed to add journaled transaction", "err", err)
				dropped++
			}
		}
	}
	// loadBatch 在下面会被调用
	...
}

4. pool.AddLocals

pool.AddLocals 是实际的添加函数。内部的一系列调用后,最终到 tx_pool.add 函数。pool 的 queue 都是 map 结构,能根据相同 key 去重。

func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err error) {
	...
 	// 下面的 if,如果已在 pool.pending 里面,那么证明之前已经添加过在 queue 里
	if list := pool.pending[from]; list != nil && list.Overlaps(tx) {
		...
 		pool.journalTx(from, tx) // 内部调用 journal.insert
		return old != nil, nil
	}
	replaced, err = pool.enqueueTx(hash, tx) // 这里,会添加到 pool.enqueue 里面
	if err != nil {
		return false, err
	}
	pool.journalTx(from, tx) // 内部调用 journal.insert
	...
}

func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
	// 本地钱包地址,没指定的话,就跳过
	if pool.journal == nil || !pool.locals.contains(from) {
		return
	}
 	// insert 会在造成重复添加,但是 load 出来的时候会根据 addr 去重
	if err := pool.journal.insert(tx); err != nil {
		log.Warn("Failed to journal local transaction", "err", err)
	}
}

截止到上面,本地交易已经被添加到 pool 的 queue 里面了。

节点启动之初,除了会从本地 load 交易到 queue 外,还会不停地监听链的事件,比如接收交易,再 add 交易 到 queue 里。

5. 本地交易文件的更新 ( 插入 / 删除 )

loop 是触发的入口。除了主动的 journal.insert 达到了插入本地交易的目的之外。

下面的更新操作,也达到了包含插入的目的:以替换的手段,从文件删除旧交易,存储新交易到文件

func (pool *TxPool) loop() {
	...
	for {
		select {
		...
		// Handle local transaction journal rotation
 		// journal 定时器,定时执行下面的本地交易数据文件的更新 journal.rotate
		case <-journal.C:
			if pool.journal != nil {
				pool.mu.Lock()
				if err := pool.journal.rotate(pool.local()); err != nil {
					log.Warn("Failed to rotate local tx journal", "err", err)
				}
				pool.mu.Unlock()
			}
		}
	}
}

journal.rotate 的做法,使用文件替换的方式,来从 pool 的交易 pending 和 queue 中存储 locals 钱包地址相关的交易到文件。注意,只存本地钱包地址的,其它的,不存。

//输入
func (pool *TxPool) local() map[common.Address]types.Transactions {
	...
	for addr := range pool.locals.accounts {
		if pending := pool.pending[addr]; pending != nil {
 			// 添加 pending 的
			txs[addr] = append(txs[addr], pending.Flatten()...)
		}
		if queued := pool.queue[addr]; queued != nil {
 			// 添加 queue 的
			txs[addr] = append(txs[addr], queued.Flatten()...)
		}
	}
	return txs
}

// all 参数,来源于上面 local()
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
	...
	// journal.path+".new" 后缀 .new
	replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
	if err != nil {
		return err
	}
	journaled := 0
	for _, txs := range all {
		for _, tx := range txs {
			if err = rlp.Encode(replacement, tx); err != nil {
				replacement.Close()
				return err
			}
		}
		journaled += len(txs)
	}
	replacement.Close()
 	// rename,重命名文件到原始的 path,达到更新,替换目的
	if err = os.Rename(journal.path+".new", journal.path); err != nil {
		return err
	}
	sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
	if err != nil {
		return err
	}
	...
	return nil
}

远程交易

P2P 通讯模块的初始化

源码文件:eth/backend.go

func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
	...
	if config.TxPool.Journal != "" {
		config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal)
	}
 	// 初始化交易池
	eth.txPool = core.NewTxPool(config.TxPool, chainConfig, eth.blockchain)
	...
 	// 使用 交易池指针对象 作为参数初始化 protocolManager
	if eth.protocolManager, err = NewProtocolManager(
    		chainConfig, checkpoint, config.SyncMode, config.NetworkId, 
            	eth.eventMux, `eth.txPool`, eth.engine, 
                eth.blockchain, chainDb, cacheLimit, config.Whitelist); err != nil {
		return nil, err
	}
	...
	return eth, nil
}

func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCheckpoint, mode downloader.SyncMode, networkID uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database, cacheLimit int, whitelist map[uint64]common.Hash) (*ProtocolManager, error) {
	// 下面初始化 tx_fetcher,使用 txpool.AddRemotes 赋值给函数变量 addTxs
	manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, fetchTx)
}

接收 P2P 消息

源码文件:eth/handler.go

func (pm *ProtocolManager) handleMsg(p *peer) error {
	...
    switch {
    ...
    // 接收到其它节点的交易数据
    case msg.Code == TransactionMsg || (msg.Code == PooledTransactionsMsg && p.version >= eth65):
		...
 		// Enqueue 将交易添加到交易池
		pm.txFetcher.Enqueue(p.id, txs, msg.Code == PooledTransactionsMsg)

    }
    ...
}
// tx_fetcher.go 文件
func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) error {
 	...
	errs := f.addTxs(txs) // 执行添加,这个函数其实就是 tx_pool.go 的 AddRemotes
	...
}

添加远程交易到交易池

// tx_pool.go
// addTxs 内部就会把交易添加到 Pending 和 Queue 里面
func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error {
	return pool.addTxs(txs, false, false)
}

打完收工

更多以太坊的开发知识,见我的书籍:

《2.0-区块链DApp开发:基于以太坊和比特币公链》


喜欢 (0)