worker.go 32.6 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17 18 19
package miner

import (
20
	"bytes"
21
	"errors"
O
obscuren 已提交
22
	"math/big"
O
obscuren 已提交
23
	"sync"
O
obscuren 已提交
24
	"sync/atomic"
25
	"time"
O
obscuren 已提交
26

27
	mapset "github.com/deckarep/golang-set"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/common"
29 30
	"github.com/ethereum/go-ethereum/consensus"
	"github.com/ethereum/go-ethereum/consensus/misc"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/core"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33 34
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/event"
35
	"github.com/ethereum/go-ethereum/log"
36
	"github.com/ethereum/go-ethereum/params"
O
obscuren 已提交
37 38
)

J
Jeffrey Wilcke 已提交
39
const (
40 41
	// resultQueueSize is the size of channel listening to sealing result.
	resultQueueSize = 10
42

43
	// txChanSize is the size of channel listening to NewTxsEvent.
44 45
	// The number is referenced from the size of tx pool.
	txChanSize = 4096
46

47 48
	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
	chainHeadChanSize = 10
49

50 51
	// chainSideChanSize is the size of channel listening to ChainSideEvent.
	chainSideChanSize = 10
52

53 54 55
	// resubmitAdjustChanSize is the size of resubmitting interval adjustment channel.
	resubmitAdjustChanSize = 10

56
	// miningLogAtDepth is the number of confirmations before logging successful mining.
57
	miningLogAtDepth = 7
58

59 60 61 62 63
	// minRecommitInterval is the minimal time interval to recreate the mining block with
	// any newly arrived transactions.
	minRecommitInterval = 1 * time.Second

	// maxRecommitInterval is the maximum time interval to recreate the mining block with
64
	// any newly arrived transactions.
65 66 67 68 69 70 71 72 73
	maxRecommitInterval = 15 * time.Second

	// intervalAdjustRatio is the impact a single interval adjustment has on sealing work
	// resubmitting interval.
	intervalAdjustRatio = 0.1

	// intervalAdjustBias is applied during the new resubmit interval calculation in favor of
	// increasing upper limit or decreasing lower limit so that the limit can be reachable.
	intervalAdjustBias = 200 * 1000.0 * 1000.0
74

75
	// staleThreshold is the maximum depth of the acceptable stale block.
76
	staleThreshold = 7
J
Jeffrey Wilcke 已提交
77
)
78

79 80
// environment is the worker's current environment and holds all of the current state information.
type environment struct {
J
Jeffrey Wilcke 已提交
81 82
	signer types.Signer

83
	state     *state.StateDB // apply state changes here
84 85 86
	ancestors mapset.Set     // ancestor set (used for checking uncle parent validity)
	family    mapset.Set     // family set (used for checking uncle invalidity)
	uncles    mapset.Set     // uncle set
87
	tcount    int            // tx count in cycle
88
	gasPool   *core.GasPool  // available gas used to pack transactions
O
obscuren 已提交
89

F
Felix Lange 已提交
90 91 92
	header   *types.Header
	txs      []*types.Transaction
	receipts []*types.Receipt
93
}
J
Jeffrey Wilcke 已提交
94

95 96 97 98 99 100 101 102
// task contains all information for consensus engine sealing and result submitting.
type task struct {
	receipts  []*types.Receipt
	state     *state.StateDB
	block     *types.Block
	createdAt time.Time
}

103 104 105 106 107 108
const (
	commitInterruptNone int32 = iota
	commitInterruptNewHead
	commitInterruptResubmit
)

109
// newWorkReq represents a request for new sealing work submitting with relative interrupt notifier.
110 111 112
type newWorkReq struct {
	interrupt *int32
	noempty   bool
113
	timestamp int64
114 115
}

116 117 118 119 120 121
// intervalAdjust represents a resubmitting interval adjustment.
type intervalAdjust struct {
	ratio float64
	inc   bool
}

