worker.go 32.2 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17 18 19
package miner

import (
20
	"bytes"
21
	"errors"
O
obscuren 已提交
22
	"math/big"
O
obscuren 已提交
23
	"sync"
O
obscuren 已提交
24
	"sync/atomic"
25
	"time"
O
obscuren 已提交
26

27
	mapset "github.com/deckarep/golang-set"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/common"
29 30
	"github.com/ethereum/go-ethereum/consensus"
	"github.com/ethereum/go-ethereum/consensus/misc"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/core"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33 34
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/event"
35
	"github.com/ethereum/go-ethereum/log"
36
	"github.com/ethereum/go-ethereum/params"
O
obscuren 已提交
37 38
)

J
Jeffrey Wilcke 已提交
39
const (
40 41
	// resultQueueSize is the size of channel listening to sealing result.
	resultQueueSize = 10
42

43
	// txChanSize is the size of channel listening to NewTxsEvent.
44 45
	// The number is referenced from the size of tx pool.
	txChanSize = 4096
46

47 48
	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
	chainHeadChanSize = 10
49

50 51
	// chainSideChanSize is the size of channel listening to ChainSideEvent.
	chainSideChanSize = 10
52

53 54 55
	// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
	resubmitAdjustChanSize = 10

56
	// miningLogAtDepth is the number of confirmations before logging successful mining.
57
	miningLogAtDepth = 7
58

59 60 61 62 63
	// minRecommitInterval is the minimal time interval to recreate the mining block with
	// any newly arrived transactions.
	minRecommitInterval = 1 * time.Second

	// maxRecommitInterval is the maximum time interval to recreate the mining block with
64
	// any newly arrived transactions.
65 66 67 68 69 70 71 72 73
	maxRecommitInterval = 15 * time.Second

	// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
	// resubmitting interval.
	intervalAdjustRatio = 0.1

	// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
	// increasing upper limit or decreasing lower limit so that the limit can be reachable.
	intervalAdjustBias = 200 * 1000.0 * 1000.0
74

75
	// staleThreshold is the maximum depth of the acceptable stale block.
76
	staleThreshold = 7
J
Jeffrey Wilcke 已提交
77
)
78

79 80
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
J
Jeffrey Wilcke 已提交
81 82
	signer types.Signer

83
	state     *state.StateDB // apply state changes here
84 85 86
	ancestors mapset.Set     // ancestor set (used for checking uncle parent validity)
	family    mapset.Set     // family set (used for checking uncle invalidity)
	uncles    mapset.Set     // uncle set
87
	tcount    int            // tx count in cycle
88
	gasPool   *core.GasPool  // available gas used to pack transactions
O
obscuren 已提交
89

F
Felix Lange 已提交
90 91 92
	header   *types.Header
	txs      []*types.Transaction
	receipts []*types.Receipt
93
}
J
Jeffrey Wilcke 已提交
94

95 96 97 98 99 100 101 102
// task contains all information for consensus engine sealing and result submitting.
type task struct {
	receipts  []*types.Receipt
	state     *state.StateDB
	block     *types.Block
	createdAt time.Time
}

103 104 105 106 107 108
const (
	commitInterruptNone int32 = iota
	commitInterruptNewHead
	commitInterruptResubmit
)

109
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
110 111 112
type newWorkReq struct {
	interrupt *int32
	noempty   bool
113
	timestamp int64
114 115
}

116 117 118 119 120 121
// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
	ratio float64
	inc   bool
}

