Skip to content

Commit

Permalink
fix: events: address sqlite index selection performance regressions
Browse files Browse the repository at this point in the history
Fixes: #12255

SQLite was found to be avoiding the intended initial indexes for some types of
complex queries and opting for minor indexes which don't narrow down the search
space enough. Specifically we want queries to first either narrow by height
or tipset_key_cid and then apply other criteria. Having alternative indexes
when a query such as `height>=X AND height<=Y` are encountered cause SQLite to
avoid the height index entirely. By removing additional indexes that could be
used during the main query path (prefillFilter), we force SQLite to use the
intended indexes and narrow the results without the use of indexes.
  • Loading branch information
rvagg committed Jul 18, 2024
1 parent 43b7a78 commit cd53adf
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 56 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

## ☢️ Upgrade Warnings ☢️

- This Lotus release includes some correctness improvements to the events subsystem, impacting RPC APIs including `GetActorEventsRaw`, `SubscribeActorEventsRaw`, `eth_getLogs` and the `eth` filter APIs. Part of these improvements involve an events database migration that may take some time to complete on nodes with extensive event databases. See [filecoin-project/lotus#12080](https:/filecoin-project/lotus/pull/12080) for details.
- This Lotus release includes some performance improvements to the events subsystem, impacting RPC APIs including `GetActorEventsRaw`, `SubscribeActorEventsRaw`, `eth_getLogs` and the `eth` filter APIs. Part of these improvements involve an events database migration that may take a some time to complete on nodes with extensive event databases. See [filecoin-project/lotus#12258](https:/filecoin-project/lotus/pull/12258) for details.

- Breaking change in public APIs `storage/pipeline.NewPreCommitBatcher`, `sealing.NewCommitBatcher` and `storage/pipeline.New`. They now have an additional error return to deal with errors arising from fetching the sealing config.

Expand Down
112 changes: 75 additions & 37 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ var ddls = []string{
reverted INTEGER NOT NULL
)`,

createIndexEventEmitterAddr,
createIndexEventTipsetKeyCid,
createIndexEventHeight,
createIndexEventReverted,

`CREATE TABLE IF NOT EXISTS event_entry (
event_id INTEGER,
Expand All @@ -55,8 +53,6 @@ var ddls = []string{

createTableEventsSeen,

createIndexEventEntryIndexedKey,
createIndexEventEntryCodecValue,
createIndexEventEntryEventId,
createIndexEventsSeenHeight,
createIndexEventsSeenTipsetKeyCid,
Expand All @@ -67,34 +63,67 @@ var (
)

const (
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key_cid BLOB NOT NULL,
reverted INTEGER NOT NULL,
UNIQUE(height, tipset_key_cid)
)`

insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)`
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
revertEventsInTipset = `UPDATE event SET reverted=true WHERE height=? AND tipset_key=?`
restoreEvent = `UPDATE event SET reverted=false WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
revertEventSeen = `UPDATE events_seen SET reverted=true WHERE height=? AND tipset_key_cid=?`
restoreEventSeen = `UPDATE events_seen SET reverted=false WHERE height=? AND tipset_key_cid=?`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`
getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen`
isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?`

createIndexEventEmitterAddr = `CREATE INDEX IF NOT EXISTS event_emitter_addr ON event (emitter_addr)`
eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?`
// QUERY PLAN
// `--SEARCH event USING INDEX event_height (height=?)
isTipsetProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE tipset_key_cid=?`
// QUERY PLAN
// `--SEARCH events_seen USING COVERING INDEX events_seen_tipset_key_cid (tipset_key_cid=?)
getMaxHeightInIndex = `SELECT MAX(height) FROM events_seen`
// QUERY PLAN
// `--SEARCH events_seen USING COVERING INDEX events_seen_height
isHeightProcessed = `SELECT COUNT(*) > 0 FROM events_seen WHERE height=?`
// QUERY PLAN
// `--SEARCH events_seen USING COVERING INDEX events_seen_height (height=?)

// When modifying indexes in this file, it is critical to test the query plan (EXPLAIN QUERY PLAN)
// of all the variations of queries built by prefillFilter to ensure that the query first hits
// an index that narrows down results to an epoch or a reasonable range of epochs. Specifically,
// event_tipset_key_cid or event_height should be the first index. Then further narrowing can take
// place within the small subset of results.
// Unfortunately SQLite has some quicks in index selection that mean that certain query types will
// bypass these indexes if alternatives are available. This has been observed specifically on
// queries with height ranges: `height>=X AND height<=Y`.
//
// e.g.:
// EXPLAIN QUERY PLAN
// SELECT
// event.height, event.tipset_key_cid, event_entry.indexed, event_entry.codec, event_entry.key, event_entry.value
// FROM event
// JOIN
// event_entry ON event.id=event_entry.event_id,
// event_entry ee2 ON event.id=ee2.event_id
// WHERE event.height>=? AND event.height<=? AND event.reverted=? AND event.emitter_addr=? AND ee2.indexed=1 AND ee2.key=?
// ORDER BY event.height DESC, event_entry._rowid_ ASC
//
// ->
//
// QUERY PLAN
// |--SEARCH event USING INDEX event_height (height>? AND height<?)
// |--SEARCH ee2 USING INDEX event_entry_event_id (event_id=?)
// |--SEARCH event_entry USING INDEX event_entry_event_id (event_id=?)
// `--USE TEMP B-TREE FOR RIGHT PART OF ORDER BY

createIndexEventTipsetKeyCid = `CREATE INDEX IF NOT EXISTS event_tipset_key_cid ON event (tipset_key_cid);`
createIndexEventHeight = `CREATE INDEX IF NOT EXISTS event_height ON event (height);`
createIndexEventReverted = `CREATE INDEX IF NOT EXISTS event_reverted ON event (reverted);`

createIndexEventEntryIndexedKey = `CREATE INDEX IF NOT EXISTS event_entry_indexed_key ON event_entry (indexed, key);`
createIndexEventEntryCodecValue = `CREATE INDEX IF NOT EXISTS event_entry_codec_value ON event_entry (codec, value);`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`

createTableEventsSeen = `CREATE TABLE IF NOT EXISTS events_seen (
id INTEGER PRIMARY KEY,
height INTEGER NOT NULL,
tipset_key_cid BLOB NOT NULL,
reverted INTEGER NOT NULL,
UNIQUE(height, tipset_key_cid)
)`
createIndexEventEntryEventId = `CREATE INDEX IF NOT EXISTS event_entry_event_id ON event_entry(event_id);`

createIndexEventsSeenHeight = `CREATE INDEX IF NOT EXISTS events_seen_height ON events_seen (height);`
createIndexEventsSeenTipsetKeyCid = `CREATE INDEX IF NOT EXISTS events_seen_tipset_key_cid ON events_seen (tipset_key_cid);`
Expand Down Expand Up @@ -200,6 +229,7 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
migrationVersion4,
migrationVersion5,
migrationVersion6,
migrationVersion7,
})
if err != nil {
_ = db.Close()
Expand Down Expand Up @@ -470,7 +500,7 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

// PrefillFilter fills a filter's collection of events from the historic index
// prefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, excludeReverted bool) error {
clauses := []string{}
values := []any{}
Expand All @@ -480,13 +510,20 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
clauses = append(clauses, "event.tipset_key_cid=?")
values = append(values, f.tipsetCid.Bytes())
} else {
if f.minHeight >= 0 {
clauses = append(clauses, "event.height>=?")
if f.minHeight >= 0 && f.minHeight == f.maxHeight {
clauses = append(clauses, "event.height=?")
values = append(values, f.minHeight)
}
if f.maxHeight >= 0 {
clauses = append(clauses, "event.height<=?")
values = append(values, f.maxHeight)
} else {
if f.maxHeight >= 0 && f.minHeight >= 0 {
clauses = append(clauses, "event.height BETWEEN ? AND ?")
values = append(values, f.minHeight, f.maxHeight)
} else if f.minHeight >= 0 {
clauses = append(clauses, "event.height >= ?")
values = append(values, f.minHeight)
} else if f.maxHeight >= 0 {
clauses = append(clauses, "event.height <= ?")
values = append(values, f.maxHeight)
}
}
}

Expand All @@ -496,12 +533,11 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
}

if len(f.addresses) > 0 {
subclauses := make([]string, 0, len(f.addresses))
for _, addr := range f.addresses {
subclauses = append(subclauses, "emitter_addr=?")
values = append(values, addr.Bytes())
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")

clauses = append(clauses, "event.emitter_addr IN ("+strings.Repeat("?,", len(f.addresses)-1)+"?)")
}

if len(f.keysWithCodec) > 0 {
Expand All @@ -510,15 +546,17 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
if len(vals) > 0 {
join++
joinAlias := fmt.Sprintf("ee%d", join)
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
joins = append(joins, fmt.Sprintf("event_entry %s ON event.id=%[1]s.event_id", joinAlias))
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
values = append(values, key)
subclauses := make([]string, 0, len(vals))
for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias))
values = append(values, val.Value, val.Codec)
if len(vals) > 1 {
subclauses := make([]string, 0, len(vals))
for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("(%s.value=? AND %[1]s.codec=?)", joinAlias))
values = append(values, val.Value, val.Codec)
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
}
}
Expand Down
57 changes: 39 additions & 18 deletions chain/events/filter/index_migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,11 @@ func migrationVersion2(db *sql.DB, chainStore *store.ChainStore) sqlite.Migratio
}
}

// migrationVersion3 migrates the schema from version 2 to version 3 by creating two indices:
// 1) an index on the event.emitter_addr column, and 2) an index on the event_entry.key column.
// migrationVersion3 migrates the schema from version 2 to version 3. It originally added two
// indices: 1) an index on the event.emitter_addr column, and 2) an index on the event_entry.key
// column. However, as of version 7, these indices have been removed as they were found to be a
// performance hindrance. This migration is now a no-op.
func migrationVersion3(ctx context.Context, tx *sql.Tx) error {
// create index on event.emitter_addr.
_, err := tx.ExecContext(ctx, createIndexEventEmitterAddr)
if err != nil {
return xerrors.Errorf("create index event_emitter_addr: %w", err)
}

// original v3 migration introduced an index:
// CREATE INDEX IF NOT EXISTS event_entry_key_index ON event_entry (key)
// which has subsequently been removed in v4, so it's omitted here

return nil
}

Expand All @@ -161,10 +153,13 @@ func migrationVersion3(ctx context.Context, tx *sql.Tx) error {
// And then creating the following indices:
// 1. an index on the event.tipset_key_cid column
// 2. an index on the event.height column
// 3. an index on the event.reverted column
// 4. an index on the event_entry.indexed and event_entry.key columns
// 5. an index on the event_entry.codec and event_entry.value columns
// 3. an index on the event.reverted column (removed in version 7)
// 4. an index on the event_entry.indexed and event_entry.key columns (removed in version 7)
// 5. an index on the event_entry.codec and event_entry.value columns (removed in version 7)
// 6. an index on the event_entry.event_id column
//
// Indexes 3, 4, and 5 were removed in version 7 as they were found to be a performance hindrance so
// are omitted here.
func migrationVersion4(ctx context.Context, tx *sql.Tx) error {
for _, create := range []struct {
desc string
Expand All @@ -174,9 +169,6 @@ func migrationVersion4(ctx context.Context, tx *sql.Tx) error {
{"drop index event_entry_key_index", "DROP INDEX IF EXISTS event_entry_key_index;"},
{"create index event_tipset_key_cid", createIndexEventTipsetKeyCid},
{"create index event_height", createIndexEventHeight},
{"create index event_reverted", createIndexEventReverted},
{"create index event_entry_indexed_key", createIndexEventEntryIndexedKey},
{"create index event_entry_codec_value", createIndexEventEntryCodecValue},
{"create index event_entry_event_id", createIndexEventEntryEventId},
} {
if _, err := tx.ExecContext(ctx, create.query); err != nil {
Expand Down Expand Up @@ -236,3 +228,32 @@ func migrationVersion6(ctx context.Context, tx *sql.Tx) error {

return nil
}

// migrationVersion7 migrates the schema from version 6 to version 7 by dropping the following
// indices:
// 1. the index on the event.emitter_addr column
// 2. the index on the event.reverted column
// 3. the index on the event_entry.indexed and event_entry.key columns
// 4. the index on the event_entry.codec and event_entry.value columns
//
// These indices were found to be a performance hindrance as they prevent SQLite from using the
// intended initial indexes on height or tipset_key_cid in many query variations. Without additional
// indices to fall-back on, SQLite is forced to narrow down each query via height or tipset_key_cid
// which is the desired behavior.
func migrationVersion7(ctx context.Context, tx *sql.Tx) error {
for _, create := range []struct {
desc string
query string
}{
{"drop index event_emitter_addr", "DROP INDEX IF EXISTS event_emitter_addr;"},
{"drop index event_reverted", "DROP INDEX IF EXISTS event_reverted;"},
{"drop index event_entry_indexed_key", "DROP INDEX IF EXISTS event_entry_indexed_key;"},
{"drop index event_entry_codec_value", "DROP INDEX IF EXISTS event_entry_codec_value;"},
} {
if _, err := tx.ExecContext(ctx, create.query); err != nil {
return xerrors.Errorf("%s: %w", create.desc, err)
}
}

return nil
}

0 comments on commit cd53adf

Please sign in to comment.