122 123
// worker is the main object which takes care of submitting new work to consensus engine
// and gathering the sealing result.
O
obscuren 已提交
124
type worker struct {
125 126 127 128 129
	config      *Config
	chainConfig *params.ChainConfig
	engine      consensus.Engine
	eth         Backend
	chain       *core.BlockChain
130

131
	// Subscriptions
132
	mux          *event.TypeMux
133
	txsCh        chan core.NewTxsEvent
134
	txsSub       event.Subscription
135 136 137 138
	chainHeadCh  chan core.ChainHeadEvent
	chainHeadSub event.Subscription
	chainSideCh  chan core.ChainSideEvent
	chainSideSub event.Subscription
139

140
	// Channels
141 142
	newWorkCh          chan *newWorkReq
	taskCh             chan *task
143
	resultCh           chan *types.Block
144 145 146 147
	startCh            chan struct{}
	exitCh             chan struct{}
	resubmitIntervalCh chan time.Duration
	resubmitAdjustCh   chan *intervalAdjust
O
obscuren 已提交
148

149 150 151 152
	current      *environment                 // An environment for current running cycle.
	localUncles  map[common.Hash]*types.Block // A set of side blocks generated locally as the possible uncle blocks.
	remoteUncles map[common.Hash]*types.Block // A set of side blocks as the possible uncle blocks.
	unconfirmed  *unconfirmedBlocks           // A set of locally mined blocks pending canonicalness confirmations.
153

154
	mu       sync.RWMutex // The lock used to protect the coinbase and extra fields
O
obscuren 已提交
155
	coinbase common.Address
156
	extra    []byte
O
obscuren 已提交
157

158 159 160
	pendingMu    sync.RWMutex
	pendingTasks map[common.Hash]*task

161
	snapshotMu    sync.RWMutex // The lock used to protect the block snapshot and state snapshot
162 163 164
	snapshotBlock *types.Block
	snapshotState *state.StateDB

F
Felix Lange 已提交
165
	// atomic status counters
166
	running int32 // The indicator whether the consensus engine is running or not.
167
	newTxs  int32 // New arrival transaction count since last sealing work submitting.
168

169 170 171
	// External functions
	isLocalBlock func(block *types.Block) bool // Function used to determine whether the specified block is mined by local miner.

172
	// Test hooks
173 174 175 176
	newTaskHook  func(*task)                        // Method to call upon receiving a new sealing task.
	skipSealHook func(*task) bool                   // Method to decide whether skipping the sealing.
	fullTaskHook func()                             // Method to call before pushing the full sealing task.
	resubmitHook func(time.Duration, time.Duration) // Method to call upon updating resubmitting interval.
O
obscuren 已提交
177 178
}

179
func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus.Engine, eth Backend, mux *event.TypeMux, isLocalBlock func(*types.Block) bool) *worker {
O
obscuren 已提交
180
	worker := &worker{
181
		config:             config,
182
		chainConfig:        chainConfig,
183 184 185 186
		engine:             engine,
		eth:                eth,
		mux:                mux,
		chain:              eth.BlockChain(),
187 188 189
		isLocalBlock:       isLocalBlock,
		localUncles:        make(map[common.Hash]*types.Block),
		remoteUncles:       make(map[common.Hash]*types.Block),
190
		unconfirmed:        newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
191
		pendingTasks:       make(map[common.Hash]*task),
192 193 194 195 196
		txsCh:              make(chan core.NewTxsEvent, txChanSize),
		chainHeadCh:        make(chan core.ChainHeadEvent, chainHeadChanSize),
		chainSideCh:        make(chan core.ChainSideEvent, chainSideChanSize),
		newWorkCh:          make(chan *newWorkReq),
		taskCh:             make(chan *task),
197
		resultCh:           make(chan *types.Block, resultQueueSize),
198 199 200 201
		exitCh:             make(chan struct{}),
		startCh:            make(chan struct{}, 1),
		resubmitIntervalCh: make(chan time.Duration),
		resubmitAdjustCh:   make(chan *intervalAdjust, resubmitAdjustChanSize),
O
obscuren 已提交
202
	}
203 204
	// Subscribe NewTxsEvent for tx pool
	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
205 206 207
	// Subscribe events for blockchain
	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
O
obscuren 已提交
208

209
	// Sanitize recommit interval if the user-specified one is too short.
210
	recommit := worker.config.Recommit
211 212 213 214 215
	if recommit < minRecommitInterval {
		log.Warn("Sanitizing miner recommit interval", "provided", recommit, "updated", minRecommitInterval)
		recommit = minRecommitInterval
	}

216
	go worker.mainLoop()
217
	go worker.newWorkLoop(recommit)
218 219
	go worker.resultLoop()
	go worker.taskLoop()
O
obscuren 已提交
220

221
	// Submit first work to initialize pending state.
222 223
	worker.startCh <- struct{}{}

O
obscuren 已提交
224
	return worker
O
obscuren 已提交
225 226
}

227 228 229 230 231
// setEtherbase sets the etherbase used to initialize the block coinbase field.
func (w *worker) setEtherbase(addr common.Address) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.coinbase = addr
J
Jeffrey Wilcke 已提交
232 233
}

234 235 236 237 238
// setExtra sets the content used to initialize the block extra field.
func (w *worker) setExtra(extra []byte) {
	w.mu.Lock()
	defer w.mu.Unlock()
	w.extra = extra
239 240
}

241 242 243 244 245
// setRecommitInterval updates the interval for miner sealing work recommitting.
func (w *worker) setRecommitInterval(interval time.Duration) {
	w.resubmitIntervalCh <- interval
}

