worker.go 12.1 KB
Newer Older
O
obscuren 已提交
1 2 3 4 5 6
package miner

import (
	"fmt"
	"math/big"
	"sort"
O
obscuren 已提交
7
	"sync"
O
obscuren 已提交
8
	"sync/atomic"
O
obscuren 已提交
9

10
	"github.com/ethereum/go-ethereum/accounts"
O
obscuren 已提交
11
	"github.com/ethereum/go-ethereum/common"
O
obscuren 已提交
12
	"github.com/ethereum/go-ethereum/core"
O
obscuren 已提交
13
	"github.com/ethereum/go-ethereum/core/state"
O
obscuren 已提交
14 15
	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/event"
16
	"github.com/ethereum/go-ethereum/logger"
O
obscuren 已提交
17
	"github.com/ethereum/go-ethereum/logger/glog"
O
obscuren 已提交
18 19 20 21
	"github.com/ethereum/go-ethereum/pow"
	"gopkg.in/fatih/set.v0"
)

22 23
var jsonlogger = logger.NewJsonLogger()

O
obscuren 已提交
24
type environment struct {
25 26 27 28 29 30 31 32 33 34 35 36
	totalUsedGas       *big.Int
	state              *state.StateDB
	coinbase           *state.StateObject
	block              *types.Block
	family             *set.Set
	uncles             *set.Set
	remove             *set.Set
	tcount             int
	ignoredTransactors *set.Set
	lowGasTransactors  *set.Set
	ownedAccounts      *set.Set
	lowGasTxs          types.Transactions
O
obscuren 已提交
37 38
}

39
func env(block *types.Block, eth core.Backend) *environment {
40
	state := state.New(block.Root(), eth.StateDb())
O
obscuren 已提交
41 42 43 44
	env := &environment{
		totalUsedGas: new(big.Int),
		state:        state,
		block:        block,
O
obscuren 已提交
45
		family:       set.New(),
O
obscuren 已提交
46 47 48 49 50 51 52
		uncles:       set.New(),
		coinbase:     state.GetOrNewStateObject(block.Coinbase()),
	}

	return env
}

53
type Work struct {
M
Matthew Wampler-Doty 已提交
54
	Number    uint64
O
obscuren 已提交
55
	Nonce     uint64
M
Matthew Wampler-Doty 已提交
56 57
	MixDigest []byte
	SeedHash  []byte
58 59
}

O
obscuren 已提交
60
type Agent interface {
O
obscuren 已提交
61
	Work() chan<- *types.Block
O
obscuren 已提交
62
	SetReturnCh(chan<- *types.Block)
O
obscuren 已提交
63
	Stop()
O
obscuren 已提交
64
	Start()
O
wip  
obscuren 已提交
65
	GetHashRate() int64
O
obscuren 已提交
66 67
}

O
obscuren 已提交
68
type worker struct {
O
obscuren 已提交
69 70
	mu sync.Mutex

O
obscuren 已提交
71
	agents []Agent
O
obscuren 已提交
72
	recv   chan *types.Block
O
obscuren 已提交
73 74 75 76
	mux    *event.TypeMux
	quit   chan struct{}
	pow    pow.PoW

77 78 79 80
	eth   core.Backend
	chain *core.ChainManager
	proc  *core.BlockProcessor

O
obscuren 已提交
81
	coinbase common.Address
82
	gasPrice *big.Int
83
	extra    []byte
O
obscuren 已提交
84

O
obscuren 已提交
85 86
	currentMu sync.Mutex
	current   *environment
87

O
obscuren 已提交
88 89 90
	uncleMu        sync.Mutex
	possibleUncles map[common.Hash]*types.Block

O
obscuren 已提交
91 92 93
	txQueueMu sync.Mutex
	txQueue   map[common.Hash]*types.Transaction

F
Felix Lange 已提交
94 95 96
	// atomic status counters
	mining int32
	atWork int32
O
obscuren 已提交
97 98
}

