Web3漫游记——MEV机器人实践1

今天开始进入技术同学最喜欢的代码环节,虽然前一篇文章《Web3漫游记——MEV套利技能树》主推Rust作为首选,但后续文章会以Golang为主。主要因为个人技术栈是Golang+Python,用熟悉的语言可以更专注于MEV相关逻辑实现。而使用Golang而非Python则是为了顺便熟悉一下Geth这个库,为了后面做私有节点优化提前做一些准备。有其他语言编程基础的小伙伴看懂应该都不是问题,只不过用到的库以及语法可能略有差别而已。

另外提醒一下Python技术栈的小伙伴,截止到发文日期,Python3.11web3.py-v6.9web3-flashbots-v1.1.1的SDK目前有兼容性问题,使用python3.8可正常测试。

首先简单看一下目录结构:


├── README.md
├── .env
├── abi
│   ├── ERC20.json
│   ├── UniswapV2Factory.json
│   ├── UniswapV2Pair.json
│   ├── V2ArbBot.json
│   ├── WETH.json
│   └── erc20.go
├── go.mod
├── go.sum
└── main.go

这里为了简(tou)单(lan)就都写到main.go里了,并且没考虑各种异常情况和优化,大家真正写工程级别代码时注意一下细节。

如何建立链接

package main

import (
	"context"
	"fmt"
	erc20 "gevm/abi"
	"log"
	"math/big"
	"math/rand"
	"os"
	"strings"

	"github.com/ethereum/go-ethereum"
	"github.com/ethereum/go-ethereum/accounts/abi"
	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/ethclient"
	_ "github.com/joho/godotenv/autoload"
)

func main() {
	client, err := ethclient.Dial(os.Getenv("WSS_URL"))
	if err != nil {
		log.Fatal(err)
	}
	log.Println("connected!")
}

其中autoload库会自动加载当前目录下.env文件到环境变量中,.env内容就是节点服务的http和ws连接地址,比如:

HTTP_URL=https://eth-mainnet.g.alchemy.com/v2/{APIKEY}
WSS_URL=wss://eth-mainnet.g.alchemy.com/v2/{APIKEY}

不要把任何APIKEY、私钥相关的硬编码到程序中,是一个非常重要的习惯。

如何监控最新的区块

func subcribeNewBlock(client *ethclient.Client) {
	headers := make(chan *types.Header)
	sub, err := client.SubscribeNewHead(context.Background(), headers)
	if err != nil {
		log.Fatal(err)
	}
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case header := <-headers:
			fmt.Println(header.Hash())
			calcNextBlockBaseFee(header.GasUsed, header.GasLimit, header.BaseFee.Uint64())
		}
	}
}

由于我们的client使用的是ws连接,所以当有新的区块生成,headers这个channel就会收到数据,其定义如下:

// Header represents a block header in the Ethereum blockchain.
type Header struct {
	ParentHash  common.Hash    `json:"parentHash"       gencodec:"required"`
	UncleHash   common.Hash    `json:"sha3Uncles"       gencodec:"required"`
	Coinbase    common.Address `json:"miner"`
	Root        common.Hash    `json:"stateRoot"        gencodec:"required"`
	TxHash      common.Hash    `json:"transactionsRoot" gencodec:"required"`
	ReceiptHash common.Hash    `json:"receiptsRoot"     gencodec:"required"`
	Bloom       Bloom          `json:"logsBloom"        gencodec:"required"`
	Difficulty  *big.Int       `json:"difficulty"       gencodec:"required"`
	Number      *big.Int       `json:"number"           gencodec:"required"`
	GasLimit    uint64         `json:"gasLimit"         gencodec:"required"`
	GasUsed     uint64         `json:"gasUsed"          gencodec:"required"`
	Time        uint64         `json:"timestamp"        gencodec:"required"`
	Extra       []byte         `json:"extraData"        gencodec:"required"`
	MixDigest   common.Hash    `json:"mixHash"`
	Nonce       BlockNonce     `json:"nonce"`

	// BaseFee was added by EIP-1559 and is ignored in legacy headers.
	BaseFee *big.Int `json:"baseFeePerGas" rlp:"optional"`

	// WithdrawalsHash was added by EIP-4895 and is ignored in legacy headers.
	WithdrawalsHash *common.Hash `json:"withdrawalsRoot" rlp:"optional"`

	// BlobGasUsed was added by EIP-4844 and is ignored in legacy headers.
	BlobGasUsed *uint64 `json:"blobGasUsed" rlp:"optional"`

	// ExcessBlobGas was added by EIP-4844 and is ignored in legacy headers.
	ExcessBlobGas *uint64 `json:"excessBlobGas" rlp:"optional"`
}

上面代码中我们简单的输出了一下区块的哈希值,然后去预估下一个区块的BaseGasFee。如果需要当前区块的所有交易信息,可以调用client.BlockByHash(context.Background(), header.Hash())函数,这里就不展开了。

如何估算下一个区块的基础GasFee