246 247
// pending returns the pending state and corresponding block.
func (w *worker) pending() (*types.Block, *state.StateDB) {
248
	// return a snapshot to avoid contention on currentMu mutex
249 250 251 252 253 254
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	if w.snapshotState == nil {
		return nil, nil
	}
	return w.snapshotBlock, w.snapshotState.Copy()
255
}
256

257 258
// pendingBlock returns pending block.
func (w *worker) pendingBlock() *types.Block {
259
	// return a snapshot to avoid contention on currentMu mutex
260 261 262
	w.snapshotMu.RLock()
	defer w.snapshotMu.RUnlock()
	return w.snapshotBlock
263 264
}

265 266 267
// start sets the running status as 1 and triggers new work submitting.
func (w *worker) start() {
	atomic.StoreInt32(&w.running, 1)
268
	w.startCh <- struct{}{}
O
obscuren 已提交
269 270
}

271 272 273
// stop sets the running status as 0.
func (w *worker) stop() {
	atomic.StoreInt32(&w.running, 0)
O
obscuren 已提交
274 275
}

276 277 278
// isRunning returns an indicator whether worker is running or not.
func (w *worker) isRunning() bool {
	return atomic.LoadInt32(&w.running) == 1
279 280
}

281
// close terminates all background threads maintained by the worker.
282 283 284
// Note the worker does not support being closed multiple times.
func (w *worker) close() {
	close(w.exitCh)
O
obscuren 已提交
285 286
}

287
// newWorkLoop is a standalone goroutine to submit new mining work upon received events.
288 289 290 291
func (w *worker) newWorkLoop(recommit time.Duration) {
	var (
		interrupt   *int32
		minRecommit = recommit // minimal resubmit interval specified by user.
292
		timestamp   int64      // timestamp for each round of mining.
293
	)
294 295 296 297

	timer := time.NewTimer(0)
	<-timer.C // discard the initial tick

298 299
	// commit aborts in-flight transaction execution with given signal and resubmits a new one.
	commit := func(noempty bool, s int32) {
300 301 302 303
		if interrupt != nil {
			atomic.StoreInt32(interrupt, s)
		}
		interrupt = new(int32)
304
		w.newWorkCh <- &newWorkReq{interrupt: interrupt, noempty: noempty, timestamp: timestamp}
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327
		timer.Reset(recommit)
		atomic.StoreInt32(&w.newTxs, 0)
	}
	// recalcRecommit recalculates the resubmitting interval upon feedback.
	recalcRecommit := func(target float64, inc bool) {
		var (
			prev = float64(recommit.Nanoseconds())
			next float64
		)
		if inc {
			next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target+intervalAdjustBias)
			// Recap if interval is larger than the maximum time interval
			if next > float64(maxRecommitInterval.Nanoseconds()) {
				next = float64(maxRecommitInterval.Nanoseconds())
			}
		} else {
			next = prev*(1-intervalAdjustRatio) + intervalAdjustRatio*(target-intervalAdjustBias)
			// Recap if interval is less than the user specified minimum
			if next < float64(minRecommit.Nanoseconds()) {
				next = float64(minRecommit.Nanoseconds())
			}
		}
		recommit = time.Duration(int64(next))
328
	}
329 330 331 332 333 334 335 336 337 338
	// clearPending cleans the stale pending tasks.
	clearPending := func(number uint64) {
		w.pendingMu.Lock()
		for h, t := range w.pendingTasks {
			if t.block.NumberU64()+staleThreshold <= number {
				delete(w.pendingTasks, h)
			}
		}
		w.pendingMu.Unlock()
	}
339 340 341 342

	for {
		select {
		case <-w.startCh:
343
			clearPending(w.chain.CurrentBlock().NumberU64())
344
			timestamp = time.Now().Unix()
345
			commit(false, commitInterruptNewHead)
346

347 348
		case head := <-w.chainHeadCh:
			clearPending(head.Block.NumberU64())
349
			timestamp = time.Now().Unix()
350
			commit(false, commitInterruptNewHead)
351 352 353 354

		case <-timer.C:
			// If mining is running resubmit a new work cycle periodically to pull in
			// higher priced transactions. Disable this overhead for pending blocks.
355
			if w.isRunning() && (w.chainConfig.Clique == nil || w.chainConfig.Clique.Period > 0) {
356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
				// Short circuit if no new transaction arrives.
				if atomic.LoadInt32(&w.newTxs) == 0 {
					timer.Reset(recommit)
					continue
				}
				commit(true, commitInterruptResubmit)
			}

		case interval := <-w.resubmitIntervalCh:
			// Adjust resubmit interval explicitly by user.
			if interval < minRecommitInterval {
				log.Warn("Sanitizing miner recommit interval", "provided", interval, "updated", minRecommitInterval)
				interval = minRecommitInterval
			}
			log.Info("Miner recommit interval update", "from", minRecommit, "to", interval)
			minRecommit, recommit = interval, interval

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
			}

		case adjust := <-w.resubmitAdjustCh:
			// Adjust resubmit interval by feedback.
			if adjust.inc {
				before := recommit
				recalcRecommit(float64(recommit.Nanoseconds())/adjust.ratio, true)
				log.Trace("Increase miner recommit interval", "from", before, "to", recommit)
			} else {
				before := recommit
				recalcRecommit(float64(minRecommit.Nanoseconds()), false)
				log.Trace("Decrease miner recommit interval", "from", before, "to", recommit)
			}

			if w.resubmitHook != nil {
				w.resubmitHook(minRecommit, recommit)
391 392 393 394 395 396 397 398
			}

		case <-w.exitCh:
			return
		}
	}
}