O
obscuren 已提交
99
func newWorker(coinbase common.Address, eth core.Backend) *worker {
O
obscuren 已提交
100
	worker := &worker{
O
obscuren 已提交
101 102
		eth:            eth,
		mux:            eth.EventMux(),
O
obscuren 已提交
103
		recv:           make(chan *types.Block),
104
		gasPrice:       new(big.Int),
O
obscuren 已提交
105 106 107 108
		chain:          eth.ChainManager(),
		proc:           eth.BlockProcessor(),
		possibleUncles: make(map[common.Hash]*types.Block),
		coinbase:       coinbase,
O
obscuren 已提交
109
		txQueue:        make(map[common.Hash]*types.Transaction),
O
obscuren 已提交
110
		quit:           make(chan struct{}),
O
obscuren 已提交
111
	}
O
obscuren 已提交
112 113 114 115 116 117
	go worker.update()
	go worker.wait()

	worker.commitNewWork()

	return worker
O
obscuren 已提交
118 119
}

O
obscuren 已提交
120 121 122 123 124 125
func (self *worker) pendingState() *state.StateDB {
	self.currentMu.Lock()
	defer self.currentMu.Unlock()

	return self.current.state
}
126

O
obscuren 已提交
127 128 129
func (self *worker) pendingBlock() *types.Block {
	self.currentMu.Lock()
	defer self.currentMu.Unlock()
130

O
obscuren 已提交
131 132 133 134
	return self.current.block
}

func (self *worker) start() {
135 136 137
	self.mu.Lock()
	defer self.mu.Unlock()

138 139
	atomic.StoreInt32(&self.mining, 1)

O
obscuren 已提交
140 141 142 143
	// spin up agents
	for _, agent := range self.agents {
		agent.Start()
	}
O
obscuren 已提交
144 145 146
}

func (self *worker) stop() {
147 148 149
	self.mu.Lock()
	defer self.mu.Unlock()

F
Felix Lange 已提交
150
	if atomic.LoadInt32(&self.mining) == 1 {
151
		var keep []Agent
O
obscuren 已提交
152 153 154
		// stop all agents
		for _, agent := range self.agents {
			agent.Stop()
155 156 157 158
			// keep all that's not a cpu agent
			if _, ok := agent.(*CpuAgent); !ok {
				keep = append(keep, agent)
			}
O
obscuren 已提交
159
		}
160
		self.agents = keep
O
obscuren 已提交
161
	}
162

F
Felix Lange 已提交
163 164
	atomic.StoreInt32(&self.mining, 0)
	atomic.StoreInt32(&self.atWork, 0)
O
obscuren 已提交
165 166 167
}

func (self *worker) register(agent Agent) {
168 169 170
	self.mu.Lock()
	defer self.mu.Unlock()

O
obscuren 已提交
171
	self.agents = append(self.agents, agent)
O
obscuren 已提交
172
	agent.SetReturnCh(self.recv)
O
obscuren 已提交
173 174 175
}

func (self *worker) update() {
O
obscuren 已提交
176
	events := self.mux.Subscribe(core.ChainHeadEvent{}, core.ChainSideEvent{}, core.TxPreEvent{})
O
obscuren 已提交
177 178 179 180 181

out:
	for {
		select {
		case event := <-events.Chan():
182
			switch ev := event.(type) {
183
			case core.ChainHeadEvent:
O
obscuren 已提交
184
				self.commitNewWork()
O
obscuren 已提交
185
			case core.ChainSideEvent:
O
obscuren 已提交
186 187 188
				self.uncleMu.Lock()
				self.possibleUncles[ev.Block.Hash()] = ev.Block
				self.uncleMu.Unlock()
O
obscuren 已提交
189
			case core.TxPreEvent:
190
				// Apply transaction to the pending state if we're not mining
F
Felix Lange 已提交
191
				if atomic.LoadInt32(&self.mining) == 0 {
192 193 194
					self.mu.Lock()
					self.commitTransactions(types.Transactions{ev.Tx})
					self.mu.Unlock()
O
obscuren 已提交
195
				}
O
obscuren 已提交
196 197 198 199 200
			}
		case <-self.quit:
			break out
		}
	}
201 202

	events.Unsubscribe()
O
obscuren 已提交
203 204
}

