worker.go 35.3 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17 18 19
package miner

import (
20
	"bytes"
21
	"errors"
O
obscuren 已提交
22
	"math/big"
O
obscuren 已提交
23
	"sync"
O
obscuren 已提交
24
	"sync/atomic"
25
	"time"
O
obscuren 已提交
26

27
	mapset "github.com/deckarep/golang-set"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/common"
29 30
	"github.com/ethereum/go-ethereum/consensus"
	"github.com/ethereum/go-ethereum/consensus/misc"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/core"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33 34
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/event"
35
	"github.com/ethereum/go-ethereum/log"
36
	"github.com/ethereum/go-ethereum/params"
37
	"github.com/ethereum/go-ethereum/trie"
O
obscuren 已提交
38 39
)

J
Jeffrey Wilcke 已提交
40
const (
41 42
	// resultQueueSize is the size of channel listening to sealing result.
	resultQueueSize = 10
43

44
	// txChanSize is the size of channel listening to NewTxsEvent.
45 46
	// The number is referenced from the size of tx pool.
	txChanSize = 4096
47

48 49
	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
	chainHeadChanSize = 10
50

51 52
	// chainSideChanSize is the size of channel listening to ChainSideEvent.
	chainSideChanSize = 10
53

54 55 56
	// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
	resubmitAdjustChanSize = 10

57
	// miningLogAtDepth is the number of confirmations before logging successful mining.
58
	miningLogAtDepth = 7
59

60 61 62 63 64
	// minRecommitInterval is the minimal time interval to recreate the mining block with
	// any newly arrived transactions.
	minRecommitInterval = 1 * time.Second

	// maxRecommitInterval is the maximum time interval to recreate the mining block with
65
	// any newly arrived transactions.
66 67 68 69 70 71 72 73 74
	maxRecommitInterval = 15 * time.Second

	// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
	// resubmitting interval.
	intervalAdjustRatio = 0.1

	// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
	// increasing upper limit or decreasing lower limit so that the limit can be reachable.
	intervalAdjustBias = 200 * 1000.0 * 1000.0
75

76
	// staleThreshold is the maximum depth of the acceptable stale block.
77
	staleThreshold = 7
J
Jeffrey Wilcke 已提交
78
)
79

80 81
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
J
Jeffrey Wilcke 已提交
82 83
	signer types.Signer

84
	state     *state.StateDB // apply state changes here
85 86 87
	ancestors mapset.Set     // ancestor set (used for checking uncle parent validity)
	family    mapset.Set     // family set (used for checking uncle invalidity)
	uncles    mapset.Set     // uncle set
88
	tcount    int            // tx count in cycle
89
	gasPool   *core.GasPool  // available gas used to pack transactions
O
obscuren 已提交
90

F
Felix Lange 已提交
91 92 93
	header   *types.Header
	txs      []*types.Transaction
	receipts []*types.Receipt
94
}
J
Jeffrey Wilcke 已提交
95

96 97 98 99 100 101 102 103
// task contains all information for consensus engine sealing and result submitting.
type task struct {
	receipts  []*types.Receipt
	state     *state.StateDB
	block     *types.Block
	createdAt time.Time
}

104 105 106 107 108 109
const (
	commitInterruptNone int32 = iota
	commitInterruptNewHead
	commitInterruptResubmit
)

110
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
111 112 113
type newWorkReq struct {
	interrupt *int32
	noempty   bool
114
	timestamp int64
115 116
}

117 118 119 120 121 122
// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
	ratio float64
	inc   bool
}