399 400 401 402 403
// mainLoop is a standalone goroutine to regenerate the sealing task based on the received event.
func (w *worker) mainLoop() {
	defer w.txsSub.Unsubscribe()
	defer w.chainHeadSub.Unsubscribe()
	defer w.chainSideSub.Unsubscribe()
404 405 406

	for {
		select {
407
		case req := <-w.newWorkCh:
408
			w.commitNewWork(req.interrupt, req.noempty, req.timestamp)
409 410

		case ev := <-w.chainSideCh:
411 412 413 414 415
			// Short circuit for duplicate side blocks
			if _, exist := w.localUncles[ev.Block.Hash()]; exist {
				continue
			}
			if _, exist := w.remoteUncles[ev.Block.Hash()]; exist {
416 417
				continue
			}
418 419 420 421 422 423
			// Add side block to possible uncle block set depending on the author.
			if w.isLocalBlock != nil && w.isLocalBlock(ev.Block) {
				w.localUncles[ev.Block.Hash()] = ev.Block
			} else {
				w.remoteUncles[ev.Block.Hash()] = ev.Block
			}
424 425 426 427 428 429 430 431 432 433 434
			// If our mining block contains less than 2 uncle blocks,
			// add the new uncle block if valid and regenerate a mining block.
			if w.isRunning() && w.current != nil && w.current.uncles.Cardinality() < 2 {
				start := time.Now()
				if err := w.commitUncle(w.current, ev.Block.Header()); err == nil {
					var uncles []*types.Header
					w.current.uncles.Each(func(item interface{}) bool {
						hash, ok := item.(common.Hash)
						if !ok {
							return false
						}
435 436 437 438
						uncle, exist := w.localUncles[hash]
						if !exist {
							uncle, exist = w.remoteUncles[hash]
						}
439 440 441 442
						if !exist {
							return false
						}
						uncles = append(uncles, uncle.Header())
443
						return false
444 445 446 447
					})
					w.commit(uncles, nil, true, start)
				}
			}
448 449

		case ev := <-w.txsCh:
450 451 452 453 454
			// Apply transactions to the pending state if we're not mining.
			//
			// Note all transactions received may not be continuous with transactions
			// already included in the current mining block. These transactions will
			// be automatically eliminated.
455
			if !w.isRunning() && w.current != nil {
456 457 458 459
				// If block is already full, abort
				if gp := w.current.gasPool; gp != nil && gp.Gas() < params.TxGas {
					continue
				}
460
				w.mu.RLock()
461
				coinbase := w.coinbase
462
				w.mu.RUnlock()
463

464 465
				txs := make(map[common.Address]types.Transactions)
				for _, tx := range ev.Txs {
466
					acc, _ := types.Sender(w.current.signer, tx)
467 468
					txs[acc] = append(txs[acc], tx)
				}
469
				txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs)
470
				tcount := w.current.tcount
471
				w.commitTransactions(txset, coinbase, nil)
472 473 474 475 476
				// Only update the snapshot if any new transactons were added
				// to the pending block
				if tcount != w.current.tcount {
					w.updateSnapshot()
				}
477
			} else {
478 479 480 481
				// If clique is running in dev mode(period is 0), disable
				// advance sealing here.
				if w.chainConfig.Clique != nil && w.chainConfig.Clique.Period == 0 {
					w.commitNewWork(nil, true, time.Now().Unix())
482
				}
O
obscuren 已提交
483
			}
484
			atomic.AddInt32(&w.newTxs, int32(len(ev.Txs)))
485 486

		// System stopped
487
		case <-w.exitCh:
488
			return
489
		case <-w.txsSub.Err():
490
			return
491 492 493
		case <-w.chainHeadSub.Err():
			return
		case <-w.chainSideSub.Err():
494
			return
O
obscuren 已提交
495 496 497 498
		}
	}
}

