Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eth: Backfill blocks in batches #2489

Merged
merged 4 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#### General
- \#1333 Display git-sha in startup logging (@emranemran)
- \#2443 Add e2e tests for O configuration and resignation (@red-0ne)
- \#2489 Backfill blocks in batches (@leszko)

#### Broadcaster
- \#2462 cmd: Delete temporary env variable LP_IS_ORCH_TESTER (@leszko)
Expand Down
4 changes: 3 additions & 1 deletion cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -879,10 +879,12 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {

// Backfill events that the node has missed since its last seen block. This method will block
// and the node will not continue setup until it finishes
if err := blockWatcher.BackfillEventsIfNeeded(blockWatchCtx); err != nil {
glog.Infof("Backfilling block events (this can take a while)...\n")
if err := blockWatcher.BackfillEvents(blockWatchCtx); err != nil {
glog.Errorf("Failed to backfill events: %v", err)
return
}
glog.Info("Done backfilling block events")

blockWatcherErr := make(chan error, 1)
go func() {
Expand Down
40 changes: 23 additions & 17 deletions eth/blockwatch/block_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,12 @@ func New(config Config) *Watcher {
return bs
}

// BackfillEventsIfNeeded finds missed events that might have occured while the
// node was offline and sends them to event subscribers. It blocks until
// it is done backfilling or the given context is canceled.
func (w *Watcher) BackfillEventsIfNeeded(ctx context.Context) error {
// BackfillEvents finds missed events and sends them to event subscribers.
// It blocks until it is done backfilling or the given context is canceled.
// Note that the latest block is never backfilled here from logs. It will be polled separately in syncToLatestBlock().
// The reason for that is that we always need to propagate events from the latest block even if it does not contain
// events which are filtered out during the backfilling process.
func (w *Watcher) BackfillEvents(ctx context.Context) error {
events, err := w.getMissedEventsToBackfill(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -117,7 +119,7 @@ func (w *Watcher) Watch(ctx context.Context) error {
ticker.Stop()
return nil
case <-ticker.C:
if err := w.syncToLatestBlock(); err != nil {
if err := w.syncToLatestBlock(ctx); err != nil {
glog.Errorf("blockwatch.Watcher error encountered - trying again on next polling interval err=%q", err)
}
}
Expand Down Expand Up @@ -147,9 +149,14 @@ func (w *Watcher) InspectRetainedBlocks() ([]*MiniHeader, error) {
return w.stack.Inspect()
}

func (w *Watcher) syncToLatestBlock() error {
func (w *Watcher) syncToLatestBlock(ctx context.Context) error {
w.Lock()
defer w.Unlock()

if err := w.BackfillEvents(ctx); err != nil {
return err
}

newestHeader, err := w.client.HeaderByNumber(nil)
if err != nil {
return err
Expand Down Expand Up @@ -303,10 +310,11 @@ func (w *Watcher) addLogs(header *MiniHeader) (*MiniHeader, error) {
return header, nil
}

// getMissedEventsToBackfill finds missed events that might have occured while the node was
// offline. It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older then the latest block, it batch fetches the events for missing blocks,
// getMissedEventsToBackfill finds missed events that might have occurred since the last block polling.
// It does this by comparing the last block stored with the latest block discoverable via RPC.
// If the stored block is older than the latest block, it batch-fetches the events for missing blocks,
// re-sets the stored blocks and returns the block events found.
// Note that the latest block is never backfilled, and will be polled separately during in syncToLatestBlock().
func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, error) {
events := []*Event{}

Expand All @@ -325,7 +333,9 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
if err != nil {
return events, err
}
latestBlockNum := int(latestBlock.Number.Int64())

// Latest block will be polled separately in syncToLatestBlock(), so it's not backfilled.
preLatestBlockNum := int(latestBlock.Number.Int64()) - 1
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

if latestRetainedBlock != nil {
latestRetainedBlockNum = int(latestRetainedBlock.Number.Int64())
Expand All @@ -335,14 +345,11 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
return events, nil
}

if blocksElapsed = latestBlockNum - startBlockNum; blocksElapsed <= 0 {
if blocksElapsed = preLatestBlockNum - startBlockNum; blocksElapsed <= 0 {
return events, nil
}

glog.Infof("Backfilling block events (this can take a while)...\n")
glog.Infof("Start block: %v End block: %v Blocks elapsed: %v\n", startBlockNum, startBlockNum+blocksElapsed, blocksElapsed)

logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, latestBlockNum)
logs, furthestBlockProcessed := w.getLogsInBlockRange(ctx, startBlockNum, preLatestBlockNum)
if furthestBlockProcessed > latestRetainedBlockNum {
// If we have processed blocks further then the latestRetainedBlock in the DB, we
// want to remove all blocks from the DB and insert the furthestBlockProcessed
Expand Down Expand Up @@ -396,7 +403,6 @@ func (w *Watcher) getMissedEventsToBackfill(ctx context.Context) ([]*Event, erro
BlockHeader: blockHeader,
})
}
glog.Info("Done backfilling block events")
return events, nil
}
return events, nil
Expand Down Expand Up @@ -558,7 +564,7 @@ func (w *Watcher) getSubBlockRanges(from, to, rangeSize int) []*blockRange {
const infuraTooManyResultsErrMsg = "query returned more than 10000 results"

func (w *Watcher) filterLogsRecursively(from, to int, allLogs []types.Log) ([]types.Log, error) {
glog.Infof("fetching block logs from=%v to=%v", from, to)
glog.V(6).Infof("Polling blocks from=%v to=%v", from, to)
numBlocks := to - from
topics := [][]common.Hash{}
if len(w.topics) > 0 {
Expand Down
7 changes: 4 additions & 3 deletions eth/blockwatch/block_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,13 @@ func TestGetMissedEventsToBackfillSomeMissed(t *testing.T) {

store := &stubMiniHeaderStore{}
// Add block number 5 as the last block seen by BlockWatcher
lastBlockSeen := &MiniHeader{
preLastBlockSeen := &MiniHeader{
Number: big.NewInt(5),
Hash: common.HexToHash("0x293b9ea024055a3e9eddbf9b9383dc7731744111894af6aa038594dc1b61f87f"),
Parent: common.HexToHash("0x26b13ac89500f7fcdd141b7d1b30f3a82178431eca325d1cf10998f9d68ff5ba"),
}
err = store.InsertMiniHeader(lastBlockSeen)
err = store.InsertMiniHeader(preLastBlockSeen)

require.NoError(t, err)

config.Store = store
Expand All @@ -211,7 +212,7 @@ func TestGetMissedEventsToBackfillSomeMissed(t *testing.T) {
headers, err := store.FindAllMiniHeadersSortedByNumber()
require.NoError(t, err)
require.Len(t, headers, 1)
assert.Equal(t, big.NewInt(30), headers[0].Number)
assert.Equal(t, big.NewInt(29), headers[0].Number)
}

func TestGetMissedEventsToBackfillNoneMissed(t *testing.T) {
Expand Down
10 changes: 10 additions & 0 deletions eth/blockwatch/testdata/fake_client_fast_sync_fixture.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
"hash": "0x382b25dd926322722bba2575b5092be661a99f26472e2b7c2bb3d51e6bd2b09c",
"parent": "0xb57233b65a0da953da19d685d10a65e3124207fd1cf4694e3fc0f7279373bf5f",
"number": 30
},
"29": {
"hash": "0xb57233b65a0da953da19d685d10a65e3124207fd1cf4694e3fc0f7279373bf5f",
"parent": "0x0f52679a1ff257072b0588234c9cf9727cc4fe3a973e5fa528474d2be638253b",
"number": 29
}
},
"getBlockByHash": {
Expand All @@ -27,6 +32,11 @@
"hash": "0x382b25dd926322722bba2575b5092be661a99f26472e2b7c2bb3d51e6bd2b09c",
"parent": "0xb57233b65a0da953da19d685d10a65e3124207fd1cf4694e3fc0f7279373bf5f",
"number": 30
},
"0xb57233b65a0da953da19d685d10a65e3124207fd1cf4694e3fc0f7279373bf5f": {
"hash": "0xb57233b65a0da953da19d685d10a65e3124207fd1cf4694e3fc0f7279373bf5f",
"parent": "0x0f52679a1ff257072b0588234c9cf9727cc4fe3a973e5fa528474d2be638253b",
"number": 29
}
},
"getCorrectChain": [],
Expand Down