const (
	EIP1559_ELASTICITY_MULTIPLIER = 2
	BASE_FEE_CHANGE_DENOMINATOR   = 8
)

func calcNextBlockBaseFee(gasUsed, gasLimit, baseFee uint64) (newBaseFee uint64) {
	// https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md
	// https://github.com/foundry-rs/foundry/blob/master/crates/anvil/src/eth/fees.rs#L141
	targetGasUsed := gasLimit / EIP1559_ELASTICITY_MULTIPLIER
	if targetGasUsed == gasUsed {
		newBaseFee = baseFee
		return
	}
	if gasUsed > targetGasUsed {
		newBaseFee = baseFee + ((baseFee*(gasUsed-targetGasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
	} else {
		newBaseFee = baseFee - ((baseFee*(targetGasUsed-gasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
	}
	newBaseFee += uint64(rand.Int63n(9))
	return
}

还记得我们前面文章说的,面对同一个套利机会往往会有很多竞争者。给的Gas费越高,胜率越大。计算BaseFee的方法我这里参考了anvil库的算法,至于好奇那2个常量的可以去看看EIP1559协议,链接在注释里自取。

计算出来基础费用其实还是不够的,还需要加上优先费(PriorityFee,贿赂矿工的),不过这个优先费我问了大佬说是调用三方接口获取的,可能是要基于历史数据+机器学习+神经网络进行预测吧,暂时先不考虑。

如何监控内存池中Pending的交易

func subcribePendingTransaction(client *ethclient.Client) {
	transactionsHash := make(chan string)
	sub, _ := client.Client().EthSubscribe(context.Background(), transactionsHash, "newPendingTransactions")
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case txHash := <-transactionsHash:
			fmt.Println(txHash)
		}
	}
}

这里的逻辑和订阅最新的区块逻辑差不多,这里想多说一句的就是对于使用其他语言的小伙伴,获取Pending交易本质上就是一个特定参数的网络请求,比如:

// initiate websocket stream first
wscat -c wss://eth-mainnet.g.alchemy.com/v2/demo

// then call subscription
{"jsonrpc":"2.0","id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}

如果没有对应的轮子自己造一个也不是很难。

如何解析交易中Data字段

当我们拿到一个请求的哈希之后,怎么知道这个交易在做什么呢?此时就需要ABI登场了,对于大多数开源合约来说,在对应链的区块链浏览器上都可以直接看到合约对应的ABI,通常是JSON格式,比如: abi

此外,还可以将开源合约代码下载到本地自行编译生成。

这里以ERC20为例来解析:

func parseERC20Transaction(client *ethclient.Client, txHash string) {
	ctx, _ := os.ReadFile("./abi/ERC20.json")
	erc20Abi, _ := abi.JSON(strings.NewReader(string(ctx)))
	tx, isPending, err := client.TransactionByHash(context.Background(), common.HexToHash(txHash))
	if err != nil {
		log.Fatalln(err)
	}
	if len(tx.Data()) > 0 {
		// 使用前4字节去找对应的函数名字
		method, err := erc20Abi.MethodById(tx.Data()[:4])
		if err != nil {
			fmt.Println("Tx is not erc20 protocol")
		} else {
			fmt.Println(method.Name, isPending)
			// 后面的字节就是对应函数的参数
			input, err := method.Inputs.Unpack(tx.Data()[4:])
			fmt.Println(input, err)
		}
	}
}

代码也很清晰,首先根据json内容生成abi对象,然后获取交易内容并根据data字段进行解析,获取函数名和inputData参数——也就是区块链浏览器上显示的inputData字段: inputdata

如何监控Pair创建

其实这里应该分2个部分,首先是同步历史已经创建的Pair:

func syncPairCreate(client *ethclient.Client) {
	// 以太坊UniswapV2-FactoryContrace地址
	address := common.HexToAddress("0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f")
	filter := ethereum.FilterQuery{
		FromBlock: big.NewInt(10000835),      // 合约在这个区块部署
		ToBlock:   big.NewInt(10091985),      // 数据过多会报错,这里限制一下
		Addresses: []common.Address{address}, // 过滤只要这个合约地址的数据
	}
	logs, err := client.FilterLogs(context.Background(), filter)
	if err != nil {
		log.Fatalln(err)
	}
	ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
	factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
	for _, l := range logs {
		fmt.Println(l.Data, l.TxHash)
		data, err := factoryAbi.Unpack("PairCreated", l.Data)
		if err != nil {
			log.Fatalln(err)
		}
		token0 := common.HexToAddress(l.Topics[1].Hex())
		token1 := common.HexToAddress(l.Topics[2].Hex())
		pairAddrss := data[0]
		pairNum := data[1]
		fmt.Println(token0, token1, pairAddrss, pairNum)
	}
}

简单说,首先我们需要找到链上的合约地址以及对应的ABI,然后根据实际需求构造FilterQuery结构,最后调用接口获取日志。在实际使用中有个需要注意的地方就是ToBlock的值,由于这个合约已经部署很久了,如果不进行限制的话日志量过大会报错,所以真正使用时候要切分并行获取。

关于数据解析部分,这里我们获取的是事件日志,和上面解析交易的数据不同。

还是以上面的为例,UniswapV2factory合约对应的PairCreated事件ABI如下(注意这里是PairCreated事件,不是createPair函数):

{
	"anonymous": false,
	"inputs": [
		{
			"indexed": true,
			"internalType": "address",
			"name": "token0",
			"type": "address"
		},
		{
			"indexed": true,
			"internalType": "address",
			"name": "token1",
			"type": "address"
		},
		{
			"indexed": false,
			"internalType": "address",
			"name": "pair",
			"type": "address"
		},
		{
			"indexed": false,
			"internalType": "uint256",
			"name": "",
			"type": "uint256"
		}
	],
	"name": "PairCreated",
	"type": "event"
}

首先Topics中第0个元素永远是名字+参数经过keccak计算后的值,然后indexed=true的字段会按照顺序依次存在Topics字段里,而值为false的数据则会存储在Data字段,需要使用对应的ABI进行解析。

当同步完历史数据,一般会记录在数据库或者文件里,然后开是监听新Pair创建事件:

func subcribeNewPairCreated(client *ethclient.Client) {
	nameHash := crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)"))
	filters := ethereum.FilterQuery{
		FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
		Topics: [][]common.Hash{
			{nameHash},
		},
	}
	ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
	factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
	logs := make(chan types.Log)
	sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case l := <-logs:
			fmt.Println(l.TxHash)
			data, err := factoryAbi.Unpack("PairCreated", l.Data)
			if err != nil {
				log.Fatalln(err)
			}
			token0 := common.HexToAddress(l.Topics[1].Hex())
			token1 := common.HexToAddress(l.Topics[2].Hex())
			pairAddrss := data[0]
			pairNum := data[1]
			fmt.Println(token0, token1, pairAddrss, pairNum)
		}
	}
}

这里我们之所以能够使用Topics作为过滤条件,就是因为上面说的所有事件日志的Topics[0]都是函数名+参数的keccak哈希值,这样就能过滤出所有新创建的Pair了。其实使用上面基于合约地址的条件进行过滤也可以,这里主要想给出另一种过滤方式,毕竟技多不压身。

说到这,喜欢冲土狗的小伙伴是不是脑海中出现了什么想法?懂的都懂,但还是注意别有个新池子就冲,记得结合其他数据分析一下,比如下面的Reserves。

如何监控Reserves

有了上面的基础,监控Reservers就是换个事件日志和ABI的事。根据UniswapV2文档,监控UniswapV2Pair合约中Sync事件即可:

func subcribeSyncEvent(client *ethclient.Client) {
	nameHash := crypto.Keccak256Hash([]byte("Sync(uint112,uint112)"))
	filters := ethereum.FilterQuery{
		FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
		Topics: [][]common.Hash{
			{nameHash},
		},
	}
	ctx, _ := os.ReadFile("./abi/UniswapV2Pair.json")
	pairAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
	logs := make(chan types.Log)
	sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
	for {
		select {
		case err := <-sub.Err():
			log.Fatal(err)
		case l := <-logs:
			fmt.Println(l.TxHash)
			data, err := pairAbi.Unpack("Sync", l.Data)
			if err != nil {
				log.Fatalln(err)
			}
			pairAddr := l.Address.Hex()
			reserves0 := data[0]
			reserves1 := data[1]
			fmt.Println(pairAddr, reserves0, reserves1)
		}
	}
}

不多解释了,这里如果想针对于某个Pair来监控,添加Addresses过滤条件即可。不过这里建议有兴趣的去看看ethereum.FilterQuery支持的条件,后面可以高效的进行组合过滤。

如何调用合约

最后说说如何与合约进行交互,Python直接有ABI的JSON文件进行反序列化就可以了,而在Golang里则需要使用abigen来生成对应的go文件。首先要找到go-ethereum源码包的位置或者从github下载,然后进入cmd/abigen目录去自己go build,然后到abi目录执行:

abigen --abi=ERC20.json  --pkg=abi --out=erc20.go --alias _totalSupply=totalSupply1

注意这里的--alias参数,由于这个程序不能自动处理下划线,不加这个参数会报错。另外不同的JSON文件生成go代码时,pkg参数也不能一样,否则会报函数已存在一类的错误,这点真是比Python差远了。

生成go文件后就当普通的包使用,如果报错无法import一类的,执行go mod tidy即可。

// import erc20 "gevm/abi"
func getTokenDecimals(client *ethclient.Client, tokenAddr string) {
	tokenAddr := common.HexToAddress(tokenAddr)
	e, _ := erc20.NewErc20(tokenAddr, client)
	i, _ := e.Decimals(nil)
	fmt.Println(i)
}

上面的例子就是传入一个token的合约地址,然后调用Decimals()获取精度,其他的用法都差不多不多说了。

参考文档


wechat