downloader.go 74.0 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

17
// Package downloader contains the manual full chain synchronisation.
18 19 20
package downloader

import (
21
	"errors"
22
	"fmt"
23
	"math/big"
24 25 26 27
	"sync"
	"sync/atomic"
	"time"

28
	"github.com/ethereum/go-ethereum"
29
	"github.com/ethereum/go-ethereum/common"
30
	"github.com/ethereum/go-ethereum/core/rawdb"
31
	"github.com/ethereum/go-ethereum/core/state/snapshot"
32
	"github.com/ethereum/go-ethereum/core/types"
33
	"github.com/ethereum/go-ethereum/eth/protocols/eth"
34
	"github.com/ethereum/go-ethereum/eth/protocols/snap"
35
	"github.com/ethereum/go-ethereum/ethdb"
36
	"github.com/ethereum/go-ethereum/event"
37
	"github.com/ethereum/go-ethereum/log"
38
	"github.com/ethereum/go-ethereum/metrics"
39
	"github.com/ethereum/go-ethereum/params"
40
	"github.com/ethereum/go-ethereum/trie"
41 42
)

43
var (
44 45
	MaxBlockFetch   = 128 // Amount of blocks to be fetched per retrieval request
	MaxHeaderFetch  = 192 // Amount of block headers to be fetched per retrieval request
46
	MaxSkeletonSize = 128 // Number of header fetches to need for a skeleton assembly
47
	MaxReceiptFetch = 256 // Amount of transaction receipts to allow fetching per request
48
	MaxStateFetch   = 384 // Amount of node state values to allow fetching per request
49

50 51 52 53 54
	maxQueuedHeaders            = 32 * 1024                         // [eth/62] Maximum number of headers to queue for import (DOS protection)
	maxHeadersProcess           = 2048                              // Number of header download results to import at once into the chain
	maxResultsProcess           = 2048                              // Number of content download results to import at once into the chain
	fullMaxForkAncestry  uint64 = params.FullImmutabilityThreshold  // Maximum chain reorganisation (locally redeclared so tests can reduce it)
	lightMaxForkAncestry uint64 = params.LightImmutabilityThreshold // Maximum chain reorganisation (locally redeclared so tests can reduce it)
55

56 57 58
	reorgProtThreshold   = 48 // Threshold number of recent blocks to disable mini reorg protection
	reorgProtHeaderDelay = 2  // Number of headers to delay delivering to cover mini reorgs

59 60 61 62 63
	fsHeaderCheckFrequency = 100             // Verification frequency of the downloaded headers during fast sync
	fsHeaderSafetyNet      = 2048            // Number of headers to discard in case a chain violation is detected
	fsHeaderForceVerify    = 24              // Number of headers to verify before and after the pivot to accept it
	fsHeaderContCheck      = 3 * time.Second // Time interval to check for header continuations during state download
	fsMinFullBlocks        = 64              // Number of blocks to retrieve fully even in fast sync
64
)
65

66
var (
67 68 69 70
	errBusy                    = errors.New("busy")
	errUnknownPeer             = errors.New("peer is unknown or unhealthy")
	errBadPeer                 = errors.New("action from bad peer ignored")
	errStallingPeer            = errors.New("peer is stalling")
71
	errUnsyncedPeer            = errors.New("unsynced peer")
72 73 74 75 76 77 78 79 80 81
	errNoPeers                 = errors.New("no peers to keep download active")
	errTimeout                 = errors.New("timeout")
	errEmptyHeaderSet          = errors.New("empty header set by peer")
	errPeersUnavailable        = errors.New("no peers available or all tried for download")
	errInvalidAncestor         = errors.New("retrieved ancestor is invalid")
	errInvalidChain            = errors.New("retrieved hash chain is invalid")
	errInvalidBody             = errors.New("retrieved block body is invalid")
	errInvalidReceipt          = errors.New("retrieved receipt is invalid")
	errCancelStateFetch        = errors.New("state data download canceled (requested)")
	errCancelContentProcessing = errors.New("content processing canceled (requested)")
82
	errCanceled                = errors.New("syncing canceled (requested)")
83
	errNoSyncActive            = errors.New("no sync active")
84
	errTooOld                  = errors.New("peer's protocol version too old")
85
	errNoAncestorFound         = errors.New("no common ancestor found")
86 87
)

88
type Downloader struct {
89
	mode uint32         // Synchronisation mode defining the strategy used (per sync cycle), use d.getMode() to get the SyncMode
90
	mux  *event.TypeMux // Event multiplexer to announce sync operation events
91

92 93 94 95
	checkpoint uint64   // Checkpoint block number to enforce head against (e.g. fast sync)
	genesis    uint64   // Genesis block number to limit sync to (e.g. light client CHT)
	queue      *queue   // Scheduler for selecting the hashes to download
	peers      *peerSet // Set of active peers from which download can proceed
96 97

	stateDB    ethdb.Database  // Database to state sync into (and deduplicate via)
98
	stateBloom *trie.SyncBloom // Bloom filter for fast trie node and contract code existence checks
99

100
	// Statistics
101 102 103
	syncStatsChainOrigin uint64 // Origin block number where syncing started at
	syncStatsChainHeight uint64 // Highest block number known when syncing started
	syncStatsState       stateSyncStats
104
	syncStatsLock        sync.RWMutex // Lock protecting the sync stats fields
105

106
	lightchain LightChain
107
	blockchain BlockChain
108

109
	// Callbacks
110
	dropPeer peerDropFn // Drops a peer for misbehaving
111

112
	// Status
113 114 115
	synchroniseMock func(id string, hash common.Hash) error // Replacement for synchronise during testing
	synchronising   int32
	notified        int32
116
	committed       int32
117
	ancientLimit    uint64 // The maximum block number which can be regarded as ancient data.
118 119

	// Channels
120 121 122 123 124 125
	headerCh      chan dataPack        // Channel receiving inbound block headers
	bodyCh        chan dataPack        // Channel receiving inbound block bodies
	receiptCh     chan dataPack        // Channel receiving inbound receipts
	bodyWakeCh    chan bool            // Channel to signal the block body fetcher of new tasks
	receiptWakeCh chan bool            // Channel to signal the receipt fetcher of new tasks
	headerProcCh  chan []*types.Header // Channel to feed the header processor new tasks
126

127 128 129 130
	// State sync
	pivotHeader *types.Header // Pivot block header to dynamically push the syncing state root
	pivotLock   sync.RWMutex  // Lock protecting pivot header reads from updates

131 132
	snapSync       bool         // Whether to run state sync over the snap protocol
	SnapSyncer     *snap.Syncer // TODO(karalabe): make private! hack for now
133 134
	stateSyncStart chan *stateSync
	trackStateReq  chan *stateReq
135
	stateCh        chan dataPack // Channel receiving inbound node state data
136

137
	// Cancellation and termination
138 139 140 141
	cancelPeer string         // Identifier of the peer currently being used as the master (cancel on drop)
	cancelCh   chan struct{}  // Channel to cancel mid-flight syncs
	cancelLock sync.RWMutex   // Lock to protect the cancel channel and peer in delivers
	cancelWg   sync.WaitGroup // Make sure all fetcher goroutines have exited.
142

143
	quitCh   chan struct{} // Quit channel to signal termination
144
	quitLock sync.Mutex    // Lock to prevent double closes
145

146
	// Testing hooks
147 148 149 150
	syncInitHook     func(uint64, uint64)  // Method to call upon initiating a new sync run
	bodyFetchHook    func([]*types.Header) // Method to call upon starting a block body fetch
	receiptFetchHook func([]*types.Header) // Method to call upon starting a receipt fetch
	chainInsertHook  func([]*fetchResult)  // Method to call upon inserting a chain of blocks (possibly in multiple invocations)
151 152
}

N
Nick Johnson 已提交
153
// LightChain encapsulates functions required to synchronise a light chain.
154 155
type LightChain interface {
	// HasHeader verifies a header's presence in the local chain.
156
	HasHeader(common.Hash, uint64) bool
157 158 159 160 161 162 163

	// GetHeaderByHash retrieves a header from the local chain.
	GetHeaderByHash(common.Hash) *types.Header

	// CurrentHeader retrieves the head header from the local chain.
	CurrentHeader() *types.Header

164 165
	// GetTd returns the total difficulty of a local block.
	GetTd(common.Hash, uint64) *big.Int
166 167 168 169

	// InsertHeaderChain inserts a batch of headers into the local chain.
	InsertHeaderChain([]*types.Header, int) (int, error)

170 171
	// SetHead rewinds the local chain to a new head.
	SetHead(uint64) error
172 173
}

N
Nick Johnson 已提交
174
// BlockChain encapsulates functions required to sync a (full or fast) blockchain.
175 176 177
type BlockChain interface {
	LightChain

178 179
	// HasBlock verifies a block's presence in the local chain.
	HasBlock(common.Hash, uint64) bool
180

181 182 183
	// HasFastBlock verifies a fast block's presence in the local chain.
	HasFastBlock(common.Hash, uint64) bool

184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
	// GetBlockByHash retrieves a block from the local chain.
	GetBlockByHash(common.Hash) *types.Block

	// CurrentBlock retrieves the head block from the local chain.
	CurrentBlock() *types.Block

	// CurrentFastBlock retrieves the head fast block from the local chain.
	CurrentFastBlock() *types.Block

	// FastSyncCommitHead directly commits the head block to a certain entity.
	FastSyncCommitHead(common.Hash) error

	// InsertChain inserts a batch of blocks into the local chain.
	InsertChain(types.Blocks) (int, error)

	// InsertReceiptChain inserts a batch of receipts into the local chain.
200
	InsertReceiptChain(types.Blocks, []types.Receipts, uint64) (int, error)
201 202 203

	// Snapshots returns the blockchain snapshot tree to paused it during sync.
	Snapshots() *snapshot.Tree
204 205
}

