worker.go 18.3 KB
Newer Older
F
Felix Lange 已提交
1
// Copyright 2015 The go-ethereum Authors
2
// This file is part of the go-ethereum library.
F
Felix Lange 已提交
3
//
4
// The go-ethereum library is free software: you can redistribute it and/or modify
F
Felix Lange 已提交
5 6 7 8
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
9
// The go-ethereum library is distributed in the hope that it will be useful,
F
Felix Lange 已提交
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
F
Felix Lange 已提交
12 13 14
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
15
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
F
Felix Lange 已提交
16

O
obscuren 已提交
17 18 19
package miner

import (
20
	"bytes"
O
obscuren 已提交
21 22
	"fmt"
	"math/big"
O
obscuren 已提交
23
	"sync"
O
obscuren 已提交
24
	"sync/atomic"
25
	"time"
O
obscuren 已提交
26

27
	mapset "github.com/deckarep/golang-set"
O
obscuren 已提交
28
	"github.com/ethereum/go-ethereum/common"
29 30
	"github.com/ethereum/go-ethereum/consensus"
	"github.com/ethereum/go-ethereum/consensus/misc"
O
obscuren 已提交
31
	"github.com/ethereum/go-ethereum/core"
O
obscuren 已提交
32
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
33
	"github.com/ethereum/go-ethereum/core/types"
34
	"github.com/ethereum/go-ethereum/core/vm"
35
	"github.com/ethereum/go-ethereum/ethdb"
O
obscuren 已提交
36
	"github.com/ethereum/go-ethereum/event"
37
	"github.com/ethereum/go-ethereum/log"
38
	"github.com/ethereum/go-ethereum/params"
O
obscuren 已提交
39 40
)

J
Jeffrey Wilcke 已提交
41 42 43
const (
	resultQueueSize  = 10
	miningLogAtDepth = 5
44

45
	// txChanSize is the size of channel listening to NewTxsEvent.
46 47 48 49 50 51
	// The number is referenced from the size of tx pool.
	txChanSize = 4096
	// chainHeadChanSize is the size of channel listening to ChainHeadEvent.
	chainHeadChanSize = 10
	// chainSideChanSize is the size of channel listening to ChainSideEvent.
	chainSideChanSize = 10
J
Jeffrey Wilcke 已提交
52
)
53 54 55

// Agent can register themself with the worker
type Agent interface {
J
Jeffrey Wilcke 已提交
56 57
	Work() chan<- *Work
	SetReturnCh(chan<- *Result)
58 59 60 61 62
	Stop()
	Start()
	GetHashRate() int64
}

F
Felix Lange 已提交
63
// Work is the workers current environment and holds
64
// all of the current state information
J
Jeffrey Wilcke 已提交
65
type Work struct {
J
Jeffrey Wilcke 已提交
66 67 68
	config *params.ChainConfig
	signer types.Signer

69
	state     *state.StateDB // apply state changes here
70 71 72
	ancestors mapset.Set     // ancestor set (used for checking uncle parent validity)
	family    mapset.Set     // family set (used for checking uncle invalidity)
	uncles    mapset.Set     // uncle set
73
	tcount    int            // tx count in cycle
74
	gasPool   *core.GasPool  // available gas used to pack transactions
O
obscuren 已提交
75

J
Jeffrey Wilcke 已提交
76
	Block *types.Block // the new block
O
obscuren 已提交
77

F
Felix Lange 已提交
78 79 80
	header   *types.Header
	txs      []*types.Transaction
	receipts []*types.Receipt
J
Jeffrey Wilcke 已提交
81 82 83 84 85 86 87

	createdAt time.Time
}

type Result struct {
	Work  *Work
	Block *types.Block
O
obscuren 已提交
88 89
}