123 124
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
O
obscuren 已提交
125
type worker struct {
126 127 128 129 130
	config      *Config
	chainConfig *params.ChainConfig
	engine      consensus.Engine
	eth         Backend
	chain       *core.BlockChain
131

132 133 134
	// Feeds
	pendingLogsFeed event.Feed

135
	// Subscriptions
136
	mux          *event.TypeMux
137
	txsCh        chan core.NewTxsEvent
138
	txsSub       event.Subscription
139 140 141 142
	chainHeadCh  chan core.ChainHeadEvent
	chainHeadSub event.Subscription
	chainSideCh  chan core.ChainSideEvent
	chainSideSub event.Subscription
143

144
	// Channels
145 146
	newWorkCh          chan *newWorkReq
	taskCh             chan *task
147
	resultCh           chan *types.Block
148 149 150 151
	startCh            chan struct{}
	exitCh             chan struct{}
	resubmitIntervalCh chan time.Duration
	resubmitAdjustCh   chan *intervalAdjust
O
obscuren 已提交
152

153 154 155 156
	current      *environment                 // An environment for current running cycle.
	localUncles  map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
	remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
	unconfirmed  *unconfirmedBlocks           // A set of locally mined blocks pending canonicalness confirmations.
157

158
	mu       sync.RWMutex // The lock used to protect the coinbase and extra fields
O
obscuren 已提交
159
	coinbase common.Address
160
	extra    []byte
O
obscuren 已提交
161

162 163 164
	pendingMu    sync.RWMutex
	pendingTasks map[common.Hash]*task

165
	snapshotMu       sync.RWMutex // The lock used to protect the snapshots below
166 167 168
	snapshotBlock    *types.Block
	snapshotReceipts types.Receipts
	snapshotState    *state.StateDB
169

F
Felix Lange 已提交
170
	// atomic status counters
171
	running int32 // The indicator whether the consensus engine is running or not.
172
	newTxs  int32 // New arrival transaction count since last sealing work submitting.
173

174 175 176 177 178 179 180
	// noempty is the flag used to control whether the feature of pre-seal empty
	// block is enabled. The default value is false(pre-seal is enabled by default).
	// But in some special scenario the consensus engine will seal blocks instantaneously,
	// in this case this feature will add all empty blocks into canonical chain
	// non-stop and no real transaction will be included.
	noempty uint32

181 182 183
	// External functions
	isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.

184
	// Test hooks
185 186 187 188
	newTaskHook  func(*task)                        // Method to call upon receiving a new sealing task.
	skipSealHook func(*task) bool                   // Method to decide whether skipping the sealing.
	fullTaskHook func()                             // Method to call before pushing the full sealing task.
	resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
O
obscuren 已提交
189 190
}

G
gary rong 已提交
191
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool, init bool) *worker {
O
obscuren 已提交
192
	worker := &worker{
193
		config:             config,
194
		chainConfig:        chainConfig,
195 196 197 198
		engine:             engine,
		eth:                eth,
		mux:                mux,
		chain:              eth.BlockChain(),
199 200 201
		isLocalBlock:       isLocalBlock,
		localUncles:        make(map[common.Hash]*types.Block),
		remoteUncles:       make(map[common.Hash]*types.Block),
202
		unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
203
		pendingTasks:       make(map[common.Hash]*task),
204 205 206 207 208
		txsCh:              make(chan core.NewTxsEvent, txChanSize),
		chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
		chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
		newWorkCh:          make(chan *newWorkReq),
		taskCh:             make(chan *task),
209
		resultCh:           make(chan *types.Block, resultQueueSize),
210 211 212 213
		exitCh:             make(chan struct{}),
		startCh:            make(chan struct{}, 1),
		resubmitIntervalCh: make(chan time.Duration),
		resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
O
obscuren 已提交
214
	}
215 216
	// Subscribe NewTxsEvent for tx pool
	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
217 218 219
	// Subscribe events for blockchain
	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
O
obscuren 已提交
220

221
	// Sanitize recommit interval if the user-specified one is too short.
222
	recommit := worker.config.Recommit
223 224 225 226 227
	if recommit < minRecommitInterval {
		log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
		recommit = minRecommitInterval
	}

228
	go worker.mainLoop()
229
	go worker.newWorkLoop(recommit)
230 231
	go worker.resultLoop()
	go worker.taskLoop()
O
obscuren 已提交
232

233
	// Submit first work to initialize pending state.
G
gary rong 已提交
234 235 236
	if init {
		worker.startCh <- struct{}{}
	}
O
obscuren 已提交
237
	return worker
O
obscuren 已提交
238 239
}

240 241 242 243 244
// setEtherbase sets the etherbase used to initialize the block coinbase field.
func (w *worker) setEtherbase(addr common.Address) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.coinbase = addr
J
Jeffrey Wilcke 已提交
245 246
}

247 248 249 250 251 252
func (w *worker) setGasCeil(ceil uint64) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.config.GasCeil = ceil
}

253 254 255 256 257
// setExtra sets the content used to initialize the block extra field.
func (w *worker) setExtra(extra []byte) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.extra = extra
258 259
}

260 261 262 263 264
// setRecommitInterval updates the interval for miner sealing work recommitting.
func (w *worker) setRecommitInterval(interval time.Duration) {
	w.resubmitIntervalCh <- interval
}

265 266 267 268 269 270 271 272 273 274
// disablePreseal disables pre-sealing mining feature
func (w *worker) disablePreseal() {
	atomic.StoreUint32(&w.noempty, 1)
}