499 500 501
// taskLoop is a standalone goroutine to fetch sealing task from the generator and
// push them to consensus engine.
func (w *worker) taskLoop() {
502 503 504 505
	var (
		stopCh chan struct{}
		prev   common.Hash
	)
506 507 508 509 510 511 512 513

	// interrupt aborts the in-flight sealing task.
	interrupt := func() {
		if stopCh != nil {
			close(stopCh)
			stopCh = nil
		}
	}
O
obscuren 已提交
514
	for {
515 516 517 518 519
		select {
		case task := <-w.taskCh:
			if w.newTaskHook != nil {
				w.newTaskHook(task)
			}
520
			// Reject duplicate sealing work due to resubmitting.
521 522
			sealHash := w.engine.SealHash(task.block.Header())
			if sealHash == prev {
523 524
				continue
			}
525
			// Interrupt previous sealing operation
526
			interrupt()
527 528 529 530 531 532 533 534 535 536 537 538
			stopCh, prev = make(chan struct{}), sealHash

			if w.skipSealHook != nil && w.skipSealHook(task) {
				continue
			}
			w.pendingMu.Lock()
			w.pendingTasks[w.engine.SealHash(task.block.Header())] = task
			w.pendingMu.Unlock()

			if err := w.engine.Seal(w.chain, task.block, w.resultCh, stopCh); err != nil {
				log.Warn("Block sealing failed", "err", err)
			}
539 540 541 542 543 544
		case <-w.exitCh:
			interrupt()
			return
		}
	}
}
O
obscuren 已提交
545

546 547 548 549 550
// resultLoop is a standalone goroutine to handle sealing result submitting
// and flush relative data to the database.
func (w *worker) resultLoop() {
	for {
		select {
551
		case block := <-w.resultCh:
552
			// Short circuit when receiving empty result.
553
			if block == nil {
O
obscuren 已提交
554 555
				continue
			}
556 557 558 559
			// Short circuit when receiving duplicate result caused by resubmitting.
			if w.chain.HasBlock(block.Hash(), block.NumberU64()) {
				continue
			}
560 561 562 563 564 565 566 567 568 569
			var (
				sealhash = w.engine.SealHash(block.Header())
				hash     = block.Hash()
			)
			w.pendingMu.RLock()
			task, exist := w.pendingTasks[sealhash]
			w.pendingMu.RUnlock()
			if !exist {
				log.Error("Block found but no relative pending task", "number", block.Number(), "sealhash", sealhash, "hash", hash)
				continue
570
			}
571 572 573 574 575 576
			// Different block could share same sealhash, deep copy here to prevent write-write conflict.
			var (
				receipts = make([]*types.Receipt, len(task.receipts))
				logs     []*types.Log
			)
			for i, receipt := range task.receipts {
577 578 579 580 581
				// add block location fields
				receipt.BlockHash = hash
				receipt.BlockNumber = block.Number()
				receipt.TransactionIndex = uint(i)

582 583 584 585 586 587 588 589
				receipts[i] = new(types.Receipt)
				*receipts[i] = *receipt
				// Update the block hash in all logs since it is now available and not when the
				// receipt/log of individual transactions were created.
				for _, log := range receipt.Logs {
					log.BlockHash = hash
				}
				logs = append(logs, receipt.Logs...)
590
			}
591
			// Commit block and state to database.
592
			stat, err := w.chain.WriteBlockWithState(block, receipts, task.state)
593 594 595 596
			if err != nil {
				log.Error("Failed writing block to chain", "err", err)
				continue
			}
597 598 599
			log.Info("Successfully sealed new block", "number", block.Number(), "sealhash", sealhash, "hash", hash,
				"elapsed", common.PrettyDuration(time.Since(task.createdAt)))

600
			// Broadcast the block and announce chain insertion event
601
			w.mux.Post(core.NewMinedBlockEvent{Block: block})
602 603

			var events []interface{}
604 605 606
			switch stat {
			case core.CanonStatTy:
				events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
607
				events = append(events, core.ChainHeadEvent{Block: block})
608 609
			case core.SideStatTy:
				events = append(events, core.ChainSideEvent{Block: block})
610
			}
611
			w.chain.PostChainEvents(events, logs)
612

613 614
			// Insert the block into the set of pending ones to resultLoop for confirmations
			w.unconfirmed.Insert(block.NumberU64(), block.Hash())
O
obscuren 已提交
615

616 617 618
		case <-w.exitCh:
			return
		}
O
obscuren 已提交
619 620 621
	}
}