O
obscuren 已提交
205 206
func (self *worker) wait() {
	for {
O
obscuren 已提交
207
		for block := range self.recv {
F
Felix Lange 已提交
208
			atomic.AddInt32(&self.atWork, -1)
O
obscuren 已提交
209 210 211 212 213

			if block == nil {
				continue
			}

214
			if _, err := self.chain.InsertChain(types.Blocks{block}); err == nil {
O
obscuren 已提交
215 216
				for _, uncle := range block.Uncles() {
					delete(self.possibleUncles, uncle.Hash())
O
obscuren 已提交
217
				}
O
obscuren 已提交
218
				self.mux.Post(core.NewMinedBlockEvent{block})
O
obscuren 已提交
219

220
				glog.V(logger.Info).Infof("🔨  Mined block #%v", block.Number())
O
obscuren 已提交
221

O
obscuren 已提交
222 223 224 225 226 227
				jsonlogger.LogJson(&logger.EthMinerNewBlock{
					BlockHash:     block.Hash().Hex(),
					BlockNumber:   block.Number(),
					ChainHeadHash: block.ParentHeaderHash.Hex(),
					BlockPrevHash: block.ParentHeaderHash.Hex(),
				})
O
obscuren 已提交
228 229
			} else {
				self.commitNewWork()
230
			}
O
obscuren 已提交
231 232 233 234 235
		}
	}
}

func (self *worker) push() {
F
Felix Lange 已提交
236
	if atomic.LoadInt32(&self.mining) == 1 {
O
obscuren 已提交
237
		self.current.block.Header().GasUsed = self.current.totalUsedGas
238
		self.current.block.SetRoot(self.current.state.Root())
O
obscuren 已提交
239

O
obscuren 已提交
240
		// push new work to agents
241
		for _, agent := range self.agents {
F
Felix Lange 已提交
242
			atomic.AddInt32(&self.atWork, 1)
O
obscuren 已提交
243

244 245 246 247 248
			if agent.Work() != nil {
				agent.Work() <- self.current.block.Copy()
			} else {
				common.Report(fmt.Sprintf("%v %T\n", agent, agent))
			}
249
		}
O
obscuren 已提交
250 251 252
	}
}

O
obscuren 已提交
253
func (self *worker) makeCurrent() {
254
	block := self.chain.NewBlock(self.coinbase)
255 256 257
	if block.Time() == self.chain.CurrentBlock().Time() {
		block.Header().Time++
	}
258
	block.Header().Extra = self.extra
259

260
	current := env(block, self.eth)
O
obscuren 已提交
261
	for _, ancestor := range self.chain.GetAncestors(block, 7) {
262
		current.family.Add(ancestor.Hash())
O
obscuren 已提交
263
	}
264 265 266 267 268 269 270 271 272 273
	accounts, _ := self.eth.AccountManager().Accounts()
	// Keep track of transactions which return errors so they can be removed
	current.remove = set.New()
	current.tcount = 0
	current.ignoredTransactors = set.New()
	current.lowGasTransactors = set.New()
	current.ownedAccounts = accountAddressesSet(accounts)

	parent := self.chain.GetBlock(current.block.ParentHash())
	current.coinbase.SetGasPool(core.CalcGasLimit(parent))
O
obscuren 已提交
274

275
	self.current = current
O
obscuren 已提交
276 277
}

278 279 280
func (w *worker) setGasPrice(p *big.Int) {
	w.mu.Lock()
	defer w.mu.Unlock()
281 282 283 284 285 286

	// calculate the minimal gas price the miner accepts when sorting out transactions.
	const pct = int64(90)
	w.gasPrice = gasprice(p, pct)

	w.mux.Post(core.GasPriceChanged{w.gasPrice})
287 288
}