// enablePreseal enables pre-sealing mining feature
func (w *worker) enablePreseal() {
	atomic.StoreUint32(&w.noempty, 0)
}

275 276
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
277
	// return a snapshot to avoid contention on currentMu mutex
278 279 280 281 282 283
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	if w.snapshotState == nil {
		return nil, nil
	}
	return w.snapshotBlock, w.snapshotState.Copy()
284
}
285

286 287
// pendingBlock returns pending block.
func (w *worker) pendingBlock() *types.Block {
288
	// return a snapshot to avoid contention on currentMu mutex
289 290 291
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	return w.snapshotBlock
292 293
}

294 295 296 297 298 299 300 301
// pendingBlockAndReceipts returns pending block and corresponding receipts.
func (w *worker) pendingBlockAndReceipts() (*types.Block, types.Receipts) {
	// return a snapshot to avoid contention on currentMu mutex
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	return w.snapshotBlock, w.snapshotReceipts
}

302 303 304
// start sets the running status as 1 and triggers new work submitting.
func (w *worker) start() {
	atomic.StoreInt32(&w.running, 1)
305
	w.startCh <- struct{}{}
O
obscuren 已提交
306 307
}

308 309 310
// stop sets the running status as 0.
func (w *worker) stop() {
	atomic.StoreInt32(&w.running, 0)
O
obscuren 已提交
311 312
}

313 314 315
// isRunning returns an indicator whether worker is running or not.
func (w *worker) isRunning() bool {
	return atomic.LoadInt32(&w.running) == 1
316 317
}

318
// close terminates all background threads maintained by the worker.
319 320
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
321 322 323
	if w.current != nil && w.current.state != nil {
		w.current.state.StopPrefetcher()
	}
324
	atomic.StoreInt32(&w.running, 0)
325
	close(w.exitCh)
O
obscuren 已提交
326 327
}

328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
// recalcRecommit recalculates the resubmitting interval upon feedback.
func recalcRecommit(minRecommit, prev time.Duration, target float64, inc bool) time.Duration {
	var (
		prevF = float64(prev.Nanoseconds())
		next  float64
	)
	if inc {
		next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
		max := float64(maxRecommitInterval.Nanoseconds())
		if next > max {
			next = max
		}
	} else {
		next = prevF*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
		min := float64(minRecommit.Nanoseconds())
		if next < min {
			next = min
		}
	}
	return time.Duration(int64(next))
}

350
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
351 352 353 354
func (w *worker) newWorkLoop(recommit time.Duration) {
	var (
		interrupt   *int32
		minRecommit = recommit // minimal resubmit interval specified by user.
355
		timestamp   int64      // timestamp for each round of mining.
356
	)
357 358

	timer := time.NewTimer(0)
359
	defer timer.Stop()
360 361
	<-timer.C // discard the initial tick

362 363
	// commit aborts in-flight transaction execution with given signal and resubmits a new one.
	commit := func(noempty bool, s int32) {
364 365 366 367
		if interrupt != nil {
			atomic.StoreInt32(interrupt, s)
		}
		interrupt = new(int32)
368 369 370 371 372
		select {
		case w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}:
		case <-w.exitCh:
			return
		}
373 374 375
		timer.Reset(recommit)
		atomic.StoreInt32(&w.newTxs, 0)
	}
376 377 378 379 380 381 382 383 384 385
	// clearPending cleans the stale pending tasks.
	clearPending := func(number uint64) {
		w.pendingMu.Lock()
		for h, t := range w.pendingTasks {
			if t.block.NumberU64()+staleThreshold <= number {
				delete(w.pendingTasks, h)
			}
		}
		w.pendingMu.Unlock()
	}