90
// worker is the main object which takes care of applying messages to the new state
O
obscuren 已提交
91
type worker struct {
92
	config *params.ChainConfig
93
	engine consensus.Engine
94

O
obscuren 已提交
95 96
	mu sync.Mutex

97
	// update loop
98
	mux          *event.TypeMux
99
	txsCh        chan core.NewTxsEvent
100
	txsSub       event.Subscription
101 102 103 104 105
	chainHeadCh  chan core.ChainHeadEvent
	chainHeadSub event.Subscription
	chainSideCh  chan core.ChainSideEvent
	chainSideSub event.Subscription
	wg           sync.WaitGroup
106

107
	agents map[Agent]struct{}
J
Jeffrey Wilcke 已提交
108
	recv   chan *Result
O
obscuren 已提交
109

F
Felix Lange 已提交
110
	eth     Backend
111
	chain   *core.BlockChain
112
	proc    core.Validator
113
	chainDb ethdb.Database
114

O
obscuren 已提交
115
	coinbase common.Address
116
	extra    []byte
O
obscuren 已提交
117

O
obscuren 已提交
118
	currentMu sync.Mutex
J
Jeffrey Wilcke 已提交
119
	current   *Work
120

121 122 123 124
	snapshotMu    sync.RWMutex
	snapshotBlock *types.Block
	snapshotState *state.StateDB

O
obscuren 已提交
125 126 127
	uncleMu        sync.Mutex
	possibleUncles map[common.Hash]*types.Block

128
	unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations
129

F
Felix Lange 已提交
130 131 132
	// atomic status counters
	mining int32
	atWork int32
O
obscuren 已提交
133 134
}

135
func newWorker(config *params.ChainConfig, engine consensus.Engine, coinbase common.Address, eth Backend, mux *event.TypeMux) *worker {
O
obscuren 已提交
136
	worker := &worker{
137
		config:         config,
138
		engine:         engine,
O
obscuren 已提交
139
		eth:            eth,
F
Felix Lange 已提交
140
		mux:            mux,
141
		txsCh:          make(chan core.NewTxsEvent, txChanSize),
142 143
		chainHeadCh:    make(chan core.ChainHeadEvent, chainHeadChanSize),
		chainSideCh:    make(chan core.ChainSideEvent, chainSideChanSize),
144
		chainDb:        eth.ChainDb(),
J
Jeffrey Wilcke 已提交
145
		recv:           make(chan *Result, resultQueueSize),
146
		chain:          eth.BlockChain(),
147
		proc:           eth.BlockChain().Validator(),
O
obscuren 已提交
148 149
		possibleUncles: make(map[common.Hash]*types.Block),
		coinbase:       coinbase,
150
		agents:         make(map[Agent]struct{}),
E
Egon Elbre 已提交
151
		unconfirmed:    newUnconfirmedBlocks(eth.BlockChain(), miningLogAtDepth),
O
obscuren 已提交
152
	}
153 154
	// Subscribe NewTxsEvent for tx pool
	worker.txsSub = eth.TxPool().SubscribeNewTxsEvent(worker.txsCh)
155 156 157
	// Subscribe events for blockchain
	worker.chainHeadSub = eth.BlockChain().SubscribeChainHeadEvent(worker.chainHeadCh)
	worker.chainSideSub = eth.BlockChain().SubscribeChainSideEvent(worker.chainSideCh)
O
obscuren 已提交
158 159
	go worker.update()

160
	go worker.wait()
O
obscuren 已提交
161 162 163
	worker.commitNewWork()

	return worker
O
obscuren 已提交
164 165
}

J
Jeffrey Wilcke 已提交
166 167 168 169 170 171
func (self *worker) setEtherbase(addr common.Address) {
	self.mu.Lock()
	defer self.mu.Unlock()
	self.coinbase = addr
}

172 173 174 175 176 177
func (self *worker) setExtra(extra []byte) {
	self.mu.Lock()
	defer self.mu.Unlock()
	self.extra = extra
}

178
func (self *worker) pending() (*types.Block, *state.StateDB) {
F
Felix Lange 已提交
179
	if atomic.LoadInt32(&self.mining) == 0 {
180 181 182 183
		// return a snapshot to avoid contention on currentMu mutex
		self.snapshotMu.RLock()
		defer self.snapshotMu.RUnlock()
		return self.snapshotBlock, self.snapshotState.Copy()
F
Felix Lange 已提交
184
	}
O
obscuren 已提交
185

186 187
	self.currentMu.Lock()
	defer self.currentMu.Unlock()
188 189
	return self.current.Block, self.current.state.Copy()
}
190

191
func (self *worker) pendingBlock() *types.Block {
192
	if atomic.LoadInt32(&self.mining) == 0 {
193 194 195 196
		// return a snapshot to avoid contention on currentMu mutex
		self.snapshotMu.RLock()
		defer self.snapshotMu.RUnlock()
		return self.snapshotBlock
197
	}
198 199 200

	self.currentMu.Lock()
	defer self.currentMu.Unlock()
201 202 203
	return self.current.Block
}