206
// New creates a new downloader to fetch hashes and blocks from remote peers.
207
func New(checkpoint uint64, stateDb ethdb.Database, stateBloom *trie.SyncBloom, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn) *Downloader {
208 209 210
	if lightchain == nil {
		lightchain = chain
	}
211
	dl := &Downloader{
212
		stateDB:        stateDb,
213
		stateBloom:     stateBloom,
214
		mux:            mux,
215
		checkpoint:     checkpoint,
216
		queue:          newQueue(blockCacheMaxItems, blockCacheInitialItems),
217
		peers:          newPeerSet(),
218
		blockchain:     chain,
219 220 221 222 223 224 225 226 227 228
		lightchain:     lightchain,
		dropPeer:       dropPeer,
		headerCh:       make(chan dataPack, 1),
		bodyCh:         make(chan dataPack, 1),
		receiptCh:      make(chan dataPack, 1),
		bodyWakeCh:     make(chan bool, 1),
		receiptWakeCh:  make(chan bool, 1),
		headerProcCh:   make(chan []*types.Header, 1),
		quitCh:         make(chan struct{}),
		stateCh:        make(chan dataPack),
229
		SnapSyncer:     snap.NewSyncer(stateDb),
230
		stateSyncStart: make(chan *stateSync),
231
		syncStatsState: stateSyncStats{
232
			processed: rawdb.ReadFastTrieProgress(stateDb),
233 234
		},
		trackStateReq: make(chan *stateReq),
235
	}
236
	go dl.stateFetcher()
237
	return dl
238 239
}

240 241 242
// Progress retrieves the synchronisation boundaries, specifically the origin
// block where synchronisation started at (may have failed/suspended); the block
// or header sync is currently at; and the latest known block which the sync targets.
243
//
L
Leif Jurvetson 已提交
244
// In addition, during the state download phase of fast synchronisation the number
245 246
// of processed and the total number of known states are also returned. Otherwise
// these are zero.
247
func (d *Downloader) Progress() ethereum.SyncProgress {
248
	// Lock the current stats and return the progress
249 250
	d.syncStatsLock.RLock()
	defer d.syncStatsLock.RUnlock()
251

252
	current := uint64(0)
253
	mode := d.getMode()
254
	switch {
255
	case d.blockchain != nil && mode == FullSync:
256
		current = d.blockchain.CurrentBlock().NumberU64()
257
	case d.blockchain != nil && mode == FastSync:
258
		current = d.blockchain.CurrentFastBlock().NumberU64()
259
	case d.lightchain != nil:
260
		current = d.lightchain.CurrentHeader().Number.Uint64()
261
	default:
262
		log.Error("Unknown downloader chain/mode combo", "light", d.lightchain != nil, "full", d.blockchain != nil, "mode", mode)
263
	}
264 265 266 267
	return ethereum.SyncProgress{
		StartingBlock: d.syncStatsChainOrigin,
		CurrentBlock:  current,
		HighestBlock:  d.syncStatsChainHeight,
268 269
		PulledStates:  d.syncStatsState.processed,
		KnownStates:   d.syncStatsState.processed + d.syncStatsState.pending,
270
	}
O
obscuren 已提交
271 272
}

273
// Synchronising returns whether the downloader is currently retrieving blocks.
274
func (d *Downloader) Synchronising() bool {
275
	return atomic.LoadInt32(&d.synchronising) > 0
276 277
}

278 279
// RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from.
280 281 282 283 284 285
func (d *Downloader) RegisterPeer(id string, version uint, peer Peer) error {
	var logger log.Logger
	if len(id) < 16 {
		// Tests use short IDs, don't choke on them
		logger = log.New("peer", id)
	} else {
286
		logger = log.New("peer", id[:8])
287
	}
288
	logger.Trace("Registering sync peer")
289
	if err := d.peers.Register(newPeerConnection(id, version, peer, logger)); err != nil {
290
		logger.Error("Failed to register sync peer", "err", err)
291 292
		return err
	}
293 294 295
	return nil
}

N
Nick Johnson 已提交
296
// RegisterLightPeer injects a light client peer, wrapping it so it appears as a regular peer.
297
func (d *Downloader) RegisterLightPeer(id string, version uint, peer LightPeer) error {
298 299 300
	return d.RegisterPeer(id, version, &lightPeerWrapper{peer})
}

301
// UnregisterPeer remove a peer from the known list, preventing any action from
302 303
// the specified peer. An effort is also made to return any pending fetches into
// the queue.
304
func (d *Downloader) UnregisterPeer(id string) error {
305
	// Unregister the peer from the active peer set and revoke any fetch tasks
306 307 308 309 310
	var logger log.Logger
	if len(id) < 16 {
		// Tests use short IDs, don't choke on them
		logger = log.New("peer", id)
	} else {
311
		logger = log.New("peer", id[:8])
312
	}
313
	logger.Trace("Unregistering sync peer")
314
	if err := d.peers.Unregister(id); err != nil {
315
		logger.Error("Failed to unregister sync peer", "err", err)
316 317
		return err
	}
318
	d.queue.Revoke(id)
319

320
	return nil
321 322
}

323 324
// Synchronise tries to sync up our local block chain with a remote peer, both
// adding various sanity checks as well as wrapping it with various log entries.
325 326
func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode SyncMode) error {
	err := d.synchronise(id, head, td, mode)
327

328
	switch err {
329 330 331
	case nil, errBusy, errCanceled:
		return err
	}
332 333 334
	if errors.Is(err, errInvalidChain) || errors.Is(err, errBadPeer) || errors.Is(err, errTimeout) ||
		errors.Is(err, errStallingPeer) || errors.Is(err, errUnsyncedPeer) || errors.Is(err, errEmptyHeaderSet) ||
		errors.Is(err, errPeersUnavailable) || errors.Is(err, errTooOld) || errors.Is(err, errInvalidAncestor) {
335 336 337 338 339 340 341 342 343 344
		log.Warn("Synchronisation failed, dropping peer", "peer", id, "err", err)
		if d.dropPeer == nil {
			// The dropPeer method is nil when `--copydb` is used for a local copy.
			// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
			log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", id)
		} else {
			d.dropPeer(id)
		}
		return err
	}
345
	log.Warn("Synchronisation failed, retrying", "err", err)
346
	return err
347 348 349
}

// synchronise will select the peer and use it for synchronising. If an empty string is given
350
// it will use the best peer possible and synchronize if its TD is higher than our own. If any of the
351
// checks fail an error will be returned. This method is synchronous
352
func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode SyncMode) error {
L
Leif Jurvetson 已提交
353
	// Mock out the synchronisation if testing
354 355 356
	if d.synchroniseMock != nil {
		return d.synchroniseMock(id, hash)
	}
357
	// Make sure only one goroutine is ever allowed past this point at once
358
	if !atomic.CompareAndSwapInt32(&d.synchronising, 0, 1) {
359
		return errBusy
360
	}
361
	defer atomic.StoreInt32(&d.synchronising, 0)
362

363 364
	// Post a user notification of the sync (only once per session)
	if atomic.CompareAndSwapInt32(&d.notified, 0, 1) {
365
		log.Info("Block synchronisation started")
366
	}
367
	// If we are already full syncing, but have a fast-sync bloom filter laying
368
	// around, make sure it doesn't use memory any more. This is a special case
369 370 371 372
	// when the user attempts to fast sync a new empty network.
	if mode == FullSync && d.stateBloom != nil {
		d.stateBloom.Close()
	}
373 374 375 376 377
	// If snap sync was requested, create the snap scheduler and switch to fast
	// sync mode. Long term we could drop fast sync or merge the two together,
	// but until snap becomes prevalent, we should support both. TODO(karalabe).
	if mode == SnapSync {
		if !d.snapSync {
378 379 380 381 382 383
			// Snap sync uses the snapshot namespace to store potentially flakey data until
			// sync completely heals and finishes. Pause snapshot maintenance in the mean
			// time to prevent access.
			if snapshots := d.blockchain.Snapshots(); snapshots != nil { // Only nil in tests
				snapshots.Disable()
			}
384 385 386 387 388
			log.Warn("Enabling snapshot sync prototype")
			d.snapSync = true
		}
		mode = FastSync
	}
389
	// Reset the queue, peer set and wake channels to clean any internal leftover state
390
	d.queue.Reset(blockCacheMaxItems, blockCacheInitialItems)
391
	d.peers.Reset()
392

393
	for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
394 395 396 397
		select {
		case <-ch:
		default:
		}
398
	}
399
	for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh} {
400 401 402 403 404 405 406 407
		for empty := false; !empty; {
			select {
			case <-ch:
			default:
				empty = true
			}
		}
	}
408 409 410 411 412 413 414
	for empty := false; !empty; {
		select {
		case <-d.headerProcCh:
		default:
			empty = true
		}
	}
415
	// Create cancel channel for aborting mid-flight and mark the master peer
416 417
	d.cancelLock.Lock()
	d.cancelCh = make(chan struct{})
418
	d.cancelPeer = id
419 420
	d.cancelLock.Unlock()

421
	defer d.Cancel() // No matter what, we can't leave the cancel channel open
422

423 424
	// Atomically set the requested sync mode
	atomic.StoreUint32(&d.mode, uint32(mode))
425

426
	// Retrieve the origin peer and initiate the downloading process
427
	p := d.peers.Peer(id)
428
	if p == nil {
429
		return errUnknownPeer
430
	}
431
	return d.syncWithPeer(p, hash, td)
432 433
}

434 435 436 437
func (d *Downloader) getMode() SyncMode {
	return SyncMode(atomic.LoadUint32(&d.mode))
}

438 439
// syncWithPeer starts a block synchronization based on the hash chain from the
// specified peer and head hash.
440
func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.Int) (err error) {
O
obscuren 已提交
441
	d.mux.Post(StartEvent{})
442 443 444
	defer func() {
		// reset on error
		if err != nil {
445 446
			d.mux.Post(FailedEvent{err})
		} else {
447 448
			latest := d.lightchain.CurrentHeader()
			d.mux.Post(DoneEvent{latest})
449 450
		}
	}()
451 452
	if p.version < eth.ETH66 {
		return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66)
453
	}
454
	mode := d.getMode()
455

456
	log.Debug("Synchronising with the network", "peer", p.id, "eth", p.version, "head", hash, "td", td, "mode", mode)
457
	defer func(start time.Time) {
458
		log.Debug("Synchronisation terminated", "elapsed", common.PrettyDuration(time.Since(start)))
459
	}(time.Now())
460

461
	// Look up the sync boundaries: the common ancestor and the target block
462
	latest, pivot, err := d.fetchHead(p)
463 464 465
	if err != nil {
		return err
	}
466 467 468 469 470 471 472
	if mode == FastSync && pivot == nil {
		// If no pivot block was returned, the head is below the min full block
		// threshold (i.e. new chian). In that case we won't really fast sync
		// anyway, but still need a valid pivot block to avoid some code hitting
		// nil panics on an access.
		pivot = d.blockchain.CurrentBlock().Header()
	}
473
	height := latest.Number.Uint64()
474

475
	origin, err := d.findAncestor(p, latest)