386 387 388 389

	for {
		select {
		case <-w.startCh:
390
			clearPending(w.chain.CurrentBlock().NumberU64())
391
			timestamp = time.Now().Unix()
392
			commit(false, commitInterruptNewHead)
393

394 395
		case head := <-w.chainHeadCh:
			clearPending(head.Block.NumberU64())
396
			timestamp = time.Now().Unix()
397
			commit(false, commitInterruptNewHead)
398 399 400 401

		case <-timer.C:
			// If mining is running resubmit a new work cycle periodically to pull in
			// higher priced transactions. Disable this overhead for pending blocks.
402
			if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427
				// Short circuit if no new transaction arrives.
				if atomic.LoadInt32(&w.newTxs) == 0 {
					timer.Reset(recommit)
					continue
				}
				commit(true, commitInterruptResubmit)
			}

		case interval := <-w.resubmitIntervalCh:
			// Adjust resubmit interval explicitly by user.
			if interval < minRecommitInterval {
				log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
				interval = minRecommitInterval
			}
			log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
			minRecommit, recommit = interval, interval

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
			}

		case adjust := <-w.resubmitAdjustCh:
			// Adjust resubmit interval by feedback.
			if adjust.inc {
				before := recommit
428 429
				target := float64(recommit.Nanoseconds()) / adjust.ratio
				recommit = recalcRecommit(minRecommit, recommit, target, true)
430 431 432
				log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
			} else {
				before := recommit
433
				recommit = recalcRecommit(minRecommit, recommit, float64(minRecommit.Nanoseconds()), false)
434 435 436 437 438
				log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
			}

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
439 440 441 442 443 444 445 446
			}

		case <-w.exitCh:
			return
		}
	}
}

447 448 449 450 451
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
	defer w.txsSub.Unsubscribe()
	defer w.chainHeadSub.Unsubscribe()
	defer w.chainSideSub.Unsubscribe()
452 453 454

	for {
		select {
455
		case req := <-w.newWorkCh:
456
			w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
457 458

		case ev := <-w.chainSideCh:
459 460 461 462 463
			// Short circuit for duplicate side blocks
			if _, exist := w.localUncles[ev.Block.Hash()]; exist {
				continue
			}
			if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
464 465
				continue
			}
466 467 468 469 470 471
			// Add side block to possible uncle block set depending on the author.
			if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
				w.localUncles[ev.Block.Hash()] = ev.Block
			} else {
				w.remoteUncles[ev.Block.Hash()] = ev.Block
			}
472 473 474 475 476 477 478 479 480 481 482
			// If our mining block contains less than 2 uncle blocks,
			// add the new uncle block if valid and regenerate a mining block.
			if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
				start := time.Now()
				if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
					var uncles []*types.Header
					w.current.uncles.Each(func(item interface{}) bool {
						hash, ok := item.(common.Hash)
						if !ok {
							return false
						}
483 484 485 486
						uncle, exist := w.localUncles[hash]
						if !exist {
							uncle, exist = w.remoteUncles[hash]
						}
487 488 489 490
						if !exist {
							return false
						}
						uncles = append(uncles, uncle.Header())
491
						return false
492 493 494 495
					})
					w.commit(uncles, nil, true, start)
				}
			}
496 497

		case ev := <-w.txsCh:
498 499 500 501 502
			// Apply transactions to the pending state if we're not mining.
			//
			// Note all transactions received may not be continuous with transactions
			// already included in the current mining block. These transactions will
			// be automatically eliminated.
503
			if !w.isRunning() && w.current != nil {
504 505 506 507
				// If block is already full, abort
				if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
					continue
				}
508
				w.mu.RLock()
509
				coinbase := w.coinbase
510
				w.mu.RUnlock()
511

512 513
				txs := make(map[common.Address]types.Transactions)
				for _, tx := range ev.Txs {
514
					acc, _ := types.Sender(w.current.signer, tx)
515 516
					txs[acc] = append(txs[acc], tx)
				}
517
				txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, w.current.header.BaseFee)
518
				tcount := w.current.tcount
519
				w.commitTransactions(txset, coinbase, nil)
520 521 522 523 524
				// Only update the snapshot if any new transactons were added
				// to the pending block
				if tcount != w.current.tcount {
					w.updateSnapshot()
				}
525
			} else {
526 527 528
				// Special case, if the consensus engine is 0 period clique(dev mode),
				// submit mining work here since all empty submission will be rejected
				// by clique. Of course the advance sealing(empty submission) is disabled.
529 530
				if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
					w.commitNewWork(nil, true, time.Now().Unix())
531
				}
O
obscuren 已提交
532
			}
533
			atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
534 535

		// System stopped
536
		case <-w.exitCh:
537
			return
538
		case <-w.txsSub.Err():
539
			return
540 541 542
		case <-w.chainHeadSub.Err():
			return
		case <-w.chainSideSub.Err():
543
			return
O
obscuren 已提交
544 545 546 547
		}
	}
}

548 549 550
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
551 552 553 554
	var (
		stopCh chan struct{}
		prev   common.Hash
	)
555 556 557 558 559 560 561 562

	// interrupt aborts the in-flight sealing task.
	interrupt := func() {
		if stopCh != nil {
			close(stopCh)
			stopCh = nil
		}
	}