O
obscuren 已提交
204
func (self *worker) start() {
205 206 207
	self.mu.Lock()
	defer self.mu.Unlock()

208 209
	atomic.StoreInt32(&self.mining, 1)

O
obscuren 已提交
210
	// spin up agents
211
	for agent := range self.agents {
O
obscuren 已提交
212 213
		agent.Start()
	}
O
obscuren 已提交
214 215 216
}

func (self *worker) stop() {
217 218
	self.wg.Wait()

219 220
	self.mu.Lock()
	defer self.mu.Unlock()
F
Felix Lange 已提交
221
	if atomic.LoadInt32(&self.mining) == 1 {
222
		for agent := range self.agents {
O
obscuren 已提交
223 224 225
			agent.Stop()
		}
	}
F
Felix Lange 已提交
226 227
	atomic.StoreInt32(&self.mining, 0)
	atomic.StoreInt32(&self.atWork, 0)
O
obscuren 已提交
228 229 230
}

func (self *worker) register(agent Agent) {
231 232
	self.mu.Lock()
	defer self.mu.Unlock()
233
	self.agents[agent] = struct{}{}
O
obscuren 已提交
234
	agent.SetReturnCh(self.recv)
O
obscuren 已提交
235 236
}

237 238 239 240 241 242 243
func (self *worker) unregister(agent Agent) {
	self.mu.Lock()
	defer self.mu.Unlock()
	delete(self.agents, agent)
	agent.Stop()
}

O
obscuren 已提交
244
func (self *worker) update() {
245
	defer self.txsSub.Unsubscribe()
246 247 248 249
	defer self.chainHeadSub.Unsubscribe()
	defer self.chainSideSub.Unsubscribe()

	for {
250
		// A real event arrived, process interesting content
251 252 253
		select {
		// Handle ChainHeadEvent
		case <-self.chainHeadCh:
254
			self.commitNewWork()
255 256 257

		// Handle ChainSideEvent
		case ev := <-self.chainSideCh:
258 259 260
			self.uncleMu.Lock()
			self.possibleUncles[ev.Block.Hash()] = ev.Block
			self.uncleMu.Unlock()
261

262
		// Handle NewTxsEvent
263 264 265 266 267 268
		case ev := <-self.txsCh:
			// Apply transactions to the pending state if we're not mining.
			//
			// Note all transactions received may not be continuous with transactions
			// already included in the current mining block. These transactions will
			// be automatically eliminated.
269 270
			if atomic.LoadInt32(&self.mining) == 0 {
				self.currentMu.Lock()
271 272 273 274 275
				txs := make(map[common.Address]types.Transactions)
				for _, tx := range ev.Txs {
					acc, _ := types.Sender(self.current.signer, tx)
					txs[acc] = append(txs[acc], tx)
				}
276
				txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
277
				self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
278
				self.updateSnapshot()
279
				self.currentMu.Unlock()
280 281 282 283 284
			} else {
				// If we're mining, but nothing is being processed, wake on new transactions
				if self.config.Clique != nil && self.config.Clique.Period == 0 {
					self.commitNewWork()
				}
O
obscuren 已提交
285
			}
286 287

		// System stopped
288
		case <-self.txsSub.Err():
289 290 291 292 293
			return
		case <-self.chainHeadSub.Err():
			return
		case <-self.chainSideSub.Err():
			return
O
obscuren 已提交
294 295 296 297
		}
	}
}