476 477 478 479 480 481 482 483 484
	if err != nil {
		return err
	}
	d.syncStatsLock.Lock()
	if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
		d.syncStatsChainOrigin = origin
	}
	d.syncStatsChainHeight = height
	d.syncStatsLock.Unlock()
485

486
	// Ensure our origin point is below any fast sync pivot point
487
	if mode == FastSync {
488 489
		if height <= uint64(fsMinFullBlocks) {
			origin = 0
490
		} else {
491 492 493
			pivotNumber := pivot.Number.Uint64()
			if pivotNumber <= origin {
				origin = pivotNumber - 1
494
			}
495 496
			// Write out the pivot into the database so a rollback beyond it will
			// reenable fast sync
497
			rawdb.WriteLastPivotNumber(d.stateDB, pivotNumber)
498
		}
499
	}
500
	d.committed = 1
501
	if mode == FastSync && pivot.Number.Uint64() != 0 {
502 503
		d.committed = 0
	}
504
	if mode == FastSync {
505
		// Set the ancient data limitation.
506 507 508
		// If we are running fast sync, all block data older than ancientLimit will be
		// written to the ancient store. More recent data will be written to the active
		// database and will wait for the freezer to migrate.
509
		//
510 511 512
		// If there is a checkpoint available, then calculate the ancientLimit through
		// that. Otherwise calculate the ancient limit through the advertised height
		// of the remote peer.
513
		//
514 515 516 517 518
		// The reason for picking checkpoint first is that a malicious peer can give us
		// a fake (very high) height, forcing the ancient limit to also be very high.
		// The peer would start to feed us valid blocks until head, resulting in all of
		// the blocks might be written into the ancient store. A following mini-reorg
		// could cause issues.
519
		if d.checkpoint != 0 && d.checkpoint > fullMaxForkAncestry+1 {
520
			d.ancientLimit = d.checkpoint
521 522
		} else if height > fullMaxForkAncestry+1 {
			d.ancientLimit = height - fullMaxForkAncestry - 1
523 524
		} else {
			d.ancientLimit = 0
525 526
		}
		frozen, _ := d.stateDB.Ancients() // Ignore the error here since light client can also hit here.
527

528 529 530 531 532 533 534 535 536 537
		// If a part of blockchain data has already been written into active store,
		// disable the ancient style insertion explicitly.
		if origin >= frozen && frozen != 0 {
			d.ancientLimit = 0
			log.Info("Disabling direct-ancient mode", "origin", origin, "ancient", frozen-1)
		} else if d.ancientLimit > 0 {
			log.Debug("Enabling direct-ancient mode", "ancient", d.ancientLimit)
		}
		// Rewind the ancient store and blockchain if reorg happens.
		if origin+1 < frozen {
538 539
			if err := d.lightchain.SetHead(origin + 1); err != nil {
				return err
540 541 542
			}
		}
	}
543
	// Initiate the sync using a concurrent header and content retrieval algorithm
544
	d.queue.Prepare(origin+1, mode)
545 546
	if d.syncInitHook != nil {
		d.syncInitHook(origin, height)
547
	}
548
	fetchers := []func() error{
549 550 551 552
		func() error { return d.fetchHeaders(p, origin+1) }, // Headers are always retrieved
		func() error { return d.fetchBodies(origin + 1) },   // Bodies are retrieved during normal and fast sync
		func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
		func() error { return d.processHeaders(origin+1, td) },
553
	}
554
	if mode == FastSync {
555 556 557 558 559
		d.pivotLock.Lock()
		d.pivotHeader = pivot
		d.pivotLock.Unlock()

		fetchers = append(fetchers, func() error { return d.processFastSyncContent() })
560
	} else if mode == FullSync {
561 562
		fetchers = append(fetchers, d.processFullSyncContent)
	}
563
	return d.spawnSync(fetchers)
564 565 566 567
}

// spawnSync runs d.process and all given fetcher functions to completion in
// separate goroutines, returning the first error that appears.
568 569
func (d *Downloader) spawnSync(fetchers []func() error) error {
	errc := make(chan error, len(fetchers))
570
	d.cancelWg.Add(len(fetchers))
571 572
	for _, fn := range fetchers {
		fn := fn
573
		go func() { defer d.cancelWg.Done(); errc <- fn() }()
574 575 576
	}
	// Wait for the first error, then terminate the others.
	var err error
577 578
	for i := 0; i < len(fetchers); i++ {
		if i == len(fetchers)-1 {
579 580 581 582 583
			// Close the queue when all fetchers have exited.
			// This will cause the block processor to end when
			// it has processed the queue.
			d.queue.Close()
		}
584
		if err = <-errc; err != nil && err != errCanceled {
585 586 587 588
			break
		}
	}
	d.queue.Close()
589
	d.Cancel()
590
	return err
591 592
}

593 594 595 596
// cancel aborts all of the operations and resets the queue. However, cancel does
// not wait for the running download goroutines to finish. This method should be
// used when cancelling the downloads from inside the downloader.
func (d *Downloader) cancel() {
597
	// Close the current cancel channel
598
	d.cancelLock.Lock()
599 600
	defer d.cancelLock.Unlock()

601 602 603 604 605 606 607
	if d.cancelCh != nil {
		select {
		case <-d.cancelCh:
			// Channel was already closed
		default:
			close(d.cancelCh)
		}
608
	}
609 610 611 612 613 614
}

// Cancel aborts all of the operations and waits for all download goroutines to
// finish before returning.
func (d *Downloader) Cancel() {
	d.cancel()
615
	d.cancelWg.Wait()
616 617
}

618
// Terminate interrupts the downloader, canceling all pending operations.
619
// The downloader cannot be reused after calling Terminate.
620
func (d *Downloader) Terminate() {
621 622 623 624 625 626 627
	// Close the termination channel (make sure double close is allowed)
	d.quitLock.Lock()
	select {
	case <-d.quitCh:
	default:
		close(d.quitCh)
	}
628 629 630
	if d.stateBloom != nil {
		d.stateBloom.Close()
	}
631 632 633
	d.quitLock.Unlock()

	// Cancel any pending download requests
634
	d.Cancel()
635 636
}

637 638 639 640 641
// fetchHead retrieves the head header and prior pivot block (if available) from
// a remote peer.
func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *types.Header, err error) {
	p.log.Debug("Retrieving remote chain head")
	mode := d.getMode()
642 643

	// Request the advertised remote head block and wait for the response
644 645 646 647 648 649
	latest, _ := p.peer.Head()
	fetch := 1
	if mode == FastSync {
		fetch = 2 // head + pivot headers
	}
	go p.peer.RequestHeadersByHash(latest, fetch, fsMinFullBlocks-1, true)
650

651
	ttl := d.peers.rates.TargetTimeout()
652
	timeout := time.After(ttl)
653 654 655
	for {
		select {
		case <-d.cancelCh:
656
			return nil, nil, errCanceled
657

658
		case packet := <-d.headerCh:
659
			// Discard anything not from the origin peer
660
			if packet.PeerId() != p.id {
661
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
662 663
				break
			}
664
			// Make sure the peer gave us at least one and at most the requested headers
665
			headers := packet.(*headerPack).headers
666 667
			if len(headers) == 0 || len(headers) > fetch {
				return nil, nil, fmt.Errorf("%w: returned headers %d != requested %d", errBadPeer, len(headers), fetch)
668
			}
669 670 671
			// The first header needs to be the head, validate against the checkpoint
			// and request. If only 1 header was returned, make sure there's no pivot
			// or there was not one requested.
672
			head := headers[0]
673
			if (mode == FastSync || mode == LightSync) && head.Number.Uint64() < d.checkpoint {
674 675 676 677 678 679 680 681 682 683 684 685 686 687
				return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
			}
			if len(headers) == 1 {
				if mode == FastSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
					return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
				}
				p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", head.Hash())
				return head, nil, nil
			}
			// At this point we have 2 headers in total and the first is the
			// validated head of the chian. Check the pivot number and return,
			pivot := headers[1]
			if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
				return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
688
			}
689
			return head, pivot, nil
690

691
		case <-timeout:
P
Péter Szilágyi 已提交
692
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
693
			return nil, nil, errTimeout
694

695
		case <-d.bodyCh:
696 697
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
698 699 700 701
		}
	}
}

702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
// calculateRequestSpan calculates what headers to request from a peer when trying to determine the
// common ancestor.
// It returns parameters to be used for peer.RequestHeadersByNumber:
//  from - starting block number
//  count - number of headers to request
//  skip - number of headers to skip
// and also returns 'max', the last block which is expected to be returned by the remote peers,
// given the (from,count,skip)
func calculateRequestSpan(remoteHeight, localHeight uint64) (int64, int, int, uint64) {
	var (
		from     int
		count    int
		MaxCount = MaxHeaderFetch / 16
	)
	// requestHead is the highest block that we will ask for. If requestHead is not offset,
	// the highest block that we will get is 16 blocks back from head, which means we
	// will fetch 14 or 15 blocks unnecessarily in the case the height difference
	// between us and the peer is 1-2 blocks, which is most common
	requestHead := int(remoteHeight) - 1
	if requestHead < 0 {
		requestHead = 0
	}
	// requestBottom is the lowest block we want included in the query
725
	// Ideally, we want to include the one just below our own head
726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753
	requestBottom := int(localHeight - 1)
	if requestBottom < 0 {
		requestBottom = 0
	}
	totalSpan := requestHead - requestBottom
	span := 1 + totalSpan/MaxCount
	if span < 2 {
		span = 2
	}
	if span > 16 {
		span = 16
	}

	count = 1 + totalSpan/span
	if count > MaxCount {
		count = MaxCount
	}
	if count < 2 {
		count = 2
	}
	from = requestHead - (count-1)*span
	if from < 0 {
		from = 0
	}
	max := from + (count-1)*span
	return int64(from), count, span - 1, uint64(max)
}