O
obscuren 已提交
563
	for {
564 565 566 567 568
		select {
		case task := <-w.taskCh:
			if w.newTaskHook != nil {
				w.newTaskHook(task)
			}
569
			// Reject duplicate sealing work due to resubmitting.
570 571
			sealHash := w.engine.SealHash(task.block.Header())
			if sealHash == prev {
572 573
				continue
			}
574
			// Interrupt previous sealing operation
575
			interrupt()
576 577 578 579 580 581
			stopCh, prev = make(chan struct{}), sealHash

			if w.skipSealHook != nil && w.skipSealHook(task) {
				continue
			}
			w.pendingMu.Lock()
582
			w.pendingTasks[sealHash] = task
583 584 585 586 587
			w.pendingMu.Unlock()

			if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
				log.Warn("Block sealing failed", "err", err)
			}
588 589 590 591 592 593
		case <-w.exitCh:
			interrupt()
			return
		}
	}
}
O
obscuren 已提交
594

595 596 597 598 599
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
	for {
		select {
600
		case block := <-w.resultCh:
601
			// Short circuit when receiving empty result.
602
			if block == nil {
O
obscuren 已提交
603 604
				continue
			}
605 606 607 608
			// Short circuit when receiving duplicate result caused by resubmitting.
			if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
				continue
			}
609 610 611 612 613 614 615 616 617 618
			var (
				sealhash = w.engine.SealHash(block.Header())
				hash     = block.Hash()
			)
			w.pendingMu.RLock()
			task, exist := w.pendingTasks[sealhash]
			w.pendingMu.RUnlock()
			if !exist {
				log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
				continue
619
			}
620 621 622 623 624 625
			// Different block could share same sealhash, deep copy here to prevent write-write conflict.
			var (
				receipts = make([]*types.Receipt, len(task.receipts))
				logs     []*types.Log
			)
			for i, receipt := range task.receipts {
626 627 628 629 630
				// add block location fields
				receipt.BlockHash = hash
				receipt.BlockNumber = block.Number()
				receipt.TransactionIndex = uint(i)

631 632 633 634 635 636 637 638
				receipts[i] = new(types.Receipt)
				*receipts[i] = *receipt
				// Update the block hash in all logs since it is now available and not when the
				// receipt/log of individual transactions were created.
				for _, log := range receipt.Logs {
					log.BlockHash = hash
				}
				logs = append(logs, receipt.Logs...)
639
			}
640
			// Commit block and state to database.
641
			_, err := w.chain.WriteBlockWithState(block, receipts, logs, task.state, true)
642 643 644 645
			if err != nil {
				log.Error("Failed writing block to chain", "err", err)
				continue
			}
646 647 648
			log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
				"elapsed", common.PrettyDuration(time.Since(task.createdAt)))

649
			// Broadcast the block and announce chain insertion event
650
			w.mux.Post(core.NewMinedBlockEvent{Block: block})
651

652 653
			// Insert the block into the set of pending ones to resultLoop for confirmations
			w.unconfirmed.Insert(block.NumberU64(), block.Hash())
O
obscuren 已提交
654

655 656 657
		case <-w.exitCh:
			return
		}
O
obscuren 已提交
658 659 660
	}
}

F
Felix Lange 已提交
661
// makeCurrent creates a new environment for the current cycle.
662
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
663 664
	// Retrieve the parent state to execute on top and start a prefetcher for
	// the miner to speed block sealing up a bit
665
	state, err := w.chain.StateAt(parent.Root())
666 667 668
	if err != nil {
		return err
	}
669 670
	state.StartPrefetcher("miner")

671
	env := &environment{
672
		signer:    types.MakeSigner(w.chainConfig, header.Number),
F
Felix Lange 已提交
673
		state:     state,
674 675 676
		ancestors: mapset.NewSet(),
		family:    mapset.NewSet(),
		uncles:    mapset.NewSet(),
F
Felix Lange 已提交
677
		header:    header,
678
	}
Z
zelig 已提交
679
	// when 08 is processed ancestors contain 07 (quick block)
680
	for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
681
		for _, uncle := range ancestor.Uncles() {
682
			env.family.Add(uncle.Hash())
683
		}
684 685
		env.family.Add(ancestor.Hash())
		env.ancestors.Add(ancestor.Hash())
O
obscuren 已提交
686
	}
687
	// Keep track of transactions which return errors so they can be removed
688
	env.tcount = 0
689 690 691 692 693 694

	// Swap out the old work with the new one, terminating any leftover prefetcher
	// processes in the mean time and starting a new one.
	if w.current != nil && w.current.state != nil {
		w.current.state.StopPrefetcher()
	}