O
obscuren 已提交
298 299
func (self *worker) wait() {
	for {
J
Jeffrey Wilcke 已提交
300
		for result := range self.recv {
F
Felix Lange 已提交
301
			atomic.AddInt32(&self.atWork, -1)
O
obscuren 已提交
302

J
Jeffrey Wilcke 已提交
303
			if result == nil {
O
obscuren 已提交
304 305
				continue
			}
J
Jeffrey Wilcke 已提交
306
			block := result.Block
307
			work := result.Work
O
obscuren 已提交
308

309 310 311 312 313
			// Update the block hash in all logs since it is now available and not when the
			// receipt/log of individual transactions were created.
			for _, r := range work.receipts {
				for _, l := range r.Logs {
					l.BlockHash = block.Hash()
J
Jeffrey Wilcke 已提交
314
				}
315
			}
316 317 318
			for _, log := range work.state.Logs() {
				log.BlockHash = block.Hash()
			}
319
			self.currentMu.Lock()
320
			stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state)
321
			self.currentMu.Unlock()
322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337
			if err != nil {
				log.Error("Failed writing block to chain", "err", err)
				continue
			}
			// Broadcast the block and announce chain insertion event
			self.mux.Post(core.NewMinedBlockEvent{Block: block})
			var (
				events []interface{}
				logs   = work.state.Logs()
			)
			events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs})
			if stat == core.CanonStatTy {
				events = append(events, core.ChainHeadEvent{Block: block})
			}
			self.chain.PostChainEvents(events, logs)

338
			// Insert the block into the set of pending ones to wait for confirmations
339
			self.unconfirmed.Insert(block.NumberU64(), block.Hash())
O
obscuren 已提交
340 341 342 343
		}
	}
}

344
// push sends a new work task to currently live miner agents.
345
func (self *worker) push(work *Work) {
346 347 348 349 350 351 352
	if atomic.LoadInt32(&self.mining) != 1 {
		return
	}
	for agent := range self.agents {
		atomic.AddInt32(&self.atWork, 1)
		if ch := agent.Work(); ch != nil {
			ch <- work
353
		}
O
obscuren 已提交
354 355 356
	}
}

F
Felix Lange 已提交
357
// makeCurrent creates a new environment for the current cycle.
358
func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error {
359
	state, err := self.chain.StateAt(parent.Root())
360 361 362
	if err != nil {
		return err
	}
363
	work := &Work{
364
		config:    self.config,
365
		signer:    types.NewEIP155Signer(self.config.ChainID),
F
Felix Lange 已提交
366
		state:     state,
367 368 369
		ancestors: mapset.NewSet(),
		family:    mapset.NewSet(),
		uncles:    mapset.NewSet(),
F
Felix Lange 已提交
370
		header:    header,
J
Jeffrey Wilcke 已提交
371
		createdAt: time.Now(),
372
	}
373

Z
zelig 已提交
374
	// when 08 is processed ancestors contain 07 (quick block)
F
Felix Lange 已提交
375
	for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) {
376
		for _, uncle := range ancestor.Uncles() {
377
			work.family.Add(uncle.Hash())
378
		}
379 380
		work.family.Add(ancestor.Hash())
		work.ancestors.Add(ancestor.Hash())
O
obscuren 已提交
381
	}
E
Egon Elbre 已提交
382

383
	// Keep track of transactions which return errors so they can be removed
384 385
	work.tcount = 0
	self.current = work
386
	return nil
O
obscuren 已提交
387 388 389 390 391 392 393 394 395 396
}