754
// findAncestor tries to locate the common ancestor link of the local chain and
755
// a remote peers blockchain. In the general case when our node was in sync and
756
// on the correct chain, checking the top N links should already get us a match.
757
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
758
// the head links match), we do a binary search to find the common ancestor.
759
func (d *Downloader) findAncestor(p *peerConnection, remoteHeader *types.Header) (uint64, error) {
760
	// Figure out the valid ancestor range to prevent rewrite attacks
761 762 763 764 765
	var (
		floor        = int64(-1)
		localHeight  uint64
		remoteHeight = remoteHeader.Number.Uint64()
	)
766 767
	mode := d.getMode()
	switch mode {
768 769 770 771 772 773
	case FullSync:
		localHeight = d.blockchain.CurrentBlock().NumberU64()
	case FastSync:
		localHeight = d.blockchain.CurrentFastBlock().NumberU64()
	default:
		localHeight = d.lightchain.CurrentHeader().Number.Uint64()
774
	}
775
	p.log.Debug("Looking for common ancestor", "local", localHeight, "remote", remoteHeight)
776 777

	// Recap floor value for binary search
778 779 780 781
	maxForkAncestry := fullMaxForkAncestry
	if d.getMode() == LightSync {
		maxForkAncestry = lightMaxForkAncestry
	}
782
	if localHeight >= maxForkAncestry {
783
		// We're above the max reorg threshold, find the earliest fork point
784
		floor = int64(localHeight - maxForkAncestry)
785 786 787
	}
	// If we're doing a light sync, ensure the floor doesn't go below the CHT, as
	// all headers before that point will be missing.
788
	if mode == LightSync {
789
		// If we don't know the current CHT position, find it
790 791 792 793 794 795
		if d.genesis == 0 {
			header := d.lightchain.CurrentHeader()
			for header != nil {
				d.genesis = header.Number.Uint64()
				if floor >= int64(d.genesis)-1 {
					break
796
				}
797
				header = d.lightchain.GetHeaderByHash(header.ParentHash)
798
			}
799 800 801 802
		}
		// We already know the "genesis" block number, cap floor to that
		if floor < int64(d.genesis)-1 {
			floor = int64(d.genesis) - 1
803
		}
804
	}
805

806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825
	ancestor, err := d.findAncestorSpanSearch(p, mode, remoteHeight, localHeight, floor)
	if err == nil {
		return ancestor, nil
	}
	// The returned error was not nil.
	// If the error returned does not reflect that a common ancestor was not found, return it.
	// If the error reflects that a common ancestor was not found, continue to binary search,
	// where the error value will be reassigned.
	if !errors.Is(err, errNoAncestorFound) {
		return 0, err
	}

	ancestor, err = d.findAncestorBinarySearch(p, mode, remoteHeight, floor)
	if err != nil {
		return 0, err
	}
	return ancestor, nil
}

func (d *Downloader) findAncestorSpanSearch(p *peerConnection, mode SyncMode, remoteHeight, localHeight uint64, floor int64) (commonAncestor uint64, err error) {
826
	from, count, skip, max := calculateRequestSpan(remoteHeight, localHeight)
827 828

	p.log.Trace("Span searching for common ancestor", "count", count, "from", from, "skip", skip)
829
	go p.peer.RequestHeadersByNumber(uint64(from), count, skip, false)
830 831 832

	// Wait for the remote response to the head fetch
	number, hash := uint64(0), common.Hash{}
833

834
	ttl := d.peers.rates.TargetTimeout()
835
	timeout := time.After(ttl)
836 837 838 839

	for finished := false; !finished; {
		select {
		case <-d.cancelCh:
840
			return 0, errCanceled
841

842
		case packet := <-d.headerCh:
843
			// Discard anything not from the origin peer
844
			if packet.PeerId() != p.id {
845
				log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
846 847 848
				break
			}
			// Make sure the peer actually gave something valid
849
			headers := packet.(*headerPack).headers
850
			if len(headers) == 0 {
P
Péter Szilágyi 已提交
851
				p.log.Warn("Empty head header set")
852 853
				return 0, errEmptyHeaderSet
			}
854
			// Make sure the peer's reply conforms to the request
855
			for i, header := range headers {
856
				expectNumber := from + int64(i)*int64(skip+1)
857 858
				if number := header.Number.Int64(); number != expectNumber {
					p.log.Warn("Head headers broke chain ordering", "index", i, "requested", expectNumber, "received", number)
859
					return 0, fmt.Errorf("%w: %v", errInvalidChain, errors.New("head headers broke chain ordering"))
860 861
				}
			}
862 863 864
			// Check if a common ancestor was found
			finished = true
			for i := len(headers) - 1; i >= 0; i-- {
865
				// Skip any headers that underflow/overflow our requested set
866
				if headers[i].Number.Int64() < from || headers[i].Number.Uint64() > max {
867 868 869
					continue
				}
				// Otherwise check if we already know the header or not
870 871
				h := headers[i].Hash()
				n := headers[i].Number.Uint64()
872 873

				var known bool
874
				switch mode {
875 876 877 878 879 880 881 882
				case FullSync:
					known = d.blockchain.HasBlock(h, n)
				case FastSync:
					known = d.blockchain.HasFastBlock(h, n)
				default:
					known = d.lightchain.HasHeader(h, n)
				}
				if known {
883
					number, hash = n, h
884 885 886 887
					break
				}
			}

888
		case <-timeout:
P
Péter Szilágyi 已提交
889
			p.log.Debug("Waiting for head header timed out", "elapsed", ttl)
890 891
			return 0, errTimeout

892
		case <-d.bodyCh:
893 894
		case <-d.receiptCh:
			// Out of bounds delivery, ignore
895 896 897
		}
	}
	// If the head fetch already found an ancestor, return
898
	if hash != (common.Hash{}) {
899
		if int64(number) <= floor {
P
Péter Szilágyi 已提交
900
			p.log.Warn("Ancestor below allowance", "number", number, "hash", hash, "allowance", floor)
901 902
			return 0, errInvalidAncestor
		}
P
Péter Szilágyi 已提交
903
		p.log.Debug("Found common ancestor", "number", number, "hash", hash)
904 905
		return number, nil
	}
906 907 908 909 910 911
	return 0, errNoAncestorFound
}

func (d *Downloader) findAncestorBinarySearch(p *peerConnection, mode SyncMode, remoteHeight uint64, floor int64) (commonAncestor uint64, err error) {
	hash := common.Hash{}

912
	// Ancestor not found, we need to binary search over our chain
913
	start, end := uint64(0), remoteHeight
914 915 916
	if floor > 0 {
		start = uint64(floor)
	}
917 918
	p.log.Trace("Binary searching for common ancestor", "start", start, "end", end)

919 920 921 922
	for start+1 < end {
		// Split our chain interval in two, and request the hash to cross check
		check := (start + end) / 2

923
		ttl := d.peers.rates.TargetTimeout()
924 925
		timeout := time.After(ttl)

926
		go p.peer.RequestHeadersByNumber(check, 1, 0, false)
927 928 929 930 931

		// Wait until a reply arrives to this request
		for arrived := false; !arrived; {
			select {
			case <-d.cancelCh:
932
				return 0, errCanceled
933

934
			case packet := <-d.headerCh:
935
				// Discard anything not from the origin peer
936 937
				if packet.PeerId() != p.id {
					log.Debug("Received headers from incorrect peer", "peer", packet.PeerId())
938 939 940
					break
				}
				// Make sure the peer actually gave something valid
941
				headers := packet.(*headerPack).headers
942
				if len(headers) != 1 {
943
					p.log.Warn("Multiple headers for single request", "headers", len(headers))
944
					return 0, fmt.Errorf("%w: multiple headers (%d) for single request", errBadPeer, len(headers))
945 946 947 948
				}
				arrived = true

				// Modify the search interval based on the response
949 950
				h := headers[0].Hash()
				n := headers[0].Number.Uint64()
951 952

				var known bool
953
				switch mode {
954 955 956 957 958 959 960 961
				case FullSync:
					known = d.blockchain.HasBlock(h, n)
				case FastSync:
					known = d.blockchain.HasFastBlock(h, n)
				default:
					known = d.lightchain.HasHeader(h, n)
				}
				if !known {
962 963 964
					end = check
					break
				}
965
				header := d.lightchain.GetHeaderByHash(h) // Independent of sync mode, header surely exists
966
				if header.Number.Uint64() != check {
967
					p.log.Warn("Received non requested header", "number", header.Number, "hash", header.Hash(), "request", check)
968
					return 0, fmt.Errorf("%w: non-requested header (%d)", errBadPeer, header.Number)
969 970
				}
				start = check
971
				hash = h
972

973
			case <-timeout:
P
Péter Szilágyi 已提交
974
				p.log.Debug("Waiting for search header timed out", "elapsed", ttl)
975 976
				return 0, errTimeout

977
			case <-d.bodyCh:
978 979
			case <-d.receiptCh:
				// Out of bounds delivery, ignore
980 981 982
			}
		}
	}
983 984
	// Ensure valid ancestry and return
	if int64(start) <= floor {
P
Péter Szilágyi 已提交
985
		p.log.Warn("Ancestor below allowance", "number", start, "hash", hash, "allowance", floor)
986 987
		return 0, errInvalidAncestor
	}
P
Péter Szilágyi 已提交
988
	p.log.Debug("Found common ancestor", "number", start, "hash", hash)
989 990 991
	return start, nil
}