122 123
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
O
obscuren 已提交
124
type worker struct {
125
	config *params.ChainConfig
126
	engine consensus.Engine
127 128
	eth    Backend
	chain  *core.BlockChain
129

130 131 132
	gasFloor uint64
	gasCeil  uint64

133
	// Subscriptions
134
	mux          *event.TypeMux
135
	txsCh        chan core.NewTxsEvent
136
	txsSub       event.Subscription
137 138 139 140
	chainHeadCh  chan core.ChainHeadEvent
	chainHeadSub event.Subscription
	chainSideCh  chan core.ChainSideEvent
	chainSideSub event.Subscription
141

142
	// Channels
143 144
	newWorkCh          chan *newWorkReq
	taskCh             chan *task
145
	resultCh           chan *types.Block
146 147 148 149
	startCh            chan struct{}
	exitCh             chan struct{}
	resubmitIntervalCh chan time.Duration
	resubmitAdjustCh   chan *intervalAdjust
O
obscuren 已提交
150

151 152 153 154
	current      *environment                 // An environment for current running cycle.
	localUncles  map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
	remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
	unconfirmed  *unconfirmedBlocks           // A set of locally mined blocks pending canonicalness confirmations.
155

156
	mu       sync.RWMutex // The lock used to protect the coinbase and extra fields
O
obscuren 已提交
157
	coinbase common.Address
158
	extra    []byte
O
obscuren 已提交
159

160 161 162
	pendingMu    sync.RWMutex
	pendingTasks map[common.Hash]*task

163
	snapshotMu    sync.RWMutex // The lock used to protect the block snapshot and state snapshot
164 165 166
	snapshotBlock *types.Block
	snapshotState *state.StateDB

F
Felix Lange 已提交
167
	// atomic status counters
168
	running int32 // The indicator whether the consensus engine is running or not.
169
	newTxs  int32 // New arrival transaction count since last sealing work submitting.
170

171 172 173
	// External functions
	isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.

174
	// Test hooks
175 176 177 178
	newTaskHook  func(*task)                        // Method to call upon receiving a new sealing task.
	skipSealHook func(*task) bool                   // Method to decide whether skipping the sealing.
	fullTaskHook func()                             // Method to call before pushing the full sealing task.
	resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
O
obscuren 已提交
179 180
}

181
func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, recommit time.Duration, gasFloor, gasCeil uint64, isLocalBlock func(*types.Block) bool) *worker {
O
obscuren 已提交
182
	worker := &worker{
183 184 185 186 187
		config:             config,
		engine:             engine,
		eth:                eth,
		mux:                mux,
		chain:              eth.BlockChain(),
188 189
		gasFloor:           gasFloor,
		gasCeil:            gasCeil,
190 191 192
		isLocalBlock:       isLocalBlock,
		localUncles:        make(map[common.Hash]*types.Block),
		remoteUncles:       make(map[common.Hash]*types.Block),
193
		unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
194
		pendingTasks:       make(map[common.Hash]*task),
195 196 197 198 199
		txsCh:              make(chan core.NewTxsEvent, txChanSize),
		chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
		chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
		newWorkCh:          make(chan *newWorkReq),
		taskCh:             make(chan *task),
200
		resultCh:           make(chan *types.Block, resultQueueSize),
201 202 203 204
		exitCh:             make(chan struct{}),
		startCh:            make(chan struct{}, 1),
		resubmitIntervalCh: make(chan time.Duration),
		resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
O
obscuren 已提交
205
	}
206 207
	// Subscribe NewTxsEvent for tx pool
	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
208 209 210
	// Subscribe events for blockchain
	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
O
obscuren 已提交
211

212 213 214 215 216 217
	// Sanitize recommit interval if the user-specified one is too short.
	if recommit < minRecommitInterval {
		log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
		recommit = minRecommitInterval
	}

218
	go worker.mainLoop()
219
	go worker.newWorkLoop(recommit)
220 221
	go worker.resultLoop()
	go worker.taskLoop()
O
obscuren 已提交
222

223
	// Submit first work to initialize pending state.
224 225
	worker.startCh <- struct{}{}

O
obscuren 已提交
226
	return worker
O
obscuren 已提交
227 228
}

229 230 231 232 233
// setEtherbase sets the etherbase used to initialize the block coinbase field.
func (w *worker) setEtherbase(addr common.Address) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.coinbase = addr
J
Jeffrey Wilcke 已提交
234 235
}