func (self *worker) commitNewWork() {
	self.mu.Lock()
	defer self.mu.Unlock()
	self.uncleMu.Lock()
	defer self.uncleMu.Unlock()
	self.currentMu.Lock()
	defer self.currentMu.Unlock()

397
	tstart := time.Now()
F
Felix Lange 已提交
398
	parent := self.chain.CurrentBlock()
399

F
Felix Lange 已提交
400
	tstamp := tstart.Unix()
C
Christoph Jentzsch 已提交
401
	if parent.Time().Cmp(new(big.Int).SetInt64(tstamp)) >= 0 {
402
		tstamp = parent.Time().Int64() + 1
F
Felix Lange 已提交
403
	}
404
	// this will ensure we're not going off too far in the future
405
	if now := time.Now().Unix(); tstamp > now+1 {
406
		wait := time.Duration(tstamp-now) * time.Second
407
		log.Info("Mining too far in the future", "wait", common.PrettyDuration(wait))
408 409 410
		time.Sleep(wait)
	}

F
Felix Lange 已提交
411 412 413 414 415 416
	num := parent.Number()
	header := &types.Header{
		ParentHash: parent.Hash(),
		Number:     num.Add(num, common.Big1),
		GasLimit:   core.CalcGasLimit(parent),
		Extra:      self.extra,
417
		Time:       big.NewInt(tstamp),
F
Felix Lange 已提交
418
	}
419 420 421 422 423 424 425 426
	// Only set the coinbase if we are mining (avoid spurious block rewards)
	if atomic.LoadInt32(&self.mining) == 1 {
		header.Coinbase = self.coinbase
	}
	if err := self.engine.Prepare(self.chain, header); err != nil {
		log.Error("Failed to prepare header for mining", "err", err)
		return
	}
427
	// If we are care about TheDAO hard-fork check whether to override the extra-data or not
428 429 430
	if daoBlock := self.config.DAOForkBlock; daoBlock != nil {
		// Check whether the block is among the fork extra-override range
		limit := new(big.Int).Add(daoBlock, params.DAOForkExtraRange)
431
		if header.Number.Cmp(daoBlock) >= 0 && header.Number.Cmp(limit) < 0 {
432 433 434
			// Depending whether we support or oppose the fork, override differently
			if self.config.DAOForkSupport {
				header.Extra = common.CopyBytes(params.DAOForkBlockExtra)
435
			} else if bytes.Equal(header.Extra, params.DAOForkBlockExtra) {
436 437
				header.Extra = []byte{} // If miner opposes, don't let it use the reserved extra-data
			}
438 439
		}
	}
440 441 442
	// Could potentially happen if starting to mine in an odd state.
	err := self.makeCurrent(parent, header)
	if err != nil {
443
		log.Error("Failed to create mining context", "err", err)
444 445
		return
	}
446
	// Create the current work task and check any fork transitions needed
447
	work := self.current
448
	if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 {
449
		misc.ApplyDAOHardFork(work.state)
450
	}
451 452
	pending, err := self.eth.TxPool().Pending()
	if err != nil {
453
		log.Error("Failed to fetch pending transactions", "err", err)
454 455
		return
	}
456
	txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending)
457
	work.commitTransactions(self.mux, txs, self.chain, self.coinbase)
458

F
Felix Lange 已提交
459
	// compute uncles for the new block.
O
obscuren 已提交
460 461 462 463
	var (
		uncles    []*types.Header
		badUncles []common.Hash
	)
O
obscuren 已提交
464
	for hash, uncle := range self.possibleUncles {
O
obscuren 已提交
465
		if len(uncles) == 2 {
O
obscuren 已提交
466 467
			break
		}
468
		if err := self.commitUncle(work, uncle.Header()); err != nil {
469
			log.Trace("Bad uncle found and will be removed", "hash", hash)
470 471
			log.Trace(fmt.Sprint(uncle))

O
obscuren 已提交
472
			badUncles = append(badUncles, hash)
O
obscuren 已提交
473
		} else {
474
			log.Debug("Committing new uncle to block", "hash", hash)
O
obscuren 已提交
475
			uncles = append(uncles, uncle.Header())
O
obscuren 已提交
476 477
		}
	}
O
obscuren 已提交
478 479 480
	for _, hash := range badUncles {
		delete(self.possibleUncles, hash)
	}
481 482 483 484
	// Create the new block to seal with the consensus engine
	if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil {
		log.Error("Failed to finalize block for sealing", "err", err)
		return
485
	}
F
Felix Lange 已提交
486 487
	// We only care about logging if we're actually mining.
	if atomic.LoadInt32(&self.mining) == 1 {
488
		log.Info("Commit new mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart)))
489
		self.unconfirmed.Shift(work.Block.NumberU64() - 1)
F
Felix Lange 已提交
490
	}
491
	self.push(work)
492
	self.updateSnapshot()
O
obscuren 已提交
493 494
}

495
func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
F
Felix Lange 已提交
496
	hash := uncle.Hash()
497
	if work.uncles.Contains(hash) {
498
		return fmt.Errorf("uncle not unique")
O
obscuren 已提交
499
	}
500
	if !work.ancestors.Contains(uncle.ParentHash) {
501
		return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4])
O
obscuren 已提交
502
	}
503
	if work.family.Contains(hash) {
504
		return fmt.Errorf("uncle already in family (%x)", hash)
O
obscuren 已提交
505
	}
506
	work.uncles.Add(uncle.Hash())
O
obscuren 已提交
507 508 509
	return nil
}

510 511 512 513 514 515 516 517 518 519 520 521 522
func (self *worker) updateSnapshot() {
	self.snapshotMu.Lock()
	defer self.snapshotMu.Unlock()

	self.snapshotBlock = types.NewBlock(
		self.current.header,
		self.current.txs,
		nil,
		self.current.receipts,
	)
	self.snapshotState = self.current.state.Copy()
}