992 993 994 995 996
// fetchHeaders keeps retrieving headers concurrently from the number
// requested, until no more are returned, potentially throttling on the way. To
// facilitate concurrency but still protect against malicious nodes sending bad
// headers, we construct a header chain skeleton using the "origin" peer we are
// syncing with, and fill in the missing headers using anyone else. Headers from
997
// other peers are only accepted if they map cleanly to the skeleton. If no one
998 999
// can fill in the skeleton - not even the origin peer - it's assumed invalid and
// the origin is dropped.
1000
func (d *Downloader) fetchHeaders(p *peerConnection, from uint64) error {
P
Péter Szilágyi 已提交
1001 1002
	p.log.Debug("Directing header downloads", "origin", from)
	defer p.log.Debug("Header download terminated")
1003

1004 1005
	// Create a timeout timer, and the associated header fetcher
	skeleton := true            // Skeleton assembly phase or finishing up
1006
	pivoting := false           // Whether the next request is pivot verification
1007
	request := time.Now()       // time of the last skeleton fetch request
1008 1009 1010 1011
	timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
	<-timeout.C                 // timeout channel should be initially empty
	defer timeout.Stop()

1012
	var ttl time.Duration
1013
	getHeaders := func(from uint64) {
1014
		request = time.Now()
1015

1016
		ttl = d.peers.rates.TargetTimeout()
1017
		timeout.Reset(ttl)
1018

1019
		if skeleton {
P
Péter Szilágyi 已提交
1020
			p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
1021
			go p.peer.RequestHeadersByNumber(from+uint64(MaxHeaderFetch)-1, MaxSkeletonSize, MaxHeaderFetch-1, false)
1022
		} else {
P
Péter Szilágyi 已提交
1023
			p.log.Trace("Fetching full headers", "count", MaxHeaderFetch, "from", from)
1024
			go p.peer.RequestHeadersByNumber(from, MaxHeaderFetch, 0, false)
1025
		}
1026
	}
1027 1028 1029 1030
	getNextPivot := func() {
		pivoting = true
		request = time.Now()

1031
		ttl = d.peers.rates.TargetTimeout()
1032 1033 1034 1035 1036 1037 1038 1039 1040
		timeout.Reset(ttl)

		d.pivotLock.RLock()
		pivot := d.pivotHeader.Number.Uint64()
		d.pivotLock.RUnlock()

		p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
		go p.peer.RequestHeadersByNumber(pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
	}
1041
	// Start pulling the header chain skeleton until all is done
1042
	ancestor := from
1043 1044
	getHeaders(from)

1045
	mode := d.getMode()
1046 1047 1048
	for {
		select {
		case <-d.cancelCh:
1049
			return errCanceled
1050

1051
		case packet := <-d.headerCh:
1052
			// Make sure the active peer is giving us the skeleton headers
1053
			if packet.PeerId() != p.id {
1054
				log.Debug("Received skeleton from incorrect peer", "peer", packet.PeerId())
1055 1056
				break
			}
1057
			headerReqTimer.UpdateSince(request)
1058 1059
			timeout.Stop()

1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097
			// If the pivot is being checked, move if it became stale and run the real retrieval
			var pivot uint64

			d.pivotLock.RLock()
			if d.pivotHeader != nil {
				pivot = d.pivotHeader.Number.Uint64()
			}
			d.pivotLock.RUnlock()

			if pivoting {
				if packet.Items() == 2 {
					// Retrieve the headers and do some sanity checks, just in case
					headers := packet.(*headerPack).headers

					if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want {
						log.Warn("Peer sent invalid next pivot", "have", have, "want", want)
						return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want)
					}
					if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want {
						log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want)
						return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want)
					}
					log.Warn("Pivot seemingly stale, moving", "old", pivot, "new", headers[0].Number)
					pivot = headers[0].Number.Uint64()

					d.pivotLock.Lock()
					d.pivotHeader = headers[0]
					d.pivotLock.Unlock()

					// Write out the pivot into the database so a rollback beyond
					// it will reenable fast sync and update the state root that
					// the state syncer will be downloading.
					rawdb.WriteLastPivotNumber(d.stateDB, pivot)
				}
				pivoting = false
				getHeaders(from)
				continue
			}
1098
			// If the skeleton's finished, pull any remaining head headers directly from the origin
1099
			if skeleton && packet.Items() == 0 {
1100 1101 1102 1103
				skeleton = false
				getHeaders(from)
				continue
			}
1104
			// If no more headers are inbound, notify the content fetchers and return
1105
			if packet.Items() == 0 {
1106 1107 1108 1109 1110 1111 1112 1113
				// Don't abort header fetches while the pivot is downloading
				if atomic.LoadInt32(&d.committed) == 0 && pivot <= from {
					p.log.Debug("No headers, waiting for pivot commit")
					select {
					case <-time.After(fsHeaderContCheck):
						getHeaders(from)
						continue
					case <-d.cancelCh:
1114
						return errCanceled
1115 1116 1117
					}
				}
				// Pivot done (or not in fast sync) and no more headers, terminate the process
P
Péter Szilágyi 已提交
1118
				p.log.Debug("No more headers available")
1119 1120 1121 1122
				select {
				case d.headerProcCh <- nil:
					return nil
				case <-d.cancelCh:
1123
					return errCanceled
1124
				}
1125
			}
1126
			headers := packet.(*headerPack).headers
1127

1128 1129
			// If we received a skeleton batch, resolve internals concurrently
			if skeleton {
1130
				filled, proced, err := d.fillHeaderSkeleton(from, headers)
1131
				if err != nil {
P
Péter Szilágyi 已提交
1132
					p.log.Debug("Skeleton chain invalid", "err", err)
1133
					return fmt.Errorf("%w: %v", errInvalidChain, err)
1134
				}
1135 1136
				headers = filled[proced:]
				from += uint64(proced)
1137 1138 1139 1140 1141 1142
			} else {
				// If we're closing in on the chain head, but haven't yet reached it, delay
				// the last few headers so mini reorgs on the head don't cause invalid hash
				// chain errors.
				if n := len(headers); n > 0 {
					// Retrieve the current head we're at
1143
					var head uint64
1144
					if mode == LightSync {
1145 1146 1147 1148 1149 1150 1151
						head = d.lightchain.CurrentHeader().Number.Uint64()
					} else {
						head = d.blockchain.CurrentFastBlock().NumberU64()
						if full := d.blockchain.CurrentBlock().NumberU64(); head < full {
							head = full
						}
					}
1152 1153 1154 1155 1156 1157
					// If the head is below the common ancestor, we're actually deduplicating
					// already existing chain segments, so use the ancestor as the fake head.
					// Otherwise we might end up delaying header deliveries pointlessly.
					if head < ancestor {
						head = ancestor
					}
1158 1159 1160 1161 1162 1163 1164 1165 1166
					// If the head is way older than this batch, delay the last few headers
					if head+uint64(reorgProtThreshold) < headers[n-1].Number.Uint64() {
						delay := reorgProtHeaderDelay
						if delay > n {
							delay = n
						}
						headers = headers[:n-delay]
					}
				}
1167
			}
1168
			// Insert all the new headers and fetch the next batch
1169
			if len(headers) > 0 {
P
Péter Szilágyi 已提交
1170
				p.log.Trace("Scheduling new headers", "count", len(headers), "from", from)
1171 1172 1173
				select {
				case d.headerProcCh <- headers:
				case <-d.cancelCh:
1174
					return errCanceled
1175 1176
				}
				from += uint64(len(headers))
1177 1178 1179 1180 1181 1182 1183 1184

				// If we're still skeleton filling fast sync, check pivot staleness
				// before continuing to the next skeleton filling
				if skeleton && pivot > 0 {
					getNextPivot()
				} else {
					getHeaders(from)
				}
1185 1186 1187 1188 1189 1190 1191 1192
			} else {
				// No headers delivered, or all of them being delayed, sleep a bit and retry
				p.log.Trace("All headers delayed, waiting")
				select {
				case <-time.After(fsHeaderContCheck):
					getHeaders(from)
					continue
				case <-d.cancelCh:
1193
					return errCanceled
1194
				}
1195
			}
1196 1197

		case <-timeout.C:
1198 1199 1200 1201 1202 1203
			if d.dropPeer == nil {
				// The dropPeer method is nil when `--copydb` is used for a local copy.
				// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
				p.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", p.id)
				break
			}
1204
			// Header retrieval timed out, consider the peer bad and drop
P
Péter Szilágyi 已提交
1205
			p.log.Debug("Header request timed out", "elapsed", ttl)
1206
			headerTimeoutMeter.Mark(1)
1207 1208 1209
			d.dropPeer(p.id)

			// Finish the sync gracefully instead of dumping the gathered data though
1210
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1211 1212 1213 1214
				select {
				case ch <- false:
				case <-d.cancelCh:
				}
1215
			}
1216 1217 1218 1219
			select {
			case d.headerProcCh <- nil:
			case <-d.cancelCh:
			}
1220
			return fmt.Errorf("%w: header request timed out", errBadPeer)
1221 1222 1223 1224
		}
	}
}

1225 1226
// fillHeaderSkeleton concurrently retrieves headers from all our available peers
// and maps them to the provided skeleton header chain.
1227 1228 1229 1230 1231
//
// Any partial results from the beginning of the skeleton is (if possible) forwarded
// immediately to the header processor to keep the rest of the pipeline full even
// in the case of header stalls.
//
Y
Yusup 已提交
1232
// The method returns the entire filled skeleton and also the number of headers
1233 1234
// already forwarded for processing.
func (d *Downloader) fillHeaderSkeleton(from uint64, skeleton []*types.Header) ([]*types.Header, int, error) {
1235
	log.Debug("Filling up skeleton", "from", from)
1236 1237 1238 1239 1240
	d.queue.ScheduleSkeleton(from, skeleton)

	var (
		deliver = func(packet dataPack) (int, error) {
			pack := packet.(*headerPack)
1241
			return d.queue.DeliverHeaders(pack.peerID, pack.headers, d.headerProcCh)
1242
		}
1243
		expire  = func() map[string]int { return d.queue.ExpireHeaders(d.peers.rates.TargetTimeout()) }
1244 1245
		reserve = func(p *peerConnection, count int) (*fetchRequest, bool, bool) {
			return d.queue.ReserveHeaders(p, count), false, false
1246
		}
1247
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchHeaders(req.From, MaxHeaderFetch) }
1248
		capacity = func(p *peerConnection) int { return p.HeaderCapacity(d.peers.rates.TargetRoundTrip()) }
1249 1250 1251
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
			p.SetHeadersIdle(accepted, deliveryTime)
		}
1252
	)
1253
	err := d.fetchParts(d.headerCh, deliver, d.queue.headerContCh, expire,
1254
		d.queue.PendingHeaders, d.queue.InFlightHeaders, reserve,
1255
		nil, fetch, d.queue.CancelHeaders, capacity, d.peers.HeaderIdlePeers, setIdle, "headers")
1256

1257
	log.Debug("Skeleton fill terminated", "err", err)
1258 1259 1260

	filled, proced := d.queue.RetrieveHeaders()
	return filled, proced, err
1261 1262
}

1263 1264 1265 1266
// fetchBodies iteratively downloads the scheduled block bodies, taking any
// available peers, reserving a chunk of blocks for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchBodies(from uint64) error {
1267
	log.Debug("Downloading block bodies", "origin", from)
1268

1269
	var (
1270
		deliver = func(packet dataPack) (int, error) {
1271
			pack := packet.(*bodyPack)
1272
			return d.queue.DeliverBodies(pack.peerID, pack.transactions, pack.uncles)
1273
		}
1274
		expire   = func() map[string]int { return d.queue.ExpireBodies(d.peers.rates.TargetTimeout()) }
1275
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchBodies(req) }
1276
		capacity = func(p *peerConnection) int { return p.BlockCapacity(d.peers.rates.TargetRoundTrip()) }
1277
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) { p.SetBodiesIdle(accepted, deliveryTime) }
1278
	)
1279
	err := d.fetchParts(d.bodyCh, deliver, d.bodyWakeCh, expire,
1280
		d.queue.PendingBlocks, d.queue.InFlightBlocks, d.queue.ReserveBodies,
1281
		d.bodyFetchHook, fetch, d.queue.CancelBodies, capacity, d.peers.BodyIdlePeers, setIdle, "bodies")
1282

1283
	log.Debug("Block body download terminated", "err", err)
1284 1285 1286 1287 1288 1289 1290
	return err
}

