提交 102f4027 编写于 作者: W Wang Huanyu 提交者: wanghuanyu3

fix: tron database write error for pg.

上级 b179b261
......@@ -267,7 +267,7 @@ database_worker_buffer = 8192
database_worker_number = 1
#一次请求区块链数据的最大块数,400000块之前可以设置大一些,比如300
max_batch_block = 30
#ltc 全节点的地址
#eth 全节点的地址
endpoint = http://[eth 全节点的ip/域名]:[eth 全节点运行端口]
#运行 eth 全节点设置的用户名
user = [rpc 访问账号]
......@@ -315,6 +315,70 @@ max_idle_conns = 100
sql_log_file = /var/log/bds-splitter/eth-sql.log
debug = false
# =============================== tron ==================================
[tron]
#是否开启 tron 数据splitter
enable = true
#是否开启数据库
database_enable = true
#数据库worker缓存大小
database_worker_buffer = 8192
#数据库worker数量
database_worker_number = 1
#并发落库的区块高度
concurrent_height = 12000000
#一次请求区块链数据的最大块数,400000块之前可以设置大一些,比如300
max_batch_block = 1000
skip_miss_block = false
skip_height = 0
#tron 全节点的地址
endpoint = http://[tron 全节点的ip/域名]:[tron 全节点运行端口]
#运行 tron 全节点设置的用户名
user = [rpc 访问账号]
#运行 tron 全节点设置的密码
password = [rpc 访问密码]
#tron 数据校验规则文件地址
json_schema_file = /etc/bds-splitter/schema/tron.json
#tron 数据校验是否开启
json_schema_validation_enable = false
#tron 定时任务配置
[cron.tron]
update_meta_expr = @every 1m
#tron kafka 配置
[kafka.tron]
enable = true
topic = trx
# kafka 客户端标示
client_id = tron-client-1
# kafka 消费组标示
group_id = tron-group
# kafka 服务的地址
broker_list = [kafka 服务的ip/域名]:[kafka 服务的运行端口]
buffer_size = 1000
return_errors = true
#tron 数据库配置
[database.tron]
#数据库类型,sql server为mssql,postgre为postgres
type = postgres
#数据库的访问地址
host = [数据库服务的ip/域名]
#数据库的端口信息
port = [数据库服务的端口]
#数据库的库名,需要初始化好,创建表和导入数据用
database = [数据库服务的库]
#数据库的访问账号
user = [数据库服务的账号]
#数据库的访问账号密码信息
password = [数据库服务的密码]
timezone = Asia/Shanghai
max_open_conns = 500
max_idle_conns = 100
sql_log_file = /var/log/bds-splitter/tron-sql.log
debug = false
# =============================== log ==================================
#普通日志配置
[logging_normal]
......
......@@ -366,12 +366,14 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
}
var affected int64
affected, err = tx.BatchInsert(data.Block)
blockList := make([]*model.Block, 0)
blockList = append(blockList, data.Block)
affected, err = tx.BatchInsert(blockList)
if err != nil {
rollbackFunc(err)
return err
}
log.Debug("splitter tron: block write %d rows", affected)
log.Debug("splitter tron: block %d write %d rows", data.Block.BlockNumber, affected)
affected, err = tx.BatchInsert(data.Transactions)
if err != nil {
......@@ -396,7 +398,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
accountCreateContract.AccountAddress = json.Get(contractValueJson, "account_address").String()
accountCreateContract.Type = json.Get(contractValueJson, "type").Int()
affected, err = tx.BatchInsert(accountCreateContract)
accountCreateContractList := make([]*model.AccountCreateContract, 0)
accountCreateContractList = append(accountCreateContractList, accountCreateContract)
affected, err = tx.BatchInsert(accountCreateContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -415,7 +419,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
transferContract.ToAddress = json.Get(contractValueJson, "to_address").String()
transferContract.Amount = json.Get(contractValueJson, "amount").Int()
affected, err = tx.BatchInsert(transferContract)
transferContractList := make([]*model.TransferContract, 0)
transferContractList = append(transferContractList, transferContract)
affected, err = tx.BatchInsert(transferContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -435,7 +441,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
transferAssetContract.Amount = json.Get(contractValueJson, "amount").Int()
transferAssetContract.AssetName = json.Get(contractValueJson, "asset_name").String()
affected, err = tx.BatchInsert(transferAssetContract)
transferAssetContractList := make([]*model.TransferAssetContract, 0)
transferAssetContractList = append(transferAssetContractList, transferAssetContract)
affected, err = tx.BatchInsert(transferAssetContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -507,7 +515,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
witnessCreateContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
witnessCreateContract.Url = json.Get(contractValueJson, "url").String()
affected, err = tx.BatchInsert(witnessCreateContract)
witnessCreateContractList := make([]*model.WitnessCreateContract, 0)
witnessCreateContractList = append(witnessCreateContractList, witnessCreateContract)
affected, err = tx.BatchInsert(witnessCreateContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -558,7 +568,7 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
}
log.Debug("splitter tron: %s transaction %s write %d.", contract.Type, transaction.Hash, affected)
case WitnessUpdateContract:
witnessUpdateContract := new(model.WitnessCreateContract)
witnessUpdateContract := new(model.WitnessUpdateContract)
witnessUpdateContract.ID = 0
witnessUpdateContract.BlockNumber = data.Block.BlockNumber
witnessUpdateContract.TransactionHash = transaction.Hash
......@@ -566,9 +576,11 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
contractValueJson := json.Parse(contract.Value).String()
witnessUpdateContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
witnessUpdateContract.Url = json.Get(contractValueJson, "update_url").String()
witnessUpdateContract.UpdateUrl = json.Get(contractValueJson, "update_url").String()
affected, err = tx.BatchInsert(witnessUpdateContract)
witnessUpdateContractList := make([]*model.WitnessUpdateContract, 0)
witnessUpdateContractList = append(witnessUpdateContractList, witnessUpdateContract)
affected, err = tx.BatchInsert(witnessUpdateContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -588,7 +600,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
participateAssetIssueContract.Amount = json.Get(contractValueJson, "amount").Int()
participateAssetIssueContract.AssetName = json.Get(contractValueJson, "asset_name").String()
affected, err = tx.BatchInsert(participateAssetIssueContract)
participateAssetIssueContractList := make([]*model.ParticipateAssetIssueContract, 0)
participateAssetIssueContractList = append(participateAssetIssueContractList, participateAssetIssueContract)
affected, err = tx.BatchInsert(participateAssetIssueContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -606,7 +620,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
accountUpdateContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
accountUpdateContract.AccountName = json.Get(contractValueJson, "account_name").String() // can't get account_name instead of code
affected, err = tx.BatchInsert(accountUpdateContract)
accountUpdateContractList := make([]*model.AccountUpdateContract, 0)
accountUpdateContractList = append(accountUpdateContractList, accountUpdateContract)
affected, err = tx.BatchInsert(accountUpdateContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -625,7 +641,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
freezeBalanceContract.FrozenBalance = json.Get(contractValueJson, "frozen_balance").Int()
freezeBalanceContract.FrozenDuration = json.Get(contractValueJson, "frozen_duration").Int()
affected, err = tx.BatchInsert(freezeBalanceContract)
freezeBalanceContractList := make([]*model.FreezeBalanceContract, 0)
freezeBalanceContractList = append(freezeBalanceContractList, freezeBalanceContract)
affected, err = tx.BatchInsert(freezeBalanceContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -642,7 +660,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
contractValueJson := json.Parse(contract.Value).String()
unfreezeBalanceContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
affected, err = tx.BatchInsert(unfreezeBalanceContract)
unfreezeBalanceContractList := make([]*model.UnfreezeBalanceContract, 0)
unfreezeBalanceContractList = append(unfreezeBalanceContractList, unfreezeBalanceContract)
affected, err = tx.BatchInsert(unfreezeBalanceContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -659,7 +679,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
contractValueJson := json.Parse(contract.Value).String()
unfreezeAssetContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
affected, err = tx.BatchInsert(unfreezeAssetContract)
unfreezeAssetContractList := make([]*model.UnfreezeAssetContract, 0)
unfreezeAssetContractList = append(unfreezeAssetContractList, unfreezeAssetContract)
affected, err = tx.BatchInsert(unfreezeAssetContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -667,7 +689,8 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
}
log.Debug("splitter tron: %s transaction %s write %d.", contract.Type, transaction.Hash, affected)
case WithdrawBalanceContract:
withdrawBalanceContract := new(model.UnfreezeAssetContract)
//withdrawBalanceContract := new(model.UnfreezeAssetContract)
withdrawBalanceContract := new(model.WithdrawBalanceContract)
withdrawBalanceContract.ID = 0
withdrawBalanceContract.BlockNumber = data.Block.BlockNumber
withdrawBalanceContract.TransactionHash = transaction.Hash
......@@ -676,7 +699,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
contractValueJson := json.Parse(contract.Value).String()
withdrawBalanceContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
affected, err = tx.BatchInsert(withdrawBalanceContract)
withdrawBalanceContractList := make([]*model.WithdrawBalanceContract, 0)
withdrawBalanceContractList = append(withdrawBalanceContractList, withdrawBalanceContract)
affected, err = tx.BatchInsert(withdrawBalanceContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -697,7 +722,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
updateAssetContract.NewLimit = json.Get(contractValueJson, "new_limit").Int()
updateAssetContract.NewPublicLimit = json.Get(contractValueJson, "new_public_limit").Int()
affected, err = tx.BatchInsert(updateAssetContract)
updateAssetContractList := make([]*model.UpdateAssetContract, 0)
updateAssetContractList = append(updateAssetContractList, updateAssetContract)
affected, err = tx.BatchInsert(updateAssetContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -718,7 +745,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
deployContract.TokenID = json.Get(contractValueJson, "token_id").Int()
deployContract.CallTokenValue = json.Get(contractValueJson, "call_token_value").Int()
affected, err = tx.BatchInsert(deployContract)
deployContractList := make([]*model.CreateSmartContract, 0)
deployContractList = append(deployContractList, deployContract)
affected, err = tx.BatchInsert(deployContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -740,7 +769,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
triggerSmartContract.CallTokenValue = json.Get(contractValueJson, "call_token_value").Int()
triggerSmartContract.CallValue = json.Get(contractValueJson, "call_value").Int()
affected, err = tx.BatchInsert(triggerSmartContract)
triggerSmartContractList := make([]*model.TriggerSmartContract, 0)
triggerSmartContractList = append(triggerSmartContractList, triggerSmartContract)
affected, err = tx.BatchInsert(triggerSmartContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -758,7 +789,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
proposalCreateContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
proposalCreateContract.Parameters = json.Get(contractValueJson, "parameters").String()
affected, err = tx.BatchInsert(proposalCreateContract)
proposalCreateContractList := make([]*model.ProposalCreateContract, 0)
proposalCreateContractList = append(proposalCreateContractList, proposalCreateContract)
affected, err = tx.BatchInsert(proposalCreateContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -777,7 +810,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
proposalApproveContract.ProposalID = json.Get(contractValueJson, "proposal_id").Int()
proposalApproveContract.IsAddApproval = json.Get(contractValueJson, "is_add_approval").Bool()
affected, err = tx.BatchInsert(proposalApproveContract)
proposalApproveContractList := make([]*model.ProposalApproveContract, 0)
proposalApproveContractList = append(proposalApproveContractList, proposalApproveContract)
affected, err = tx.BatchInsert(proposalApproveContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -785,7 +820,8 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
}
log.Debug("splitter tron: %s transaction %s write %d.", contract.Type, transaction.Hash, affected)
case ProposalDeleteContract:
proposalDeleteContract := new(model.ProposalApproveContract)
//proposalDeleteContract := new(model.ProposalApproveContract)
proposalDeleteContract := new(model.ProposalDeleteContract)
proposalDeleteContract.ID = 0
proposalDeleteContract.BlockNumber = data.Block.BlockNumber
proposalDeleteContract.TransactionHash = transaction.Hash
......@@ -795,7 +831,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
proposalDeleteContract.OwnerAddress = json.Get(contractValueJson, "owner_address").String()
proposalDeleteContract.ProposalID = json.Get(contractValueJson, "proposal_id").Int()
affected, err = tx.BatchInsert(proposalDeleteContract)
proposalDeleteContractList := make([]*model.ProposalDeleteContract, 0)
proposalDeleteContractList = append(proposalDeleteContractList, proposalDeleteContract)
affected, err = tx.BatchInsert(proposalDeleteContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -816,7 +854,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
exchangeCreateContract.SecondTokenID = json.Get(contractValueJson, "second_token_id").String()
exchangeCreateContract.SecondTokenBalance = json.Get(contractValueJson, "second_token_balance").Int()
affected, err = tx.BatchInsert(exchangeCreateContract)
exchangeCreateContractList := make([]*model.ExchangeCreateContract, 0)
exchangeCreateContractList = append(exchangeCreateContractList, exchangeCreateContract)
affected, err = tx.BatchInsert(exchangeCreateContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -836,7 +876,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
exchangeInjectContract.ExchangeID = json.Get(contractValueJson, "exchagne_id").Int()
exchangeInjectContract.Quant = json.Get(contractValueJson, "quant").Int()
affected, err = tx.BatchInsert(exchangeInjectContract)
exchangeInjectContractList := make([]*model.ExchangeInjectContract, 0)
exchangeInjectContractList = append(exchangeInjectContractList, exchangeInjectContract)
affected, err = tx.BatchInsert(exchangeInjectContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -856,7 +898,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
exchangeWithdrawContract.ExchangeID = json.Get(contractValueJson, "exchagne_id").Int()
exchangeWithdrawContract.Quant = json.Get(contractValueJson, "quant").Int()
affected, err = tx.BatchInsert(exchangeWithdrawContract)
exchangeWithdrawContractList := make([]*model.ExchangeWithdrawContract, 0)
exchangeWithdrawContractList = append(exchangeWithdrawContractList, exchangeWithdrawContract)
affected, err = tx.BatchInsert(exchangeWithdrawContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -877,7 +921,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
exchangeTransactionContract.Quant = json.Get(contractValueJson, "quant").Int()
exchangeTransactionContract.Expected = json.Get(contractValueJson, "expected").Int()
affected, err = tx.BatchInsert(exchangeTransactionContract)
exchangeTransactionContractList := make([]*model.ExchangeTransactionContract, 0)
exchangeTransactionContractList = append(exchangeTransactionContractList, exchangeTransactionContract)
affected, err = tx.BatchInsert(exchangeTransactionContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -896,7 +942,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
updateSettingContract.ContractAddress = json.Get(contractValueJson, "contract_address").String()
updateSettingContract.ConsumeUserResourcePercent = json.Get(contractValueJson, "consume_user_resource_percent").Int()
affected, err = tx.BatchInsert(updateSettingContract)
updateSettingContractList := make([]*model.UpdateSettingContract, 0)
updateSettingContractList = append(updateSettingContractList, updateSettingContract)
affected, err = tx.BatchInsert(updateSettingContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......@@ -915,7 +963,9 @@ func (s *TRONSplitter) SaveBlock(data *TRONBlockData) error {
updateEnergyLimitContract.ContractAddress = json.Get(contractValueJson, "contract_address").String()
updateEnergyLimitContract.OriginEnergyLimit = json.Get(contractValueJson, "origin_energy_limit").Int()
affected, err = tx.BatchInsert(updateEnergyLimitContract)
updateEnergyLimitContractList := make([]*model.UpdateEnergyLimitContract, 0)
updateEnergyLimitContractList = append(updateEnergyLimitContractList, updateEnergyLimitContract)
affected, err = tx.BatchInsert(updateEnergyLimitContractList)
if err != nil {
log.Error("splitter tron: %s transaction %s write error.", contract.Type, transaction.Hash)
rollbackFunc(err)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册