Skip to content

Commit

Permalink
[bloomFilter] fix FilterBlocksInRange (#3062)
Browse files Browse the repository at this point in the history
* fix FilterBlocksInRange
  • Loading branch information
Liuhaai authored Feb 9, 2022
1 parent d0062aa commit 8cce51c
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 41 deletions.
62 changes: 30 additions & 32 deletions blockindex/bloomfilterindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,24 +207,32 @@ func (bfx *bloomfilterIndexer) FilterBlocksInRange(l *filter.LogFilter, start, e
if start == 0 || end == 0 || end < start {
return nil, errors.New("start/end height should be bigger than zero")
}

br, err := bfx.getRangeFilters(start, end)
if err != nil {
var (
startIndex, endIndex uint64
blockNumbers = make([]uint64, 0)
err error
)
if startIndex, err = bfx.getIndexByHeight(start); err != nil {
return nil, err
}

blockNumbers := make([]uint64, 0)
for i := range br {
bigBloom := br[i].BloomFilter
if l.ExistInBloomFilterv2(bigBloom) {
searchStart, searchEnd := br[i].Start(), br[i].End()
if i == 0 {
if endIndex, err = bfx.getIndexByHeight(end); err != nil {
return nil, err
}
for ; startIndex <= endIndex; startIndex++ {
br, err := bfx.getBloomRangeByIndex(startIndex)
if err != nil {
return nil, err
}
if l.ExistInBloomFilterv2(br.BloomFilter) {
searchStart := br.Start()
if start > searchStart {
searchStart = start
}
if i == len(br)-1 {
searchEnd := br.End()
if end < searchEnd {
searchEnd = end
}
blockNumbers = append(blockNumbers, l.SelectBlocksFromRangeBloomFilter(bigBloom, searchStart, searchEnd)...)
blockNumbers = append(blockNumbers, l.SelectBlocksFromRangeBloomFilter(br.BloomFilter, searchStart, searchEnd)...)
}
}
return blockNumbers, nil
Expand Down Expand Up @@ -289,29 +297,19 @@ func (bfx *bloomfilterIndexer) addLogsToRangeBloomFilter(ctx context.Context, bl
}
}

func (bfx *bloomfilterIndexer) getRangeFilters(start, end uint64) ([]*bloomRange, error) {
b, err := bfx.totalRange.Get(start)
func (bfx *bloomfilterIndexer) getBloomRangeByIndex(idx uint64) (*bloomRange, error) {
bfKey := byteutil.Uint64ToBytesBigEndian(idx)
bfBytes, err := bfx.kvStore.Get(RangeBloomFilterNamespace, bfKey)
if err != nil {
return nil, err
}
startIndex := byteutil.BytesToUint64BigEndian(b)
if b, err = bfx.totalRange.Get(end); err != nil {
return nil, err
}
endIndex := byteutil.BytesToUint64BigEndian(b)
return bloomRangeFromBytes(bfBytes)
}

var br []*bloomRange
for ; startIndex <= endIndex; startIndex++ {
bfKey := byteutil.Uint64ToBytesBigEndian(startIndex)
bfBytes, err := bfx.kvStore.Get(RangeBloomFilterNamespace, bfKey)
if err != nil {
return nil, err
}
bf, err := bloomRangeFromBytes(bfBytes)
if err != nil {
return nil, err
}
br = append(br, bf)
func (bfx *bloomfilterIndexer) getIndexByHeight(height uint64) (uint64, error) {
val, err := bfx.totalRange.Get(height)
if err != nil {
return 0, err
}
return br, nil
return byteutil.BytesToUint64BigEndian(val), nil
}
81 changes: 73 additions & 8 deletions blockindex/bloomfilterindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"context"
"hash/fnv"
"math/big"
"runtime"
"sync"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -272,14 +274,6 @@ func TestBloomfilterIndexer(t *testing.T) {
require.NoError(err)
require.Equal(expectedRes4[i], res)
}

bfs, err := indexer.(*bloomfilterIndexer).getRangeFilters(1, 5)
require.NoError(err)
require.Equal(2, len(bfs))
require.EqualValues(1, bfs[0].Start())
require.EqualValues(3, bfs[0].End())
require.EqualValues(4, bfs[1].Start())
require.EqualValues(5, bfs[1].End())
}

path := "test-indexer"
Expand All @@ -294,3 +288,74 @@ func TestBloomfilterIndexer(t *testing.T) {
testIndexer(db.NewBoltDB(cfg), t)
})
}

func BenchmarkBloomfilterIndexer(b *testing.B) {
require := require.New(b)

indexerCfg := config.Default.Indexer
indexerCfg.RangeBloomFilterNumElements = 16
indexerCfg.RangeBloomFilterSize = 4096
indexerCfg.RangeBloomFilterNumHash = 4

testFilter := iotexapi.LogsFilter{
Address: []string{identityset.Address(28).String()},
Topics: []*iotexapi.Topics{
{
Topic: [][]byte{
data2[:],
},
},
},
}
testinglf := logfilter.NewLogFilter(&testFilter, nil, nil)

var (
blkNum = 2000
receiptNumPerBlk = 1000
blks = make([]block.Block, blkNum)
wg sync.WaitGroup
)
for i := 0; i < blkNum; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
receipts := make([]*action.Receipt, receiptNumPerBlk)
for j := 0; j < receiptNumPerBlk; j++ {
receipt := &action.Receipt{}
testLog := newTestLog(identityset.Address(28).String(), []hash.Hash256{data2})
receipt.AddLogs(testLog)
receipts[j] = receipt
}
blk, err := block.NewTestingBuilder().
SetHeight(uint64(i + 1)).
SetReceipts(receipts).
SignAndBuild(identityset.PrivateKey(27))
if err != nil {
panic("fail")
}
blks[i] = blk
}(i)
}
wg.Wait()

// for n := 0; n < b.N; n++ {
testPath, err := testutil.PathOfTempFile("test-indexer")
require.NoError(err)
dbCfg := db.DefaultConfig
dbCfg.DbPath = testPath
defer testutil.CleanupPathV2(testPath)
indexer, err := NewBloomfilterIndexer(db.NewBoltDB(dbCfg), indexerCfg)
require.NoError(err)
ctx := context.Background()
require.NoError(indexer.Start(ctx))
defer func() {
require.NoError(indexer.Stop(ctx))
}()
for i := 0; i < len(blks); i++ {
require.NoError(indexer.PutBlock(context.Background(), &blks[i]))
}
runtime.GC()
res, err := indexer.FilterBlocksInRange(testinglf, 1, uint64(blkNum-1))
require.NoError(err)
require.Equal(blkNum-1, len(res))
}
2 changes: 1 addition & 1 deletion ioctl/newcmd/account/accountdelete.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,4 @@ func NewAccountDelete(c ioctl.Client) *cobra.Command {
return nil
},
}
}
}

0 comments on commit 8cce51c

Please sign in to comment.