236 237 238 239 240
// setExtra sets the content used to initialize the block extra field.
func (w *worker) setExtra(extra []byte) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.extra = extra
241 242
}

243 244 245 246 247
// setRecommitInterval updates the interval for miner sealing work recommitting.
func (w *worker) setRecommitInterval(interval time.Duration) {
	w.resubmitIntervalCh <- interval
}

248 249
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
250
	// return a snapshot to avoid contention on currentMu mutex
251 252 253 254 255 256
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	if w.snapshotState == nil {
		return nil, nil
	}
	return w.snapshotBlock, w.snapshotState.Copy()
257
}
258

259 260
// pendingBlock returns pending block.
func (w *worker) pendingBlock() *types.Block {
261
	// return a snapshot to avoid contention on currentMu mutex
262 263 264
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	return w.snapshotBlock
265 266
}

267 268 269
// start sets the running status as 1 and triggers new work submitting.
func (w *worker) start() {
	atomic.StoreInt32(&w.running, 1)
270
	w.startCh <- struct{}{}
O
obscuren 已提交
271 272
}

273 274 275
// stop sets the running status as 0.
func (w *worker) stop() {
	atomic.StoreInt32(&w.running, 0)
O
obscuren 已提交
276 277
}

278 279 280
// isRunning returns an indicator whether worker is running or not.
func (w *worker) isRunning() bool {
	return atomic.LoadInt32(&w.running) == 1
281 282
}

283
// close terminates all background threads maintained by the worker.
284 285 286
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
	close(w.exitCh)
O
obscuren 已提交
287 288
}

289
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
290 291 292 293
func (w *worker) newWorkLoop(recommit time.Duration) {
	var (
		interrupt   *int32
		minRecommit = recommit // minimal resubmit interval specified by user.
294
		timestamp   int64      // timestamp for each round of mining.
295
	)
296 297 298 299

	timer := time.NewTimer(0)
	<-timer.C // discard the initial tick

300 301
	// commit aborts in-flight transaction execution with given signal and resubmits a new one.
	commit := func(noempty bool, s int32) {
302 303 304 305
		if interrupt != nil {
			atomic.StoreInt32(interrupt, s)
		}
		interrupt = new(int32)
306
		w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329
		timer.Reset(recommit)
		atomic.StoreInt32(&w.newTxs, 0)
	}
	// recalcRecommit recalculates the resubmitting interval upon feedback.
	recalcRecommit := func(target float64, inc bool) {
		var (
			prev = float64(recommit.Nanoseconds())
			next float64
		)
		if inc {
			next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
			// Recap if interval is larger than the maximum time interval
			if next > float64(maxRecommitInterval.Nanoseconds()) {
				next = float64(maxRecommitInterval.Nanoseconds())
			}
		} else {
			next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
			// Recap if interval is less than the user specified minimum
			if next < float64(minRecommit.Nanoseconds()) {
				next = float64(minRecommit.Nanoseconds())
			}
		}
		recommit = time.Duration(int64(next))
330
	}
331 332 333 334 335 336 337 338 339 340
	// clearPending cleans the stale pending tasks.
	clearPending := func(number uint64) {
		w.pendingMu.Lock()
		for h, t := range w.pendingTasks {
			if t.block.NumberU64()+staleThreshold <= number {
				delete(w.pendingTasks, h)
			}
		}
		w.pendingMu.Unlock()
	}
341 342 343 344

	for {
		select {
		case <-w.startCh:
345
			clearPending(w.chain.CurrentBlock().NumberU64())
346
			timestamp = time.Now().Unix()
347
			commit(false, commitInterruptNewHead)
348

349 350
		case head := <-w.chainHeadCh:
			clearPending(head.Block.NumberU64())
351
			timestamp = time.Now().Unix()
352
			commit(false, commitInterruptNewHead)
353 354 355 356 357

		case <-timer.C:
			// If mining is running resubmit a new work cycle periodically to pull in
			// higher priced transactions. Disable this overhead for pending blocks.
			if w.isRunning() && (w.config.Clique == nil || w.config.Clique.Period > 0) {
358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392
				// Short circuit if no new transaction arrives.
				if atomic.LoadInt32(&w.newTxs) == 0 {
					timer.Reset(recommit)
					continue
				}
				commit(true, commitInterruptResubmit)
			}

		case interval := <-w.resubmitIntervalCh:
			// Adjust resubmit interval explicitly by user.
			if interval < minRecommitInterval {
				log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
				interval = minRecommitInterval
			}
			log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
			minRecommit, recommit = interval, interval

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
			}

		case adjust := <-w.resubmitAdjustCh:
			// Adjust resubmit interval by feedback.
			if adjust.inc {
				before := recommit
				recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
				log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
			} else {
				before := recommit
				recalcRecommit(float64(minRecommit.Nanoseconds()), false)
				log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
			}

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
393 394 395 396 397 398 399 400
			}

		case <-w.exitCh:
			return
		}
	}
}