523
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
524 525 526
	if env.gasPool == nil {
		env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit)
	}
527

F
Felix Lange 已提交
528
	var coalescedLogs []*types.Log
B
Bas van Kervel 已提交
529

530
	for {
531
		// If we don't have enough gas for any further transactions then we're done
532
		if env.gasPool.Gas() < params.TxGas {
533
			log.Trace("Not enough gas for further transactions", "have", env.gasPool, "want", params.TxGas)
534 535
			break
		}
536 537 538 539 540
		// Retrieve the next transaction and abort if all done
		tx := txs.Peek()
		if tx == nil {
			break
		}
541 542
		// Error may be ignored here. The error has already been checked
		// during transaction acceptance is the transaction pool.
J
Jeffrey Wilcke 已提交
543 544 545 546 547 548
		//
		// We use the eip155 signer regardless of the current hf.
		from, _ := types.Sender(env.signer, tx)
		// Check whether the tx is replay protected. If we're not in the EIP155 hf
		// phase, start ignoring the sender until we do.
		if tx.Protected() && !env.config.IsEIP155(env.header.Number) {
549
			log.Trace("Ignoring reply protected transaction", "hash", tx.Hash(), "eip155", env.config.EIP155Block)
J
Jeffrey Wilcke 已提交
550 551 552 553

			txs.Pop()
			continue
		}
554
		// Start executing the transaction
555
		env.state.Prepare(tx.Hash(), common.Hash{}, env.tcount)
556

557
		err, logs := env.commitTransaction(tx, bc, coinbase, env.gasPool)
558 559
		switch err {
		case core.ErrGasLimitReached:
560
			// Pop the current out-of-gas transaction without shifting in the next from the account
561
			log.Trace("Gas limit exceeded for current block", "sender", from)
562
			txs.Pop()
563

564 565 566 567 568 569 570 571 572 573
		case core.ErrNonceTooLow:
			// New head notification data race between the transaction pool and miner, shift
			log.Trace("Skipping transaction with low nonce", "sender", from, "nonce", tx.Nonce())
			txs.Shift()

		case core.ErrNonceTooHigh:
			// Reorg notification data race between the transaction pool and miner, skip account =
			log.Trace("Skipping account with hight nonce", "sender", from, "nonce", tx.Nonce())
			txs.Pop()

574
		case nil:
575
			// Everything ok, collect the logs and shift in the next transaction from the same account
576
			coalescedLogs = append(coalescedLogs, logs...)
577 578
			env.tcount++
			txs.Shift()
579 580

		default:
581 582 583 584
			// Strange error, discard the transaction and get the next in line (note, the
			// nonce-too-high clause will prevent us from executing in vain).
			log.Debug("Transaction failed, account skipped", "hash", tx.Hash(), "err", err)
			txs.Shift()
585 586
		}
	}
587

588
	if len(coalescedLogs) > 0 || env.tcount > 0 {
589 590 591
		// make a copy, the state caches the logs and these logs get "upgraded" from pending to mined
		// logs by filling in the block hash when the block was mined by the local miner. This can
		// cause a race condition if a log was "upgraded" before the PendingLogsEvent is processed.
F
Felix Lange 已提交
592
		cpy := make([]*types.Log, len(coalescedLogs))
593
		for i, l := range coalescedLogs {
F
Felix Lange 已提交
594
			cpy[i] = new(types.Log)
595 596
			*cpy[i] = *l
		}
F
Felix Lange 已提交
597
		go func(logs []*types.Log, tcount int) {
598 599 600 601 602 603
			if len(logs) > 0 {
				mux.Post(core.PendingLogsEvent{Logs: logs})
			}
			if tcount > 0 {
				mux.Post(core.PendingStateEvent{})
			}
604
		}(cpy, env.tcount)
605
	}
606 607
}

608
func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) {
609
	snap := env.state.Snapshot()
610

611
	receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{})
612
	if err != nil {
613
		env.state.RevertToSnapshot(snap)
614
		return err, nil
O
obscuren 已提交
615
	}
F
Felix Lange 已提交
616 617
	env.txs = append(env.txs, tx)
	env.receipts = append(env.receipts, receipt)
618

619
	return nil, receipt.Logs
O
obscuren 已提交
620
}