F
Felix Lange 已提交
622
// makeCurrent creates a new environment for the current cycle.
623 624
func (w *worker) makeCurrent(parent *types.Block, header *types.Header) error {
	state, err := w.chain.StateAt(parent.Root())
625 626 627
	if err != nil {
		return err
	}
628
	env := &environment{
629
		signer:    types.NewEIP155Signer(w.chainConfig.ChainID),
F
Felix Lange 已提交
630
		state:     state,
631 632 633
		ancestors: mapset.NewSet(),
		family:    mapset.NewSet(),
		uncles:    mapset.NewSet(),
F
Felix Lange 已提交
634
		header:    header,
635
	}
636

Z
zelig 已提交
637
	// when 08 is processed ancestors contain 07 (quick block)
638
	for _, ancestor := range w.chain.GetBlocksFromHash(parent.Hash(), 7) {
639
		for _, uncle := range ancestor.Uncles() {
640
			env.family.Add(uncle.Hash())
641
		}
642 643
		env.family.Add(ancestor.Hash())
		env.ancestors.Add(ancestor.Hash())
O
obscuren 已提交
644
	}
E
Egon Elbre 已提交
645

646
	// Keep track of transactions which return errors so they can be removed
647
	env.tcount = 0
648
	w.current = env
649
	return nil
O
obscuren 已提交
650 651
}

652
// commitUncle adds the given block to uncle block set, returns error if failed to add.
653
func (w *worker) commitUncle(env *environment, uncle *types.Header) error {
654 655
	hash := uncle.Hash()
	if env.uncles.Contains(hash) {
656 657 658 659
		return errors.New("uncle not unique")
	}
	if env.header.ParentHash == uncle.ParentHash {
		return errors.New("uncle is sibling")
660 661
	}
	if !env.ancestors.Contains(uncle.ParentHash) {
662
		return errors.New("uncle's parent unknown")
663 664
	}
	if env.family.Contains(hash) {
665
		return errors.New("uncle already included")
666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682
	}
	env.uncles.Add(uncle.Hash())
	return nil
}

// updateSnapshot updates pending snapshot block and state.
// Note this function assumes the current variable is thread safe.
func (w *worker) updateSnapshot() {
	w.snapshotMu.Lock()
	defer w.snapshotMu.Unlock()

	var uncles []*types.Header
	w.current.uncles.Each(func(item interface{}) bool {
		hash, ok := item.(common.Hash)
		if !ok {
			return false
		}
683 684 685 686
		uncle, exist := w.localUncles[hash]
		if !exist {
			uncle, exist = w.remoteUncles[hash]
		}
687 688 689 690
		if !exist {
			return false
		}
		uncles = append(uncles, uncle.Header())
691
		return false
692 693 694 695 696 697 698 699 700 701 702 703
	})

	w.snapshotBlock = types.NewBlock(
		w.current.header,
		w.current.txs,
		uncles,
		w.current.receipts,
	)

	w.snapshotState = w.current.state.Copy()
}

704 705 706
func (w *worker) commitTransaction(tx *types.Transaction, coinbase common.Address) ([]*types.Log, error) {
	snap := w.current.state.Snapshot()

707
	receipt, err := core.ApplyTransaction(w.chainConfig, w.chain, &coinbase, w.current.gasPool, w.current.state, w.current.header, tx, &w.current.header.GasUsed, *w.chain.GetVMConfig())
708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737
	if err != nil {
		w.current.state.RevertToSnapshot(snap)
		return nil, err
	}
	w.current.txs = append(w.current.txs, tx)
	w.current.receipts = append(w.current.receipts, receipt)

	return receipt.Logs, nil
}

