Skip to content

Commit

Permalink
Node/Hack: Add tool to test go-ethereum subscriptions (#4214)
Browse files Browse the repository at this point in the history
* Node/Hack: SeiEVM watcher test app

* Make tool general purpose
  • Loading branch information
bruce-riley authored Jan 27, 2025
1 parent 1863142 commit 7d008ed
Showing 1 changed file with 146 additions and 0 deletions.
146 changes: 146 additions & 0 deletions node/hack/evm_test/wstest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// This tool can be used to verify that an EVM endpoint works properly with go-ethereum websocket subscriptions.
// It can subscribe to latest blocks as well as log events from the Wormhole core contract and just logs them out.
//
// To run this tool, do:
// go run wstest.go --rpc <websocketEndpoint> [--contract <wormholeCodeContractAddress>] [--blocks]
//
// where
// --contract` subscribes to log events from the specified Wormhole core contract
// --blocks subscribes to the latest blocks.
//
// To listen to log events from the SeiEVM test endpoint (what this was originally written for) do:
// go run wstest.go --rpc wss://evm-ws-testnet.sei-apis.com --contract 0xBB73cB66C26740F31d1FabDC6b7A46a038A300dd

package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"

"go.uber.org/zap"

ethAbi "github.com/certusone/wormhole/node/pkg/watchers/evm/connectors/ethabi"
ethBind "github.com/ethereum/go-ethereum/accounts/abi/bind"
ethCommon "github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
ethClient "github.com/ethereum/go-ethereum/ethclient"
ethRpc "github.com/ethereum/go-ethereum/rpc"
)

var (
rpc = flag.String("rpc", "", "Websocket URL, this parameter is required")
contract = flag.String("contract", "", "Core contract address, leave blank to not subscribe to the core contract")
blocks = flag.Bool("blocks", false, "Also subscribe to new blocks, default is false")
)

func main() {
flag.Parse()
logger, _ := zap.NewDevelopment()
if *rpc == "" {
logger.Fatal(`The "--rpc" parameter is required`)
}
if *contract == "" && !*blocks {
logger.Fatal(`Must specify either "--contract" or "--blocks" or both`)
}

logger.Info("Connecting to websocket endpoint", zap.String("webSocket", *rpc))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

rawClient, err := ethRpc.DialContext(ctx, *rpc)
if err != nil {
logger.Fatal("Failed to connect to RPC", zap.Error(err))
}

client := ethClient.NewClient(rawClient)

errC := make(chan error)

if *blocks {
logger.Info("Subscribing for latest blocks")
headSink := make(chan *ethTypes.Header, 2)
headerSubscription, err := client.SubscribeNewHead(ctx, headSink)
if err != nil {
logger.Fatal("Failed to subscribe to latest blocks", zap.Error(err))
}

go func() {
logger.Info("Waiting for latest block events")
defer headerSubscription.Unsubscribe()
for {
select {
case <-ctx.Done():
return
case err := <-headerSubscription.Err():
errC <- fmt.Errorf("block subscription failed: %w", err)
return
case block := <-headSink:
// These two pointers should have been checked before the event was placed on the channel, but just being safe.
if block == nil {
logger.Error("New header event is nil")
continue
}
logger.Info("Received a new block", zap.Any("block", block))
}
}
}()
}

if *contract != "" {
logger.Info("Subscribing to log events from contract", zap.String("contractAddr", *contract))
filterer, err := ethAbi.NewAbiFilterer(ethCommon.BytesToAddress(ethCommon.HexToAddress(*contract).Bytes()), client)
if err != nil {
logger.Fatal("Failed to create filter", zap.Error(err))
}

timeout, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
messageC := make(chan *ethAbi.AbiLogMessagePublished, 2)
messageSub, err := filterer.WatchLogMessagePublished(&ethBind.WatchOpts{Context: timeout}, messageC, nil)
if err != nil {
logger.Fatal("Failed to subscribe to events", zap.Error(err))
}
defer messageSub.Unsubscribe()

logger.Info("Waiting for log events from contract")
go func() {
for {
select {
case <-ctx.Done():
return
case err := <-messageSub.Err():
errC <- fmt.Errorf("message subscription failed: %w", err)
return
case ev := <-messageC:
logger.Info("Received a log event from the contract", zap.Any("ev", ev))
}
}
}()
}

// Wait for SIGTERM.
logger.Info("Waiting for sigterm.")
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
go func() {
<-sigterm
logger.Info("Received sigterm. exiting.")
cancel()
}()

// Wait for either a shutdown or a fatal error from the permissions watcher.
select {
case <-ctx.Done():
logger.Info("Context cancelled, exiting...")
break
case err := <-errC:
logger.Error("Encountered an error, exiting", zap.Error(err))
break
}

}

0 comments on commit 7d008ed

Please sign in to comment.