提交 eae81465 编写于 作者: B Bas van Kervel

rpc: new RPC implementation with pub/sub support

上级 8db9d44c
......@@ -44,6 +44,10 @@ type Account struct {
Address common.Address
}
func (acc *Account) MarshalJSON() ([]byte, error) {
return []byte(`"` + acc.Address.Hex() + `"`), nil
}
type Manager struct {
keyStore crypto.KeyStore
unlocked map[common.Address]*unlocked
......@@ -92,6 +96,17 @@ func (am *Manager) Unlock(addr common.Address, keyAuth string) error {
return am.TimedUnlock(addr, keyAuth, 0)
}
func (am *Manager) Lock(addr common.Address) error {
am.mutex.Lock()
if unl, found := am.unlocked[addr]; found {
am.mutex.Unlock()
am.expire(addr, unl, time.Duration(0) * time.Nanosecond)
} else {
am.mutex.Unlock()
}
return nil
}
// TimedUnlock unlocks the account with the given address. The account
// stays unlocked for the duration of timeout. A timeout of 0 unlocks the account
// until the program exits.
......
......@@ -246,10 +246,10 @@ func (self *jsre) welcome() {
self.re.Run(`
(function () {
console.log('instance: ' + web3.version.node);
console.log(' datadir: ' + admin.datadir);
console.log("coinbase: " + eth.coinbase);
var ts = 1000 * eth.getBlock(eth.blockNumber).timestamp;
console.log("at block: " + eth.blockNumber + " (" + new Date(ts) + ")");
console.log(' datadir: ' + admin.datadir);
})();
`)
if modules, err := self.supportedApis(); err == nil {
......@@ -258,7 +258,7 @@ func (self *jsre) welcome() {
loadedModules = append(loadedModules, fmt.Sprintf("%s:%s", api, version))
}
sort.Strings(loadedModules)
fmt.Println("modules:", strings.Join(loadedModules, " "))
}
}
......@@ -325,12 +325,28 @@ func (js *jsre) apiBindings(f xeth.Frontend) error {
}
_, err = js.re.Run(shortcuts)
if err != nil {
utils.Fatalf("Error setting namespaces: %v", err)
}
js.re.Run(`var GlobalRegistrar = eth.contract(` + registrar.GlobalRegistrarAbi + `); registrar = GlobalRegistrar.at("` + registrar.GlobalRegistrarAddr + `");`)
// overrule some of the methods that require password as input and ask for it interactively
p, err := js.re.Get("personal")
if err != nil {
fmt.Println("Unable to overrule sensitive methods in personal module")
return nil
}
// Override the unlockAccount and newAccount methods on the personal object since these require user interaction.
// Assign the jeth.unlockAccount and jeth.newAccount in the jsre the original web3 callbacks. These will be called
// by the jeth.* methods after they got the password from the user and send the original web3 request to the backend.
persObj := p.Object()
js.re.Run(`jeth.unlockAccount = personal.unlockAccount;`)
persObj.Set("unlockAccount", jeth.UnlockAccount)
js.re.Run(`jeth.newAccount = personal.newAccount;`)
persObj.Set("newAccount", jeth.NewAccount)
return nil
}
......
......@@ -168,7 +168,7 @@ func TestAccounts(t *testing.T) {
checkEvalJSON(t, repl, `eth.accounts`, `["`+testAddress+`"]`)
checkEvalJSON(t, repl, `eth.coinbase`, `"`+testAddress+`"`)
val, err := repl.re.Run(`personal.newAccount("password")`)
val, err := repl.re.Run(`jeth.newAccount("password")`)
if err != nil {
t.Errorf("expected no error, got %v", err)
}
......
......@@ -313,6 +313,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils.IPCDisabledFlag,
utils.IPCApiFlag,
utils.IPCPathFlag,
utils.IPCExperimental,
utils.ExecFlag,
utils.WhisperEnabledFlag,
utils.DevModeFlag,
......
......@@ -158,6 +158,7 @@ var AppHelpFlagGroups = []flagGroup{
Flags: []cli.Flag{
utils.WhisperEnabledFlag,
utils.NatspecEnabledFlag,
utils.IPCExperimental,
},
},
{
......
......@@ -23,15 +23,22 @@ import (
"log"
"os"
"os/signal"
"path/filepath"
"runtime"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc/api"
"github.com/ethereum/go-ethereum/rpc/codec"
"github.com/ethereum/go-ethereum/rpc/comms"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
"github.com/ethereum/go-ethereum/tests"
"github.com/ethereum/go-ethereum/whisper"
"github.com/ethereum/go-ethereum/xeth"
......@@ -81,9 +88,14 @@ func main() {
}
log.Println("Initial test suite passed...")
if err := StartIPC(stack); err != nil {
log.Fatalf("Failed to start IPC interface: %v\n", err)
}
log.Println("IPC Interface started, accepting requests...")
// Start the RPC interface and wait until terminated
if err := StartRPC(stack); err != nil {
log.Fatalf("Failed to start RPC instarface: %v", err)
log.Fatalf("Failed to start RPC interface: %v", err)
}
log.Println("RPC Interface started, accepting requests...")
......@@ -177,3 +189,54 @@ func StartRPC(stack *node.Node) error {
}
return comms.StartHttp(config, codec, api.Merge(apis...))
}
// StartRPC initializes an IPC interface to the given protocol stack.
func StartIPC(stack *node.Node) error {
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
return err
}
endpoint := `\\.\pipe\geth.ipc`
if runtime.GOOS != "windows" {
endpoint = filepath.Join(common.DefaultDataDir(), "geth.ipc")
}
config := comms.IpcConfig{
Endpoint: endpoint,
}
listener, err := comms.CreateListener(config)
if err != nil {
return err
}
server := rpc.NewServer()
// register package API's this node provides
offered := stack.RPCAPIs()
for _, api := range offered {
server.RegisterName(api.Namespace, api.Service)
glog.V(logger.Debug).Infof("Register %T@%s for IPC service\n", api.Service, api.Namespace)
}
web3 := utils.NewPublicWeb3API(stack)
server.RegisterName("web3", web3)
net := utils.NewPublicNetAPI(stack.Server(), ethereum.NetVersion())
server.RegisterName("net", net)
go func() {
glog.V(logger.Info).Infof("Start IPC server on %s\n", config.Endpoint)
for {
conn, err := listener.Accept()
if err != nil {
glog.V(logger.Error).Infof("Unable to accept connection - %v\n", err)
}
codec := rpc.NewJSONCodec(conn)
go server.ServeCodec(codec)
}
}()
return nil
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// PublicWeb3API offers helper utils
type PublicWeb3API struct {
stack *node.Node
}
// NewPublicWeb3API creates a new Web3Service instance
func NewPublicWeb3API(stack *node.Node) *PublicWeb3API {
return &PublicWeb3API{stack}
}
// ClientVersion returns the node name
func (s *PublicWeb3API) ClientVersion() string {
return s.stack.Server().Name
}
// Sha3 applies the ethereum sha3 implementation on the input.
// It assumes the input is hex encoded.
func (s *PublicWeb3API) Sha3(input string) string {
return common.ToHex(crypto.Sha3(common.FromHex(input)))
}
// PublicNetAPI offers network related RPC methods
type PublicNetAPI struct {
net *p2p.Server
networkVersion int
}
// NewPublicNetAPI creates a new net api instance.
func NewPublicNetAPI(net *p2p.Server, networkVersion int) *PublicNetAPI {
return &PublicNetAPI{net, networkVersion}
}
// Listening returns an indication if the node is listening for network connections.
func (s *PublicNetAPI) Listening() bool {
return true // always listening
}
// Peercount returns the number of connected peers
func (s *PublicNetAPI) PeerCount() *rpc.HexNumber {
return rpc.NewHexNumber(s.net.PeerCount())
}
// ProtocolVersion returns the current ethereum protocol version.
func (s *PublicNetAPI) Version() string {
return fmt.Sprintf("%d", s.networkVersion)
}
......@@ -54,6 +54,7 @@ import (
"github.com/ethereum/go-ethereum/rpc/comms"
"github.com/ethereum/go-ethereum/rpc/shared"
"github.com/ethereum/go-ethereum/rpc/useragent"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
"github.com/ethereum/go-ethereum/whisper"
"github.com/ethereum/go-ethereum/xeth"
)
......@@ -300,6 +301,10 @@ var (
Usage: "Filename for IPC socket/pipe",
Value: DirectoryString{common.DefaultIpcPath()},
}
IPCExperimental = cli.BoolFlag{
Name: "ipcexp",
Usage: "Enable the new RPC implementation",
}
ExecFlag = cli.StringFlag{
Name: "exec",
Usage: "Execute JavaScript statement (only in combination with console/attach)",
......@@ -690,6 +695,7 @@ func MakeSystemNode(name, version string, extra []byte, ctx *cli.Context) *node.
Fatalf("Failed to register the Whisper service: %v", err)
}
}
return stack
}
......@@ -773,17 +779,53 @@ func IpcSocketPath(ctx *cli.Context) (ipcpath string) {
return
}
// StartIPC starts a IPC JSON-RPC API server.
func StartIPC(stack *node.Node, ctx *cli.Context) error {
config := comms.IpcConfig{
Endpoint: IpcSocketPath(ctx),
}
initializer := func(conn net.Conn) (comms.Stopper, shared.EthereumApi, error) {
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
return nil, nil, err
var ethereum *eth.Ethereum
if err := stack.Service(&ethereum); err != nil {
return err
}
if ctx.GlobalIsSet(IPCExperimental.Name) {
listener, err := comms.CreateListener(config)
if err != nil {
return err
}
server := rpc.NewServer()
// register package API's this node provides
offered := stack.RPCAPIs()
for _, api := range offered {
server.RegisterName(api.Namespace, api.Service)
glog.V(logger.Debug).Infof("Register %T under namespace '%s' for IPC service\n", api.Service, api.Namespace)
}
web3 := NewPublicWeb3API(stack)
server.RegisterName("web3", web3)
net := NewPublicNetAPI(stack.Server(), ethereum.NetVersion())
server.RegisterName("net", net)
go func() {
glog.V(logger.Info).Infof("Start IPC server on %s\n", config.Endpoint)
for {
conn, err := listener.Accept()
if err != nil {
glog.V(logger.Error).Infof("Unable to accept connection - %v\n", err)
}
codec := rpc.NewJSONCodec(conn)
go server.ServeCodec(codec)
}
}()
return nil
}
initializer := func(conn net.Conn) (comms.Stopper, shared.EthereumApi, error) {
fe := useragent.NewRemoteFrontend(conn, ethereum.AccountManager())
xeth := xeth.New(stack, fe)
apis, err := api.ParseApiString(ctx.GlobalString(IPCApiFlag.Name), codec.JSON, xeth, stack)
......
......@@ -17,6 +17,8 @@
package common
import (
"encoding/hex"
"encoding/json"
"fmt"
"math/big"
"math/rand"
......@@ -50,6 +52,21 @@ func (h Hash) Bytes() []byte { return h[:] }
func (h Hash) Big() *big.Int { return Bytes2Big(h[:]) }
func (h Hash) Hex() string { return "0x" + Bytes2Hex(h[:]) }
// UnmarshalJSON parses a hash in its hex from to a hash.
func (h *Hash) UnmarshalJSON(input []byte) error {
length := len(input)
if length >= 2 && input[0] == '"' && input[length-1] == '"' {
input = input[1 : length-1]
}
h.SetBytes(FromHex(string(input)))
return nil
}
// Serialize given hash to JSON
func (h Hash) MarshalJSON() ([]byte, error) {
return json.Marshal(h.Hex())
}
// Sets the hash to the value of b. If b is larger than len(h) it will panic
func (h *Hash) SetBytes(b []byte) {
if len(b) > len(h) {
......@@ -129,6 +146,38 @@ func (a *Address) Set(other Address) {
}
}
// Serialize given address to JSON
func (a Address) MarshalJSON() ([]byte, error) {
return json.Marshal(a.Hex())
}
// Parse address from raw json data
func (a *Address) UnmarshalJSON(data []byte) error {
if len(data) > 2 && data[0] == '"' && data[len(data)-1] == '"' {
data = data[:len(data)-1][1:]
}
if len(data) > 2 && data[0] == '0' && data[1] == 'x' {
data = data[2:]
}
if len(data) != 2*AddressLength {
return fmt.Errorf("Invalid address length, expected %d got %d bytes", 2*AddressLength, len(data))
}
n, err := hex.Decode(a[:], data)
if err != nil {
return err
}
if n != AddressLength {
return fmt.Errorf("Invalid address")
}
a.Set(HexToAddress(string(data)))
return nil
}
// PP Pretty Prints a byte slice in the following format:
// hex(value[:4])...(hex[len(value)-4:])
func PP(value []byte) string {
......
......@@ -48,6 +48,10 @@ func (n BlockNonce) Uint64() uint64 {
return binary.BigEndian.Uint64(n[:])
}
func (n BlockNonce) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"0x%x"`, n)), nil
}
type Header struct {
ParentHash common.Hash // Hash to the previous block
UncleHash common.Hash // Uncles of this block
......
......@@ -69,6 +69,10 @@ func (b Bloom) TestBytes(test []byte) bool {
return b.Test(common.BytesToBig(test))
}
func (b Bloom) MarshalJSON() ([]byte, error) {
return []byte(fmt.Sprintf(`"%#x"`, b.Bytes())), nil
}
func CreateBloom(receipts Receipts) Bloom {
bin := new(big.Int)
for _, receipt := range receipts {
......
......@@ -17,6 +17,7 @@
package vm
import (
"encoding/json"
"fmt"
"io"
......@@ -63,6 +64,21 @@ func (l *Log) String() string {
return fmt.Sprintf(`log: %x %x %x %x %d %x %d`, l.Address, l.Topics, l.Data, l.TxHash, l.TxIndex, l.BlockHash, l.Index)
}
func (r *Log) MarshalJSON() ([]byte, error) {
fields := map[string]interface{}{
"address": r.Address,
"data": fmt.Sprintf("%#x", r.Data),
"blockNumber": fmt.Sprintf("%#x", r.BlockNumber),
"logIndex": fmt.Sprintf("%#x", r.Index),
"blockHash": r.BlockHash,
"transactionHash": r.TxHash,
"transactionIndex": fmt.Sprintf("%#x", r.TxIndex),
"topics": r.Topics,
}
return json.Marshal(fields)
}
type Logs []*Log
// LogForStorage is a wrapper around a Log that flattens and parses the entire
......
此差异已折叠。
......@@ -35,6 +35,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
......@@ -43,6 +44,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
const (
......@@ -239,6 +241,64 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
return eth, nil
}
// APIs returns the collection of RPC services the ethereum package offers.
// NOTE, some of these services probably need to be moved to somewhere else.
func (s *Ethereum) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "eth",
Version: "1.0",
Service: NewPublicEthereumAPI(s),
Public: true,
}, {
Namespace: "eth",
Version: "1.0",
Service: NewPublicAccountAPI(s.AccountManager()),
Public: true,
}, {
Namespace: "personal",
Version: "1.0",
Service: NewPrivateAccountAPI(s.AccountManager()),
Public: false,
}, {
Namespace: "eth",
Version: "1.0",
Service: NewPublicBlockChainAPI(s.BlockChain(), s.ChainDb(), s.EventMux(), s.AccountManager()),
Public: true,
}, {
Namespace: "eth",
Version: "1.0",
Service: NewPublicTransactionPoolAPI(s.TxPool(), s.ChainDb(), s.EventMux(), s.BlockChain(), s.AccountManager()),
Public: true,
}, {
Namespace: "eth",
Version: "1.0",
Service: miner.NewPublicMinerAPI(s.Miner()),
Public: true,
}, {
Namespace: "eth",
Version: "1.0",
Service: downloader.NewPublicDownloaderAPI(s.Downloader()),
Public: true,
}, {
Namespace: "miner",
Version: "1.0",
Service: NewPrivateMinerAPI(s),
Public: false,
}, {
Namespace: "txpool",
Version: "1.0",
Service: NewPublicTxPoolAPI(s),
Public: true,
}, {
Namespace: "eth",
Version: "1.0",
Service: filters.NewPublicFilterAPI(s.ChainDb(), s.EventMux()),
Public: true,
},
}
}
func (s *Ethereum) ResetWithGenesisBlock(gb *types.Block) {
s.blockchain.ResetWithGenesisBlock(gb)
}
......
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// PublicDownloaderAPI provides an API which gives informatoin about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
d *Downloader
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
func NewPublicDownloaderAPI(d *Downloader) *PublicDownloaderAPI {
return &PublicDownloaderAPI{d}
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
type Progress struct {
Origin uint64 `json:"startingBlock"`
Current uint64 `json:"currentBlock"`
Height uint64 `json:"highestBlock"`
}
// SyncingResult provides information about the current synchronisation status for this node.
type SyncingResult struct {
Syncing bool `json:"syncing"`
Status Progress `json:"status"`
}
// Syncing provides information when this nodes starts synchronising with the Ethereumn network and when it's finished.
func (s *PublicDownloaderAPI) Syncing() (rpc.Subscription, error) {
sub := s.d.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
output := func(event interface{}) interface{} {
switch event.(type) {
case StartEvent:
result := &SyncingResult{Syncing: true}
result.Status.Origin, result.Status.Current, result.Status.Height = s.d.Progress()
return result
case DoneEvent, FailedEvent:
return false
}
return nil
}
return rpc.NewSubscriptionWithOutputFormat(sub, output), nil
}
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
"sync"
"time"
"crypto/rand"
"encoding/hex"
"errors"
"encoding/json"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
var (
filterTickerTime = 5 * time.Minute
)
// byte will be inferred
const (
unknownFilterTy = iota
blockFilterTy
transactionFilterTy
logFilterTy
)
// PublicFilterAPI offers support to create and manage filters. This will allow externa clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
mux *event.TypeMux
quit chan struct{}
chainDb ethdb.Database
filterManager *FilterSystem
filterMapMu sync.RWMutex
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
logMu sync.RWMutex
logQueue map[int]*logQueue
blockMu sync.RWMutex
blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
transactMu sync.Mutex
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(chainDb ethdb.Database, mux *event.TypeMux) *PublicFilterAPI {
svc := &PublicFilterAPI{
mux: mux,
chainDb: chainDb,
filterManager: NewFilterSystem(mux),
filterMapping: make(map[string]int),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
}
go svc.start()
return svc
}
// Stop quits the work loop.
func (s *PublicFilterAPI) Stop() {
close(s.quit)
}
// start the work loop, wait and process events.
func (s *PublicFilterAPI) start() {
timer := time.NewTicker(2 * time.Second)
defer timer.Stop()
done:
for {
select {
case <-timer.C:
s.logMu.Lock()
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.logQueue, id)
}
}
s.logMu.Unlock()
s.blockMu.Lock()
for id, filter := range s.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.blockQueue, id)
}
}
s.blockMu.Unlock()
s.transactionMu.Lock()
for id, filter := range s.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.transactionQueue, id)
}
}
s.transactionMu.Unlock()
case <-s.quit:
break done
}
}
}
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.blockMu.Lock()
filter := New(s.chainDb)
id := s.filterManager.Add(filter)
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if queue := s.blockQueue[id]; queue != nil {
queue.add(block.Hash())
}
}
defer s.blockMu.Unlock()
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
filter := New(s.chainDb)
id := s.filterManager.Add(filter)
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
if queue := s.transactionQueue[id]; queue != nil {
queue.add(tx.Hash())
}
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash) int {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.chainDb)
id := s.filterManager.Add(filter)
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest)
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogsCallback = func(logs vm.Logs) {
s.logMu.Lock()
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(logs...)
}
}
return id
}
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
ToBlock rpc.BlockNumber
Addresses []common.Address
Topics [][]common.Hash
}
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics interface{} `json:"topics"`
}
var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if raw.From == nil {
args.FromBlock = rpc.LatestBlockNumber
} else {
args.FromBlock = *raw.From
}
if raw.ToBlock == nil {
args.ToBlock = rpc.LatestBlockNumber
} else {
args.ToBlock = *raw.ToBlock
}
args.Addresses = []common.Address{}
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
var addresses []common.Address
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
for i, addr := range strAddrs {
if strAddr, ok := addr.(string); ok {
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
strAddr = strAddr[2:]
}
if decAddr, err := hex.DecodeString(strAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
}
} else {
return fmt.Errorf("invalid address on index %d", i)
}
}
} else if singleAddr, ok := raw.Addresses.(string); ok {
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
singleAddr = singleAddr[2:]
}
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
fmt.Errorf("invalid address given")
}
} else {
errors.New("invalid address(es) given")
}
args.Addresses = addresses
}
topicConverter := func(raw string) (common.Hash, error) {
if len(raw) == 0 {
return common.Hash{}, nil
}
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
raw = raw[2:]
}
if decAddr, err := hex.DecodeString(raw); err == nil {
return common.BytesToHash(decAddr), nil
}
return common.Hash{}, errors.New("invalid topic given")
}
// topics is an array consisting of strings or arrays of strings
if raw.Topics != nil {
topics, ok := raw.Topics.([]interface{})
if ok {
parsedTopics := make([][]common.Hash, len(topics))
for i, topic := range topics {
if topic == nil {
parsedTopics[i] = []common.Hash{common.StringToHash("")}
} else if strTopic, ok := topic.(string); ok {
if t, err := topicConverter(strTopic); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
}
} else if arrTopic, ok := topic.([]interface{}); ok {
parsedTopics[i] = make([]common.Hash, len(arrTopic))
for j := 0; j < len(parsedTopics[i]); i++ {
if arrTopic[j] == nil {
parsedTopics[i][j] = common.StringToHash("")
} else if str, ok := arrTopic[j].(string); ok {
if t, err := topicConverter(str); err != nil {
return fmt.Errorf("invalid topic on index %d", i)
} else {
parsedTopics[i] = []common.Hash{t}
}
} else {
fmt.Errorf("topic[%d][%d] not a string", i, j)
}
}
} else {
return fmt.Errorf("topic[%d] invalid", i)
}
}
args.Topics = parsedTopics
}
}
return nil
}
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
var id int
if len(args.Addresses) > 0 {
id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics)
} else {
id = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics)
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(args NewFilterArgs) vm.Logs {
filter := New(s.chainDb)
filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics)
return returnLogs(filter.Find())
}
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
s.filterMapMu.Lock()
defer s.filterMapMu.Unlock()
id, ok := s.filterMapping[filterId]
if !ok {
return false
}
defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId)
if _, ok := s.logQueue[id]; ok {
s.logMu.Lock()
defer s.logMu.Unlock()
delete(s.logQueue, id)
return true
}
if _, ok := s.blockQueue[id]; ok {
s.blockMu.Lock()
defer s.blockMu.Unlock()
delete(s.blockQueue, id)
return true
}
if _, ok := s.transactionQueue[id]; ok {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
delete(s.transactionQueue, id)
return true
}
return false
}
// getFilterType is a helper utility that determine the type of filter for the given filter id.
func (s *PublicFilterAPI) getFilterType(id int) byte {
if _, ok := s.blockQueue[id]; ok {
return blockFilterTy
} else if _, ok := s.transactionQueue[id]; ok {
return transactionFilterTy
} else if _, ok := s.logQueue[id]; ok {
return logFilterTy
}
return unknownFilterTy
}
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.blockQueue[id] != nil {
return s.blockQueue[id].get()
}
return nil
}
// transactionFilterChanged returns a collection of transaction hashes for the pending
// transaction filter with the given id.
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.transactionQueue[id] != nil {
return s.transactionQueue[id].get()
}
return nil
}
// logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) vm.Logs {
s.logMu.Lock()
defer s.logMu.Unlock()
if s.logQueue[id] != nil {
return s.logQueue[id].get()
}
return nil
}
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(filterId string) vm.Logs {
id, ok := s.filterMapping[filterId]
if !ok {
return returnLogs(nil)
}
if filter := s.filterManager.Get(id); filter != nil {
return returnLogs(filter.Find())
}
return returnLogs(nil)
}
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId]
s.filterMapMu.Unlock()
if !ok { // filter not found
return []interface{}{}
}
switch s.getFilterType(id) {
case blockFilterTy:
return returnHashes(s.blockFilterChanged(id))
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
return returnLogs(s.logFilterChanged(id))
}
return []interface{}{}
}
type logQueue struct {
mu sync.Mutex
logs vm.Logs
timeout time.Time
id int
}
func (l *logQueue) add(logs ...*vm.Log) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() vm.Logs {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
}
type hashQueue struct {
mu sync.Mutex
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
l.mu.Lock()
defer l.mu.Unlock()
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
}
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
// causing the affected DApp to miss data.
func newFilterId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate filter id")
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// returnLogs is a helper that will return an empty logs array case the given logs is nil, otherwise is will return the
// given logs. The RPC interfaces defines that always an array is returned.
func returnLogs(logs vm.Logs) vm.Logs {
if logs == nil {
return vm.Logs{}
}
return logs
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
// return the given hashes. The RPC interfaces defines that always an array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package miner
import (
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/logger/glog"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// PublicMinerAPI provides an API to control the miner.
// It offers only methods that operate on data that pose no security risk when it is publicly accessible.
type PublicMinerAPI struct {
miner *Miner
agent *RemoteAgent
}
// NewPublicMinerAPI create a new PublicMinerAPI instance.
func NewPublicMinerAPI(miner *Miner) *PublicMinerAPI {
return &PublicMinerAPI{miner, NewRemoteAgent()}
}
// Mining returns an indication if this node is currently mining.
func (s *PublicMinerAPI) Mining() bool {
return s.miner.Mining()
}
// SubmitWork can be used by external miner to submit their POW solution. It returns an indication if the work was
// accepted. Note, this is not an indication if the provided work was valid!
func (s *PublicMinerAPI) SubmitWork(nonce rpc.HexNumber, solution, digest common.Hash) bool {
return s.agent.SubmitWork(nonce.Uint64(), digest, solution)
}
// GetWork returns a work package for external miner. The work package consists of 3 strings
// result[0], 32 bytes hex encoded current block header pow-hash
// result[1], 32 bytes hex encoded seed hash used for DAG
// result[2], 32 bytes hex encoded boundary condition ("target"), 2^256/difficulty
func (s *PublicMinerAPI) GetWork() ([]string, error) {
if !s.Mining() {
s.miner.Start(s.miner.coinbase, 0)
}
if work, err := s.agent.GetWork(); err == nil {
return work[:], nil
} else {
glog.Infof("%v\n", err)
}
return nil, fmt.Errorf("mining not ready")
}
// SubmitHashrate can be used for remote miners to submit their hash rate. This enables the node to report the combined
// hash rate of all miners which submit work through this node. It accepts the miner hash rate and an identifier which
// must be unique between nodes.
func (s *PublicMinerAPI) SubmitHashrate(hashrate rpc.HexNumber, id common.Hash) bool {
s.agent.SubmitHashrate(id, hashrate.Uint64())
return true
}
......@@ -327,6 +327,7 @@ func (self *worker) wait() {
go func(block *types.Block, logs vm.Logs, receipts []*types.Receipt) {
self.mux.Post(core.NewMinedBlockEvent{block})
self.mux.Post(core.ChainEvent{block, block.Hash(), logs})
if stat == core.CanonStatTy {
self.mux.Post(core.ChainHeadEvent{block})
self.mux.Post(logs)
......
......@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
var (
......@@ -264,3 +265,12 @@ func (n *Node) DataDir() string {
func (n *Node) EventMux() *event.TypeMux {
return n.eventmux
}
// RPCAPIs returns the collection of RPC descriptor this node offers
func (n *Node) RPCAPIs() []rpc.API {
var apis []rpc.API
for _, api := range n.services {
apis = append(apis, api.APIs()...)
}
return apis
}
......@@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// SampleService is a trivial network service that can be attached to a node for
......@@ -35,6 +36,7 @@ import (
type SampleService struct{}
func (s *SampleService) Protocols() []p2p.Protocol { return nil }
func (s *SampleService) APIs() []rpc.API { return nil }
func (s *SampleService) Start(*p2p.Server) error { fmt.Println("Service starting..."); return nil }
func (s *SampleService) Stop() error { fmt.Println("Service stopping..."); return nil }
......
......@@ -23,6 +23,7 @@ import (
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// ServiceContext is a collection of service independent options inherited from
......@@ -70,6 +71,9 @@ type Service interface {
// Protocol retrieves the P2P protocols the service wishes to start.
Protocols() []p2p.Protocol
// APIs retrieves the list of RPC descriptors the service provides
APIs() []rpc.API
// Start is called after all services have been constructed and the networking
// layer was also initialized to spawn any goroutines required by the service.
Start(server *p2p.Server) error
......
......@@ -23,12 +23,14 @@ import (
"reflect"
"github.com/ethereum/go-ethereum/p2p"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// NoopService is a trivial implementation of the Service interface.
type NoopService struct{}
func (s *NoopService) Protocols() []p2p.Protocol { return nil }
func (s *NoopService) APIs() []rpc.API { return nil }
func (s *NoopService) Start(*p2p.Server) error { return nil }
func (s *NoopService) Stop() error { return nil }
......@@ -67,6 +69,10 @@ func (s *InstrumentedService) Protocols() []p2p.Protocol {
return s.protocols
}
func (s *InstrumentedService) APIs() []rpc.API {
return nil
}
func (s *InstrumentedService) Start(server *p2p.Server) error {
if s.startHook != nil {
s.startHook(server)
......
......@@ -39,9 +39,11 @@ func newMergedApi(apis ...shared.EthereumApi) *MergedApi {
mergedApi.methods = make(map[string]shared.EthereumApi)
for _, api := range apis {
mergedApi.apis[api.Name()] = api.ApiVersion()
for _, method := range api.Methods() {
mergedApi.methods[method] = api
if api != nil {
mergedApi.apis[api.Name()] = api.ApiVersion()
for _, method := range api.Methods() {
mergedApi.methods[method] = api
}
}
}
return mergedApi
......
......@@ -33,6 +33,11 @@ web3._extend({
call: 'personal_unlockAccount',
params: 3,
inputFormatter: [null, null, null]
}),
new web3._extend.Method({
name: 'lockAccount',
call: 'personal_lockAccount',
params: 1
})
],
properties:
......
......@@ -191,6 +191,8 @@ func ParseApiString(apistr string, codec codec.Codec, xeth *xeth.XEth, stack *no
apis[i] = NewPersonalApi(xeth, eth, codec)
case shared.Web3ApiName:
apis[i] = NewWeb3Api(xeth, codec)
case "rpc": // gives information about the RPC interface
continue
default:
return nil, fmt.Errorf("Unknown API '%s'", name)
}
......
......@@ -69,13 +69,28 @@ func (self *ipcClient) SupportedModules() (map[string]string, error) {
req := shared.Request{
Id: 1,
Jsonrpc: "2.0",
Method: "modules",
Method: "rpc_modules",
}
if err := self.coder.WriteResponse(req); err != nil {
return nil, err
}
res, _ := self.coder.ReadResponse()
if sucRes, ok := res.(*shared.SuccessResponse); ok {
data, _ := json.Marshal(sucRes.Result)
modules := make(map[string]string)
if err := json.Unmarshal(data, &modules); err == nil {
return modules, nil
}
}
// old version uses modules instead of rpc_modules, this can be removed after full migration
req.Method = "modules"
if err := self.coder.WriteResponse(req); err != nil {
return nil, err
}
res, err := self.coder.ReadResponse()
if err != nil {
return nil, err
......@@ -108,6 +123,11 @@ func StartIpc(cfg IpcConfig, codec codec.Codec, initializer InitFunc) error {
return nil
}
// CreateListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe
func CreateListener(cfg IpcConfig) (net.Listener, error) {
return ipcListen(cfg)
}
func ipcLoop(cfg IpcConfig, codec codec.Codec, initializer InitFunc, l net.Listener) {
glog.V(logger.Info).Infof("IPC service started (%s)\n", cfg.Endpoint)
defer os.Remove(cfg.Endpoint)
......
......@@ -54,6 +54,78 @@ func (self *Jeth) err(call otto.FunctionCall, code int, msg string, id interface
return res
}
// UnlockAccount asks the user for the password and than executes the jeth.UnlockAccount callback in the jsre
func (self *Jeth) UnlockAccount(call otto.FunctionCall) (response otto.Value) {
var cmd, account, passwd string
timeout := int64(300)
var ok bool
if len(call.ArgumentList) == 0 {
fmt.Println("expected address of account to unlock")
return otto.FalseValue()
}
if len(call.ArgumentList) >= 1 {
if accountExport, err := call.Argument(0).Export(); err == nil {
if account, ok = accountExport.(string); ok {
if len(call.ArgumentList) == 1 {
fmt.Printf("Unlock account %s\n", account)
passwd, err = utils.PromptPassword("Passphrase: ", true)
if err != nil {
return otto.FalseValue()
}
}
}
}
}
if len(call.ArgumentList) >= 2 {
if passwdExport, err := call.Argument(1).Export(); err == nil {
passwd, _ = passwdExport.(string)
}
}
if len(call.ArgumentList) >= 3 {
if timeoutExport, err := call.Argument(2).Export(); err == nil {
timeout, _ = timeoutExport.(int64)
}
}
cmd = fmt.Sprintf("jeth.unlockAccount('%s', '%s', %d)", account, passwd, timeout)
if val, err := call.Otto.Run(cmd); err == nil {
return val
}
return otto.FalseValue()
}
// NewAccount asks the user for the password and than executes the jeth.newAccount callback in the jsre
func (self *Jeth) NewAccount(call otto.FunctionCall) (response otto.Value) {
if len(call.ArgumentList) == 0 {
passwd, err := utils.PromptPassword("Passphrase: ", true)
if err != nil {
return otto.FalseValue()
}
passwd2, err := utils.PromptPassword("Repeat passphrase: ", true)
if err != nil {
return otto.FalseValue()
}
if passwd != passwd2 {
fmt.Println("Passphrases don't match")
return otto.FalseValue()
}
cmd := fmt.Sprintf("jeth.newAccount('%s')", passwd)
if val, err := call.Otto.Run(cmd); err == nil {
return val
}
} else {
fmt.Println("New account doesn't expect argument(s), you will be prompted for a password")
}
return otto.FalseValue()
}
func (self *Jeth) Send(call otto.FunctionCall) (response otto.Value) {
reqif, err := call.Argument(0).Export()
if err != nil {
......
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
/*
Package rpc provides access to the exported methods of an object across a network
or other I/O connection. After creating a server instance objects can be registered,
making it visible from the outside. Exported methods that follow specific
conventions can be called remotely. It also has support for the publish/subscribe
pattern.
Methods that satisfy the following criteria are made available for remote access:
- object must be exported
- method must be exported
- method returns 0, 1 (response or error) or 2 (response and error) values
- method argument(s) must be exported or builtin types
- method returned value(s) must be exported or builtin types
An example method:
func (s *CalcService) Div(a, b int) (int, error)
When the returned error isn't nil the returned integer is ignored and the error is
send back to the client. Otherwise the returned integer is send back to the client.
The server offers the ServeCodec method which accepts a ServerCodec instance. It will
read requests from the codec, process the request and sends the response back to the
client using the codec. The server can execute requests concurrently. Responses
can be send back to the client out of order.
An example server which uses the JSON codec:
type CalculatorService struct {}
func (s *CalculatorService) Add(a, b int) int {
return a + b
}
func (s *CalculatorService Div(a, b int) (int, error) {
if b == 0 {
return 0, errors.New("divide by zero")
}
return a/b, nil
}
calculator := new(CalculatorService)
server := NewServer()
server.RegisterName("calculator", calculator")
l, _ := net.ListenUnix("unix", &net.UnixAddr{Net: "unix", Name: "/tmp/calculator.sock"})
for {
c, _ := l.AcceptUnix()
codec := v2.NewJSONCodec(c)
go server.ServeCodec(codec)
}
The package also supports the publish subscribe pattern through the use of subscriptions.
A method that is considered eligible for notifications must satisfy the following criteria:
- object must be exported
- method must be exported
- method argument(s) must be exported or builtin types
- method must return the tuple Subscription, error
An example method:
func (s *BlockChainService) Head() (Subscription, error) {
sub := s.bc.eventMux.Subscribe(ChainHeadEvent{})
return v2.NewSubscription(sub), nil
}
This method will push all raised ChainHeadEvents to subscribed clients. If the client is only
interested in every N'th block it is possible to add a criteria.
func (s *BlockChainService) HeadFiltered(nth uint64) (Subscription, error) {
sub := s.bc.eventMux.Subscribe(ChainHeadEvent{})
criteria := func(event interface{}) bool {
chainHeadEvent := event.(ChainHeadEvent)
if chainHeadEvent.Block.NumberU64() % nth == 0 {
return true
}
return false
}
return v2.NewSubscriptionFiltered(sub, criteria), nil
}
Subscriptions are deleted when:
- the user sends an unsubscribe request
- the connection which was used to create the subscription is closed
*/
package v2
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package v2
import "fmt"
// request is for an unknown service
type methodNotFoundError struct {
service string
method string
}
func (e *methodNotFoundError) Code() int {
return -32601
}
func (e *methodNotFoundError) Error() string {
return fmt.Sprintf("The method %s%s%s does not exist/is not available", e.service, serviceMethodSeparator, e.method)
}
// received message isn't a valid request
type invalidRequestError struct {
message string
}
func (e *invalidRequestError) Code() int {
return -32600
}
func (e *invalidRequestError) Error() string {
return e.message
}
// received message is invalid
type invalidMessageError struct {
message string
}
func (e *invalidMessageError) Code() int {
return -32700
}
func (e *invalidMessageError) Error() string {
return e.message
}
// unable to decode supplied params, or an invalid number of parameters
type invalidParamsError struct {
message string
}
func (e *invalidParamsError) Code() int {
return -32602
}
func (e *invalidParamsError) Error() string {
return e.message
}
// logic error, callback returned an error
type callbackError struct {
message string
}
func (e *callbackError) Code() int {
return -32000
}
func (e *callbackError) Error() string {
return e.message
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package v2
import (
"encoding/json"
"fmt"
"io"
"reflect"
"strings"
"sync/atomic"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
const (
jsonRPCVersion = "2.0"
serviceMethodSeparator = "_"
subscribeMethod = "eth_subscribe"
unsubscribeMethod = "eth_unsubscribe"
notificationMethod = "eth_subscription"
)
// JSON-RPC request
type jsonRequest struct {
Method string `json:"method"`
Version string `json:"jsonrpc"`
Id *int64 `json:"id,omitempty"`
Payload json.RawMessage `json:"params"`
}
// JSON-RPC response
type jsonSuccessResponse struct {
Version string `json:"jsonrpc"`
Id int64 `json:"id"`
Result interface{} `json:"result,omitempty"`
}
// JSON-RPC error object
type jsonError struct {
Code int `json:"code"`
Message string `json:"message"`
Data interface{} `json:"data,omitempty"`
}
// JSON-RPC error response
type jsonErrResponse struct {
Version string `json:"jsonrpc"`
Id *int64 `json:"id,omitempty"`
Error jsonError `json:"error"`
}
// JSON-RPC notification payload
type jsonSubscription struct {
Subscription string `json:"subscription"`
Result interface{} `json:"result,omitempty"`
}
// JSON-RPC notification
type jsonNotification struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Params jsonSubscription `json:"params"`
}
// jsonCodec reads and writes JSON-RPC messages to the underlying connection. It also has support for parsing arguments
// and serializing (result) objects.
type jsonCodec struct {
closed chan interface{}
isClosed int32
d *json.Decoder
e *json.Encoder
req jsonRequest
rw io.ReadWriteCloser
}
// NewJSONCodec creates a new RPC server codec with support for JSON-RPC 2.0
func NewJSONCodec(rwc io.ReadWriteCloser) ServerCodec {
d := json.NewDecoder(rwc)
d.UseNumber()
return &jsonCodec{closed: make(chan interface{}), d: d, e: json.NewEncoder(rwc), rw: rwc, isClosed: 0}
}
// isBatch returns true when the first non-whitespace characters is '['
func isBatch(msg json.RawMessage) bool {
for _, c := range msg {
// skip insignificant whitespace (http://www.ietf.org/rfc/rfc4627.txt)
if c == 0x20 || c == 0x09 || c == 0x0a || c == 0x0d {
continue
}
return c == '['
}
return false
}
// ReadRequestHeaders will read new requests without parsing the arguments. It will return a collection of requests, an
// indication if these requests are in batch form or an error when the incoming message could not be read/parsed.
func (c *jsonCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
var incomingMsg json.RawMessage
if err := c.d.Decode(&incomingMsg); err != nil {
return nil, false, &invalidRequestError{err.Error()}
}
if isBatch(incomingMsg) {
return parseBatchRequest(incomingMsg)
}
return parseRequest(incomingMsg)
}
// parseRequest will parse a single request from the given RawMessage. It will return the parsed request, an indication
// if the request was a batch or an error when the request could not be parsed.
func parseRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
var in jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
if in.Id == nil {
return nil, false, &invalidMessageError{"Server cannot handle notifications"}
}
// subscribe are special, they will always use `subscribeMethod` as service method
if in.Method == subscribeMethod {
reqs := []rpcRequest{rpcRequest{id: *in.Id, isPubSub: true}}
if len(in.Payload) > 0 {
// first param must be subscription name
var subscribeMethod [1]string
if err := json.Unmarshal(in.Payload, &subscribeMethod); err != nil {
glog.V(logger.Debug).Infof("Unable to parse subscription method: %v\n", err)
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}
// all subscriptions are made on the eth service
reqs[0].service, reqs[0].method = "eth", subscribeMethod[0]
reqs[0].params = in.Payload
return reqs, false, nil
}
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}
if in.Method == unsubscribeMethod {
return []rpcRequest{rpcRequest{id: *in.Id, isPubSub: true,
method: unsubscribeMethod, params: in.Payload}}, false, nil
}
// regular RPC call
elems := strings.Split(in.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, false, &methodNotFoundError{in.Method, ""}
}
if len(in.Payload) == 0 {
return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: *in.Id}}, false, nil
}
return []rpcRequest{rpcRequest{service: elems[0], method: elems[1], id: *in.Id, params: in.Payload}}, false, nil
}
// parseBatchRequest will parse a batch request into a collection of requests from the given RawMessage, an indication
// if the request was a batch or an error when the request could not be read.
func parseBatchRequest(incomingMsg json.RawMessage) ([]rpcRequest, bool, RPCError) {
var in []jsonRequest
if err := json.Unmarshal(incomingMsg, &in); err != nil {
return nil, false, &invalidMessageError{err.Error()}
}
requests := make([]rpcRequest, len(in))
for i, r := range in {
if r.Id == nil {
return nil, true, &invalidMessageError{"Server cannot handle notifications"}
}
// (un)subscribe are special, they will always use the same service.method
if r.Method == subscribeMethod {
requests[i] = rpcRequest{id: *r.Id, isPubSub: true}
if len(r.Payload) > 0 {
var subscribeMethod [1]string
if err := json.Unmarshal(r.Payload, &subscribeMethod); err != nil {
glog.V(logger.Debug).Infof("Unable to parse subscription method: %v\n", err)
return nil, false, &invalidRequestError{"Unable to parse subscription request"}
}
// all subscriptions are made on the eth service
requests[i].service, requests[i].method = "eth", subscribeMethod[0]
requests[i].params = r.Payload
continue
}
return nil, true, &invalidRequestError{"Unable to parse (un)subscribe request arguments"}
}
if r.Method == unsubscribeMethod {
requests[i] = rpcRequest{id: *r.Id, isPubSub: true, method: unsubscribeMethod, params: r.Payload}
continue
}
elems := strings.Split(r.Method, serviceMethodSeparator)
if len(elems) != 2 {
return nil, true, &methodNotFoundError{r.Method, ""}
}
if len(r.Payload) == 0 {
requests[i] = rpcRequest{service: elems[0], method: elems[1], id: *r.Id, params: nil}
} else {
requests[i] = rpcRequest{service: elems[0], method: elems[1], id: *r.Id, params: r.Payload}
}
}
return requests, true, nil
}
// ParseRequestArguments tries to parse the given params (json.RawMessage) with the given types. It returns the parsed
// values or an error when the parsing failed.
func (c *jsonCodec) ParseRequestArguments(argTypes []reflect.Type, params interface{}) ([]reflect.Value, RPCError) {
if args, ok := params.(json.RawMessage); !ok {
return nil, &invalidParamsError{"Invalid params supplied"}
} else {
return parsePositionalArguments(args, argTypes)
}
}
func countArguments(args json.RawMessage) (int, error) {
var cnt []interface{}
if err := json.Unmarshal(args, &cnt); err != nil {
return -1, nil
}
return len(cnt), nil
}
// parsePositionalArguments tries to parse the given args to an array of values with the given types. It returns the
// parsed values or an error when the args could not be parsed.
func parsePositionalArguments(args json.RawMessage, argTypes []reflect.Type) ([]reflect.Value, RPCError) {
argValues := make([]reflect.Value, len(argTypes))
params := make([]interface{}, len(argTypes))
n, err := countArguments(args)
if err != nil {
return nil, &invalidParamsError{err.Error()}
}
if n != len(argTypes) {
return nil, &invalidParamsError{fmt.Sprintf("insufficient params, want %d have %d", len(argTypes), n)}
}
for i, t := range argTypes {
if t.Kind() == reflect.Ptr {
// values must be pointers for the Unmarshal method, reflect.
// Dereference otherwise reflect.New would create **SomeType
argValues[i] = reflect.New(t.Elem())
params[i] = argValues[i].Interface()
// when not specified blockNumbers are by default latest (-1)
if blockNumber, ok := params[i].(*BlockNumber); ok {
*blockNumber = BlockNumber(-1)
}
} else {
argValues[i] = reflect.New(t)
params[i] = argValues[i].Interface()
// when not specified blockNumbers are by default latest (-1)
if blockNumber, ok := params[i].(*BlockNumber); ok {
*blockNumber = BlockNumber(-1)
}
}
}
if err := json.Unmarshal(args, &params); err != nil {
return nil, &invalidParamsError{err.Error()}
}
// Convert pointers back to values where necessary
for i, a := range argValues {
if a.Kind() != argTypes[i].Kind() {
argValues[i] = reflect.Indirect(argValues[i])
}
}
return argValues, nil
}
// CreateResponse will create a JSON-RPC success response with the given id and reply as result.
func (c *jsonCodec) CreateResponse(id int64, reply interface{}) interface{} {
if isHexNum(reflect.TypeOf(reply)) {
return &jsonSuccessResponse{Version: jsonRPCVersion, Id: id, Result: fmt.Sprintf(`%#x`, reply)}
}
return &jsonSuccessResponse{Version: jsonRPCVersion, Id: id, Result: reply}
}
// CreateErrorResponse will create a JSON-RPC error response with the given id and error.
func (c *jsonCodec) CreateErrorResponse(id *int64, err RPCError) interface{} {
return &jsonErrResponse{Version: jsonRPCVersion, Id: id, Error: jsonError{Code: err.Code(), Message: err.Error()}}
}
// CreateErrorResponseWithInfo will create a JSON-RPC error response with the given id and error.
// info is optional and contains additional information about the error. When an empty string is passed it is ignored.
func (c *jsonCodec) CreateErrorResponseWithInfo(id *int64, err RPCError, info interface{}) interface{} {
return &jsonErrResponse{Version: jsonRPCVersion, Id: id,
Error: jsonError{Code: err.Code(), Message: err.Error(), Data: info}}
}
// CreateNotification will create a JSON-RPC notification with the given subscription id and event as params.
func (c *jsonCodec) CreateNotification(subid string, event interface{}) interface{} {
if isHexNum(reflect.TypeOf(event)) {
return &jsonNotification{Version: jsonRPCVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: fmt.Sprintf(`%#x`, event)}}
}
return &jsonNotification{Version: jsonRPCVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: event}}
}
// Write message to client
func (c *jsonCodec) Write(res interface{}) error {
return c.e.Encode(res)
}
// Close the underlying connection
func (c *jsonCodec) Close() {
if atomic.CompareAndSwapInt32(&c.isClosed, 0, 1) {
close(c.closed)
c.rw.Close()
}
}
// Closed returns a channel which will be closed when Close is called
func (c *jsonCodec) Closed() <-chan interface{} {
return c.closed
}
package v2
import (
"bufio"
"bytes"
"reflect"
"testing"
)
type RWC struct {
*bufio.ReadWriter
}
func (rwc *RWC) Close() error {
return nil
}
func TestJSONRequestParsing(t *testing.T) {
server := NewServer()
service := new(Service)
if err := server.RegisterName("calc", service); err != nil {
t.Fatalf("%v", err)
}
req := bytes.NewBufferString(`{"id": 1234, "jsonrpc": "2.0", "method": "calc_add", "params": [11, 22]}`)
var str string
reply := bytes.NewBufferString(str)
rw := &RWC{bufio.NewReadWriter(bufio.NewReader(req), bufio.NewWriter(reply))}
codec := NewJSONCodec(rw)
requests, batch, err := codec.ReadRequestHeaders()
if err != nil {
t.Fatalf("%v", err)
}
if batch {
t.Fatalf("Request isn't a batch")
}
if len(requests) != 1 {
t.Fatalf("Expected 1 request but got %d requests - %v", len(requests), requests)
}
if requests[0].service != "calc" {
t.Fatalf("Expected service 'calc' but got '%s'", requests[0].service)
}
if requests[0].method != "add" {
t.Fatalf("Expected method 'Add' but got '%s'", requests[0].method)
}
if requests[0].id != 1234 {
t.Fatalf("Expected id 1234 but got %d", requests[0].id)
}
var arg int
args := []reflect.Type{reflect.TypeOf(arg), reflect.TypeOf(arg)}
v, err := codec.ParseRequestArguments(args, requests[0].params)
if err != nil {
t.Fatalf("%v", err)
}
if len(v) != 2 {
t.Fatalf("Expected 2 argument values, got %d", len(v))
}
if v[0].Int() != 11 || v[1].Int() != 22 {
t.Fatalf("expected %d == 11 && %d == 22", v[0].Int(), v[1].Int())
}
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package v2
import (
"fmt"
"reflect"
"runtime"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
// NewServer will create a new server instance with no registered handlers.
func NewServer() *Server {
server := &Server{services: make(serviceRegistry), subscriptions: make(subscriptionRegistry)}
// register a default service which will provide meta information about the RPC service such as the services and
// methods it offers.
rpcService := &RPCService{server}
server.RegisterName("rpc", rpcService)
return server
}
// RPCService gives meta information about the server.
// e.g. gives information about the loaded modules.
type RPCService struct {
server *Server
}
// Modules returns the list of RPC services with their version number
func (s *RPCService) Modules() map[string]string {
modules := make(map[string]string)
for name, _ := range s.server.services {
modules[name] = "1.0"
}
return modules
}
// RegisterName will create an service for the given rcvr type under the given name. When no methods on the given rcvr
// match the criteria to be either a RPC method or a subscription an error is returned. Otherwise a new service is
// created and added to the service collection this server instance serves.
func (s *Server) RegisterName(name string, rcvr interface{}) error {
if s.services == nil {
s.services = make(serviceRegistry)
}
svc := new(service)
svc.typ = reflect.TypeOf(rcvr)
rcvrVal := reflect.ValueOf(rcvr)
if name == "" {
return fmt.Errorf("no service name for type %s", svc.typ.String())
}
if !isExported(reflect.Indirect(rcvrVal).Type().Name()) {
return fmt.Errorf("%s is not exported", reflect.Indirect(rcvrVal).Type().Name())
}
// already a previous service register under given sname, merge methods/subscriptions
if regsvc, present := s.services[name]; present {
methods, subscriptions := suitableCallbacks(rcvrVal, svc.typ)
if len(methods) == 0 && len(subscriptions) == 0 {
return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose")
}
for _, m := range methods {
regsvc.callbacks[formatName(m.method.Name)] = m
}
for _, s := range subscriptions {
regsvc.subscriptions[formatName(s.method.Name)] = s
}
return nil
}
svc.name = name
svc.callbacks, svc.subscriptions = suitableCallbacks(rcvrVal, svc.typ)
if len(svc.callbacks) == 0 && len(svc.subscriptions) == 0 {
return fmt.Errorf("Service doesn't have any suitable methods/subscriptions to expose")
}
s.services[svc.name] = svc
return nil
}
// ServeCodec reads incoming requests from codec, calls the appropriate callback and writes the
// response back using the given codec. It will block until the codec is closed.
//
// This server will:
// 1. allow for asynchronous and parallel request execution
// 2. supports notifications (pub/sub)
// 3. supports request batches
func (s *Server) ServeCodec(codec ServerCodec) {
defer func() {
if err := recover(); err != nil {
const size = 64 << 10
buf := make([]byte, size)
buf = buf[:runtime.Stack(buf, false)]
glog.Errorln(string(buf))
}
codec.Close()
}()
for {
reqs, batch, err := s.readRequest(codec)
if err != nil {
glog.V(logger.Debug).Infof("%v\n", err)
codec.Write(codec.CreateErrorResponse(nil, err))
break
}
if batch {
go s.execBatch(codec, reqs)
} else {
go s.exec(codec, reqs[0])
}
}
}
// sendNotification will create a notification from the given event by serializing member fields of the event.
// It will then send the notification to the client, when it fails the codec is closed. When the event has multiple
// fields an array of values is returned.
func sendNotification(codec ServerCodec, subid string, event interface{}) {
notification := codec.CreateNotification(subid, event)
if err := codec.Write(notification); err != nil {
codec.Close()
}
}
// createSubscription will register a new subscription and waits for raised events. When an event is raised it will:
// 1. test if the event is raised matches the criteria the user has (optionally) specified
// 2. create a notification of the event and send it the client when it matches the criteria
// It will unsubscribe the subscription when the socket is closed or the subscription is unsubscribed by the user.
func (s *Server) createSubscription(c ServerCodec, req *serverRequest) (string, error) {
args := []reflect.Value{req.callb.rcvr}
if len(req.args) > 0 {
args = append(args, req.args...)
}
subid, err := newSubscriptionId()
if err != nil {
return "", err
}
reply := req.callb.method.Func.Call(args)
if reply[1].IsNil() { // no error
if subscription, ok := reply[0].Interface().(Subscription); ok {
s.muSubcriptions.Lock()
s.subscriptions[subid] = subscription
s.muSubcriptions.Unlock()
go func() {
cases := []reflect.SelectCase{
reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(subscription.Chan())}, // new event
reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c.Closed())}, // connection closed
}
for {
idx, notification, recvOk := reflect.Select(cases)
switch idx {
case 0: // new event, or channel closed
if recvOk { // send notification
if event, ok := notification.Interface().(*event.Event); ok {
if subscription.match == nil || subscription.match(event.Data) {
sendNotification(c, subid, subscription.format(event.Data))
}
}
} else { // user send an eth_unsubscribe request
return
}
case 1: // connection closed
s.unsubscribe(subid)
return
}
}
}()
} else { // unable to create subscription
s.muSubcriptions.Lock()
delete(s.subscriptions, subid)
s.muSubcriptions.Unlock()
}
} else {
return "", fmt.Errorf("Unable to create subscription")
}
return subid, nil
}
// unsubscribe calls the Unsubscribe method on the subscription and removes a subscription from the subscription
// registry.
func (s *Server) unsubscribe(subid string) bool {
s.muSubcriptions.Lock()
defer s.muSubcriptions.Unlock()
if sub, ok := s.subscriptions[subid]; ok {
sub.Unsubscribe()
delete(s.subscriptions, subid)
return true
}
return false
}
// handle executes a request and returns the response from the callback.
func (s *Server) handle(codec ServerCodec, req *serverRequest) interface{} {
if req.err != nil {
return codec.CreateErrorResponse(&req.id, req.err)
}
if req.isUnsubscribe { // first param must be the subscription id
if len(req.args) >= 1 && req.args[0].Kind() == reflect.String {
subid := req.args[0].String()
if s.unsubscribe(subid) {
return codec.CreateResponse(req.id, true)
} else {
return codec.CreateErrorResponse(&req.id,
&callbackError{fmt.Sprintf("subscription '%s' not found", subid)})
}
}
return codec.CreateErrorResponse(&req.id, &invalidParamsError{"Expected subscription id as argument"})
}
if req.callb.isSubscribe {
subid, err := s.createSubscription(codec, req)
if err != nil {
return codec.CreateErrorResponse(&req.id, &callbackError{err.Error()})
}
return codec.CreateResponse(req.id, subid)
}
// regular RPC call
if len(req.args) != len(req.callb.argTypes) {
rpcErr := &invalidParamsError{fmt.Sprintf("%s%s%s expects %d parameters, got %d",
req.svcname, serviceMethodSeparator, req.callb.method.Name,
len(req.callb.argTypes), len(req.args))}
return codec.CreateErrorResponse(&req.id, rpcErr)
}
arguments := []reflect.Value{req.callb.rcvr}
if len(req.args) > 0 {
arguments = append(arguments, req.args...)
}
reply := req.callb.method.Func.Call(arguments)
if len(reply) == 0 {
return codec.CreateResponse(req.id, nil)
}
if req.callb.errPos >= 0 { // test if method returned an error
if !reply[req.callb.errPos].IsNil() {
e := reply[req.callb.errPos].Interface().(error)
res := codec.CreateErrorResponse(&req.id, &callbackError{e.Error()})
return res
}
}
return codec.CreateResponse(req.id, reply[0].Interface())
}
// exec executes the given request and writes the result back using the codec.
func (s *Server) exec(codec ServerCodec, req *serverRequest) {
var response interface{}
if req.err != nil {
response = codec.CreateErrorResponse(&req.id, req.err)
} else {
response = s.handle(codec, req)
}
if err := codec.Write(response); err != nil {
glog.V(logger.Error).Infof("%v\n", err)
codec.Close()
}
}
// execBatch executes the given requests and writes the result back using the codec. It will only write the response
// back when the last request is processed.
func (s *Server) execBatch(codec ServerCodec, requests []*serverRequest) {
responses := make([]interface{}, len(requests))
for i, req := range requests {
if req.err != nil {
responses[i] = codec.CreateErrorResponse(&req.id, req.err)
} else {
responses[i] = s.handle(codec, req)
}
}
if err := codec.Write(responses); err != nil {
glog.V(logger.Error).Infof("%v\n", err)
codec.Close()
}
}
// readRequest requests the next (batch) request from the codec. It will return the collection of requests, an
// indication if the request was a batch, the invalid request identifier and an error when the request could not be
// read/parsed.
func (s *Server) readRequest(codec ServerCodec) ([]*serverRequest, bool, RPCError) {
reqs, batch, err := codec.ReadRequestHeaders()
if err != nil {
return nil, batch, err
}
requests := make([]*serverRequest, len(reqs))
// verify requests
for i, r := range reqs {
var ok bool
var svc *service
if r.isPubSub && r.method == unsubscribeMethod {
requests[i] = &serverRequest{id: r.id, isUnsubscribe: true}
argTypes := []reflect.Type{reflect.TypeOf("")}
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
requests[i].args = args
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
continue
}
if svc, ok = s.services[r.service]; !ok {
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
continue
}
if r.isPubSub { // eth_subscribe
if callb, ok := svc.subscriptions[r.method]; ok {
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 {
argTypes := []reflect.Type{reflect.TypeOf("")}
argTypes = append(argTypes, callb.argTypes...)
if args, err := codec.ParseRequestArguments(argTypes, r.params); err == nil {
requests[i].args = args[1:] // first one is service.method name which isn't an actual argument
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
}
} else {
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{subscribeMethod, r.method}}
}
continue
}
if callb, ok := svc.callbacks[r.method]; ok {
requests[i] = &serverRequest{id: r.id, svcname: svc.name, callb: callb}
if r.params != nil && len(callb.argTypes) > 0 {
if args, err := codec.ParseRequestArguments(callb.argTypes, r.params); err == nil {
requests[i].args = args
} else {
requests[i].err = &invalidParamsError{err.Error()}
}
}
continue
}
requests[i] = &serverRequest{id: r.id, err: &methodNotFoundError{r.service, r.method}}
}
return requests, batch, nil
}
package v2
import (
"encoding/json"
"fmt"
"reflect"
"testing"
"time"
)
type Service struct{}
type Args struct {
S string
}
func (s *Service) NoArgsRets() {
}
type Result struct {
String string
Int int
Args *Args
}
func (s *Service) Echo(str string, i int, args *Args) Result {
return Result{str, i, args}
}
func (s *Service) Rets() (string, error) {
return "", nil
}
func (s *Service) InvalidRets1() (error, string) {
return nil, ""
}
func (s *Service) InvalidRets2() (string, string) {
return "", ""
}
func (s *Service) InvalidRets3() (string, string, error) {
return "", "", nil
}
func (s *Service) Subscription() (Subscription, error) {
return NewSubscription(nil), nil
}
func TestServerRegisterName(t *testing.T) {
server := NewServer()
service := new(Service)
if err := server.RegisterName("calc", service); err != nil {
t.Fatalf("%v", err)
}
if len(server.services) != 2 {
t.Fatalf("Expected 2 service entries, got %d", len(server.services))
}
svc, ok := server.services["calc"]
if !ok {
t.Fatalf("Expected service calc to be registered")
}
if len(svc.callbacks) != 3 {
t.Errorf("Expected 3 callbacks for service 'calc', got %d", len(svc.callbacks))
}
if len(svc.subscriptions) != 1 {
t.Errorf("Expected 1 subscription for service 'calc', got %d", len(svc.subscriptions))
}
}
// dummy codec used for testing RPC method execution
type ServerTestCodec struct {
counter int
input []byte
output string
closer chan interface{}
}
func (c *ServerTestCodec) ReadRequestHeaders() ([]rpcRequest, bool, RPCError) {
c.counter += 1
if c.counter == 1 {
var req jsonRequest
json.Unmarshal(c.input, &req)
return []rpcRequest{rpcRequest{id: *req.Id, isPubSub: false, service: "test", method: req.Method, params: req.Payload}}, false, nil
}
// requests are executes in parallel, wait a bit before returning an error so that the previous request has time to
// be executed
timer := time.NewTimer(time.Duration(2) * time.Second)
<-timer.C
return nil, false, &invalidRequestError{"connection closed"}
}
func (c *ServerTestCodec) ParseRequestArguments(argTypes []reflect.Type, payload interface{}) ([]reflect.Value, RPCError) {
args, _ := payload.(json.RawMessage)
argValues := make([]reflect.Value, len(argTypes))
params := make([]interface{}, len(argTypes))
n, err := countArguments(args)
if err != nil {
return nil, &invalidParamsError{err.Error()}
}
if n != len(argTypes) {
return nil, &invalidParamsError{fmt.Sprintf("insufficient params, want %d have %d", len(argTypes), n)}
}
for i, t := range argTypes {
if t.Kind() == reflect.Ptr {
// values must be pointers for the Unmarshal method, reflect.
// Dereference otherwise reflect.New would create **SomeType
argValues[i] = reflect.New(t.Elem())
params[i] = argValues[i].Interface()
// when not specified blockNumbers are by default latest (-1)
if blockNumber, ok := params[i].(*BlockNumber); ok {
*blockNumber = BlockNumber(-1)
}
} else {
argValues[i] = reflect.New(t)
params[i] = argValues[i].Interface()
// when not specified blockNumbers are by default latest (-1)
if blockNumber, ok := params[i].(*BlockNumber); ok {
*blockNumber = BlockNumber(-1)
}
}
}
if err := json.Unmarshal(args, &params); err != nil {
return nil, &invalidParamsError{err.Error()}
}
// Convert pointers back to values where necessary
for i, a := range argValues {
if a.Kind() != argTypes[i].Kind() {
argValues[i] = reflect.Indirect(argValues[i])
}
}
return argValues, nil
}
func (c *ServerTestCodec) CreateResponse(id int64, reply interface{}) interface{} {
return &jsonSuccessResponse{Version: jsonRPCVersion, Id: id, Result: reply}
}
func (c *ServerTestCodec) CreateErrorResponse(id *int64, err RPCError) interface{} {
return &jsonErrResponse{Version: jsonRPCVersion, Id: id, Error: jsonError{Code: err.Code(), Message: err.Error()}}
}
func (c *ServerTestCodec) CreateErrorResponseWithInfo(id *int64, err RPCError, info interface{}) interface{} {
return &jsonErrResponse{Version: jsonRPCVersion, Id: id,
Error: jsonError{Code: err.Code(), Message: err.Error(), Data: info}}
}
func (c *ServerTestCodec) CreateNotification(subid string, event interface{}) interface{} {
return &jsonNotification{Version: jsonRPCVersion, Method: notificationMethod,
Params: jsonSubscription{Subscription: subid, Result: event}}
}
func (c *ServerTestCodec) Write(msg interface{}) error {
if len(c.output) == 0 { // only capture first response
if o, err := json.Marshal(msg); err != nil {
return err
} else {
c.output = string(o)
}
}
return nil
}
func (c *ServerTestCodec) Close() {
close(c.closer)
}
func (c *ServerTestCodec) Closed() <-chan interface{} {
return c.closer
}
func TestServerMethodExecution(t *testing.T) {
server := NewServer()
service := new(Service)
if err := server.RegisterName("test", service); err != nil {
t.Fatalf("%v", err)
}
id := int64(12345)
req := jsonRequest{
Method: "echo",
Version: "2.0",
Id: &id,
}
args := []interface{}{"string arg", 1122, &Args{"qwerty"}}
req.Payload, _ = json.Marshal(&args)
input, _ := json.Marshal(&req)
codec := &ServerTestCodec{input: input, closer: make(chan interface{})}
go server.ServeCodec(codec)
<-codec.closer
expected := `{"jsonrpc":"2.0","id":12345,"result":{"String":"string arg","Int":1122,"Args":{"S":"qwerty"}}}`
if expected != codec.output {
t.Fatalf("expected %s, got %s\n", expected, codec.output)
}
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package v2
import (
"fmt"
"math"
"math/big"
"reflect"
"strings"
"sync"
"github.com/ethereum/go-ethereum/event"
)
// API describes the set of methods offered over the RPC interface
type API struct {
Namespace string // namespace under which the rpc methods of Service are exposed
Version string // api version for DApp's
Service interface{} // receiver instance which holds the methods
Public bool // indication if the methods must be considered safe for public use
}
// callback is a method callback which was registered in the server
type callback struct {
rcvr reflect.Value // receiver of method
method reflect.Method // callback
argTypes []reflect.Type // input argument types
errPos int // err return idx, of -1 when method cannot return error
isSubscribe bool // indication if the callback is a subscription
}
// service represents a registered object
type service struct {
name string // name for service
rcvr reflect.Value // receiver of methods for the service
typ reflect.Type // receiver type
callbacks callbacks // registered handlers
subscriptions subscriptions // available subscriptions/notifications
}
// serverRequest is an incoming request
type serverRequest struct {
id int64
svcname string
rcvr reflect.Value
callb *callback
args []reflect.Value
isUnsubscribe bool
err RPCError
}
type serviceRegistry map[string]*service // collection of services
type callbacks map[string]*callback // collection of RPC callbacks
type subscriptions map[string]*callback // collection of subscription callbacks
type subscriptionRegistry map[string]Subscription // collection of subscriptions
// Server represents a RPC server
type Server struct {
services serviceRegistry
muSubcriptions sync.Mutex // protects subscriptions
subscriptions subscriptionRegistry
}
// rpcRequest represents a raw incoming RPC request
type rpcRequest struct {
service string
method string
id int64
isPubSub bool
params interface{}
}
// RPCError implements RPC error, is add support for error codec over regular go errors
type RPCError interface {
// RPC error code
Code() int
// Error message
Error() string
}
// ServerCodec implements reading, parsing and writing RPC messages for the server side of
// a RPC session. Implementations must be go-routine safe since the codec can be called in
// multiple go-routines concurrently.
type ServerCodec interface {
// Read next request
ReadRequestHeaders() ([]rpcRequest, bool, RPCError)
// Parse request argument to the given types
ParseRequestArguments([]reflect.Type, interface{}) ([]reflect.Value, RPCError)
// Assemble success response
CreateResponse(int64, interface{}) interface{}
// Assemble error response
CreateErrorResponse(*int64, RPCError) interface{}
// Assemble error response with extra information about the error through info
CreateErrorResponseWithInfo(id *int64, err RPCError, info interface{}) interface{}
// Create notification response
CreateNotification(string, interface{}) interface{}
// Write msg to client.
Write(interface{}) error
// Close underlying data stream
Close()
// Closed when underlying connection is closed
Closed() <-chan interface{}
}
// SubscriptionMatcher returns true if the given value matches the criteria specified by the user
type SubscriptionMatcher func(interface{}) bool
// SubscriptionOutputFormat accepts event data and has the ability to format the data before it is send to the client
type SubscriptionOutputFormat func(interface{}) interface{}
// defaultSubscriptionOutputFormatter returns data and is used as default output format for notifications
func defaultSubscriptionOutputFormatter(data interface{}) interface{} {
return data
}
// Subscription is used by the server to send notifications to the client
type Subscription struct {
sub event.Subscription
match SubscriptionMatcher
format SubscriptionOutputFormat
}
// NewSubscription create a new RPC subscription
func NewSubscription(sub event.Subscription) Subscription {
return Subscription{sub, nil, defaultSubscriptionOutputFormatter}
}
// NewSubscriptionWithOutputFormat create a new RPC subscription which a custom notification output format
func NewSubscriptionWithOutputFormat(sub event.Subscription, formatter SubscriptionOutputFormat) Subscription {
return Subscription{sub, nil, formatter}
}
// NewSubscriptionFiltered will create a new subscription. For each raised event the given matcher is
// called. If it returns true the event is send as notification to the client, otherwise it is ignored.
func NewSubscriptionFiltered(sub event.Subscription, match SubscriptionMatcher) Subscription {
return Subscription{sub, match, defaultSubscriptionOutputFormatter}
}
// Chan returns the channel where new events will be published. It's up the user to call the matcher to
// determine if the events are interesting for the client.
func (s *Subscription) Chan() <-chan *event.Event {
return s.sub.Chan()
}
// Unsubscribe will end the subscription and closes the event channel
func (s *Subscription) Unsubscribe() {
s.sub.Unsubscribe()
}
// HexNumber serializes a number to hex format using the "%#x" format
type HexNumber big.Int
// NewHexNumber creates a new hex number instance which will serialize the given val with `%#x` on marshal.
func NewHexNumber(val interface{}) *HexNumber {
if val == nil {
return nil
}
if v, ok := val.(*big.Int); ok && v != nil {
hn := new(big.Int).Set(v)
return (*HexNumber)(hn)
}
rval := reflect.ValueOf(val)
var unsigned uint64
utype := reflect.TypeOf(unsigned)
if t := rval.Type(); t.ConvertibleTo(utype) {
hn := new(big.Int).SetUint64(rval.Convert(utype).Uint())
return (*HexNumber)(hn)
}
var signed int64
stype := reflect.TypeOf(signed)
if t := rval.Type(); t.ConvertibleTo(stype) {
hn := new(big.Int).SetInt64(rval.Convert(stype).Int())
return (*HexNumber)(hn)
}
return nil
}
func (h *HexNumber) UnmarshalJSON(input []byte) error {
length := len(input)
if length >= 2 && input[0] == '"' && input[length-1] == '"' {
input = input[1 : length-1]
}
hn := (*big.Int)(h)
if _, ok := hn.SetString(string(input), 0); ok {
return nil
}
return fmt.Errorf("Unable to parse number")
}
// MarshalJSON serialize the hex number instance to a hex representation.
func (h *HexNumber) MarshalJSON() ([]byte, error) {
if h != nil {
hn := (*big.Int)(h)
if hn.BitLen() == 0 {
return []byte(`"0x0"`), nil
}
return []byte(fmt.Sprintf(`"0x%x"`, hn)), nil
}
return nil, nil
}
func (h *HexNumber) Int() int {
hn := (*big.Int)(h)
return int(hn.Int64())
}
func (h *HexNumber) Int64() int64 {
hn := (*big.Int)(h)
return hn.Int64()
}
func (h *HexNumber) Uint() uint {
hn := (*big.Int)(h)
return uint(hn.Uint64())
}
func (h *HexNumber) Uint64() uint64 {
hn := (*big.Int)(h)
return hn.Uint64()
}
func (h *HexNumber) BigInt() *big.Int {
return (*big.Int)(h)
}
type Number int64
func (n *Number) UnmarshalJSON(data []byte) error {
input := strings.TrimSpace(string(data))
if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' {
input = input[1 : len(input)-1]
}
if len(input) == 0 {
*n = Number(latestBlockNumber.Int64())
return nil
}
in := new(big.Int)
_, ok := in.SetString(input, 0)
if !ok { // test if user supplied string tag
return fmt.Errorf(`invalid number %s`, data)
}
if in.Cmp(earliestBlockNumber) >= 0 && in.Cmp(maxBlockNumber) <= 0 {
*n = Number(in.Int64())
return nil
}
return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber)
}
func (n *Number) Int64() int64 {
return *(*int64)(n)
}
func (n *Number) BigInt() *big.Int {
return big.NewInt(n.Int64())
}
var (
pendingBlockNumber = big.NewInt(-2)
latestBlockNumber = big.NewInt(-1)
earliestBlockNumber = big.NewInt(0)
maxBlockNumber = big.NewInt(math.MaxInt64)
)
type BlockNumber int64
const (
PendingBlockNumber = BlockNumber(-2)
LatestBlockNumber = BlockNumber(-1)
)
// UnmarshalJSON parses the given JSON fragement into a BlockNumber. It supports:
// - "latest" or "earliest" as string arguments
// - the block number
// Returned errors:
// - an unsupported error when "pending" is specified (not yet implemented)
// - an invalid block number error when the given argument isn't a known strings
// - an out of range error when the given block number is either too little or too large
func (bn *BlockNumber) UnmarshalJSON(data []byte) error {
input := strings.TrimSpace(string(data))
if len(input) >= 2 && input[0] == '"' && input[len(input)-1] == '"' {
input = input[1 : len(input)-1]
}
if len(input) == 0 {
*bn = BlockNumber(latestBlockNumber.Int64())
return nil
}
in := new(big.Int)
_, ok := in.SetString(input, 0)
if !ok { // test if user supplied string tag
strBlockNumber := input
if strBlockNumber == "latest" {
*bn = BlockNumber(latestBlockNumber.Int64())
return nil
}
if strBlockNumber == "earliest" {
*bn = BlockNumber(earliestBlockNumber.Int64())
return nil
}
if strBlockNumber == "pending" {
*bn = BlockNumber(pendingBlockNumber.Int64())
return nil
}
return fmt.Errorf(`invalid blocknumber %s`, data)
}
if in.Cmp(earliestBlockNumber) >= 0 && in.Cmp(maxBlockNumber) <= 0 {
*bn = BlockNumber(in.Int64())
return nil
}
return fmt.Errorf("blocknumber not in range [%d, %d]", earliestBlockNumber, maxBlockNumber)
}
func (bn *BlockNumber) Int64() int64 {
return (int64)(*bn)
}
package v2
import (
"bytes"
"encoding/json"
"math/big"
"testing"
)
func TestNewHexNumber(t *testing.T) {
tests := []interface{}{big.NewInt(123), int64(123), uint64(123), int8(123), uint8(123)}
for i, v := range tests {
hn := NewHexNumber(v)
if hn == nil {
t.Fatalf("Unable to create hex number instance for tests[%d]", i)
}
if hn.Int64() != 123 {
t.Fatalf("expected %d, got %d on value tests[%d]", 123, hn.Int64(), i)
}
}
failures := []interface{}{"", nil, []byte{1, 2, 3, 4}}
for i, v := range failures {
hn := NewHexNumber(v)
if hn != nil {
t.Fatalf("Creating a nex number instance of %T should fail (failures[%d])", failures[i], i)
}
}
}
func TestHexNumberUnmarshalJSON(t *testing.T) {
tests := []string{`"0x4d2"`, "1234", `"1234"`}
for i, v := range tests {
var hn HexNumber
if err := json.Unmarshal([]byte(v), &hn); err != nil {
t.Fatalf("Test %d failed - %s", i, err)
}
if hn.Int64() != 1234 {
t.Fatalf("Expected %d, got %d for test[%d]", 1234, hn.Int64(), i)
}
}
}
func TestHexNumberMarshalJSON(t *testing.T) {
hn := NewHexNumber(1234567890)
got, err := json.Marshal(hn)
if err != nil {
t.Fatalf("Unable to marshal hex number - %s", err)
}
exp := []byte(`"0x499602d2"`)
if bytes.Compare(exp, got) != 0 {
t.Fatalf("Invalid json.Marshal, expected '%s', got '%s'", exp, got)
}
}
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package v2
import (
"crypto/rand"
"encoding/hex"
"errors"
"math/big"
"reflect"
"unicode"
"unicode/utf8"
)
// Is this an exported - upper case - name?
func isExported(name string) bool {
rune, _ := utf8.DecodeRuneInString(name)
return unicode.IsUpper(rune)
}
// Is this type exported or a builtin?
func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
// PkgPath will be non-empty even for an exported type,
// so we need to check the type name as well.
return isExported(t.Name()) || t.PkgPath() == ""
}
var errorType = reflect.TypeOf((*error)(nil)).Elem()
// Implements this type the error interface
func isErrorType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.Implements(errorType)
}
var subscriptionType = reflect.TypeOf((*Subscription)(nil)).Elem()
func isSubscriptionType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == subscriptionType
}
// isPubSub tests whether the given method return the pair (v2.Subscription, error)
func isPubSub(methodType reflect.Type) bool {
if methodType.NumOut() != 2 {
return false
}
return isSubscriptionType(methodType.Out(0)) && isErrorType(methodType.Out(1))
}
// formatName will convert to first character to lower case
func formatName(name string) string {
ret := []rune(name)
if len(ret) > 0 {
ret[0] = unicode.ToLower(ret[0])
}
return string(ret)
}
var bigIntType = reflect.TypeOf((*big.Int)(nil)).Elem()
// Indication if this type should be serialized in hex
func isHexNum(t reflect.Type) bool {
if t == nil {
return false
}
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == bigIntType
}
var blockNumberType = reflect.TypeOf((*BlockNumber)(nil)).Elem()
// Indication if the given block is a BlockNumber
func isBlockNumber(t reflect.Type) bool {
if t == nil {
return false
}
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t == blockNumberType
}
// suitableCallbacks iterates over the methods of the given type. It will determine if a method satisfies the criteria
// for a RPC callback or a subscription callback and adds it to the collection of callbacks or subscriptions. See server
// documentation for a summary of these criteria.
func suitableCallbacks(rcvr reflect.Value, typ reflect.Type) (callbacks, subscriptions) {
callbacks := make(callbacks)
subscriptions := make(subscriptions)
METHODS:
for m := 0; m < typ.NumMethod(); m++ {
method := typ.Method(m)
mtype := method.Type
mname := formatName(method.Name)
if method.PkgPath != "" { // method must be exported
continue
}
var h callback
h.isSubscribe = isPubSub(mtype)
h.rcvr = rcvr
h.method = method
h.errPos = -1
if h.isSubscribe {
h.argTypes = make([]reflect.Type, mtype.NumIn()-1) // skip rcvr type
for i := 1; i < mtype.NumIn(); i++ {
argType := mtype.In(i)
if isExportedOrBuiltinType(argType) {
h.argTypes[i-1] = argType
} else {
continue METHODS
}
}
subscriptions[mname] = &h
continue METHODS
}
numIn := mtype.NumIn()
// determine method arguments, ignore first arg since it's the receiver type
// Arguments must be exported or builtin types
h.argTypes = make([]reflect.Type, numIn-1)
for i := 1; i < numIn; i++ {
argType := mtype.In(i)
if !isExportedOrBuiltinType(argType) {
continue METHODS
}
h.argTypes[i-1] = argType
}
// check that all returned values are exported or builtin types
for i := 0; i < mtype.NumOut(); i++ {
if !isExportedOrBuiltinType(mtype.Out(i)) {
continue METHODS
}
}
// when a method returns an error it must be the last returned value
h.errPos = -1
for i := 0; i < mtype.NumOut(); i++ {
if isErrorType(mtype.Out(i)) {
h.errPos = i
break
}
}
if h.errPos >= 0 && h.errPos != mtype.NumOut()-1 {
continue METHODS
}
switch mtype.NumOut() {
case 0, 1:
break
case 2:
if h.errPos == -1 { // method must one return value and 1 error
continue METHODS
}
break
default:
continue METHODS
}
callbacks[mname] = &h
}
return callbacks, subscriptions
}
func newSubscriptionId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate subscription id")
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package whisper
import (
"encoding/json"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
)
// PublicWhisperAPI provides the whisper RPC service.
type PublicWhisperAPI struct {
w *Whisper
messagesMu sync.RWMutex
messages map[int]*whisperFilter
}
type whisperOfflineError struct{}
func (e *whisperOfflineError) Error() string {
return "whisper is offline"
}
// whisperOffLineErr is returned when the node doesn't offer the shh service.
var whisperOffLineErr = new(whisperOfflineError)
// NewPublicWhisperAPI create a new RPC whisper service.
func NewPublicWhisperAPI(w *Whisper) *PublicWhisperAPI {
return &PublicWhisperAPI{w: w, messages: make(map[int]*whisperFilter)}
}
// Version returns the Whisper version this node offers.
func (s *PublicWhisperAPI) Version() (*rpc.HexNumber, error) {
if s.w == nil {
return rpc.NewHexNumber(0), whisperOffLineErr
}
return rpc.NewHexNumber(s.w.Version()), nil
}
// HasIdentity checks if the the whisper node is configured with the private key
// of the specified public pair.
func (s *PublicWhisperAPI) HasIdentity(identity string) (bool, error) {
if s.w == nil {
return false, whisperOffLineErr
}
return s.w.HasIdentity(crypto.ToECDSAPub(common.FromHex(identity))), nil
}
// NewIdentity generates a new cryptographic identity for the client, and injects
// it into the known identities for message decryption.
func (s *PublicWhisperAPI) NewIdentity() (string, error) {
if s.w == nil {
return "", whisperOffLineErr
}
identity := s.w.NewIdentity()
return common.ToHex(crypto.FromECDSAPub(&identity.PublicKey)), nil
}
type NewFilterArgs struct {
To string
From string
Topics [][][]byte
}
// NewWhisperFilter creates and registers a new message filter to watch for inbound whisper messages.
func (s *PublicWhisperAPI) NewFilter(args *NewFilterArgs) (*rpc.HexNumber, error) {
if s.w == nil {
return nil, whisperOffLineErr
}
var id int
filter := Filter{
To: crypto.ToECDSAPub(common.FromHex(args.To)),
From: crypto.ToECDSAPub(common.FromHex(args.From)),
Topics: NewFilterTopics(args.Topics...),
Fn: func(message *Message) {
wmsg := NewWhisperMessage(message)
s.messagesMu.RLock() // Only read lock to the filter pool
defer s.messagesMu.RUnlock()
if s.messages[id] != nil {
s.messages[id].insert(wmsg)
}
},
}
id = s.w.Watch(filter)
s.messagesMu.Lock()
s.messages[id] = newWhisperFilter(id, s.w)
s.messagesMu.Unlock()
return rpc.NewHexNumber(id), nil
}
// GetFilterChanges retrieves all the new messages matched by a filter since the last retrieval.
func (s *PublicWhisperAPI) GetFilterChanges(filterId rpc.HexNumber) []WhisperMessage {
s.messagesMu.RLock()
defer s.messagesMu.RUnlock()
if s.messages[filterId.Int()] != nil {
if changes := s.messages[filterId.Int()].retrieve(); changes != nil {
return changes
}
}
return returnWhisperMessages(nil)
}
// UninstallFilter disables and removes an existing filter.
func (s *PublicWhisperAPI) UninstallFilter(filterId rpc.HexNumber) bool {
s.messagesMu.Lock()
defer s.messagesMu.Unlock()
if _, ok := s.messages[filterId.Int()]; ok {
delete(s.messages, filterId.Int())
return true
}
return false
}
// GetMessages retrieves all the known messages that match a specific filter.
func (s *PublicWhisperAPI) GetMessages(filterId rpc.HexNumber) []WhisperMessage {
// Retrieve all the cached messages matching a specific, existing filter
s.messagesMu.RLock()
defer s.messagesMu.RUnlock()
var messages []*Message
if s.messages[filterId.Int()] != nil {
messages = s.messages[filterId.Int()].messages()
}
return returnWhisperMessages(messages)
}
// returnWhisperMessages converts aNhisper message to a RPC whisper message.
func returnWhisperMessages(messages []*Message) []WhisperMessage {
msgs := make([]WhisperMessage, len(messages))
for i, msg := range messages {
msgs[i] = NewWhisperMessage(msg)
}
return msgs
}
type PostArgs struct {
From string `json:"from"`
To string `json:"to"`
Topics [][]byte `json:"topics"`
Payload string `json:"payload"`
Priority int64 `json:"priority"`
TTL int64 `json:"ttl"`
}
// Post injects a message into the whisper network for distribution.
func (s *PublicWhisperAPI) Post(args *PostArgs) (bool, error) {
if s.w == nil {
return false, whisperOffLineErr
}
// construct whisper message with transmission options
message := NewMessage(common.FromHex(args.Payload))
options := Options{
To: crypto.ToECDSAPub(common.FromHex(args.To)),
TTL: time.Duration(args.TTL) * time.Second,
Topics: NewTopics(args.Topics...),
}
// set sender identity
if len(args.From) > 0 {
if key := s.w.GetIdentity(crypto.ToECDSAPub(common.FromHex(args.From))); key != nil {
options.From = key
} else {
return false, fmt.Errorf("unknown identity to send from: %s", args.From)
}
}
// Wrap and send the message
pow := time.Duration(args.Priority) * time.Millisecond
envelope, err := message.Wrap(pow, options)
if err != nil {
return false, err
}
return true, s.w.Send(envelope)
}
// WhisperMessage is the RPC representation of a whisper message.
type WhisperMessage struct {
ref *Message
Payload string `json:"payload"`
To string `json:"to"`
From string `json:"from"`
Sent int64 `json:"sent"`
TTL int64 `json:"ttl"`
Hash string `json:"hash"`
}
func (args *PostArgs) UnmarshalJSON(data []byte) (err error) {
var obj struct {
From string `json:"from"`
To string `json:"to"`
Topics []string `json:"topics"`
Payload string `json:"payload"`
Priority rpc.HexNumber `json:"priority"`
TTL rpc.HexNumber `json:"ttl"`
}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
args.From = obj.From
args.To = obj.To
args.Payload = obj.Payload
args.Priority = obj.Priority.Int64()
args.TTL = obj.TTL.Int64()
// decode topic strings
args.Topics = make([][]byte, len(obj.Topics))
for i, topic := range obj.Topics {
args.Topics[i] = common.FromHex(topic)
}
return nil
}
// UnmarshalJSON implements the json.Unmarshaler interface, invoked to convert a
// JSON message blob into a WhisperFilterArgs structure.
func (args *NewFilterArgs) UnmarshalJSON(b []byte) (err error) {
// Unmarshal the JSON message and sanity check
var obj struct {
To interface{} `json:"to"`
From interface{} `json:"from"`
Topics interface{} `json:"topics"`
}
if err := json.Unmarshal(b, &obj); err != nil {
return err
}
// Retrieve the simple data contents of the filter arguments
if obj.To == nil {
args.To = ""
} else {
argstr, ok := obj.To.(string)
if !ok {
return fmt.Errorf("to is not a string")
}
args.To = argstr
}
if obj.From == nil {
args.From = ""
} else {
argstr, ok := obj.From.(string)
if !ok {
return fmt.Errorf("from is not a string")
}
args.From = argstr
}
// Construct the nested topic array
if obj.Topics != nil {
// Make sure we have an actual topic array
list, ok := obj.Topics.([]interface{})
if !ok {
return fmt.Errorf("topics is not an array")
}
// Iterate over each topic and handle nil, string or array
topics := make([][]string, len(list))
for idx, field := range list {
switch value := field.(type) {
case nil:
topics[idx] = []string{}
case string:
topics[idx] = []string{value}
case []interface{}:
topics[idx] = make([]string, len(value))
for i, nested := range value {
switch value := nested.(type) {
case nil:
topics[idx][i] = ""
case string:
topics[idx][i] = value
default:
return fmt.Errorf("topic[%d][%d] is not a string", idx, i)
}
}
default:
return fmt.Errorf("topic[%d] not a string or array", idx)
}
}
topicsDecoded := make([][][]byte, len(topics))
for i, condition := range topics {
topicsDecoded[i] = make([][]byte, len(condition))
for j, topic := range condition {
topicsDecoded[i][j] = common.FromHex(topic)
}
}
args.Topics = topicsDecoded
}
return nil
}
// whisperFilter is the message cache matching a specific filter, accumulating
// inbound messages until the are requested by the client.
type whisperFilter struct {
id int // Filter identifier for old message retrieval
ref *Whisper // Whisper reference for old message retrieval
cache []WhisperMessage // Cache of messages not yet polled
skip map[common.Hash]struct{} // List of retrieved messages to avoid duplication
update time.Time // Time of the last message query
lock sync.RWMutex // Lock protecting the filter internals
}
// messages retrieves all the cached messages from the entire pool matching the
// filter, resetting the filter's change buffer.
func (w *whisperFilter) messages() []*Message {
w.lock.Lock()
defer w.lock.Unlock()
w.cache = nil
w.update = time.Now()
w.skip = make(map[common.Hash]struct{})
messages := w.ref.Messages(w.id)
for _, message := range messages {
w.skip[message.Hash] = struct{}{}
}
return messages
}
// insert injects a new batch of messages into the filter cache.
func (w *whisperFilter) insert(messages ...WhisperMessage) {
w.lock.Lock()
defer w.lock.Unlock()
for _, message := range messages {
if _, ok := w.skip[message.ref.Hash]; !ok {
w.cache = append(w.cache, messages...)
}
}
}
// retrieve fetches all the cached messages from the filter.
func (w *whisperFilter) retrieve() (messages []WhisperMessage) {
w.lock.Lock()
defer w.lock.Unlock()
messages, w.cache = w.cache, nil
w.update = time.Now()
return
}
// activity returns the last time instance when client requests were executed on
// the filter.
func (w *whisperFilter) activity() time.Time {
w.lock.RLock()
defer w.lock.RUnlock()
return w.update
}
// newWhisperFilter creates a new serialized, poll based whisper topic filter.
func newWhisperFilter(id int, ref *Whisper) *whisperFilter {
return &whisperFilter{
id: id,
ref: ref,
update: time.Now(),
skip: make(map[common.Hash]struct{}),
}
}
// NewWhisperMessage converts an internal message into an API version.
func NewWhisperMessage(message *Message) WhisperMessage {
return WhisperMessage{
ref: message,
Payload: common.ToHex(message.Payload),
From: common.ToHex(crypto.FromECDSAPub(message.Recover())),
To: common.ToHex(crypto.FromECDSAPub(message.To)),
Sent: message.Sent.Unix(),
TTL: int64(message.TTL / time.Second),
Hash: common.ToHex(message.Hash.Bytes()),
}
}
......@@ -28,6 +28,8 @@ import (
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p"
rpc "github.com/ethereum/go-ethereum/rpc/v2"
"gopkg.in/fatih/set.v0"
)
......@@ -98,6 +100,18 @@ func New() *Whisper {
return whisper
}
// APIs returns the RPC descriptors the Whisper implementation offers
func (s *Whisper) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "shh",
Version: "1.0",
Service: NewPublicWhisperAPI(s),
Public: true,
},
}
}
// Protocols returns the whisper sub-protocols ran by this particular client.
func (self *Whisper) Protocols() []p2p.Protocol {
return []p2p.Protocol{self.protocol}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册