func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coinbase common.Address, interrupt *int32) bool {
	// Short circuit if current is nil
	if w.current == nil {
		return true
	}

	if w.current.gasPool == nil {
		w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
	}

	var coalescedLogs []*types.Log

	for {
		// In the following three cases, we will interrupt the execution of the transaction.
		// (1) new head block event arrival, the interrupt signal is 1
		// (2) worker start or restart, the interrupt signal is 1
		// (3) worker recreate the mining block with any newly arrived transactions, the interrupt signal is 2.
		// For the first two cases, the semi-finished work will be discarded.
		// For the third case, the semi-finished work will be submitted to the consensus engine.
		if interrupt != nil && atomic.LoadInt32(interrupt) != commitInterruptNone {
738 739 740 741 742 743 744 745 746 747 748
			// Notify resubmit loop to increase resubmitting interval due to too frequent commits.
			if atomic.LoadInt32(interrupt) == commitInterruptResubmit {
				ratio := float64(w.current.header.GasLimit-w.current.gasPool.Gas()) / float64(w.current.header.GasLimit)
				if ratio < 0.1 {
					ratio = 0.1
				}
				w.resubmitAdjustCh <- &intervalAdjust{
					ratio: ratio,
					inc:   true,
				}
			}
749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
			return atomic.LoadInt32(interrupt) == commitInterruptNewHead
		}
		// If we don't have enough gas for any further transactions then we're done
		if w.current.gasPool.Gas() < params.TxGas {
			log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas)
			break
		}
		// Retrieve the next transaction and abort if all done
		tx := txs.Peek()
		if tx == nil {
			break
		}
		// Error may be ignored here. The error has already been checked
		// during transaction acceptance is the transaction pool.
		//
		// We use the eip155 signer regardless of the current hf.
		from, _ := types.Sender(w.current.signer, tx)
		// Check whether the tx is replay protected. If we're not in the EIP155 hf
		// phase, start ignoring the sender until we do.
768 769
		if tx.Protected() && !w.chainConfig.IsEIP155(w.current.header.Number) {
			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", w.chainConfig.EIP155Block)
770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822

			txs.Pop()
			continue
		}
		// Start executing the transaction
		w.current.state.Prepare(tx.Hash(), common.Hash{}, w.current.tcount)

		logs, err := w.commitTransaction(tx, coinbase)
		switch err {
		case core.ErrGasLimitReached:
			// Pop the current out-of-gas transaction without shifting in the next from the account
			log.Trace("Gas limit exceeded for current block", "sender", from)
			txs.Pop()

		case core.ErrNonceTooLow:
			// New head notification data race between the transaction pool and miner, shift
			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
			txs.Shift()

		case core.ErrNonceTooHigh:
			// Reorg notification data race between the transaction pool and miner, skip account =
			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
			txs.Pop()

		case nil:
			// Everything ok, collect the logs and shift in the next transaction from the same account
			coalescedLogs = append(coalescedLogs, logs...)
			w.current.tcount++
			txs.Shift()

		default:
			// Strange error, discard the transaction and get the next in line (note, the
			// nonce-too-high clause will prevent us from executing in vain).
			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
			txs.Shift()
		}
	}

	if !w.isRunning() && len(coalescedLogs) > 0 {
		// We don't push the pendingLogsEvent while we are mining. The reason is that
		// when we are mining, the worker will regenerate a mining block every 3 seconds.
		// In order to avoid pushing the repeated pendingLog, we disable the pending log pushing.

		// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
		// logs by filling in the block hash when the block was mined by the local miner. This can
		// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
		cpy := make([]*types.Log, len(coalescedLogs))
		for i, l := range coalescedLogs {
			cpy[i] = new(types.Log)
			*cpy[i] = *l
		}
		go w.mux.Post(core.PendingLogsEvent{Logs: cpy})
	}
823 824 825 826 827
	// Notify resubmit loop to decrease resubmitting interval if current interval is larger
	// than the user-specified one.
	if interrupt != nil {
		w.resubmitAdjustCh <- &intervalAdjust{inc: false}
	}
828 829 830
	return false
}

831
// commitNewWork generates several new sealing tasks based on the parent block.
832
func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) {
833 834
	w.mu.RLock()
	defer w.mu.RUnlock()
O
obscuren 已提交
835

836
	tstart := time.Now()
837
	parent := w.chain.CurrentBlock()
838

839 840
	if parent.Time() >= uint64(timestamp) {
		timestamp = int64(parent.Time() + 1)
F
Felix Lange 已提交
841
	}
842
	// this will ensure we're not going off too far in the future
843 844
	if now := time.Now().Unix(); timestamp > now+1 {
		wait := time.Duration(timestamp-now) * time.Second
845
		log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
846 847 848
		time.Sleep(wait)
	}

F
Felix Lange 已提交
849 850 851 852
	num := parent.Number()
	header := &types.Header{
		ParentHash: parent.Hash(),
		Number:     num.Add(num, common.Big1),
853
		GasLimit:   core.CalcGasLimit(parent, w.config.GasFloor, w.config.GasCeil),
854
		Extra:      w.extra,
855
		Time:       uint64(timestamp),
F
Felix Lange 已提交
856
	}
857
	// Only set the coinbase if our consensus engine is running (avoid spurious block rewards)
858 859
	if w.isRunning() {
		if w.coinbase == (common.Address{}) {
860 861 862
			log.Error("Refusing to mine without etherbase")
			return
		}
863
		header.Coinbase = w.coinbase
864
	}
865
	if err := w.engine.Prepare(w.chain, header); err != nil {
866 867 868
		log.Error("Failed to prepare header for mining", "err", err)
		return
	}
869
	// If we are care about TheDAO hard-fork check whether to override the extra-data or not
870
	if daoBlock := w.chainConfig.DAOForkBlock; daoBlock != nil {
871 872
		// Check whether the block is among the fork extra-override range
		limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
873
		if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
874
			// Depending whether we support or oppose the fork, override differently
875
			if w.chainConfig.DAOForkSupport {
876
				header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
877
			} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
878 879
				header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
			}
880 881
		}
	}