// fetchReceipts iteratively downloads the scheduled block receipts, taking any
// available peers, reserving a chunk of receipts for each, waiting for delivery
// and also periodically checking for timeouts.
func (d *Downloader) fetchReceipts(from uint64) error {
1291
	log.Debug("Downloading transaction receipts", "origin", from)
1292 1293

	var (
1294
		deliver = func(packet dataPack) (int, error) {
1295
			pack := packet.(*receiptPack)
1296
			return d.queue.DeliverReceipts(pack.peerID, pack.receipts)
1297
		}
1298
		expire   = func() map[string]int { return d.queue.ExpireReceipts(d.peers.rates.TargetTimeout()) }
1299
		fetch    = func(p *peerConnection, req *fetchRequest) error { return p.FetchReceipts(req) }
1300
		capacity = func(p *peerConnection) int { return p.ReceiptCapacity(d.peers.rates.TargetRoundTrip()) }
1301 1302 1303
		setIdle  = func(p *peerConnection, accepted int, deliveryTime time.Time) {
			p.SetReceiptsIdle(accepted, deliveryTime)
		}
1304
	)
1305
	err := d.fetchParts(d.receiptCh, deliver, d.receiptWakeCh, expire,
1306
		d.queue.PendingReceipts, d.queue.InFlightReceipts, d.queue.ReserveReceipts,
1307
		d.receiptFetchHook, fetch, d.queue.CancelReceipts, capacity, d.peers.ReceiptIdlePeers, setIdle, "receipts")
1308

1309
	log.Debug("Transaction receipt download terminated", "err", err)
1310 1311 1312 1313 1314 1315
	return err
}

// fetchParts iteratively downloads scheduled block parts, taking any available
// peers, reserving a chunk of fetch requests for each, waiting for delivery and
// also periodically checking for timeouts.
1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333
//
// As the scheduling/timeout logic mostly is the same for all downloaded data
// types, this method is used by each for data gathering and is instrumented with
// various callbacks to handle the slight differences between processing them.
//
// The instrumentation parameters:
//  - errCancel:   error type to return if the fetch operation is cancelled (mostly makes logging nicer)
//  - deliveryCh:  channel from which to retrieve downloaded data packets (merged from all concurrent peers)
//  - deliver:     processing callback to deliver data packets into type specific download queues (usually within `queue`)
//  - wakeCh:      notification channel for waking the fetcher when new tasks are available (or sync completed)
//  - expire:      task callback method to abort requests that took too long and return the faulty peers (traffic shaping)
//  - pending:     task callback for the number of requests still needing download (detect completion/non-completability)
//  - inFlight:    task callback for the number of in-progress requests (wait for all active downloads to finish)
//  - throttle:    task callback to check if the processing queue is full and activate throttling (bound memory use)
//  - reserve:     task callback to reserve new download tasks to a particular peer (also signals partial completions)
//  - fetchHook:   tester callback to notify of new tasks being initiated (allows testing the scheduling logic)
//  - fetch:       network callback to actually send a particular download request to a physical remote peer
//  - cancel:      task callback to abort an in-flight download request and allow rescheduling it (in case of lost peer)
P
Péter Szilágyi 已提交
1334
//  - capacity:    network callback to retrieve the estimated type-specific bandwidth capacity of a peer (traffic shaping)
1335 1336
//  - idle:        network callback to retrieve the currently (type specific) idle peers that can be assigned tasks
//  - setIdle:     network callback to set a peer back to idle and update its estimated capacity (traffic shaping)
1337
//  - kind:        textual label of the type being downloaded to display in log messages
1338
func (d *Downloader) fetchParts(deliveryCh chan dataPack, deliver func(dataPack) (int, error), wakeCh chan bool,
1339
	expire func() map[string]int, pending func() int, inFlight func() bool, reserve func(*peerConnection, int) (*fetchRequest, bool, bool),
1340
	fetchHook func([]*types.Header), fetch func(*peerConnection, *fetchRequest) error, cancel func(*fetchRequest), capacity func(*peerConnection) int,
1341
	idle func() ([]*peerConnection, int), setIdle func(*peerConnection, int, time.Time), kind string) error {
1342

1343
	// Create a ticker to detect expired retrieval tasks
1344 1345 1346 1347 1348
	ticker := time.NewTicker(100 * time.Millisecond)
	defer ticker.Stop()

	update := make(chan struct{}, 1)

1349
	// Prepare the queue and fetch block parts until the block header fetcher's done
1350 1351 1352 1353
	finished := false
	for {
		select {
		case <-d.cancelCh:
1354
			return errCanceled
1355

1356
		case packet := <-deliveryCh:
1357
			deliveryTime := time.Now()
1358 1359
			// If the peer was previously banned and failed to deliver its pack
			// in a reasonable time frame, ignore its message.
1360
			if peer := d.peers.Peer(packet.PeerId()); peer != nil {
1361 1362
				// Deliver the received chunk of data and check chain validity
				accepted, err := deliver(packet)
1363
				if errors.Is(err, errInvalidChain) {
1364
					return err
1365 1366 1367 1368
				}
				// Unless a peer delivered something completely else than requested (usually
				// caused by a timed out request which came through in the end), set it to
				// idle. If the delivery's stale, the peer should have already been idled.
1369
				if !errors.Is(err, errStaleDelivery) {
1370
					setIdle(peer, accepted, deliveryTime)
1371 1372 1373 1374
				}
				// Issue a log to the user to see what's going on
				switch {
				case err == nil && packet.Items() == 0:
P
Péter Szilágyi 已提交
1375
					peer.log.Trace("Requested data not delivered", "type", kind)
1376
				case err == nil:
P
Péter Szilágyi 已提交
1377
					peer.log.Trace("Delivered new batch of data", "type", kind, "count", packet.Stats())
1378
				default:
1379
					peer.log.Debug("Failed to deliver retrieved data", "type", kind, "err", err)
1380 1381 1382 1383 1384 1385 1386 1387
				}
			}
			// Blocks assembled, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

1388
		case cont := <-wakeCh:
1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406 1407 1408 1409 1410
			// The header fetcher sent a continuation flag, check if it's done
			if !cont {
				finished = true
			}
			// Headers arrive, try to update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-ticker.C:
			// Sanity check update the progress
			select {
			case update <- struct{}{}:
			default:
			}

		case <-update:
			// Short circuit if we lost all our peers
			if d.peers.Len() == 0 {
				return errNoPeers
			}
1411
			// Check for fetch request timeouts and demote the responsible peers
1412
			for pid, fails := range expire() {
1413
				if peer := d.peers.Peer(pid); peer != nil {
1414 1415 1416 1417 1418 1419 1420 1421
					// If a lot of retrieval elements expired, we might have overestimated the remote peer or perhaps
					// ourselves. Only reset to minimal throughput but don't drop just yet. If even the minimal times
					// out that sync wise we need to get rid of the peer.
					//
					// The reason the minimum threshold is 2 is because the downloader tries to estimate the bandwidth
					// and latency of a peer separately, which requires pushing the measures capacity a bit and seeing
					// how response times reacts, to it always requests one more than the minimum (i.e. min 2).
					if fails > 2 {
P
Péter Szilágyi 已提交
1422
						peer.log.Trace("Data delivery timed out", "type", kind)
1423
						setIdle(peer, 0, time.Now())
1424
					} else {
P
Péter Szilágyi 已提交
1425
						peer.log.Debug("Stalling delivery, dropping", "type", kind)
1426

1427 1428 1429 1430 1431 1432
						if d.dropPeer == nil {
							// The dropPeer method is nil when `--copydb` is used for a local copy.
							// Timeouts can occur if e.g. compaction hits at the wrong time, and can be ignored
							peer.log.Warn("Downloader wants to drop peer, but peerdrop-function is not set", "peer", pid)
						} else {
							d.dropPeer(pid)
1433 1434 1435 1436 1437 1438 1439 1440 1441 1442

							// If this peer was the master peer, abort sync immediately
							d.cancelLock.RLock()
							master := pid == d.cancelPeer
							d.cancelLock.RUnlock()

							if master {
								d.cancel()
								return errTimeout
							}
1443
						}
1444
					}
1445 1446
				}
			}
1447 1448
			// If there's nothing more to fetch, wait or terminate
			if pending() == 0 {
1449
				if !inFlight() && finished {
1450
					log.Debug("Data fetching completed", "type", kind)
1451 1452 1453 1454 1455
					return nil
				}
				break
			}
			// Send a download request to all idle peers, until throttled
1456
			progressed, throttled, running := false, false, inFlight()
1457
			idles, total := idle()
1458
			pendCount := pending()
1459
			for _, peer := range idles {
1460
				// Short circuit if throttling activated
1461
				if throttled {
1462 1463
					break
				}
1464
				// Short circuit if there is no more available task.
1465
				if pendCount = pending(); pendCount == 0 {
1466 1467
					break
				}
1468 1469
				// Reserve a chunk of fetches for a peer. A nil can mean either that
				// no more headers are available, or that the peer is known not to
1470
				// have them.
1471
				request, progress, throttle := reserve(peer, capacity(peer))
1472 1473
				if progress {
					progressed = true
1474
				}
1475 1476 1477 1478
				if throttle {
					throttled = true
					throttleCounter.Inc(1)
				}
1479 1480 1481
				if request == nil {
					continue
				}
1482
				if request.From > 0 {
P
Péter Szilágyi 已提交
1483
					peer.log.Trace("Requesting new batch of data", "type", kind, "from", request.From)
1484
				} else {
1485
					peer.log.Trace("Requesting new batch of data", "type", kind, "count", len(request.Headers), "from", request.Headers[0].Number)
1486
				}
1487
				// Fetch the chunk and make sure any errors return the hashes to the queue
1488 1489
				if fetchHook != nil {
					fetchHook(request.Headers)
1490
				}
1491
				if err := fetch(peer, request); err != nil {
1492 1493 1494 1495 1496
					// Although we could try and make an attempt to fix this, this error really
					// means that we've double allocated a fetch task to a peer. If that is the
					// case, the internal state of the downloader and the queue is very wrong so
					// better hard crash and note the error instead of silently accumulating into
					// a much bigger issue.
1497
					panic(fmt.Sprintf("%v: %s fetch assignment failed", peer, kind))
1498
				}
1499
				running = true
1500 1501 1502
			}
			// Make sure that we have peers available for fetching. If all peers have been tried
			// and all failed throw an error
1503
			if !progressed && !throttled && !running && len(idles) == total && pendCount > 0 {
1504 1505 1506 1507 1508 1509
				return errPeersUnavailable
			}
		}
	}
}