401 402 403 404 405
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
	defer w.txsSub.Unsubscribe()
	defer w.chainHeadSub.Unsubscribe()
	defer w.chainSideSub.Unsubscribe()
406 407 408

	for {
		select {
409
		case req := <-w.newWorkCh:
410
			w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
411 412

		case ev := <-w.chainSideCh:
413 414 415 416 417
			// Short circuit for duplicate side blocks
			if _, exist := w.localUncles[ev.Block.Hash()]; exist {
				continue
			}
			if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
418 419
				continue
			}
420 421 422 423 424 425
			// Add side block to possible uncle block set depending on the author.
			if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
				w.localUncles[ev.Block.Hash()] = ev.Block
			} else {
				w.remoteUncles[ev.Block.Hash()] = ev.Block
			}
426 427 428 429 430 431 432 433 434 435 436
			// If our mining block contains less than 2 uncle blocks,
			// add the new uncle block if valid and regenerate a mining block.
			if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
				start := time.Now()
				if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
					var uncles []*types.Header
					w.current.uncles.Each(func(item interface{}) bool {
						hash, ok := item.(common.Hash)
						if !ok {
							return false
						}
437 438 439 440
						uncle, exist := w.localUncles[hash]
						if !exist {
							uncle, exist = w.remoteUncles[hash]
						}
441 442 443 444
						if !exist {
							return false
						}
						uncles = append(uncles, uncle.Header())
445
						return false
446 447 448 449
					})
					w.commit(uncles, nil, true, start)
				}
			}
450 451

		case ev := <-w.txsCh:
452 453 454 455 456
			// Apply transactions to the pending state if we're not mining.
			//
			// Note all transactions received may not be continuous with transactions
			// already included in the current mining block. These transactions will
			// be automatically eliminated.
457
			if !w.isRunning() && w.current != nil {
458
				w.mu.RLock()
459
				coinbase := w.coinbase
460
				w.mu.RUnlock()
461

462 463
				txs := make(map[common.Address]types.Transactions)
				for _, tx := range ev.Txs {
464
					acc, _ := types.Sender(w.current.signer, tx)
465 466
					txs[acc] = append(txs[acc], tx)
				}
467
				txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
468
				w.commitTransactions(txset, coinbase, nil)
469
				w.updateSnapshot()
470 471
			} else {
				// If we're mining, but nothing is being processed, wake on new transactions
472
				if w.config.Clique != nil && w.config.Clique.Period == 0 {
473
					w.commitNewWork(nil, false, time.Now().Unix())
474
				}
O
obscuren 已提交
475
			}
476
			atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
477 478

		// System stopped
479
		case <-w.exitCh:
480
			return
481
		case <-w.txsSub.Err():
482
			return
483 484 485
		case <-w.chainHeadSub.Err():
			return
		case <-w.chainSideSub.Err():
