Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth/filters: return sender in pending tx subscription #26034

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions accounts/abi/bind/backends/simulated.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,8 @@ type filterBackend struct {

func (fb *filterBackend) ChainDb() ethdb.Database { return fb.db }

func (fb *filterBackend) ChainConfig() *params.ChainConfig { return fb.backend.config }

func (fb *filterBackend) EventMux() *event.TypeMux { panic("not supported") }

func (fb *filterBackend) HeaderByNumber(ctx context.Context, number rpc.BlockNumber) (*types.Header, error) {
Expand Down
11 changes: 6 additions & 5 deletions eth/filters/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/rpc"
)

Expand All @@ -38,7 +39,7 @@ type filter struct {
typ Type
deadline *time.Timer // filter is inactive when deadline triggers
hashes []common.Hash
txs []*types.Transaction
txs []*ethapi.RPCTransaction
crit FilterCriteria
logs []*types.Log
s *Subscription // associated subscription in event system
Expand Down Expand Up @@ -104,12 +105,12 @@ func (api *FilterAPI) timeoutLoop(timeout time.Duration) {
// `eth_getFilterChanges` polling method that is also used for log filters.
func (api *FilterAPI) NewPendingTransactionFilter() rpc.ID {
var (
pendingTxs = make(chan []*types.Transaction)
pendingTxs = make(chan []*ethapi.RPCTransaction)
pendingTxSub = api.events.SubscribePendingTxs(pendingTxs)
)

api.filtersMu.Lock()
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*types.Transaction, 0), s: pendingTxSub}
api.filters[pendingTxSub.ID] = &filter{typ: PendingTransactionsSubscription, deadline: time.NewTimer(api.timeout), txs: make([]*ethapi.RPCTransaction, 0), s: pendingTxSub}
api.filtersMu.Unlock()

go func() {
Expand Down Expand Up @@ -145,7 +146,7 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
rpcSub := notifier.CreateSubscription()

go func() {
txs := make(chan []*types.Transaction, 128)
txs := make(chan []*ethapi.RPCTransaction, 128)
pendingTxSub := api.events.SubscribePendingTxs(txs)

for {
Expand All @@ -157,7 +158,7 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool)
if fullTx != nil && *fullTx {
notifier.Notify(rpcSub.ID, tx)
} else {
notifier.Notify(rpcSub.ID, tx.Hash())
notifier.Notify(rpcSub.ID, tx.Hash)
}
}
case <-rpcSub.Err():
Expand Down
3 changes: 2 additions & 1 deletion eth/filters/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
)

func BenchmarkBloomBits512(b *testing.B) {
Expand Down Expand Up @@ -176,7 +177,7 @@ func BenchmarkNoBloomBits(b *testing.B) {

clearBloomBits(db)

_, sys := newTestFilterSystem(b, db, Config{})
_, sys := newTestFilterSystem(b, db, Config{}, params.TestChainConfig)

b.Log("Running filter benchmarks...")
start := time.Now()
Expand Down
27 changes: 20 additions & 7 deletions eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
lru "github.com/hashicorp/golang-lru"
)
Expand All @@ -55,6 +57,7 @@ func (cfg Config) withDefaults() Config {

type Backend interface {
ChainDb() ethdb.Database
ChainConfig() *params.ChainConfig
HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error)
HeaderByHash(ctx context.Context, blockHash common.Hash) (*types.Header, error)
GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error)
Expand Down Expand Up @@ -151,7 +154,7 @@ type subscription struct {
created time.Time
logsCrit ethereum.FilterQuery
logs chan []*types.Log
txs chan []*types.Transaction
txs chan []*ethapi.RPCTransaction
headers chan *types.Header
installed chan struct{} // closed when the filter is installed
err chan error // closed when the filter is uninstalled
Expand Down Expand Up @@ -311,7 +314,7 @@ func (es *EventSystem) subscribeMinedPendingLogs(crit ethereum.FilterQuery, logs
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
txs: make(chan []*ethapi.RPCTransaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -328,7 +331,7 @@ func (es *EventSystem) subscribeLogs(crit ethereum.FilterQuery, logs chan []*typ
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
txs: make(chan []*ethapi.RPCTransaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -345,7 +348,7 @@ func (es *EventSystem) subscribePendingLogs(crit ethereum.FilterQuery, logs chan
logsCrit: crit,
created: time.Now(),
logs: logs,
txs: make(chan []*types.Transaction),
txs: make(chan []*ethapi.RPCTransaction),
headers: make(chan *types.Header),
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -361,7 +364,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti
typ: BlocksSubscription,
created: time.Now(),
logs: make(chan []*types.Log),
txs: make(chan []*types.Transaction),
txs: make(chan []*ethapi.RPCTransaction),
headers: headers,
installed: make(chan struct{}),
err: make(chan error),
Expand All @@ -371,7 +374,7 @@ func (es *EventSystem) SubscribeNewHeads(headers chan *types.Header) *Subscripti

// SubscribePendingTxs creates a subscription that writes transactions for
// transactions that enter the transaction pool.
func (es *EventSystem) SubscribePendingTxs(txs chan []*types.Transaction) *Subscription {
func (es *EventSystem) SubscribePendingTxs(txs chan []*ethapi.RPCTransaction) *Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel that at this point - internally - we should not shuffle RPCTransaction objects. It's fine to convert to these types at the last step before returning to the user, but it's an RPC output nicety, Geth internally should not pass around RPC related types.

sub := &subscription{
id: rpc.NewID(),
typ: PendingTransactionsSubscription,
Expand Down Expand Up @@ -422,7 +425,17 @@ func (es *EventSystem) handleRemovedLogs(filters filterIndex, ev core.RemovedLog

func (es *EventSystem) handleTxsEvent(filters filterIndex, ev core.NewTxsEvent) {
for _, f := range filters[PendingTransactionsSubscription] {
f.txs <- ev.Txs
header, err := es.backend.HeaderByNumber(context.Background(), rpc.LatestBlockNumber)
if err != nil {
log.Warn("failed to get latest header when processing new pending tx event", "err", err)
return
}
config := es.backend.ChainConfig()
txs := make([]*ethapi.RPCTransaction, 0, len(ev.Txs))
for _, tx := range ev.Txs {
txs = append(txs, ethapi.NewRPCPendingTransaction(tx, header, config))
}
f.txs <- txs
}
}

Expand Down
63 changes: 43 additions & 20 deletions eth/filters/filter_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@ import (
"github.com/ethereum/go-ethereum/core/bloombits"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rpc"
)

type testBackend struct {
db ethdb.Database
chainConfig *params.ChainConfig
sections uint64
txFeed event.Feed
logsFeed event.Feed
Expand All @@ -54,6 +57,10 @@ func (b *testBackend) ChainDb() ethdb.Database {
return b.db
}

func (b *testBackend) ChainConfig() *params.ChainConfig {
return b.chainConfig
}

func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) {
var (
hash common.Hash
Expand Down Expand Up @@ -158,8 +165,8 @@ func (b *testBackend) ServiceFilter(ctx context.Context, session *bloombits.Matc
}()
}

func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config) (*testBackend, *FilterSystem) {
backend := &testBackend{db: db}
func newTestFilterSystem(t testing.TB, db ethdb.Database, cfg Config, chainConfig *params.ChainConfig) (*testBackend, *FilterSystem) {
backend := &testBackend{db: db, chainConfig: chainConfig}
sys := NewFilterSystem(backend, cfg)
return backend, sys
}
Expand All @@ -173,15 +180,15 @@ func TestBlockSubscription(t *testing.T) {
t.Parallel()

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
api = NewFilterAPI(sys, false)
genesis = &core.Genesis{
db = rawdb.NewMemoryDatabase()
genesis = &core.Genesis{
Config: params.TestChainConfig,
BaseFee: big.NewInt(params.InitialBaseFee),
}
_, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
backend, sys = newTestFilterSystem(t, db, Config{}, genesis.Config)
api = NewFilterAPI(sys, false)
_, chain, _ = core.GenerateChainWithGenesis(genesis, ethash.NewFaker(), 10, func(i int, gen *core.BlockGen) {})
chainEvents = []core.ChainEvent{}
)

for _, blk := range chain {
Expand Down Expand Up @@ -229,8 +236,12 @@ func TestPendingTxFilter(t *testing.T) {

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
chainConfig = params.TestChainConfig
backend, sys = newTestFilterSystem(t, db, Config{}, chainConfig)
api = NewFilterAPI(sys, false)
from = "0x682a80a6f560eeC50d54E63CBeDa1c324C5F8d1b"
privkey, err = crypto.HexToECDSA("0000000000000000deadbeef00000000000000000000000000000000deadbeef")
signer = types.LatestSigner(chainConfig)

transactions = []*types.Transaction{
types.NewTransaction(0, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
Expand All @@ -240,9 +251,18 @@ func TestPendingTxFilter(t *testing.T) {
types.NewTransaction(4, common.HexToAddress("0xb794f5ea0ba39494ce83a213fffba74279579268"), new(big.Int), 0, new(big.Int), nil),
}

txs []*types.Transaction
txs []*ethapi.RPCTransaction
)
if err != nil {
t.Fatalf("failed to parse private key: %v", err)
}

for i, tx := range transactions {
transactions[i], err = types.SignTx(tx, signer, privkey)
if err != nil {
t.Fatalf("failed to sign tx: %v", err)
}
}
fid0 := api.NewPendingTransactionFilter()

time.Sleep(1 * time.Second)
Expand All @@ -255,7 +275,7 @@ func TestPendingTxFilter(t *testing.T) {
t.Fatalf("Unable to retrieve logs: %v", err)
}

tx := results.([]*types.Transaction)
tx := results.([]*ethapi.RPCTransaction)
txs = append(txs, tx...)
if len(txs) >= len(transactions) {
break
Expand All @@ -273,8 +293,11 @@ func TestPendingTxFilter(t *testing.T) {
return
}
for i := range txs {
if txs[i].Hash() != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash())
if txs[i].Hash != transactions[i].Hash() {
t.Errorf("hashes[%d] invalid, want %x, got %x", i, transactions[i].Hash(), txs[i].Hash)
}
if txs[i].From.Hex() != from {
t.Errorf("sender[%d] invalid, want %x, got %x", i, from, txs[i].From)
}
}
}
Expand All @@ -284,7 +307,7 @@ func TestPendingTxFilter(t *testing.T) {
func TestLogFilterCreation(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
_, sys = newTestFilterSystem(t, db, Config{}, params.TestChainConfig)
api = NewFilterAPI(sys, false)

testCases = []struct {
Expand Down Expand Up @@ -331,7 +354,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {

var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
_, sys = newTestFilterSystem(t, db, Config{}, params.TestChainConfig)
api = NewFilterAPI(sys, false)
)

Expand All @@ -353,7 +376,7 @@ func TestInvalidLogFilterCreation(t *testing.T) {
func TestInvalidGetLogsRequest(t *testing.T) {
var (
db = rawdb.NewMemoryDatabase()
_, sys = newTestFilterSystem(t, db, Config{})
_, sys = newTestFilterSystem(t, db, Config{}, params.TestChainConfig)
api = NewFilterAPI(sys, false)
blockHash = common.HexToHash("0x1111111111111111111111111111111111111111111111111111111111111111")
)
Expand All @@ -378,7 +401,7 @@ func TestLogFilter(t *testing.T) {

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
backend, sys = newTestFilterSystem(t, db, Config{}, params.TestChainConfig)
api = NewFilterAPI(sys, false)

firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
Expand Down Expand Up @@ -492,7 +515,7 @@ func TestPendingLogsSubscription(t *testing.T) {

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{})
backend, sys = newTestFilterSystem(t, db, Config{}, params.TestChainConfig)
api = NewFilterAPI(sys, false)

firstAddr = common.HexToAddress("0x1111111111111111111111111111111111111111")
Expand Down Expand Up @@ -676,7 +699,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {

var (
db = rawdb.NewMemoryDatabase()
backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout})
backend, sys = newTestFilterSystem(t, db, Config{Timeout: timeout}, params.TestChainConfig)
api = NewFilterAPI(sys, false)
done = make(chan struct{})
)
Expand Down Expand Up @@ -709,7 +732,7 @@ func TestPendingTxFilterDeadlock(t *testing.T) {
if err != nil {
t.Fatalf("Filter should exist: %v\n", err)
}
if len(txs.([]*types.Transaction)) > 0 {
if len(txs.([]*ethapi.RPCTransaction)) > 0 {
break
}
runtime.Gosched()
Expand Down
15 changes: 7 additions & 8 deletions eth/filters/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func makeReceipt(addr common.Address) *types.Receipt {
func BenchmarkFilters(b *testing.B) {
var (
db, _ = rawdb.NewLevelDBDatabase(b.TempDir(), 0, 0, "", false)
_, sys = newTestFilterSystem(b, db, Config{})
_, sys = newTestFilterSystem(b, db, Config{}, params.TestChainConfig)
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
addr2 = common.BytesToAddress([]byte("jeff"))
Expand Down Expand Up @@ -98,20 +98,19 @@ func BenchmarkFilters(b *testing.B) {
func TestFilters(t *testing.T) {
var (
db, _ = rawdb.NewLevelDBDatabase(t.TempDir(), 0, 0, "", false)
_, sys = newTestFilterSystem(t, db, Config{})
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
addr = crypto.PubkeyToAddress(key1.PublicKey)
gspec = &core.Genesis{
Config: params.TestChainConfig,
Alloc: core.GenesisAlloc{addr: {Balance: big.NewInt(1000000)}},
BaseFee: big.NewInt(params.InitialBaseFee),
}
_, sys = newTestFilterSystem(t, db, Config{}, gspec.Config)

hash1 = common.BytesToHash([]byte("topic1"))
hash2 = common.BytesToHash([]byte("topic2"))
hash3 = common.BytesToHash([]byte("topic3"))
hash4 = common.BytesToHash([]byte("topic4"))

gspec = &core.Genesis{
Config: params.TestChainConfig,
Alloc: core.GenesisAlloc{addr: {Balance: big.NewInt(1000000)}},
BaseFee: big.NewInt(params.InitialBaseFee),
}
)
defer db.Close()

Expand Down
Loading