695
	w.current = env
696
	return nil
O
obscuren 已提交
697 698
}

699
// commitUncle adds the given block to uncle block set, returns error if failed to add.
700
func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
701 702
	hash := uncle.Hash()
	if env.uncles.Contains(hash) {
703 704 705 706
		return errors.New("uncle not unique")
	}
	if env.header.ParentHash == uncle.ParentHash {
		return errors.New("uncle is sibling")
707 708
	}
	if !env.ancestors.Contains(uncle.ParentHash) {
709
		return errors.New("uncle's parent unknown")
710 711
	}
	if env.family.Contains(hash) {
712
		return errors.New("uncle already included")
713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729
	}
	env.uncles.Add(uncle.Hash())
	return nil
}

// updateSnapshot updates pending snapshot block and state.
// Note this function assumes the current variable is thread safe.
func (w *worker) updateSnapshot() {
	w.snapshotMu.Lock()
	defer w.snapshotMu.Unlock()

	var uncles []*types.Header
	w.current.uncles.Each(func(item interface{}) bool {
		hash, ok := item.(common.Hash)
		if !ok {
			return false
		}
730 731 732 733
		uncle, exist := w.localUncles[hash]
		if !exist {
			uncle, exist = w.remoteUncles[hash]
		}
734 735 736 737
		if !exist {
			return false
		}
		uncles = append(uncles, uncle.Header())
738
		return false
739 740 741 742 743 744 745
	})

	w.snapshotBlock = types.NewBlock(
		w.current.header,
		w.current.txs,
		uncles,
		w.current.receipts,
746
		trie.NewStackTrie(nil),
747
	)
748
	w.snapshotReceipts = copyReceipts(w.current.receipts)
749 750 751
	w.snapshotState = w.current.state.Copy()
}

752 753 754
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
	snap := w.current.state.Snapshot()

755
	receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771
	if err != nil {
		w.current.state.RevertToSnapshot(snap)
		return nil, err
	}
	w.current.txs = append(w.current.txs, tx)
	w.current.receipts = append(w.current.receipts, receipt)

	return receipt.Logs, nil
}

func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
	// Short circuit if current is nil
	if w.current == nil {
		return true
	}

772
	gasLimit := w.current.header.GasLimit
773
	if w.current.gasPool == nil {
774
		w.current.gasPool = new(core.GasPool).AddGas(gasLimit)
775 776 777 778 779 780 781 782 783 784 785 786
	}

	var coalescedLogs []*types.Log

	for {
		// In the following three cases, we will interrupt the execution of the transaction.
		// (1) new head block event arrival, the interrupt signal is 1
		// (2) worker start or restart, the interrupt signal is 1
		// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
		// For the first two cases, the semi-finished work will be discarded.
		// For the third case, the semi-finished work will be submitted to the consensus engine.
		if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
787 788
			// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
			if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
789
				ratio := float64(gasLimit-w.current.gasPool.Gas()) / float64(gasLimit)
790 791 792 793 794 795 796 797
				if ratio < 0.1 {
					ratio = 0.1
				}
				w.resubmitAdjustCh <- &intervalAdjust{
					ratio: ratio,
					inc:   true,
				}
			}
798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816
			return atomic.LoadInt32(interrupt) == commitInterruptNewHead
		}
		// If we don't have enough gas for any further transactions then we're done
		if w.current.gasPool.Gas() < params.TxGas {
			log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
			break
		}
		// Retrieve the next transaction and abort if all done
		tx := txs.Peek()
		if tx == nil {
			break
		}
		// Error may be ignored here. The error has already been checked
		// during transaction acceptance is the transaction pool.
		//
		// We use the eip155 signer regardless of the current hf.
		from, _ := types.Sender(w.current.signer, tx)
		// Check whether the tx is replay protected. If we're not in the EIP155 hf
		// phase, start ignoring the sender until we do.
817 818
		if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) {
			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)
819 820 821 822 823

			txs.Pop()
			continue
		}
		// Start executing the transaction
824
		w.current.state.Prepare(tx.Hash(), w.current.tcount)
825 826

		logs, err := w.commitTransaction(tx, coinbase)