O
obscuren 已提交
289 290 291 292 293 294 295 296 297
func (self *worker) commitNewWork() {
	self.mu.Lock()
	defer self.mu.Unlock()
	self.uncleMu.Lock()
	defer self.uncleMu.Unlock()
	self.currentMu.Lock()
	defer self.currentMu.Unlock()

	self.makeCurrent()
298
	current := self.current
O
obscuren 已提交
299 300 301 302

	transactions := self.eth.TxPool().GetTransactions()
	sort.Sort(types.TxByNonce{transactions})

303 304 305
	// commit transactions for this run
	self.commitTransactions(transactions)
	self.eth.TxPool().RemoveTransactions(current.lowGasTxs)
O
obscuren 已提交
306

O
obscuren 已提交
307 308 309 310
	var (
		uncles    []*types.Header
		badUncles []common.Hash
	)
O
obscuren 已提交
311
	for hash, uncle := range self.possibleUncles {
O
obscuren 已提交
312
		if len(uncles) == 2 {
O
obscuren 已提交
313 314 315 316
			break
		}

		if err := self.commitUncle(uncle.Header()); err != nil {
317 318 319 320 321
			if glog.V(logger.Ridiculousness) {
				glog.V(logger.Detail).Infof("Bad uncle found and will be removed (%x)\n", hash[:4])
				glog.V(logger.Detail).Infoln(uncle)
			}

O
obscuren 已提交
322
			badUncles = append(badUncles, hash)
O
obscuren 已提交
323
		} else {
O
obscuren 已提交
324
			glog.V(logger.Debug).Infof("commiting %x as uncle\n", hash[:4])
O
obscuren 已提交
325
			uncles = append(uncles, uncle.Header())
O
obscuren 已提交
326 327
		}
	}
O
obscuren 已提交
328 329

	// We only care about logging if we're actually mining
F
Felix Lange 已提交
330
	if atomic.LoadInt32(&self.mining) == 1 {
331
		glog.V(logger.Info).Infof("commit new work on block %v with %d txs & %d uncles\n", current.block.Number(), current.tcount, len(uncles))
O
obscuren 已提交
332 333
	}

O
obscuren 已提交
334 335 336
	for _, hash := range badUncles {
		delete(self.possibleUncles, hash)
	}
337

O
obscuren 已提交
338
	self.current.block.SetUncles(uncles)
O
obscuren 已提交
339

O
obscuren 已提交
340
	core.AccumulateRewards(self.current.state, self.current.block)
O
obscuren 已提交
341

O
obscuren 已提交
342
	self.current.state.Update()
O
obscuren 已提交
343

O
obscuren 已提交
344
	self.push()
O
obscuren 已提交
345 346 347 348 349 350 351 352 353
}

var (
	inclusionReward = new(big.Int).Div(core.BlockReward, big.NewInt(32))
	_uncleReward    = new(big.Int).Mul(core.BlockReward, big.NewInt(15))
	uncleReward     = new(big.Int).Div(_uncleReward, big.NewInt(16))
)

func (self *worker) commitUncle(uncle *types.Header) error {
O
obscuren 已提交
354
	if self.current.uncles.Has(uncle.Hash()) {
O
obscuren 已提交
355 356 357
		// Error not unique
		return core.UncleError("Uncle not unique")
	}
O
obscuren 已提交
358
	self.current.uncles.Add(uncle.Hash())
O
obscuren 已提交
359

O
obscuren 已提交
360
	if !self.current.family.Has(uncle.ParentHash) {
O
obscuren 已提交
361 362 363
		return core.UncleError(fmt.Sprintf("Uncle's parent unknown (%x)", uncle.ParentHash[0:4]))
	}

O
obscuren 已提交
364 365
	if self.current.family.Has(uncle.Hash()) {
		return core.UncleError(fmt.Sprintf("Uncle already in family (%x)", uncle.Hash()))
O
obscuren 已提交
366 367 368 369 370
	}

	return nil
}