1510 1511 1512
// processHeaders takes batches of retrieved headers from an input channel and
// keeps processing and scheduling them into the header chain and downloader's
// queue until the stream ends or a failure occurs.
1513
func (d *Downloader) processHeaders(origin uint64, td *big.Int) error {
1514
	// Keep a count of uncertain headers to roll back
1515
	var (
1516
		rollback    uint64 // Zero means no rollback (fine as you can't unroll the genesis)
1517 1518 1519
		rollbackErr error
		mode        = d.getMode()
	)
1520
	defer func() {
1521
		if rollback > 0 {
1522
			lastHeader, lastFastBlock, lastBlock := d.lightchain.CurrentHeader().Number, common.Big0, common.Big0
1523
			if mode != LightSync {
1524 1525
				lastFastBlock = d.blockchain.CurrentFastBlock().Number()
				lastBlock = d.blockchain.CurrentBlock().Number()
1526
			}
1527 1528 1529 1530
			if err := d.lightchain.SetHead(rollback - 1); err != nil { // -1 to target the parent of the first uncertain block
				// We're already unwinding the stack, only print the error to make it more visible
				log.Error("Failed to roll back chain segment", "head", rollback-1, "err", err)
			}
1531
			curFastBlock, curBlock := common.Big0, common.Big0
1532
			if mode != LightSync {
1533 1534
				curFastBlock = d.blockchain.CurrentFastBlock().Number()
				curBlock = d.blockchain.CurrentBlock().Number()
1535
			}
1536
			log.Warn("Rolled back chain segment",
1537
				"header", fmt.Sprintf("%d->%d", lastHeader, d.lightchain.CurrentHeader().Number),
1538
				"fast", fmt.Sprintf("%d->%d", lastFastBlock, curFastBlock),
1539
				"block", fmt.Sprintf("%d->%d", lastBlock, curBlock), "reason", rollbackErr)
1540 1541 1542 1543 1544 1545 1546 1547
		}
	}()
	// Wait for batches of headers to process
	gotHeaders := false

	for {
		select {
		case <-d.cancelCh:
1548
			rollbackErr = errCanceled
1549
			return errCanceled
1550 1551 1552 1553 1554

		case headers := <-d.headerProcCh:
			// Terminate header processing if we synced up
			if len(headers) == 0 {
				// Notify everyone that headers are fully processed
1555
				for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1556 1557 1558 1559 1560
					select {
					case ch <- false:
					case <-d.cancelCh:
					}
				}
1561 1562
				// If no headers were retrieved at all, the peer violated its TD promise that it had a
				// better chain compared to ours. The only exception is if its promised blocks were
1563
				// already imported by other means (e.g. fetcher):
1564 1565 1566 1567 1568 1569 1570 1571 1572
				//
				// R <remote peer>, L <local node>: Both at block 10
				// R: Mine block 11, and propagate it to L
				// L: Queue block 11 for import
				// L: Notice that R's head and TD increased compared to ours, start sync
				// L: Import of block 11 finishes
				// L: Sync begins, and finds common ancestor at 11
				// L: Request new headers up from 11 (R's TD was higher, it must have something)
				// R: Nothing to give
1573
				if mode != LightSync {
1574 1575
					head := d.blockchain.CurrentBlock()
					if !gotHeaders && td.Cmp(d.blockchain.GetTd(head.Hash(), head.NumberU64())) > 0 {
1576 1577
						return errStallingPeer
					}
1578 1579 1580 1581 1582 1583 1584 1585
				}
				// If fast or light syncing, ensure promised headers are indeed delivered. This is
				// needed to detect scenarios where an attacker feeds a bad pivot and then bails out
				// of delivering the post-pivot blocks that would flag the invalid content.
				//
				// This check cannot be executed "as is" for full imports, since blocks may still be
				// queued for processing when the header download completes. However, as long as the
				// peer gave us something useful, we're already happy/progressed (above check).
1586
				if mode == FastSync || mode == LightSync {
1587 1588
					head := d.lightchain.CurrentHeader()
					if td.Cmp(d.lightchain.GetTd(head.Hash(), head.Number.Uint64())) > 0 {
1589 1590 1591 1592
						return errStallingPeer
					}
				}
				// Disable any rollback and return
1593
				rollback = 0
1594 1595 1596 1597 1598 1599 1600 1601
				return nil
			}
			// Otherwise split the chunk of headers into batches and process them
			gotHeaders = true
			for len(headers) > 0 {
				// Terminate if something failed in between processing chunks
				select {
				case <-d.cancelCh:
1602
					rollbackErr = errCanceled
1603
					return errCanceled
1604 1605 1606 1607 1608 1609 1610 1611
				default:
				}
				// Select the next chunk of headers to import
				limit := maxHeadersProcess
				if limit > len(headers) {
					limit = len(headers)
				}
				chunk := headers[:limit]
1612

1613
				// In case of header only syncing, validate the chunk immediately
1614
				if mode == FastSync || mode == LightSync {
1615
					// If we're importing pure headers, verify based on their recentness
1616 1617 1618 1619 1620 1621 1622 1623
					var pivot uint64

					d.pivotLock.RLock()
					if d.pivotHeader != nil {
						pivot = d.pivotHeader.Number.Uint64()
					}
					d.pivotLock.RUnlock()

1624 1625 1626 1627
					frequency := fsHeaderCheckFrequency
					if chunk[len(chunk)-1].Number.Uint64()+uint64(fsHeaderForceVerify) > pivot {
						frequency = 1
					}
1628
					if n, err := d.lightchain.InsertHeaderChain(chunk, frequency); err != nil {
1629
						rollbackErr = err
1630 1631

						// If some headers were inserted, track them as uncertain
1632
						if (mode == FastSync || frequency > 1) && n > 0 && rollback == 0 {
1633
							rollback = chunk[0].Number.Uint64()
1634
						}
1635
						log.Warn("Invalid header encountered", "number", chunk[n].Number, "hash", chunk[n].Hash(), "parent", chunk[n].ParentHash, "err", err)
1636
						return fmt.Errorf("%w: %v", errInvalidChain, err)
1637
					}
1638
					// All verifications passed, track all headers within the alloted limits
1639 1640 1641 1642 1643 1644 1645
					if mode == FastSync {
						head := chunk[len(chunk)-1].Number.Uint64()
						if head-rollback > uint64(fsHeaderSafetyNet) {
							rollback = head - uint64(fsHeaderSafetyNet)
						} else {
							rollback = 1
						}
1646 1647 1648
					}
				}
				// Unless we're doing light chains, schedule the headers for associated content retrieval
1649
				if mode == FullSync || mode == FastSync {
1650 1651 1652 1653
					// If we've reached the allowed number of pending headers, stall a bit
					for d.queue.PendingBlocks() >= maxQueuedHeaders || d.queue.PendingReceipts() >= maxQueuedHeaders {
						select {
						case <-d.cancelCh:
1654
							rollbackErr = errCanceled
1655
							return errCanceled
1656 1657 1658 1659 1660 1661
						case <-time.After(time.Second):
						}
					}
					// Otherwise insert the headers for content retrieval
					inserts := d.queue.Schedule(chunk, origin)
					if len(inserts) != len(chunk) {
1662
						rollbackErr = fmt.Errorf("stale headers: len inserts %v len(chunk) %v", len(inserts), len(chunk))
1663
						return fmt.Errorf("%w: stale headers", errBadPeer)
1664 1665 1666 1667 1668
					}
				}
				headers = headers[limit:]
				origin += uint64(limit)
			}
1669 1670 1671 1672 1673 1674 1675
			// Update the highest block number we know if a higher one is found.
			d.syncStatsLock.Lock()
			if d.syncStatsChainHeight < origin {
				d.syncStatsChainHeight = origin - 1
			}
			d.syncStatsLock.Unlock()

1676
			// Signal the content downloaders of the availablility of new tasks
1677
			for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh} {
1678 1679 1680 1681 1682 1683 1684 1685 1686
				select {
				case ch <- true:
				default:
				}
			}
		}
	}
}

1687 1688
// processFullSyncContent takes fetch results from the queue and imports them into the chain.
func (d *Downloader) processFullSyncContent() error {
1689
	for {
1690
		results := d.queue.Results(true)
1691
		if len(results) == 0 {
1692
			return nil
1693
		}
1694
		if d.chainInsertHook != nil {
1695
			d.chainInsertHook(results)
1696
		}
1697 1698 1699 1700 1701 1702 1703
		if err := d.importBlockResults(results); err != nil {
			return err
		}
	}
}

func (d *Downloader) importBlockResults(results []*fetchResult) error {
1704 1705 1706 1707 1708 1709 1710 1711 1712 1713 1714 1715 1716 1717 1718 1719 1720 1721 1722 1723
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting downloaded chain", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnum", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	}
	if index, err := d.blockchain.InsertChain(blocks); err != nil {
1724 1725 1726 1727 1728 1729 1730 1731 1732
		if index < len(results) {
			log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
		} else {
			// The InsertChain method in blockchain.go will sometimes return an out-of-bounds index,
			// when it needs to preprocess blocks to import a sidechain.
			// The importer will put together a new list of blocks to import, which is a superset
			// of the blocks delivered from the downloader, and the indexing will be off.
			log.Debug("Downloaded item processing failed on sidechain import", "index", index, "err", err)
		}
1733
		return fmt.Errorf("%w: %v", errInvalidChain, err)
1734 1735 1736 1737 1738 1739
	}
	return nil
}