827 828
		switch {
		case errors.Is(err, core.ErrGasLimitReached):
829 830 831 832
			// Pop the current out-of-gas transaction without shifting in the next from the account
			log.Trace("Gas limit exceeded for current block", "sender", from)
			txs.Pop()

833
		case errors.Is(err, core.ErrNonceTooLow):
834 835 836 837
			// New head notification data race between the transaction pool and miner, shift
			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
			txs.Shift()

838
		case errors.Is(err, core.ErrNonceTooHigh):
839 840 841 842
			// Reorg notification data race between the transaction pool and miner, skip account =
			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
			txs.Pop()

843
		case errors.Is(err, nil):
844 845 846 847 848
			// Everything ok, collect the logs and shift in the next transaction from the same account
			coalescedLogs = append(coalescedLogs, logs...)
			w.current.tcount++
			txs.Shift()

849 850 851 852 853
		case errors.Is(err, core.ErrTxTypeNotSupported):
			// Pop the unsupported transaction without shifting in the next from the account
			log.Trace("Skipping unsupported transaction type", "sender", from, "type", tx.Type())
			txs.Pop()

854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874
		default:
			// Strange error, discard the transaction and get the next in line (note, the
			// nonce-too-high clause will prevent us from executing in vain).
			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
			txs.Shift()
		}
	}

	if !w.isRunning() && len(coalescedLogs) > 0 {
		// We don't push the pendingLogsEvent while we are mining. The reason is that
		// when we are mining, the worker will regenerate a mining block every 3 seconds.
		// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.

		// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
		// logs by filling in the block hash when the block was mined by the local miner. This can
		// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
		cpy := make([]*types.Log, len(coalescedLogs))
		for i, l := range coalescedLogs {
			cpy[i] = new(types.Log)
			*cpy[i] = *l
		}
875
		w.pendingLogsFeed.Send(cpy)
876
	}
877 878 879 880 881
	// Notify resubmit loop to decrease resubmitting interval if current interval is larger
	// than the user-specified one.
	if interrupt != nil {
		w.resubmitAdjustCh <- &intervalAdjust{inc: false}
	}
882 883 884
	return false
}

885
// commitNewWork generates several new sealing tasks based on the parent block.
886
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
887 888
	w.mu.RLock()
	defer w.mu.RUnlock()
O
obscuren 已提交
889

890
	tstart := time.Now()
891
	parent := w.chain.CurrentBlock()
892

893 894
	if parent.Time() >= uint64(timestamp) {
		timestamp = int64(parent.Time() + 1)
F
Felix Lange 已提交
895 896 897 898 899
	}
	num := parent.Number()
	header := &types.Header{
		ParentHash: parent.Hash(),
		Number:     num.Add(num, common.Big1),
900
		GasLimit:   core.CalcGasLimit(parent.GasLimit(), w.config.GasCeil),
901
		Extra:      w.extra,
902
		Time:       uint64(timestamp),
F
Felix Lange 已提交
903
	}
904 905 906 907
	// Set baseFee and GasLimit if we are on an EIP-1559 chain
	if w.chainConfig.IsLondon(header.Number) {
		header.BaseFee = misc.CalcBaseFee(w.chainConfig, parent.Header())
		if !w.chainConfig.IsLondon(parent.Number()) {
908 909
			parentGasLimit := parent.GasLimit() * params.ElasticityMultiplier
			header.GasLimit = core.CalcGasLimit(parentGasLimit, w.config.GasCeil)
910 911
		}
	}
912
	// Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
913 914
	if w.isRunning() {
		if w.coinbase == (common.Address{}) {
915 916 917
			log.Error("Refusing to mine without etherbase")
			return
		}
918
		header.Coinbase = w.coinbase
919
	}
920
	if err := w.engine.Prepare(w.chain, header); err != nil {
921 922 923
		log.Error("Failed to prepare header for mining", "err", err)
		return
	}
924
	// If we are care about TheDAO hard-fork check whether to override the extra-data or not
925
	if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil {
926 927
		// Check whether the block is among the fork extra-override range
		limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
928
		if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
929
			// Depending whether we support or oppose the fork, override differently
930
			if w.chainConfig.DAOForkSupport {
931
				header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
932
			} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
933 934
				header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
			}
935 936
		}
	}
937
	// Could potentially happen if starting to mine in an odd state.
938
	err := w.makeCurrent(parent, header)
939
	if err != nil {
940
		log.Error("Failed to create mining context", "err", err)
941 942
		return
	}
943
	// Create the current work task and check any fork transitions needed
944
	env := w.current
945
	if w.chainConfig.DAOForkSupport && w.chainConfig.DAOForkBlock != nil && w.chainConfig.DAOForkBlock.Cmp(header.Number) == 0 {
946
		misc.ApplyDAOHardFork(env.state)
947
	}