486
			return
O
obscuren 已提交
487 488 489 490
		}
	}
}

491 492 493
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
494 495 496 497
	var (
		stopCh chan struct{}
		prev   common.Hash
	)
498 499 500 501 502 503 504 505

	// interrupt aborts the in-flight sealing task.
	interrupt := func() {
		if stopCh != nil {
			close(stopCh)
			stopCh = nil
		}
	}
O
obscuren 已提交
506
	for {
507 508 509 510 511
		select {
		case task := <-w.taskCh:
			if w.newTaskHook != nil {
				w.newTaskHook(task)
			}
512
			// Reject duplicate sealing work due to resubmitting.
513 514
			sealHash := w.engine.SealHash(task.block.Header())
			if sealHash == prev {
515 516
				continue
			}
517
			// Interrupt previous sealing operation
518
			interrupt()
519 520 521 522 523 524 525 526 527 528 529 530
			stopCh, prev = make(chan struct{}), sealHash

			if w.skipSealHook != nil && w.skipSealHook(task) {
				continue
			}
			w.pendingMu.Lock()
			w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
			w.pendingMu.Unlock()

			if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
				log.Warn("Block sealing failed", "err", err)
			}
531 532 533 534 535 536
		case <-w.exitCh:
			interrupt()
			return
		}
	}
}
O
obscuren 已提交
537

538 539 540 541 542
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
	for {
		select {
543
		case block := <-w.resultCh:
544
			// Short circuit when receiving empty result.
545
			if block == nil {
O
obscuren 已提交
546 547
				continue
			}
548 549 550 551
			// Short circuit when receiving duplicate result caused by resubmitting.
			if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
				continue
			}
552 553 554 555 556 557 558 559 560 561
			var (
				sealhash = w.engine.SealHash(block.Header())
				hash     = block.Hash()
			)
			w.pendingMu.RLock()
			task, exist := w.pendingTasks[sealhash]
			w.pendingMu.RUnlock()
			if !exist {
				log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
				continue
562
			}
563 564 565 566 567 568 569 570 571 572 573 574 575 576
			// Different block could share same sealhash, deep copy here to prevent write-write conflict.
			var (
				receipts = make([]*types.Receipt, len(task.receipts))
				logs     []*types.Log
			)
			for i, receipt := range task.receipts {
				receipts[i] = new(types.Receipt)
				*receipts[i] = *receipt
				// Update the block hash in all logs since it is now available and not when the
				// receipt/log of individual transactions were created.
				for _, log := range receipt.Logs {
					log.BlockHash = hash
				}
				logs = append(logs, receipt.Logs...)
577
			}
578
			// Commit block and state to database.
579
			stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
580 581 582 583
			if err != nil {
				log.Error("Failed writing block to chain", "err", err)
				continue
			}
584 585 586
			log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
				"elapsed", common.PrettyDuration(time.Since(task.createdAt)))

587
			// Broadcast the block and announce chain insertion event
588
			w.mux.Post(core.NewMinedBlockEvent{Block: block})
589 590

			var events []interface{}
591 592 593
			switch stat {
			case core.CanonStatTy:
				events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
594
				events = append(events, core.ChainHeadEvent{Block: block})
595 596
			case core.SideStatTy:
				events = append(events, core.ChainSideEvent{Block: block})
597
			}
598
			w.chain.PostChainEvents(events, logs)
599

600 601
			// Insert the block into the set of pending ones to resultLoop for confirmations
			w.unconfirmed.Insert(block.NumberU64(), block.Hash())
O
obscuren 已提交
602

603 604 605
		case <-w.exitCh:
			return
		}
O
obscuren 已提交
606 607 608
	}
}

F
Felix Lange 已提交
609
// makeCurrent creates a new environment for the current cycle.
610 611
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
	state, err := w.chain.StateAt(parent.Root())
612 613 614
	if err != nil {
		return err
	}
