提交 b179b261 编写于 作者: W Wang Huanyu 提交者: wanghuanyu3

fix: eth database write error for pg.

上级 09ad62fb
......@@ -255,6 +255,66 @@ max_idle_conns = 100
sql_log_file = /var/log/bds-splitter/ltc-sql.log
debug = false
# =============================== eth ==================================
[eth]
#是否开启 eth 数据splitter
enable = true
#是否开启数据库
database_enable = true
#数据库worker缓存大小
database_worker_buffer = 8192
#数据库worker数量
database_worker_number = 1
#一次请求区块链数据的最大块数,400000块之前可以设置大一些,比如300
max_batch_block = 30
#ltc 全节点的地址
endpoint = http://[eth 全节点的ip/域名]:[eth 全节点运行端口]
#运行 eth 全节点设置的用户名
user = [rpc 访问账号]
#运行 eth 全节点设置的密码
password = [rpc 访问密码]
#eth 数据校验规则文件地址
json_schema_file = /etc/bds-splitter/schema/eth.json
#eth 数据校验是否开启
json_schema_validation_enable = false
#eth 定时任务配置
[cron.eth]
update_meta_expr = @every 1m
#eth kafka 配置
[kafka.eth]
enable = true
topic = eth
# kafka 客户端标示
client_id = eth-client-1
# kafka 消费组标示
group_id = eth-group
# kafka 服务的地址
broker_list = [kafka 服务的ip/域名]:[kafka 服务的运行端口]
buffer_size = 1000
return_errors = true
#eth 数据库配置
[database.eth]
#数据库类型,sql server为mssql,postgre为postgres
type = postgres
#数据库的访问地址
host = [数据库服务的ip/域名]
#数据库的端口信息
port = [数据库服务的端口]
#数据库的库名,需要初始化好,创建表和导入数据用
database = [数据库服务的库]
#数据库的访问账号
user = [数据库服务的账号]
#数据库的访问账号密码信息
password = [数据库服务的密码]
timezone = Asia/Shanghai
max_open_conns = 500
max_idle_conns = 100
sql_log_file = /var/log/bds-splitter/eth-sql.log
debug = false
# =============================== log ==================================
#普通日志配置
[logging_normal]
......
......@@ -15,7 +15,7 @@ type TokenTransaction struct {
Timestamp int64 `xorm:"timestamp int notnull index"`
TokenAddress string `xorm:"token_address char(40) notnull"`
LogIndex int64 `xorm:"log_index int notnull"`
IsRemoved bool `xorm:"is_removed tinyint notnull"`
IsRemoved int `xorm:"is_removed tinyint notnull"`
}
func (t TokenTransaction) TableName() string {
......
......@@ -5,9 +5,11 @@ import (
"github.com/jdcloud-bds/bds/common/json"
"github.com/jdcloud-bds/bds/common/log"
"github.com/jdcloud-bds/bds/common/math"
"github.com/jdcloud-bds/bds/config"
"github.com/jdcloud-bds/bds/service"
model "github.com/jdcloud-bds/bds/service/model/eth"
"math/big"
"strconv"
"strings"
"time"
)
......@@ -372,7 +374,9 @@ func ParseBlock(data string) (*ETHBlockData, error) {
transaction.TokenAddress = removeHexPrefixAndToLower(json.Get(txItem.String(), "token_address").String())
transaction.LogIndex = json.Get(txItem.String(), "log_index").Int()
transaction.IsRemoved = json.Get(txItem.String(), "is_removed").Bool()
if json.Get(txItem.String(), "is_removed").Bool() {
transaction.IsRemoved = 1
}
fromAccount := new(model.TokenAccount)
fromAccount.TokenAddress = transaction.TokenAddress
......@@ -497,24 +501,32 @@ func ParseBlock(data string) (*ETHBlockData, error) {
func revertMiner(height int64, tx *service.Transaction) error {
startTime := time.Now()
index := "revert_miner"
sql := fmt.Sprintf("UPDATE a SET a.miner_count = a.miner_count - 1 FROM eth_account a"+
" JOIN (SELECT miner FROM eth_block WHERE height = '%d') b"+
" ON a.address = b.miner ", height)
affected1, err := tx.Execute(sql)
//sql := fmt.Sprintf("UPDATE a SET a.miner_count = a.miner_count - 1 FROM eth_account a"+
// " JOIN (SELECT miner FROM eth_block WHERE height = '%d') b"+
// " ON a.address = b.miner ", height)
//affected1, err := tx.Execute(sql)
sql := fmt.Sprintf("SELECT miner FROM eth_block WHERE height = %d", height)
result, err := tx.QueryString(sql)
if err != nil {
log.DetailError(err)
return err
}
sql = fmt.Sprintf("UPDATE a SET a.miner_uncle_count = a.miner_uncle_count - 1 FROM eth_account a"+
" JOIN (SELECT miner FROM eth_block WHERE height = '%d') b"+
" ON a.address = b.miner ", height)
affected2, err := tx.Execute(sql)
//sql = fmt.Sprintf("UPDATE a SET a.miner_uncle_count = a.miner_uncle_count - 1 FROM eth_account a"+
// " JOIN (SELECT miner FROM eth_block WHERE height = '%d') b"+
// " ON a.address = b.miner ", height)
//affected2, err := tx.Execute(sql)
if len(result) == 0 {
return nil
}
address := result[0]["miner"]
sql = fmt.Sprintf("UPDATE eth_account SET miner_count = miner_count - 1,miner_uncle_count = miner_uncle_count - 1 where address='%s'", address)
affected1, err := tx.Execute(sql)
if err != nil {
log.DetailError(err)
return err
}
elaspedTime := time.Now().Sub(startTime)
log.Debug("splitter eth index: %s affected %d %d elasped %s", index, affected1, affected2, elaspedTime.String())
log.Debug("splitter eth index: %s affected %d elasped %s", index, affected1, elaspedTime.String())
return nil
}
......@@ -523,20 +535,31 @@ func revertAccountBalance(height int64, tx *service.Transaction, handler *rpcHan
startTime := time.Now()
index := "revert_account"
sql := fmt.Sprintf("SELECT t.address FROM"+
" (SELECT DISTINCT(miner) AS address FROM eth_block WHERE height = '%d' "+
" UNION SELECT DISTINCT(miner) AS address FROM eth_uncle WHERE block_height = '%d'"+
" UNION SELECT DISTINCT([from]) AS adddress FROM eth_transaction WHERE block_height = '%d'"+
" UNION SELECT DISTINCT([to]) AS adddress FROM eth_transaction WHERE block_height = '%d'"+
" (SELECT DISTINCT(miner) AS address FROM eth_block WHERE height = %d "+
" UNION SELECT DISTINCT(miner) AS address FROM eth_uncle WHERE block_height = %d"+
" UNION SELECT DISTINCT([from]) AS adddress FROM eth_transaction WHERE block_height = %d"+
" UNION SELECT DISTINCT([to]) AS adddress FROM eth_transaction WHERE block_height = %d"+
" UNION SELECT DISTINCT([from]) AS adddress FROM eth_internal_transaction WHERE block_height = '%d'"+
" UNION SELECT DISTINCT([to]) AS adddress FROM eth_internal_transaction WHERE [to] != '' AND block_height = '%d'"+
" UNION SELECT contract_address AS adddress FROM eth_transaction WHERE block_height = '%d' AND contract_address != '') t ",
" UNION SELECT contract_address AS adddress FROM eth_transaction WHERE block_height = %d AND contract_address != '') t ",
height, height, height, height, height, height, height)
if config.SplitterConfig.DatabaseETHSetting.Type == "postgres" {
sql = fmt.Sprintf("SELECT t.address FROM"+
" (SELECT DISTINCT(miner) AS address FROM eth_block WHERE height = %d "+
" UNION SELECT DISTINCT(miner) AS address FROM eth_uncle WHERE block_height = %d"+
" UNION SELECT DISTINCT(\"from\") AS adddress FROM eth_transaction WHERE block_height = %d"+
" UNION SELECT DISTINCT(\"to\") AS adddress FROM eth_transaction WHERE block_height = %d"+
" UNION SELECT DISTINCT(\"from\") AS adddress FROM eth_internal_transaction WHERE block_height = '%d'"+
" UNION SELECT DISTINCT(\"to\") AS adddress FROM eth_internal_transaction WHERE \"to\" != '' AND block_height = '%d'"+
" UNION SELECT contract_address AS adddress FROM eth_transaction WHERE block_height = %d AND contract_address != '') t ",
height, height, height, height, height, height, height)
}
result, err := tx.QueryString(sql)
if err != nil {
log.DetailError(err)
return err
}
accountBalance := make(map[string]string, 0)
accountBalance := make(map[string]int64, 0)
if len(result) > 0 {
for _, v := range result {
......@@ -547,7 +570,8 @@ func revertAccountBalance(height int64, tx *service.Transaction, handler *rpcHan
log.DetailError(err)
return err
}
accountBalance[address] = balance.String()
//TODO: may panic, data overflow
accountBalance[address] = balance.Int64()
}
}
}
......@@ -560,7 +584,7 @@ func revertAccountBalance(height int64, tx *service.Transaction, handler *rpcHan
totalAffected1 := int64(0)
if len(accountBalance) > 0 {
for address, balance := range accountBalance {
b := fmt.Sprintf(" WHEN '%s' THEN '%s'", address, balance)
b := fmt.Sprintf(" WHEN '%s' THEN %d", address, balance)
b1 = b1 + b
in := fmt.Sprintf(" '%s',", address)
in1 = in1 + in
......@@ -608,6 +632,12 @@ func revertTokenAccount(height int64, tx *service.Transaction, handler *rpcHandl
" (SELECT [from] AS address, token_address FROM eth_token_transaction WHERE block_height = '%d'"+
" UNION SELECT [to] AS address, token_address FROM eth_token_transaction WHERE block_height = '%d') t ",
height, height)
if config.SplitterConfig.DatabaseETHSetting.Type == "postgres" {
notIN = fmt.Sprintf("SELECT t.address, t.token_address FROM"+
" (SELECT \"from\" AS address, token_address FROM eth_token_transaction WHERE block_height = '%d'"+
" UNION SELECT \"to\" AS address, token_address FROM eth_token_transaction WHERE block_height = '%d') t ",
height, height)
}
result, err := tx.QueryString(notIN)
if err != nil {
log.DetailError(err)
......@@ -762,8 +792,9 @@ func revertInternalTransaction(height int64, tx *service.Transaction) error {
//update token and toke account
func updateToken(data *ETHBlockData, tx *service.Transaction) error {
var totalInsertAffected, totalDeleteAffected int64
var totalInsertAffected int64
for _, v := range data.Tokens {
tokenList := make([]*model.Token, 0)
token := new(model.Token)
token.TokenAddress = v.TokenAddress
has, err := tx.Get(token)
......@@ -772,7 +803,8 @@ func updateToken(data *ETHBlockData, tx *service.Transaction) error {
return err
}
if !has {
affected, err := tx.Insert(v)
tokenList = append(tokenList, v)
affected, err := tx.Insert(tokenList)
if err != nil {
log.DetailError(err)
return err
......@@ -783,64 +815,148 @@ func updateToken(data *ETHBlockData, tx *service.Transaction) error {
log.Debug("splitter eth: block %d token affected %d", data.Block.Height, totalInsertAffected)
totalInsertAffected = int64(0)
for _, v := range data.TokenAccounts {
tokenAccount := new(model.TokenAccount)
tokenAccount.TokenAddress = v.TokenAddress
tokenAccount.Address = v.Address
has, err := tx.Get(tokenAccount)
tokenAccountList := make([]*model.TokenAccount, 0)
tokenAccountMap := make(map[string]*model.TokenAccount, 0)
if len(data.TokenAccounts) == 0 {
return nil
}
var sqlSelect string
var sqlDelete string
var addressList string
var result []map[string]string
var err error
if config.SplitterConfig.DatabaseETHSetting.Type != "postgres" {
for k, v := range data.TokenAccounts {
if k == 0 {
sqlSelect = fmt.Sprintf("select * from eth_token_account where address='%s' and token_address='%s' ", v.Address, v.TokenAddress)
sqlDelete = fmt.Sprintf("delete from eth_token_account where address='%s' and token_address='%s';", v.Address, v.TokenAddress)
} else {
sqlSelect += fmt.Sprintf("union select * from eth_token_account where address='%s' and token_address='%s'", v.Address, v.TokenAddress)
sqlDelete += fmt.Sprintf("delete from eth_token_account where address='%s' and token_address='%s';", v.Address, v.TokenAddress)
}
}
result, err = tx.QueryString(sqlSelect)
if err != nil {
log.DetailError(sqlSelect)
log.DetailError(err)
return err
}
if has {
affected, err := tx.Delete(tokenAccount)
if err != nil {
log.DetailError(err)
return err
_, err = tx.Exec(sqlDelete)
if err != nil {
log.DetailError(sqlDelete)
log.DetailError(err)
return err
}
} else {
for k, v := range data.TokenAccounts {
if k == 0 {
addressList = fmt.Sprintf("(('%s','%s')", v.Address, v.TokenAddress)
} else {
addressList += fmt.Sprintf(",('%s','%s')", v.Address, v.TokenAddress)
}
totalDeleteAffected += affected
} else {
tokenAccount.BirthTimestamp = v.BirthTimestamp
}
tokenAccount.ID = 0
tokenAccount.Balance = v.Balance
tokenAccount.LastActiveTimestamp = v.LastActiveTimestamp
affected, err := tx.Insert(tokenAccount)
addressList += ")"
sql := "select * from eth_token_account where (address,token_address) in " + addressList
result, err = tx.QueryString(sql)
if err != nil {
log.DetailError(err)
return err
}
sql = "delete from eth_token_account where (address,token_address) in " + addressList
_, err = tx.Exec(sql)
if err != nil {
log.DetailError(err)
return err
}
totalInsertAffected += affected
}
log.Debug("splitter eth: block %d token account affected %d:%d", data.Block.Height, totalInsertAffected, totalDeleteAffected)
for _, v := range result {
account := new(model.TokenAccount)
account.Address = v["address"]
account.TokenAddress = v["token_address"]
account.BirthTimestamp, _ = strconv.ParseInt(v["birth_timestamp"], 10, 64)
account.LastActiveTimestamp = data.Block.Timestamp
tokenAccountMap[v["address"]+v["token_address"]] = account
}
for _, v := range data.TokenAccounts {
var tokenAccount *model.TokenAccount
tokenAccount, ok := tokenAccountMap[v.Address+v.TokenAddress]
if !ok {
tokenAccount = new(model.TokenAccount)
tokenAccount.BirthTimestamp = data.Block.Timestamp
}
tokenAccount.TokenAddress = v.TokenAddress
tokenAccount.Address = v.Address
tokenAccount.Balance = v.Balance
tokenAccount.LastActiveTimestamp = v.LastActiveTimestamp
tokenAccountList = append(tokenAccountList, tokenAccount)
}
totalInsertAffected, err = tx.BatchInsert(tokenAccountList)
if err != nil {
log.DetailError(err)
return err
}
log.Debug("splitter eth: block %d token account affected %d:%d", data.Block.Height, totalInsertAffected)
return nil
}
//update account
func updateAccount(data *ETHBlockData, tx *service.Transaction) error {
var totalInsertAffected, totalDeleteAffected int64
for _, v := range data.Accounts {
account := new(model.Account)
account.Address = v.Address
has, err := tx.Get(account)
if err != nil {
log.DetailError(err)
return err
}
if has {
affected, err := tx.Delete(account)
if err != nil {
log.DetailError(err)
return err
}
totalDeleteAffected += affected
var totalInsertAffected int64
accountList := make([]*model.Account, 0)
accountMap := make(map[string]*model.Account, 0)
if len(data.Accounts) == 0 {
return nil
}
var addressList string
for k, v := range data.Accounts {
if k == 0 {
addressList = fmt.Sprintf("('%s'", v.Address)
} else {
account.BirthTimestamp = v.BirthTimestamp
addressList += fmt.Sprintf(",'%s'", v.Address)
}
}
addressList += ")"
sql := "select * from eth_account where address in " + addressList
result, err := tx.QueryString(sql)
if err != nil {
log.DetailError(err)
return err
}
sql = "delete from eth_account where address in " + addressList
_, err = tx.Exec(sql)
if err != nil {
log.DetailError(err)
return err
}
for _, v := range result {
account := new(model.Account)
account.Address = v["address"]
typeInt, _ := strconv.ParseInt(v["type"], 10, 64)
account.Type = int(typeInt)
hasInternalTransactionInt, _ := strconv.ParseInt(v["has_internal_tx"], 10, 64)
account.HasInternalTransaction = int(hasInternalTransactionInt)
minerCountInt, _ := strconv.ParseInt(v["miner_count"], 10, 64)
account.MinerCount = int(minerCountInt)
minerUncleCountInt, _ := strconv.ParseInt(v["miner_uncle_count"], 10, 64)
account.MinerUncleCount = int(minerUncleCountInt)
account.Creator = v["creator"]
account.BirthTimestamp, _ = strconv.ParseInt(v["birth_timestamp"], 10, 64)
account.LastActiveTimestamp = data.Block.Timestamp
accountMap[v["address"]] = account
}
account.ID = 0
for _, v := range data.Accounts {
if v.Address == "" {
continue
}
var account *model.Account
account, ok := accountMap[v.Address]
if !ok {
account = new(model.Account)
account.BirthTimestamp = data.Block.Timestamp
}
account.Address = v.Address
account.HasInternalTransaction = v.HasInternalTransaction
account.Balance = v.Balance
account.MinerCount += v.MinerCount
......@@ -854,33 +970,36 @@ func updateAccount(data *ETHBlockData, tx *service.Transaction) error {
} else {
account.Type = v.Type
}
affected, err := tx.Insert(account)
if err != nil {
log.DetailError(err)
return err
}
totalInsertAffected += affected
accountList = append(accountList, account)
}
log.Debug("splitter eth: block %d account update affected %d:%d", data.Block.Height, totalInsertAffected, totalDeleteAffected)
totalInsertAffected, err = tx.BatchInsert(accountList)
if err != nil {
log.DetailError(err)
return err
}
log.Debug("splitter eth: block %d account update affected %d", data.Block.Height, totalInsertAffected)
return nil
}
//update real difficulty
func updateRealDifficulty(data *ETHBlockData, tx *service.Transaction) error {
sql := fmt.Sprintf("UPDATE c SET real_difficulty=d.realdif"+
" FROM eth_block c"+
" JOIN (SELECT a.height AS height, (CAST(a.difficulty AS decimal(30,0)))*(a.timestamp-b.timestamp)/15 AS realdif"+
" FROM eth_block AS a"+
" JOIN eth_block AS b ON b.height+1=a.height WHERE a.height = '%d') d"+
" ON c.height = d.height", data.Block.Height)
affected, err := tx.Execute(sql)
sql := fmt.Sprintf("select d.realdif realdif FROM eth_block as c JOIN (SELECT a.height AS height, (CAST(a.difficulty AS decimal(30,0)))*(a.timestamp-b.timestamp)/15 AS realdif FROM eth_block AS a JOIN eth_block AS b ON b.height+1=a.height WHERE a.height = %d) d ON c.height = d.height", data.Block.Height)
result, err := tx.QueryString(sql)
if err != nil {
log.DetailError(err)
return err
}
log.Debug("splitter eth: block %d real difficulty affected %d", data.Block.Height, affected)
for _, v := range result {
difficulty, _ := strconv.ParseFloat(v["realdif"], 64)
sql = fmt.Sprintf(" UPDATE eth_block SET real_difficulty=%f where height=%d", difficulty, data.Block.Height)
_, err = tx.Exec(sql)
if err != nil {
log.DetailError(err)
return err
}
}
log.Debug("splitter eth: block %d real difficulty", data.Block.Height)
return nil
}
......
......@@ -7,6 +7,7 @@ import (
"github.com/jdcloud-bds/bds/common/jsonrpc"
"github.com/jdcloud-bds/bds/common/kafka"
"github.com/jdcloud-bds/bds/common/log"
"github.com/jdcloud-bds/bds/config"
"github.com/jdcloud-bds/bds/service"
model "github.com/jdcloud-bds/bds/service/model/eth"
"github.com/xeipuuv/gojsonschema"
......@@ -242,7 +243,8 @@ func (s *ETHSplitter) SaveBlock(data *ETHBlockData) error {
}
blockTemp := new(model.Block)
blockTemp.Height = data.Block.Height
has, err := tx.Get(blockTemp)
//has, err := tx.Get(blockTemp)
has, err := tx.Where("height=?", data.Block.Height).Get(blockTemp)
if err != nil {
_ = tx.Rollback()
log.DetailError(err)
......@@ -397,9 +399,11 @@ func (s *ETHSplitter) RevertBlock(height int64, tx *service.Transaction) error {
return err
}
//revert token account by rpc
err = revertTokenAccount(height, tx, s.remoteHandler)
if err != nil {
return err
if config.SplitterConfig.DatabaseETHSetting.Type != "postgres" {
err = revertTokenAccount(height, tx, s.remoteHandler)
if err != nil {
return err
}
}
//revert miner by height
......@@ -449,11 +453,11 @@ func (s *ETHSplitter) RevertBlock(height int64, tx *service.Transaction) error {
return nil
}
type TokenAccountBalance struct {
Address string
TokenAddress string
Balance *big.Int
}
//type TokenAccountBalance struct {
// Address string
// TokenAddress string
// Balance *big.Int
//}
//check json schema
func (s *ETHSplitter) jsonSchemaValid(data string) (bool, error) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册