948 949
	// Accumulate the uncles for the current block
	uncles := make([]*types.Header, 0, 2)
950 951 952 953 954 955
	commitUncles := func(blocks map[common.Hash]*types.Block) {
		// Clean up stale uncle blocks first
		for hash, uncle := range blocks {
			if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
				delete(blocks, hash)
			}
O
obscuren 已提交
956
		}
957 958 959 960 961 962 963 964 965 966
		for hash, uncle := range blocks {
			if len(uncles) == 2 {
				break
			}
			if err := w.commitUncle(env, uncle.Header()); err != nil {
				log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
			} else {
				log.Debug("Committing new uncle to block", "hash", hash)
				uncles = append(uncles, uncle.Header())
			}
O
obscuren 已提交
967 968
		}
	}
969 970 971
	// Prefer to locally generated uncle
	commitUncles(w.localUncles)
	commitUncles(w.remoteUncles)
972

973 974 975
	// Create an empty block based on temporary copied state for
	// sealing in advance without waiting block execution finished.
	if !noempty && atomic.LoadUint32(&w.noempty) == 0 {
976 977
		w.commit(uncles, nil, false, tstart)
	}
978 979

	// Fill the block with all available pending transactions.
980
	pending, err := w.eth.TxPool().Pending(true)
981 982 983 984
	if err != nil {
		log.Error("Failed to fetch pending transactions", "err", err)
		return
	}
985 986 987 988
	// Short circuit if there is no available pending transactions.
	// But if we disable empty precommit already, ignore it. Since
	// empty block is necessary to keep the liveness of the network.
	if len(pending) == 0 && atomic.LoadUint32(&w.noempty) == 0 {
989 990 991
		w.updateSnapshot()
		return
	}
992 993 994 995 996 997 998 999 1000
	// Split the pending transactions into locals and remotes
	localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
	for _, account := range w.eth.TxPool().Locals() {
		if txs := remoteTxs[account]; len(txs) > 0 {
			delete(remoteTxs, account)
			localTxs[account] = txs
		}
	}
	if len(localTxs) > 0 {
1001
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs, header.BaseFee)
1002 1003 1004 1005 1006
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
	}
	if len(remoteTxs) > 0 {
1007
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs, header.BaseFee)
1008 1009 1010
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
1011
	}
1012 1013 1014 1015 1016 1017
	w.commit(uncles, w.fullTaskHook, true, tstart)
}

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
1018
	// Deep copy receipts here to avoid interaction between different tasks.
1019
	receipts := copyReceipts(w.current.receipts)
1020
	s := w.current.state.Copy()
1021
	block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, receipts)
1022 1023
	if err != nil {
		return err
1024
	}
1025
	if w.isRunning() {
1026 1027
		if interval != nil {
			interval()
1028
		}
1029
		select {
1030 1031
		case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
			w.unconfirmed.Shift(block.NumberU64() - 1)
1032
			log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
1033 1034 1035
				"uncles", len(uncles), "txs", w.current.tcount,
				"gas", block.GasUsed(), "fees", totalFees(block, receipts),
				"elapsed", common.PrettyDuration(time.Since(start)))
1036

1037 1038
		case <-w.exitCh:
			log.Info("Worker has exited")
1039
		}
O
obscuren 已提交
1040
	}
1041 1042 1043 1044
	if update {
		w.updateSnapshot()
	}
	return nil
O
obscuren 已提交
1045
}
1046

1047 1048 1049 1050 1051 1052 1053 1054 1055 1056
// copyReceipts makes a deep copy of the given receipts.
func copyReceipts(receipts []*types.Receipt) []*types.Receipt {
	result := make([]*types.Receipt, len(receipts))
	for i, l := range receipts {
		cpy := *l
		result[i] = &cpy
	}
	return result
}

1057 1058 1059 1060 1061 1062 1063
// postSideBlock fires a side chain event, only use it for testing.
func (w *worker) postSideBlock(event core.ChainSideEvent) {
	select {
	case w.chainSideCh <- event:
	case <-w.exitCh:
	}
}
1064

1065
// totalFees computes total consumed miner fees in ETH. Block transactions and receipts have to have the same order.
1066 1067 1068
func totalFees(block *types.Block, receipts []*types.Receipt) *big.Float {
	feesWei := new(big.Int)
	for i, tx := range block.Transactions() {
1069
		minerFee, _ := tx.EffectiveGasTip(block.BaseFee())
1070
		feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), minerFee))
1071 1072 1073
	}
	return new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))
}