615
	env := &environment{
616
		signer:    types.NewEIP155Signer(w.config.ChainID),
F
Felix Lange 已提交
617
		state:     state,
618 619 620
		ancestors: mapset.NewSet(),
		family:    mapset.NewSet(),
		uncles:    mapset.NewSet(),
F
Felix Lange 已提交
621
		header:    header,
622
	}
623

Z
zelig 已提交
624
	// when 08 is processed ancestors contain 07 (quick block)
625
	for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
626
		for _, uncle := range ancestor.Uncles() {
627
			env.family.Add(uncle.Hash())
628
		}
629 630
		env.family.Add(ancestor.Hash())
		env.ancestors.Add(ancestor.Hash())
O
obscuren 已提交
631
	}
E
Egon Elbre 已提交
632

633
	// Keep track of transactions which return errors so they can be removed
634
	env.tcount = 0
635
	w.current = env
636
	return nil
O
obscuren 已提交
637 638
}

639
// commitUncle adds the given block to uncle block set, returns error if failed to add.
640
func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
641 642
	hash := uncle.Hash()
	if env.uncles.Contains(hash) {
643 644 645 646
		return errors.New("uncle not unique")
	}
	if env.header.ParentHash == uncle.ParentHash {
		return errors.New("uncle is sibling")
647 648
	}
	if !env.ancestors.Contains(uncle.ParentHash) {
649
		return errors.New("uncle's parent unknown")
650 651
	}
	if env.family.Contains(hash) {
652
		return errors.New("uncle already included")
653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669
	}
	env.uncles.Add(uncle.Hash())
	return nil
}

// updateSnapshot updates pending snapshot block and state.
// Note this function assumes the current variable is thread safe.
func (w *worker) updateSnapshot() {
	w.snapshotMu.Lock()
	defer w.snapshotMu.Unlock()

	var uncles []*types.Header
	w.current.uncles.Each(func(item interface{}) bool {
		hash, ok := item.(common.Hash)
		if !ok {
			return false
		}
670 671 672 673
		uncle, exist := w.localUncles[hash]
		if !exist {
			uncle, exist = w.remoteUncles[hash]
		}
674 675 676 677
		if !exist {
			return false
		}
		uncles = append(uncles, uncle.Header())
678
		return false
679 680 681 682 683 684 685 686 687 688 689 690
	})

	w.snapshotBlock = types.NewBlock(
		w.current.header,
		w.current.txs,
		uncles,
		w.current.receipts,
	)

	w.snapshotState = w.current.state.Copy()
}

691 692 693
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
	snap := w.current.state.Snapshot()

694
	receipt, _, err := core.ApplyTransaction(w.config, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724
	if err != nil {
		w.current.state.RevertToSnapshot(snap)
		return nil, err
	}
	w.current.txs = append(w.current.txs, tx)
	w.current.receipts = append(w.current.receipts, receipt)

	return receipt.Logs, nil
}

func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
	// Short circuit if current is nil
	if w.current == nil {
		return true
	}

	if w.current.gasPool == nil {
		w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
	}

	var coalescedLogs []*types.Log

	for {
		// In the following three cases, we will interrupt the execution of the transaction.
		// (1) new head block event arrival, the interrupt signal is 1
		// (2) worker start or restart, the interrupt signal is 1
		// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
		// For the first two cases, the semi-finished work will be discarded.
		// For the third case, the semi-finished work will be submitted to the consensus engine.
		if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
725 726 727 728 729 730 731 732 733 734 735
			// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
			if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
				ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
				if ratio < 0.1 {
					ratio = 0.1
				}
				w.resubmitAdjustCh <- &intervalAdjust{
					ratio: ratio,
					inc:   true,
				}
			}
