Web3漫游记——MEV机器人实践1
今天开始进入技术同学最喜欢的代码环节,虽然前一篇文章《Web3漫游记——MEV套利技能树》主推Rust作为首选,但后续文章会以Golang为主。主要因为个人技术栈是Golang+Python,用熟悉的语言可以更专注于MEV相关逻辑实现。而使用Golang而非Python则是为了顺便熟悉一下Geth这个库,为了后面做私有节点优化提前做一些准备。有其他语言编程基础的小伙伴看懂应该都不是问题,只不过用到的库以及语法可能略有差别而已。
另外提醒一下Python技术栈的小伙伴,截止到发文日期,Python3.11
、web3.py-v6.9
和web3-flashbots-v1.1.1
的SDK目前有兼容性问题,使用python3.8可正常测试。
首先简单看一下目录结构:
├── README.md
├── .env
├── abi
│ ├── ERC20.json
│ ├── UniswapV2Factory.json
│ ├── UniswapV2Pair.json
│ ├── V2ArbBot.json
│ ├── WETH.json
│ └── erc20.go
├── go.mod
├── go.sum
└── main.go
这里为了简(tou)单(lan)就都写到main.go
里了,并且没考虑各种异常情况和优化,大家真正写工程级别代码时注意一下细节。
如何建立链接
package main
import (
"context"
"fmt"
erc20 "gevm/abi"
"log"
"math/big"
"math/rand"
"os"
"strings"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
_ "github.com/joho/godotenv/autoload"
)
func main() {
client, err := ethclient.Dial(os.Getenv("WSS_URL"))
if err != nil {
log.Fatal(err)
}
log.Println("connected!")
}
其中autoload
库会自动加载当前目录下.env
文件到环境变量中,.env
内容就是节点服务的http和ws连接地址,比如:
HTTP_URL=https://eth-mainnet.g.alchemy.com/v2/{APIKEY}
WSS_URL=wss://eth-mainnet.g.alchemy.com/v2/{APIKEY}
不要把任何APIKEY、私钥相关的硬编码到程序中,是一个非常重要的习惯。
如何监控最新的区块
func subcribeNewBlock(client *ethclient.Client) {
headers := make(chan *types.Header)
sub, err := client.SubscribeNewHead(context.Background(), headers)
if err != nil {
log.Fatal(err)
}
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case header := <-headers:
fmt.Println(header.Hash())
calcNextBlockBaseFee(header.GasUsed, header.GasLimit, header.BaseFee.Uint64())
}
}
}
由于我们的client使用的是ws连接,所以当有新的区块生成,headers
这个channel就会收到数据,其定义如下:
// Header represents a block header in the Ethereum blockchain.
type Header struct {
ParentHash common.Hash `json:"parentHash" gencodec:"required"`
UncleHash common.Hash `json:"sha3Uncles" gencodec:"required"`
Coinbase common.Address `json:"miner"`
Root common.Hash `json:"stateRoot" gencodec:"required"`
TxHash common.Hash `json:"transactionsRoot" gencodec:"required"`
ReceiptHash common.Hash `json:"receiptsRoot" gencodec:"required"`
Bloom Bloom `json:"logsBloom" gencodec:"required"`
Difficulty *big.Int `json:"difficulty" gencodec:"required"`
Number *big.Int `json:"number" gencodec:"required"`
GasLimit uint64 `json:"gasLimit" gencodec:"required"`
GasUsed uint64 `json:"gasUsed" gencodec:"required"`
Time uint64 `json:"timestamp" gencodec:"required"`
Extra []byte `json:"extraData" gencodec:"required"`
MixDigest common.Hash `json:"mixHash"`
Nonce BlockNonce `json:"nonce"`
// BaseFee was added by EIP-1559 and is ignored in legacy headers.
BaseFee *big.Int `json:"baseFeePerGas" rlp:"optional"`
// WithdrawalsHash was added by EIP-4895 and is ignored in legacy headers.
WithdrawalsHash *common.Hash `json:"withdrawalsRoot" rlp:"optional"`
// BlobGasUsed was added by EIP-4844 and is ignored in legacy headers.
BlobGasUsed *uint64 `json:"blobGasUsed" rlp:"optional"`
// ExcessBlobGas was added by EIP-4844 and is ignored in legacy headers.
ExcessBlobGas *uint64 `json:"excessBlobGas" rlp:"optional"`
}
上面代码中我们简单的输出了一下区块的哈希值,然后去预估下一个区块的BaseGasFee。如果需要当前区块的所有交易信息,可以调用client.BlockByHash(context.Background(), header.Hash())
函数,这里就不展开了。
如何估算下一个区块的基础GasFee
const (
EIP1559_ELASTICITY_MULTIPLIER = 2
BASE_FEE_CHANGE_DENOMINATOR = 8
)
func calcNextBlockBaseFee(gasUsed, gasLimit, baseFee uint64) (newBaseFee uint64) {
// https://github.com/ethereum/EIPs/blob/master/EIPS/eip-1559.md
// https://github.com/foundry-rs/foundry/blob/master/crates/anvil/src/eth/fees.rs#L141
targetGasUsed := gasLimit / EIP1559_ELASTICITY_MULTIPLIER
if targetGasUsed == gasUsed {
newBaseFee = baseFee
return
}
if gasUsed > targetGasUsed {
newBaseFee = baseFee + ((baseFee*(gasUsed-targetGasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
} else {
newBaseFee = baseFee - ((baseFee*(targetGasUsed-gasUsed))/targetGasUsed)/BASE_FEE_CHANGE_DENOMINATOR
}
newBaseFee += uint64(rand.Int63n(9))
return
}
还记得我们前面文章说的,面对同一个套利机会往往会有很多竞争者。给的Gas费越高,胜率越大。计算BaseFee的方法我这里参考了anvil库的算法,至于好奇那2个常量的可以去看看EIP1559协议,链接在注释里自取。
计算出来基础费用其实还是不够的,还需要加上优先费(PriorityFee,贿赂矿工的),不过这个优先费我问了大佬说是调用三方接口获取的,可能是要基于历史数据+机器学习+神经网络进行预测吧,暂时先不考虑。
如何监控内存池中Pending的交易
func subcribePendingTransaction(client *ethclient.Client) {
transactionsHash := make(chan string)
sub, _ := client.Client().EthSubscribe(context.Background(), transactionsHash, "newPendingTransactions")
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case txHash := <-transactionsHash:
fmt.Println(txHash)
}
}
}
这里的逻辑和订阅最新的区块逻辑差不多,这里想多说一句的就是对于使用其他语言的小伙伴,获取Pending交易本质上就是一个特定参数的网络请求,比如:
// initiate websocket stream first
wscat -c wss://eth-mainnet.g.alchemy.com/v2/demo
// then call subscription
{"jsonrpc":"2.0","id": 2, "method": "eth_subscribe", "params": ["newPendingTransactions"]}
如果没有对应的轮子自己造一个也不是很难。
如何解析交易中Data字段
当我们拿到一个请求的哈希之后,怎么知道这个交易在做什么呢?此时就需要ABI登场了,对于大多数开源合约来说,在对应链的区块链浏览器上都可以直接看到合约对应的ABI,通常是JSON格式,比如:
此外,还可以将开源合约代码下载到本地自行编译生成。
这里以ERC20
为例来解析:
func parseERC20Transaction(client *ethclient.Client, txHash string) {
ctx, _ := os.ReadFile("./abi/ERC20.json")
erc20Abi, _ := abi.JSON(strings.NewReader(string(ctx)))
tx, isPending, err := client.TransactionByHash(context.Background(), common.HexToHash(txHash))
if err != nil {
log.Fatalln(err)
}
if len(tx.Data()) > 0 {
// 使用前4字节去找对应的函数名字
method, err := erc20Abi.MethodById(tx.Data()[:4])
if err != nil {
fmt.Println("Tx is not erc20 protocol")
} else {
fmt.Println(method.Name, isPending)
// 后面的字节就是对应函数的参数
input, err := method.Inputs.Unpack(tx.Data()[4:])
fmt.Println(input, err)
}
}
}
代码也很清晰,首先根据json内容生成abi对象,然后获取交易内容并根据data字段进行解析,获取函数名和inputData参数——也就是区块链浏览器上显示的inputData字段:
如何监控Pair创建
其实这里应该分2个部分,首先是同步历史已经创建的Pair:
func syncPairCreate(client *ethclient.Client) {
// 以太坊UniswapV2-FactoryContrace地址
address := common.HexToAddress("0x5c69bee701ef814a2b6a3edd4b1652cb9cc5aa6f")
filter := ethereum.FilterQuery{
FromBlock: big.NewInt(10000835), // 合约在这个区块部署
ToBlock: big.NewInt(10091985), // 数据过多会报错,这里限制一下
Addresses: []common.Address{address}, // 过滤只要这个合约地址的数据
}
logs, err := client.FilterLogs(context.Background(), filter)
if err != nil {
log.Fatalln(err)
}
ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
for _, l := range logs {
fmt.Println(l.Data, l.TxHash)
data, err := factoryAbi.Unpack("PairCreated", l.Data)
if err != nil {
log.Fatalln(err)
}
token0 := common.HexToAddress(l.Topics[1].Hex())
token1 := common.HexToAddress(l.Topics[2].Hex())
pairAddrss := data[0]
pairNum := data[1]
fmt.Println(token0, token1, pairAddrss, pairNum)
}
}
简单说,首先我们需要找到链上的合约地址以及对应的ABI,然后根据实际需求构造FilterQuery
结构,最后调用接口获取日志。在实际使用中有个需要注意的地方就是ToBlock
的值,由于这个合约已经部署很久了,如果不进行限制的话日志量过大会报错,所以真正使用时候要切分并行获取。
关于数据解析部分,这里我们获取的是事件日志,和上面解析交易的数据不同。
还是以上面的为例,UniswapV2factory
合约对应的PairCreated
事件ABI如下(注意这里是PairCreated
事件,不是createPair
函数):
{
"anonymous": false,
"inputs": [
{
"indexed": true,
"internalType": "address",
"name": "token0",
"type": "address"
},
{
"indexed": true,
"internalType": "address",
"name": "token1",
"type": "address"
},
{
"indexed": false,
"internalType": "address",
"name": "pair",
"type": "address"
},
{
"indexed": false,
"internalType": "uint256",
"name": "",
"type": "uint256"
}
],
"name": "PairCreated",
"type": "event"
}
首先Topics
中第0个元素永远是名字+参数经过keccak计算后的值,然后indexed=true
的字段会按照顺序依次存在Topics
字段里,而值为false
的数据则会存储在Data
字段,需要使用对应的ABI进行解析。
当同步完历史数据,一般会记录在数据库或者文件里,然后开是监听新Pair创建事件:
func subcribeNewPairCreated(client *ethclient.Client) {
nameHash := crypto.Keccak256Hash([]byte("PairCreated(address,address,address,uint256)"))
filters := ethereum.FilterQuery{
FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
Topics: [][]common.Hash{
{nameHash},
},
}
ctx, _ := os.ReadFile("./abi/UniswapV2Factory.json")
factoryAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
logs := make(chan types.Log)
sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case l := <-logs:
fmt.Println(l.TxHash)
data, err := factoryAbi.Unpack("PairCreated", l.Data)
if err != nil {
log.Fatalln(err)
}
token0 := common.HexToAddress(l.Topics[1].Hex())
token1 := common.HexToAddress(l.Topics[2].Hex())
pairAddrss := data[0]
pairNum := data[1]
fmt.Println(token0, token1, pairAddrss, pairNum)
}
}
}
这里我们之所以能够使用Topics
作为过滤条件,就是因为上面说的所有事件日志的Topics[0]
都是函数名+参数的keccak哈希值,这样就能过滤出所有新创建的Pair了。其实使用上面基于合约地址的条件进行过滤也可以,这里主要想给出另一种过滤方式,毕竟技多不压身。
说到这,喜欢冲土狗的小伙伴是不是脑海中出现了什么想法?懂的都懂,但还是注意别有个新池子就冲,记得结合其他数据分析一下,比如下面的Reserves。
如何监控Reserves
有了上面的基础,监控Reservers就是换个事件日志和ABI的事。根据UniswapV2文档,监控UniswapV2Pair
合约中Sync
事件即可:
func subcribeSyncEvent(client *ethclient.Client) {
nameHash := crypto.Keccak256Hash([]byte("Sync(uint112,uint112)"))
filters := ethereum.FilterQuery{
FromBlock: big.NewInt(18069579), // 示例,以实际运行是获取的最新区块为准,怎么获得见上面
Topics: [][]common.Hash{
{nameHash},
},
}
ctx, _ := os.ReadFile("./abi/UniswapV2Pair.json")
pairAbi, _ := abi.JSON(strings.NewReader(string(ctx)))
logs := make(chan types.Log)
sub, _ := client.SubscribeFilterLogs(context.Background(), filters, logs)
for {
select {
case err := <-sub.Err():
log.Fatal(err)
case l := <-logs:
fmt.Println(l.TxHash)
data, err := pairAbi.Unpack("Sync", l.Data)
if err != nil {
log.Fatalln(err)
}
pairAddr := l.Address.Hex()
reserves0 := data[0]
reserves1 := data[1]
fmt.Println(pairAddr, reserves0, reserves1)
}
}
}
不多解释了,这里如果想针对于某个Pair来监控,添加Addresses
过滤条件即可。不过这里建议有兴趣的去看看ethereum.FilterQuery
支持的条件,后面可以高效的进行组合过滤。
如何调用合约
最后说说如何与合约进行交互,Python直接有ABI的JSON文件进行反序列化就可以了,而在Golang里则需要使用abigen
来生成对应的go文件。首先要找到go-ethereum
源码包的位置或者从github下载,然后进入cmd/abigen
目录去自己go build
,然后到abi
目录执行:
abigen --abi=ERC20.json --pkg=abi --out=erc20.go --alias _totalSupply=totalSupply1
注意这里的--alias
参数,由于这个程序不能自动处理下划线,不加这个参数会报错。另外不同的JSON文件生成go代码时,pkg
参数也不能一样,否则会报函数已存在一类的错误,这点真是比Python差远了。
生成go文件后就当普通的包使用,如果报错无法import一类的,执行go mod tidy
即可。
// import erc20 "gevm/abi"
func getTokenDecimals(client *ethclient.Client, tokenAddr string) {
tokenAddr := common.HexToAddress(tokenAddr)
e, _ := erc20.NewErc20(tokenAddr, client)
i, _ := e.Decimals(nil)
fmt.Println(i)
}
上面的例子就是传入一个token的合约地址,然后调用Decimals()
获取精度,其他的用法都差不多不多说了。
参考文档