// processFastSyncContent takes fetch results from the queue and writes them to the
// database. It also controls the synchronisation of state nodes of the pivot block.
1740
func (d *Downloader) processFastSyncContent() error {
1741 1742
	// Start syncing state of the reported head block. This should get us most of
	// the state of the pivot block.
1743 1744 1745 1746
	d.pivotLock.RLock()
	sync := d.syncState(d.pivotHeader.Root)
	d.pivotLock.RUnlock()

1747 1748 1749 1750 1751 1752 1753
	defer func() {
		// The `sync` object is replaced every time the pivot moves. We need to
		// defer close the very last active one, hence the lazy evaluation vs.
		// calling defer sync.Cancel() !!!
		sync.Cancel()
	}()

1754
	closeOnErr := func(s *stateSync) {
1755
		if err := s.Wait(); err != nil && err != errCancelStateFetch && err != errCanceled && err != snap.ErrCancelled {
1756
			d.queue.Close() // wake up Results
1757
		}
1758 1759
	}
	go closeOnErr(sync)
1760

1761
	// To cater for moving pivot points, track the pivot block and subsequently
Y
Yusup 已提交
1762
	// accumulated download results separately.
1763 1764 1765 1766
	var (
		oldPivot *fetchResult   // Locked in pivot block, might change eventually
		oldTail  []*fetchResult // Downloaded content after the pivot
	)
1767
	for {
1768 1769 1770
		// Wait for the next batch of downloaded data to be available, and if the pivot
		// block became stale, move the goalpost
		results := d.queue.Results(oldPivot == nil) // Block if we're not monitoring pivot staleness
1771
		if len(results) == 0 {
1772 1773
			// If pivot sync is done, stop
			if oldPivot == nil {
1774
				return sync.Cancel()
1775 1776 1777 1778
			}
			// If sync failed, stop
			select {
			case <-d.cancelCh:
1779
				sync.Cancel()
1780
				return errCanceled
1781 1782
			default:
			}
1783 1784 1785 1786
		}
		if d.chainInsertHook != nil {
			d.chainInsertHook(results)
		}
1787 1788 1789 1790 1791 1792 1793 1794 1795 1796 1797 1798 1799 1800
		// If we haven't downloaded the pivot block yet, check pivot staleness
		// notifications from the header downloader
		d.pivotLock.RLock()
		pivot := d.pivotHeader
		d.pivotLock.RUnlock()

		if oldPivot == nil {
			if pivot.Root != sync.root {
				sync.Cancel()
				sync = d.syncState(pivot.Root)

				go closeOnErr(sync)
			}
		} else {
1801 1802 1803 1804
			results = append(append([]*fetchResult{oldPivot}, oldTail...), results...)
		}
		// Split around the pivot block and process the two sides via fast/full sync
		if atomic.LoadInt32(&d.committed) == 0 {
1805 1806 1807 1808 1809 1810 1811 1812 1813 1814 1815 1816 1817 1818 1819
			latest := results[len(results)-1].Header
			// If the height is above the pivot block by 2 sets, it means the pivot
			// become stale in the network and it was garbage collected, move to a
			// new pivot.
			//
			// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
			// need to be taken into account, otherwise we're detecting the pivot move
			// late and will drop peers due to unavailable state!!!
			if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
				log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
				pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

				d.pivotLock.Lock()
				d.pivotHeader = pivot
				d.pivotLock.Unlock()
1820 1821 1822

				// Write out the pivot into the database so a rollback beyond it will
				// reenable fast sync
1823
				rawdb.WriteLastPivotNumber(d.stateDB, pivot.Number.Uint64())
1824 1825
			}
		}
1826
		P, beforeP, afterP := splitAroundPivot(pivot.Number.Uint64(), results)
1827
		if err := d.commitFastSyncData(beforeP, sync); err != nil {
1828 1829 1830
			return err
		}
		if P != nil {
1831 1832
			// If new pivot block found, cancel old state retrieval and restart
			if oldPivot != P {
1833 1834
				sync.Cancel()
				sync = d.syncState(P.Header.Root)
1835

1836
				go closeOnErr(sync)
1837 1838 1839 1840
				oldPivot = P
			}
			// Wait for completion, occasionally checking for pivot staleness
			select {
1841 1842 1843
			case <-sync.done:
				if sync.err != nil {
					return sync.err
1844 1845 1846 1847 1848 1849 1850 1851 1852
				}
				if err := d.commitPivotBlock(P); err != nil {
					return err
				}
				oldPivot = nil

			case <-time.After(time.Second):
				oldTail = afterP
				continue
1853
			}
1854
		}
1855
		// Fast sync done, pivot commit done, full import
1856 1857 1858 1859 1860 1861 1862
		if err := d.importBlockResults(afterP); err != nil {
			return err
		}
	}
}

func splitAroundPivot(pivot uint64, results []*fetchResult) (p *fetchResult, before, after []*fetchResult) {
1863 1864 1865 1866 1867 1868 1869 1870
	if len(results) == 0 {
		return nil, nil, nil
	}
	if lastNum := results[len(results)-1].Header.Number.Uint64(); lastNum < pivot {
		// the pivot is somewhere in the future
		return nil, results, nil
	}
	// This can also be optimized, but only happens very seldom
1871 1872 1873 1874 1875 1876 1877 1878 1879 1880 1881 1882 1883 1884 1885
	for _, result := range results {
		num := result.Header.Number.Uint64()
		switch {
		case num < pivot:
			before = append(before, result)
		case num == pivot:
			p = result
		default:
			after = append(after, result)
		}
	}
	return p, before, after
}

func (d *Downloader) commitFastSyncData(results []*fetchResult, stateSync *stateSync) error {
1886 1887 1888 1889 1890 1891 1892 1893 1894 1895
	// Check for any early termination requests
	if len(results) == 0 {
		return nil
	}
	select {
	case <-d.quitCh:
		return errCancelContentProcessing
	case <-stateSync.done:
		if err := stateSync.Wait(); err != nil {
			return err
1896
		}
1897 1898 1899 1900 1901 1902 1903 1904 1905 1906 1907 1908 1909 1910
	default:
	}
	// Retrieve the a batch of results to import
	first, last := results[0].Header, results[len(results)-1].Header
	log.Debug("Inserting fast-sync blocks", "items", len(results),
		"firstnum", first.Number, "firsthash", first.Hash(),
		"lastnumn", last.Number, "lasthash", last.Hash(),
	)
	blocks := make([]*types.Block, len(results))
	receipts := make([]types.Receipts, len(results))
	for i, result := range results {
		blocks[i] = types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
		receipts[i] = result.Receipts
	}
1911
	if index, err := d.blockchain.InsertReceiptChain(blocks, receipts, d.ancientLimit); err != nil {
1912
		log.Debug("Downloaded item processing failed", "number", results[index].Header.Number, "hash", results[index].Header.Hash(), "err", err)
1913
		return fmt.Errorf("%w: %v", errInvalidChain, err)
1914 1915 1916 1917 1918
	}
	return nil
}

func (d *Downloader) commitPivotBlock(result *fetchResult) error {
1919 1920
	block := types.NewBlockWithHeader(result.Header).WithBody(result.Transactions, result.Uncles)
	log.Debug("Committing fast sync pivot as new head", "number", block.Number(), "hash", block.Hash())
1921 1922

	// Commit the pivot block as the new head, will require full sync from here on
1923
	if _, err := d.blockchain.InsertReceiptChain([]*types.Block{block}, []types.Receipts{result.Receipts}, d.ancientLimit); err != nil {
1924 1925
		return err
	}
1926
	if err := d.blockchain.FastSyncCommitHead(block.Hash()); err != nil {
1927
		return err
1928
	}
1929
	atomic.StoreInt32(&d.committed, 1)
1930 1931 1932 1933 1934 1935 1936 1937 1938

	// If we had a bloom filter for the state sync, deallocate it now. Note, we only
	// deallocate internally, but keep the empty wrapper. This ensures that if we do
	// a rollback after committing the pivot and restarting fast sync, we don't end
	// up using a nil bloom. Empty bloom is fine, it just returns that it does not
	// have the info we need, so reach down to the database instead.
	if d.stateBloom != nil {
		d.stateBloom.Close()
	}
1939
	return nil
1940 1941
}

L
Leif Jurvetson 已提交
1942
// DeliverHeaders injects a new batch of block headers received from a remote
1943
// node into the download schedule.
1944 1945
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) error {
	return d.deliver(d.headerCh, &headerPack{id, headers}, headerInMeter, headerDropMeter)
1946 1947 1948
}

// DeliverBodies injects a new batch of block bodies received from a remote node.
1949 1950
func (d *Downloader) DeliverBodies(id string, transactions [][]*types.Transaction, uncles [][]*types.Header) error {
	return d.deliver(d.bodyCh, &bodyPack{id, transactions, uncles}, bodyInMeter, bodyDropMeter)
1951 1952 1953
}

// DeliverReceipts injects a new batch of receipts received from a remote node.
1954 1955
func (d *Downloader) DeliverReceipts(id string, receipts [][]*types.Receipt) error {
	return d.deliver(d.receiptCh, &receiptPack{id, receipts}, receiptInMeter, receiptDropMeter)
1956 1957 1958
}

// DeliverNodeData injects a new batch of node state data received from a remote node.
1959 1960 1961 1962 1963 1964 1965 1966 1967 1968 1969 1970 1971 1972 1973 1974 1975 1976 1977 1978 1979 1980 1981 1982 1983 1984 1985 1986
func (d *Downloader) DeliverNodeData(id string, data [][]byte) error {
	return d.deliver(d.stateCh, &statePack{id, data}, stateInMeter, stateDropMeter)
}

// DeliverSnapPacket is invoked from a peer's message handler when it transmits a
// data packet for the local node to consume.
func (d *Downloader) DeliverSnapPacket(peer *snap.Peer, packet snap.Packet) error {
	switch packet := packet.(type) {
	case *snap.AccountRangePacket:
		hashes, accounts, err := packet.Unpack()
		if err != nil {
			return err
		}
		return d.SnapSyncer.OnAccounts(peer, packet.ID, hashes, accounts, packet.Proof)

	case *snap.StorageRangesPacket:
		hashset, slotset := packet.Unpack()
		return d.SnapSyncer.OnStorage(peer, packet.ID, hashset, slotset, packet.Proof)

	case *snap.ByteCodesPacket:
		return d.SnapSyncer.OnByteCodes(peer, packet.ID, packet.Codes)

	case *snap.TrieNodesPacket:
		return d.SnapSyncer.OnTrieNodes(peer, packet.ID, packet.Nodes)

	default:
		return fmt.Errorf("unexpected snap packet type: %T", packet)
	}
1987 1988 1989
}

// deliver injects a new batch of data received from a remote node.
1990
func (d *Downloader) deliver(destCh chan dataPack, packet dataPack, inMeter, dropMeter metrics.Meter) (err error) {
1991
	// Update the delivery metrics for both good and failed deliveries
1992
	inMeter.Mark(int64(packet.Items()))
1993 1994
	defer func() {
		if err != nil {
1995
			dropMeter.Mark(int64(packet.Items()))
1996 1997 1998 1999 2000 2001
		}
	}()
	// Deliver or abort if the sync is canceled while queuing
	d.cancelLock.RLock()
	cancel := d.cancelCh
	d.cancelLock.RUnlock()
2002 2003 2004
	if cancel == nil {
		return errNoSyncActive
	}
2005
	select {
2006
	case destCh <- packet:
2007 2008 2009 2010
		return nil
	case <-cancel:
		return errNoSyncActive
	}
2011
}