736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809
			return atomic.LoadInt32(interrupt) == commitInterruptNewHead
		}
		// If we don't have enough gas for any further transactions then we're done
		if w.current.gasPool.Gas() < params.TxGas {
			log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
			break
		}
		// Retrieve the next transaction and abort if all done
		tx := txs.Peek()
		if tx == nil {
			break
		}
		// Error may be ignored here. The error has already been checked
		// during transaction acceptance is the transaction pool.
		//
		// We use the eip155 signer regardless of the current hf.
		from, _ := types.Sender(w.current.signer, tx)
		// Check whether the tx is replay protected. If we're not in the EIP155 hf
		// phase, start ignoring the sender until we do.
		if tx.Protected() && !w.config.IsEIP155(w.current.header.Number) {
			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.config.EIP155Block)

			txs.Pop()
			continue
		}
		// Start executing the transaction
		w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)

		logs, err := w.commitTransaction(tx, coinbase)
		switch err {
		case core.ErrGasLimitReached:
			// Pop the current out-of-gas transaction without shifting in the next from the account
			log.Trace("Gas limit exceeded for current block", "sender", from)
			txs.Pop()

		case core.ErrNonceTooLow:
			// New head notification data race between the transaction pool and miner, shift
			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
			txs.Shift()

		case core.ErrNonceTooHigh:
			// Reorg notification data race between the transaction pool and miner, skip account =
			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
			txs.Pop()

		case nil:
			// Everything ok, collect the logs and shift in the next transaction from the same account
			coalescedLogs = append(coalescedLogs, logs...)
			w.current.tcount++
			txs.Shift()

		default:
			// Strange error, discard the transaction and get the next in line (note, the
			// nonce-too-high clause will prevent us from executing in vain).
			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
			txs.Shift()
		}
	}

	if !w.isRunning() && len(coalescedLogs) > 0 {
		// We don't push the pendingLogsEvent while we are mining. The reason is that
		// when we are mining, the worker will regenerate a mining block every 3 seconds.
		// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.

		// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
		// logs by filling in the block hash when the block was mined by the local miner. This can
		// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
		cpy := make([]*types.Log, len(coalescedLogs))
		for i, l := range coalescedLogs {
			cpy[i] = new(types.Log)
			*cpy[i] = *l
		}
		go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
	}
810 811 812 813 814
	// Notify resubmit loop to decrease resubmitting interval if current interval is larger
	// than the user-specified one.
	if interrupt != nil {
		w.resubmitAdjustCh <- &intervalAdjust{inc: false}
	}
815 816 817
	return false
}

818
// commitNewWork generates several new sealing tasks based on the parent block.
819
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
820 821
	w.mu.RLock()
	defer w.mu.RUnlock()
O
obscuren 已提交
822

823
	tstart := time.Now()
824
	parent := w.chain.CurrentBlock()
825

826 827
	if parent.Time().Cmp(new(big.Int).SetInt64(timestamp)) >= 0 {
		timestamp = parent.Time().Int64() + 1
F
Felix Lange 已提交
828
	}
829
	// this will ensure we're not going off too far in the future
830 831
	if now := time.Now().Unix(); timestamp > now+1 {
		wait := time.Duration(timestamp-now) * time.Second
832
		log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
833 834 835
		time.Sleep(wait)
	}

F
Felix Lange 已提交
836 837 838 839
	num := parent.Number()
	header := &types.Header{
		ParentHash: parent.Hash(),
		Number:     num.Add(num, common.Big1),
840
		GasLimit:   core.CalcGasLimit(parent, w.gasFloor, w.gasCeil),
841
		Extra:      w.extra,
842
		Time:       big.NewInt(timestamp),
F
Felix Lange 已提交
843
	}
844
	// Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
845 846
	if w.isRunning() {
		if w.coinbase == (common.Address{}) {
847 848 849
			log.Error("Refusing to mine without etherbase")
			return
		}
850
		header.Coinbase = w.coinbase
851
	}
852
	if err := w.engine.Prepare(w.chain, header); err != nil {
853 854 855
		log.Error("Failed to prepare header for mining", "err", err)
		return
	}