371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435
func (self *worker) commitTransactions(transactions types.Transactions) {
	current := self.current

	for _, tx := range transactions {
		// We can skip err. It has already been validated in the tx pool
		from, _ := tx.From()

		// check if it falls within margin
		if tx.GasPrice().Cmp(self.gasPrice) < 0 {
			// ignore the transaction and transactor. We ignore the transactor
			// because nonce will fail after ignoring this transaction so there's
			// no point
			current.lowGasTransactors.Add(from)

			glog.V(logger.Info).Infof("transaction(%x) below gas price (tx=%v ask=%v). All sequential txs from this address(%x) will be ignored\n", tx.Hash().Bytes()[:4], common.CurrencyToString(tx.GasPrice()), common.CurrencyToString(self.gasPrice), from[:4])
		}

		// Continue with the next transaction if the transaction sender is included in
		// the low gas tx set. This will also remove the tx and all sequential transaction
		// from this transactor
		if current.lowGasTransactors.Has(from) {
			// add tx to the low gas set. This will be removed at the end of the run
			// owned accounts are ignored
			if !current.ownedAccounts.Has(from) {
				current.lowGasTxs = append(current.lowGasTxs, tx)
			}
			continue
		}

		// Move on to the next transaction when the transactor is in ignored transactions set
		// This may occur when a transaction hits the gas limit. When a gas limit is hit and
		// the transaction is processed (that could potentially be included in the block) it
		// will throw a nonce error because the previous transaction hasn't been processed.
		// Therefor we need to ignore any transaction after the ignored one.
		if current.ignoredTransactors.Has(from) {
			continue
		}

		self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)

		err := self.commitTransaction(tx)
		switch {
		case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
			// Remove invalid transactions
			from, _ := tx.From()

			self.chain.TxState().RemoveNonce(from, tx.Nonce())
			current.remove.Add(tx.Hash())

			if glog.V(logger.Detail) {
				glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
			}
		case state.IsGasLimitErr(err):
			from, _ := tx.From()
			// ignore the transactor so no nonce errors will be thrown for this account
			// next time the worker is run, they'll be picked up again.
			current.ignoredTransactors.Add(from)

			glog.V(logger.Detail).Infof("Gas limit reached for (%x) in this block. Continue to try smaller txs\n", from[:4])
		default:
			current.tcount++
		}
	}
}

O
obscuren 已提交
436
func (self *worker) commitTransaction(tx *types.Transaction) error {
437
	snap := self.current.state.Copy()
O
obscuren 已提交
438
	receipt, _, err := self.proc.ApplyTransaction(self.current.coinbase, self.current.state, self.current.block, tx, self.current.totalUsedGas, true)
439 440
	if err != nil && (core.IsNonceErr(err) || state.IsGasLimitErr(err) || core.IsInvalidTxErr(err)) {
		self.current.state.Set(snap)
O
obscuren 已提交
441 442 443 444 445 446 447 448
		return err
	}

	self.current.block.AddTransaction(tx)
	self.current.block.AddReceipt(receipt)

	return nil
}
O
obscuren 已提交
449 450 451 452

func (self *worker) HashRate() int64 {
	var tot int64
	for _, agent := range self.agents {
O
wip  
obscuren 已提交
453
		tot += agent.GetHashRate()
O
obscuren 已提交
454 455 456 457
	}

	return tot
}
O
obscuren 已提交
458 459 460 461 462 463 464 465 466

// gasprice calculates a reduced gas price based on the pct
// XXX Use big.Rat?
func gasprice(price *big.Int, pct int64) *big.Int {
	p := new(big.Int).Set(price)
	p.Div(p, big.NewInt(100))
	p.Mul(p, big.NewInt(pct))
	return p
}
467 468 469 470 471 472 473 474

func accountAddressesSet(accounts []accounts.Account) *set.Set {
	accountSet := set.New()
	for _, account := range accounts {
		accountSet.Add(common.BytesToAddress(account.Address))
	}
	return accountSet
}