882
	// Could potentially happen if starting to mine in an odd state.
883
	err := w.makeCurrent(parent, header)
884
	if err != nil {
885
		log.Error("Failed to create mining context", "err", err)
886 887
		return
	}
888
	// Create the current work task and check any fork transitions needed
889
	env := w.current
890
	if w.chainConfig.DAOForkSupport && w.chainConfig.DAOForkBlock != nil && w.chainConfig.DAOForkBlock.Cmp(header.Number) == 0 {
891
		misc.ApplyDAOHardFork(env.state)
892
	}
893 894
	// Accumulate the uncles for the current block
	uncles := make([]*types.Header, 0, 2)
895 896 897 898 899 900
	commitUncles := func(blocks map[common.Hash]*types.Block) {
		// Clean up stale uncle blocks first
		for hash, uncle := range blocks {
			if uncle.NumberU64()+staleThreshold <= header.Number.Uint64() {
				delete(blocks, hash)
			}
O
obscuren 已提交
901
		}
902 903 904 905 906 907 908 909 910 911
		for hash, uncle := range blocks {
			if len(uncles) == 2 {
				break
			}
			if err := w.commitUncle(env, uncle.Header()); err != nil {
				log.Trace("Possible uncle rejected", "hash", hash, "reason", err)
			} else {
				log.Debug("Committing new uncle to block", "hash", hash)
				uncles = append(uncles, uncle.Header())
			}
O
obscuren 已提交
912 913
		}
	}
914 915 916
	// Prefer to locally generated uncle
	commitUncles(w.localUncles)
	commitUncles(w.remoteUncles)
917

918 919 920 921 922
	if !noempty {
		// Create an empty block based on temporary copied state for sealing in advance without waiting block
		// execution finished.
		w.commit(uncles, nil, false, tstart)
	}
923 924

	// Fill the block with all available pending transactions.
925
	pending, err := w.eth.TxPool().Pending()
926 927 928 929
	if err != nil {
		log.Error("Failed to fetch pending transactions", "err", err)
		return
	}
930 931 932 933 934
	// Short circuit if there is no available pending transactions
	if len(pending) == 0 {
		w.updateSnapshot()
		return
	}
935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953
	// Split the pending transactions into locals and remotes
	localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending
	for _, account := range w.eth.TxPool().Locals() {
		if txs := remoteTxs[account]; len(txs) > 0 {
			delete(remoteTxs, account)
			localTxs[account] = txs
		}
	}
	if len(localTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs)
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
	}
	if len(remoteTxs) > 0 {
		txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs)
		if w.commitTransactions(txs, w.coinbase, interrupt) {
			return
		}
954
	}
955 956 957 958 959 960
	w.commit(uncles, w.fullTaskHook, true, tstart)
}

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
func (w *worker) commit(uncles []*types.Header, interval func(), update bool, start time.Time) error {
961
	// Deep copy receipts here to avoid interaction between different tasks.
962 963 964 965 966 967
	receipts := make([]*types.Receipt, len(w.current.receipts))
	for i, l := range w.current.receipts {
		receipts[i] = new(types.Receipt)
		*receipts[i] = *l
	}
	s := w.current.state.Copy()
968
	block, err := w.engine.FinalizeAndAssemble(w.chain, w.current.header, s, w.current.txs, uncles, w.current.receipts)
969 970
	if err != nil {
		return err
971
	}
972
	if w.isRunning() {
973 974
		if interval != nil {
			interval()
975
		}
976
		select {
977 978
		case w.taskCh <- &task{receipts: receipts, state: s, block: block, createdAt: time.Now()}:
			w.unconfirmed.Shift(block.NumberU64() - 1)
979 980

			feesWei := new(big.Int)
981 982
			for i, tx := range block.Transactions() {
				feesWei.Add(feesWei, new(big.Int).Mul(new(big.Int).SetUint64(receipts[i].GasUsed), tx.GasPrice()))
983 984 985
			}
			feesEth := new(big.Float).Quo(new(big.Float).SetInt(feesWei), new(big.Float).SetInt(big.NewInt(params.Ether)))

986 987
			log.Info("Commit new mining work", "number", block.Number(), "sealhash", w.engine.SealHash(block.Header()),
				"uncles", len(uncles), "txs", w.current.tcount, "gas", block.GasUsed(), "fees", feesEth, "elapsed", common.PrettyDuration(time.Since(start)))
988

989 990
		case <-w.exitCh:
			log.Info("Worker has exited")
991
		}
O
obscuren 已提交
992
	}
993 994 995 996
	if update {
		w.updateSnapshot()
	}
	return nil
O
obscuren 已提交
997
}