856
	// If we are care about TheDAO hard-fork check whether to override the extra-data or not
857
	if daoBlock := w.config.DAOForkBlock; daoBlock != nil {
858 859
		// Check whether the block is among the fork extra-override range
		limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
860
		if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
861
			// Depending whether we support or oppose the fork, override differently
862
			if w.config.DAOForkSupport {
863
				header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
864
			} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
865 866
				header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
			}
867 868
		}
	}
869
	// Could potentially happen if starting to mine in an odd state.
870
	err := w.makeCurrent(parent, header)
871
	if err != nil {
872
		log.Error("Failed to create mining context", "err", err)
873 874
		return
	}
875
	// Create the current work task and check any fork transitions needed
876 877
	env := w.current
	if w.config.DAOForkSupport && w.config.DAOForkBlock != nil && w.config.DAOForkBlock.Cmp(header.Number) == 0 {
878
		misc.ApplyDAOHardFork(env.state)
879
	}
880 881
	// Accumulate the uncles for the current block
	uncles := make([]*types.Header, 0, 2)
882 883 884 885 886 887
	commitUncles := func(blocks map[common.Hash]*types.Block) {
		// Clean up stale uncle blocks first
		for hash, uncle := range blocks {
			if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
				delete(blocks, hash)
			}
O
obscuren 已提交
888
		}
889 890 891 892 893 894 895 896 897 898
		for hash, uncle := range blocks {
			if len(uncles) == 2 {
				break
			}
			if err := w.commitUncle(env, uncle.Header()); err != nil {
				log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
			} else {
				log.Debug("Committing new uncle to block", "hash", hash)
				uncles = append(uncles, uncle.Header())
			}
O
obscuren 已提交
899 900
		}
	}
901 902 903
	// Prefer to locally generated uncle
	commitUncles(w.localUncles)
	commitUncles(w.remoteUncles)
904

905 906 907 908 909
	if !noempty {
		// Create an empty block based on temporary copied state for sealing in advance without waiting block
		// execution finished.
		w.commit(uncles, nil, false, tstart)
	}
910 911

	// Fill the block with all available pending transactions.
912
	pending, err := w.eth.TxPool().Pending()
913 914 915 916
	if err != nil {
		log.Error("Failed to fetch pending transactions", "err", err)
		return
	}
917 918 919 920 921
	// Short circuit if there is no available pending transactions
	if len(pending) == 0 {
		w.updateSnapshot()
		return
	}
922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940
	// Split the pending transactions into locals and remotes
	localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
	for _, account := range w.eth.TxPool().Locals() {
		if txs := remoteTxs[account]; len(txs) > 0 {
			delete(remoteTxs, account)
			localTxs[account] = txs
		}
	}
	if len(localTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
	}
	if len(remoteTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
941
	}
942 943 944 945 946 947
	w.commit(uncles, w.fullTaskHook, true, tstart)
}

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
948
	// Deep copy receipts here to avoid interaction between different tasks.
949 950 951 952 953 954 955 956 957
	receipts := make([]*types.Receipt, len(w.current.receipts))
	for i, l := range w.current.receipts {
		receipts[i] = new(types.Receipt)
		*receipts[i] = *l
	}
	s := w.current.state.Copy()
	block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
	if err != nil {
		return err
958
	}
959
	if w.isRunning() {
960 961
		if interval != nil {
			interval()
962
		}
963
		select {
964 965
		case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
			w.unconfirmed.Shift(block.NumberU64() - 1)
966 967

			feesWei := new(big.Int)
968 969
			for i, tx := range block.Transactions() {
				feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
970 971 972
			}
			feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))

973 974
			log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
				"uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
975

976 977
		case <-w.exitCh:
			log.Info("Worker has exited")
978
		}
O
obscuren 已提交
979
	}
980 981 982 983
	if update {
		w.updateSnapshot()
	}
	return nil
O
obscuren 已提交
984
}