提交 a20574f8 编写于 作者: U ujued

rollback

上级 67f62c91
...@@ -127,7 +127,7 @@ ...@@ -127,7 +127,7 @@
reproduction, and distribution of the Work otherwise complies with reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License. the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise, 5. Submission of Contributions. Unless You explicitly transactionState otherwise,
any Contribution intentionally submitted for inclusion in the Work any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions. this License, without any additional terms or conditions.
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package com.codingapi.txlcn.client.aspect.weave; package com.codingapi.txlcn.client.aspect.weave;
import com.codingapi.txlcn.client.bean.DTXLocal; import com.codingapi.txlcn.client.bean.DTXLocal;
import com.codingapi.txlcn.client.support.LCNTransactionBeanHelper; import com.codingapi.txlcn.client.support.TXLCNTransactionBeanHelper;
import com.codingapi.txlcn.client.support.resouce.TransactionResourceExecutor; import com.codingapi.txlcn.client.support.resouce.TransactionResourceExecutor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.ProceedingJoinPoint;
...@@ -37,10 +37,10 @@ import java.util.Objects; ...@@ -37,10 +37,10 @@ import java.util.Objects;
@Slf4j @Slf4j
public class DTXResourceWeaver { public class DTXResourceWeaver {
private final LCNTransactionBeanHelper transactionBeanHelper; private final TXLCNTransactionBeanHelper transactionBeanHelper;
@Autowired @Autowired
public DTXResourceWeaver(LCNTransactionBeanHelper transactionBeanHelper) { public DTXResourceWeaver(TXLCNTransactionBeanHelper transactionBeanHelper) {
this.transactionBeanHelper = transactionBeanHelper; this.transactionBeanHelper = transactionBeanHelper;
} }
......
...@@ -56,7 +56,7 @@ public class LcnTransactionCleanService implements TransactionCleanService { ...@@ -56,7 +56,7 @@ public class LcnTransactionCleanService implements TransactionCleanService {
log.error("本地事务通知失败"); log.error("本地事务通知失败");
throw new TransactionClearException("通知资源时出错"); throw new TransactionClearException("通知资源时出错");
} }
log.error("local non transaction, but notified. probably net message timeout . groupId: {}, state: {}", groupId, state); log.error("local non transaction, but notified. probably net message timeout . groupId: {}, transactionState: {}", groupId, state);
throw new TransactionClearException("local non transaction, but notified. probably net message timeout ."); throw new TransactionClearException("local non transaction, but notified. probably net message timeout .");
} }
} }
...@@ -38,7 +38,7 @@ public class LcnConnectionProxy implements Connection { ...@@ -38,7 +38,7 @@ public class LcnConnectionProxy implements Connection {
/** /**
* 通知事务 * 通知事务
* *
* @param state state * @param state transactionState
* @return RpcResponseState RpcResponseState * @return RpcResponseState RpcResponseState
*/ */
public RpcResponseState notify(int state) { public RpcResponseState notify(int state) {
......
...@@ -69,7 +69,7 @@ public class TxcStartingTransaction implements TXLCNTransactionControl { ...@@ -69,7 +69,7 @@ public class TxcStartingTransaction implements TXLCNTransactionControl {
@Override @Override
public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) { public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) {
// set state equ 1 // set transactionState equ 1
DTXLocal.cur().setState(1); DTXLocal.cur().setState(1);
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package com.codingapi.txlcn.client.initializer; package com.codingapi.txlcn.client.initializer;
import com.codingapi.txlcn.client.aspectlog.AspectLogHelper; import com.codingapi.txlcn.client.aspectlog.AspectLogHelper;
import com.codingapi.txlcn.client.message.TxLcnClientMessageServer; import com.codingapi.txlcn.client.message.TXLCNClientMessageServer;
import com.codingapi.txlcn.commons.runner.TxLcnInitializer; import com.codingapi.txlcn.commons.runner.TxLcnInitializer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
...@@ -35,7 +35,7 @@ public class TxClientInitializer implements TxLcnInitializer { ...@@ -35,7 +35,7 @@ public class TxClientInitializer implements TxLcnInitializer {
private AspectLogHelper aspectLogHelper; private AspectLogHelper aspectLogHelper;
@Autowired @Autowired
private TxLcnClientMessageServer txLcnClientMessageServer; private TXLCNClientMessageServer txLcnClientMessageServer;
@Override @Override
public void init() throws Exception { public void init() throws Exception {
......
...@@ -29,14 +29,14 @@ import org.springframework.stereotype.Component; ...@@ -29,14 +29,14 @@ import org.springframework.stereotype.Component;
* @author lorne * @author lorne
*/ */
@Component @Component
public class TxLcnClientMessageServer{ public class TXLCNClientMessageServer {
private final RpcClientInitializer rpcClientInitializer; private final RpcClientInitializer rpcClientInitializer;
private final TxClientConfig txClientConfig; private final TxClientConfig txClientConfig;
@Autowired @Autowired
public TxLcnClientMessageServer(RpcClientInitializer rpcClientInitializer, TxClientConfig txClientConfig) { public TXLCNClientMessageServer(RpcClientInitializer rpcClientInitializer, TxClientConfig txClientConfig) {
this.rpcClientInitializer = rpcClientInitializer; this.rpcClientInitializer = rpcClientInitializer;
this.txClientConfig = txClientConfig; this.txClientConfig = txClientConfig;
} }
......
...@@ -20,7 +20,7 @@ import com.codingapi.txlcn.spi.message.RpcClient; ...@@ -20,7 +20,7 @@ import com.codingapi.txlcn.spi.message.RpcClient;
import com.codingapi.txlcn.spi.message.dto.MessageDto; import com.codingapi.txlcn.spi.message.dto.MessageDto;
import com.codingapi.txlcn.spi.message.dto.RpcCmd; import com.codingapi.txlcn.spi.message.dto.RpcCmd;
import com.codingapi.txlcn.spi.message.exception.RpcException; import com.codingapi.txlcn.spi.message.exception.RpcException;
import com.codingapi.txlcn.client.support.LCNTransactionBeanHelper; import com.codingapi.txlcn.client.support.TXLCNTransactionBeanHelper;
import com.codingapi.txlcn.commons.exception.TxClientException; import com.codingapi.txlcn.commons.exception.TxClientException;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -39,12 +39,12 @@ import java.util.Objects; ...@@ -39,12 +39,12 @@ import java.util.Objects;
@Slf4j @Slf4j
public class ClientRpcAnswer implements RpcAnswer { public class ClientRpcAnswer implements RpcAnswer {
private final LCNTransactionBeanHelper transactionBeanHelper; private final TXLCNTransactionBeanHelper transactionBeanHelper;
private final RpcClient rpcClient; private final RpcClient rpcClient;
@Autowired @Autowired
public ClientRpcAnswer(LCNTransactionBeanHelper transactionBeanHelper, RpcClient rpcClient) { public ClientRpcAnswer(TXLCNTransactionBeanHelper transactionBeanHelper, RpcClient rpcClient) {
this.transactionBeanHelper = transactionBeanHelper; this.transactionBeanHelper = transactionBeanHelper;
this.rpcClient = rpcClient; this.rpcClient = rpcClient;
} }
......
...@@ -45,7 +45,7 @@ public class TxMangerReporter { ...@@ -45,7 +45,7 @@ public class TxMangerReporter {
* @param groupId groupId * @param groupId groupId
* @param unitId unitId * @param unitId unitId
* @param registrar registrar * @param registrar registrar
* @param state state * @param state transactionState
*/ */
public void reportTransactionState(String groupId, String unitId, Short registrar, int state) { public void reportTransactionState(String groupId, String unitId, Short registrar, int state) {
TxExceptionParams txExceptionParams = new TxExceptionParams(); TxExceptionParams txExceptionParams = new TxExceptionParams();
...@@ -78,7 +78,7 @@ public class TxMangerReporter { ...@@ -78,7 +78,7 @@ public class TxMangerReporter {
break; break;
} catch (RpcException e) { } catch (RpcException e) {
if (e.getCode() == RpcException.NON_TX_MANAGER) { if (e.getCode() == RpcException.NON_TX_MANAGER) {
log.error("report transaction state error. non tx-manager is alive."); log.error("report transaction transactionState error. non tx-manager is alive.");
break; break;
} }
} }
......
...@@ -33,7 +33,7 @@ import org.springframework.stereotype.Component; ...@@ -33,7 +33,7 @@ import org.springframework.stereotype.Component;
*/ */
@Component @Component
@Slf4j @Slf4j
public class LCNTransactionBeanHelper { public class TXLCNTransactionBeanHelper {
/** /**
...@@ -60,11 +60,11 @@ public class LCNTransactionBeanHelper { ...@@ -60,11 +60,11 @@ public class LCNTransactionBeanHelper {
private static final String TRANSACTION_BEAN_NAME_FORMAT = "transaction_%s"; private static final String TRANSACTION_BEAN_NAME_FORMAT = "transaction_%s";
/** /**
* transaction state resolver * transaction transactionState resolver
* transaction_state_resolver_%s * transaction_state_resolver_%s
* %s:transaction type. lcn, tcc, txc so on. * %s:transaction type. lcn, tcc, txc so on.
*/ */
private static final String TRANSACTION_STATE_RESOLVER_BEAN_NAME_FARMOT = "transaction_state_resolver_%s"; private static final String TRANSACTION_STATE_RESOLVER_BEAN_NAME_FORMAT = "transaction_state_resolver_%s";
/** /**
* Transaction Clean Service * Transaction Clean Service
...@@ -76,7 +76,7 @@ public class LCNTransactionBeanHelper { ...@@ -76,7 +76,7 @@ public class LCNTransactionBeanHelper {
private final ApplicationContext spring; private final ApplicationContext spring;
@Autowired @Autowired
public LCNTransactionBeanHelper(ApplicationContext spring) { public TXLCNTransactionBeanHelper(ApplicationContext spring) {
this.spring = spring; this.spring = spring;
} }
...@@ -131,10 +131,10 @@ public class LCNTransactionBeanHelper { ...@@ -131,10 +131,10 @@ public class LCNTransactionBeanHelper {
*/ */
public TXLCNTransactionSeparator loadLCNTransactionStateResolver(String transactionType) { public TXLCNTransactionSeparator loadLCNTransactionStateResolver(String transactionType) {
try { try {
String name = String.format(TRANSACTION_STATE_RESOLVER_BEAN_NAME_FARMOT, transactionType); String name = String.format(TRANSACTION_STATE_RESOLVER_BEAN_NAME_FORMAT, transactionType);
return spring.getBean(name, TXLCNTransactionSeparator.class); return spring.getBean(name, TXLCNTransactionSeparator.class);
} catch (Exception e) { } catch (Exception e) {
return spring.getBean(String.format(TRANSACTION_STATE_RESOLVER_BEAN_NAME_FARMOT, "default"), TXLCNTransactionSeparator.class); return spring.getBean(String.format(TRANSACTION_STATE_RESOLVER_BEAN_NAME_FORMAT, "default"), TXLCNTransactionSeparator.class);
} }
} }
......
...@@ -36,7 +36,7 @@ public class TXLCNTransactionServiceExecutor { ...@@ -36,7 +36,7 @@ public class TXLCNTransactionServiceExecutor {
@Autowired @Autowired
private LCNTransactionBeanHelper lcnTransactionBeanHelper; private TXLCNTransactionBeanHelper TXLCNTransactionBeanHelper;
@Autowired @Autowired
private TransactionAttachmentCache transactionAttachmentCache; private TransactionAttachmentCache transactionAttachmentCache;
...@@ -58,7 +58,7 @@ public class TXLCNTransactionServiceExecutor { ...@@ -58,7 +58,7 @@ public class TXLCNTransactionServiceExecutor {
// 2. 事务状态抉择器 // 2. 事务状态抉择器
TXLCNTransactionSeparator lcnTransactionSeparator = TXLCNTransactionSeparator lcnTransactionSeparator =
lcnTransactionBeanHelper.loadLCNTransactionStateResolver(transactionType); TXLCNTransactionBeanHelper.loadLCNTransactionStateResolver(transactionType);
// 3. 获取事务状态 // 3. 获取事务状态
TXLCNTransactionState lcnTransactionState = lcnTransactionSeparator.loadTransactionState(info); TXLCNTransactionState lcnTransactionState = lcnTransactionSeparator.loadTransactionState(info);
...@@ -69,7 +69,7 @@ public class TXLCNTransactionServiceExecutor { ...@@ -69,7 +69,7 @@ public class TXLCNTransactionServiceExecutor {
// 4. 获取bean // 4. 获取bean
TXLCNTransactionControl lcnTransactionControl = TXLCNTransactionControl lcnTransactionControl =
lcnTransactionBeanHelper.loadLCNTransactionControl(transactionType, lcnTransactionState); TXLCNTransactionBeanHelper.loadLCNTransactionControl(transactionType, lcnTransactionState);
// 5. 织入事务操作 // 5. 织入事务操作
......
...@@ -106,8 +106,8 @@ public class SimpleDTXChecking implements DTXChecking { ...@@ -106,8 +106,8 @@ public class SimpleDTXChecking implements DTXChecking {
String channel = rpcClient.loadRemoteKey(); String channel = rpcClient.loadRemoteKey();
MessageDto messageDto = rpcClient.request(channel, MessageCreator.askTransactionState(groupId, unitId)); MessageDto messageDto = rpcClient.request(channel, MessageCreator.askTransactionState(groupId, unitId));
int state = SerializerContext.getInstance().deSerialize(messageDto.getBytes(), Short.class); int state = SerializerContext.getInstance().deSerialize(messageDto.getBytes(), Short.class);
log.info("support > ask transaction state:{}", state); log.info("support > ask transaction transactionState:{}", state);
txLogger.trace(groupId, unitId, Transactions.TAG_TASK, "ask transaction state " + state); txLogger.trace(groupId, unitId, Transactions.TAG_TASK, "ask transaction transactionState " + state);
if (state == -1) { if (state == -1) {
log.error("delay clean transaction error."); log.error("delay clean transaction error.");
onAskTransactionStateException(groupId, unitId, transactionType); onAskTransactionStateException(groupId, unitId, transactionType);
...@@ -119,7 +119,7 @@ public class SimpleDTXChecking implements DTXChecking { ...@@ -119,7 +119,7 @@ public class SimpleDTXChecking implements DTXChecking {
} catch (RpcException e) { } catch (RpcException e) {
onAskTransactionStateException(groupId, unitId, transactionType); onAskTransactionStateException(groupId, unitId, transactionType);
} catch (TransactionClearException | SerializerException | InterruptedException e) { } catch (TransactionClearException | SerializerException | InterruptedException e) {
log.error("{} > [transaction state message] error or [clean transaction] error.", transactionType); log.error("{} > [transaction transactionState message] error or [clean transaction] error.", transactionType);
} }
}, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS); }, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);
delayTasks.put(groupId + unitId, scheduledFuture); delayTasks.put(groupId + unitId, scheduledFuture);
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
package com.codingapi.txlcn.client.support.common.template; package com.codingapi.txlcn.client.support.common.template;
import com.codingapi.txlcn.client.aspectlog.AspectLogger; import com.codingapi.txlcn.client.aspectlog.AspectLogger;
import com.codingapi.txlcn.client.support.LCNTransactionBeanHelper; import com.codingapi.txlcn.client.support.TXLCNTransactionBeanHelper;
import com.codingapi.txlcn.client.support.checking.DTXChecking; import com.codingapi.txlcn.client.support.checking.DTXChecking;
import com.codingapi.txlcn.commons.exception.TransactionClearException; import com.codingapi.txlcn.commons.exception.TransactionClearException;
import com.codingapi.txlcn.commons.util.Transactions; import com.codingapi.txlcn.commons.util.Transactions;
...@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component; ...@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
@Slf4j @Slf4j
public class TransactionCleanTemplate { public class TransactionCleanTemplate {
private final LCNTransactionBeanHelper transactionBeanHelper; private final TXLCNTransactionBeanHelper transactionBeanHelper;
private final DTXChecking dtxChecking; private final DTXChecking dtxChecking;
...@@ -44,7 +44,7 @@ public class TransactionCleanTemplate { ...@@ -44,7 +44,7 @@ public class TransactionCleanTemplate {
private final TxLogger txLogger; private final TxLogger txLogger;
@Autowired @Autowired
public TransactionCleanTemplate(LCNTransactionBeanHelper transactionBeanHelper, public TransactionCleanTemplate(TXLCNTransactionBeanHelper transactionBeanHelper,
DTXChecking dtxChecking, DTXChecking dtxChecking,
AspectLogger aspectLogger, AspectLogger aspectLogger,
TxLogger txLogger) { TxLogger txLogger) {
...@@ -60,7 +60,7 @@ public class TransactionCleanTemplate { ...@@ -60,7 +60,7 @@ public class TransactionCleanTemplate {
* @param groupId groupId * @param groupId groupId
* @param unitId unitId * @param unitId unitId
* @param unitType unitType * @param unitType unitType
* @param state state * @param state transactionState
* @throws TransactionClearException TransactionClearException * @throws TransactionClearException TransactionClearException
*/ */
public void clean(String groupId, String unitId, String unitType, int state) throws TransactionClearException { public void clean(String groupId, String unitId, String unitType, int state) throws TransactionClearException {
...@@ -85,7 +85,7 @@ public class TransactionCleanTemplate { ...@@ -85,7 +85,7 @@ public class TransactionCleanTemplate {
* @param groupId groupId * @param groupId groupId
* @param unitId unitId * @param unitId unitId
* @param unitType unitType * @param unitType unitType
* @param state state * @param state transactionState
* @throws TransactionClearException TransactionClearException * @throws TransactionClearException TransactionClearException
*/ */
public void compensationClean(String groupId, String unitId, String unitType, int state) throws TransactionClearException { public void compensationClean(String groupId, String unitId, String unitType, int state) throws TransactionClearException {
......
...@@ -170,7 +170,7 @@ public class TransactionControlTemplate { ...@@ -170,7 +170,7 @@ public class TransactionControlTemplate {
* @param groupId groupId * @param groupId groupId
* @param unitId unitId * @param unitId unitId
* @param transactionType transactionType * @param transactionType transactionType
* @param state state * @param state transactionState
*/ */
public void notifyGroup(String groupId, String unitId, String transactionType, int state) { public void notifyGroup(String groupId, String unitId, String transactionType, int state) {
txLogger.trace(groupId, unitId, Transactions.TAG_TRANSACTION, "notify group " + state); txLogger.trace(groupId, unitId, Transactions.TAG_TRANSACTION, "notify group " + state);
......
...@@ -88,6 +88,10 @@ public class SimpleTransactionManager implements TransactionManager { ...@@ -88,6 +88,10 @@ public class SimpleTransactionManager implements TransactionManager {
transUnit.setUnitId(transactionUnit.unitId()); transUnit.setUnitId(transactionUnit.unitId());
log.info("unit:{} joined group:{}", transactionUnit.unitId(), dtxTransaction.groupId()); log.info("unit:{} joined group:{}", transactionUnit.unitId(), dtxTransaction.groupId());
try { try {
//手动回滚时设置状态为回滚状态 0
if(transactionUnit.getTranscationState()==0){
groupRelationship.setTransactionState(dtxTransaction.groupId(),0);
}
groupRelationship.joinGroup(dtxTransaction.groupId(), transUnit); groupRelationship.joinGroup(dtxTransaction.groupId(), transUnit);
} catch (JoinGroupException e) { } catch (JoinGroupException e) {
throw new TransactionException(e); throw new TransactionException(e);
...@@ -113,6 +117,7 @@ public class SimpleTransactionManager implements TransactionManager { ...@@ -113,6 +117,7 @@ public class SimpleTransactionManager implements TransactionManager {
@Override @Override
public int transactionState(DTXTransaction groupTransaction) { public int transactionState(DTXTransaction groupTransaction) {
int state = exceptionService.transactionState(groupTransaction.groupId()); int state = exceptionService.transactionState(groupTransaction.groupId());
//存在数据时返回数据状态
if (state != -1) { if (state != -1) {
return state; return state;
} }
......
...@@ -29,10 +29,13 @@ public class TransactionUnit { ...@@ -29,10 +29,13 @@ public class TransactionUnit {
private String messageContextId; private String messageContextId;
public TransactionUnit(String unitId, String unitType, String messageContextId) { private int transcationState;
public TransactionUnit(String unitId, String unitType, int transcationState,String messageContextId) {
this.unitId = unitId; this.unitId = unitId;
this.unitType = unitType; this.unitType = unitType;
this.messageContextId = messageContextId; this.messageContextId = messageContextId;
this.transcationState = transcationState;
} }
public String unitId() { public String unitId() {
...@@ -46,4 +49,8 @@ public class TransactionUnit { ...@@ -46,4 +49,8 @@ public class TransactionUnit {
public String unitType() { public String unitType() {
return unitType; return unitType;
} }
public int getTranscationState() {
return transcationState;
}
} }
...@@ -45,6 +45,7 @@ public class AskTransactionStateExecuteService implements RpcExecuteService { ...@@ -45,6 +45,7 @@ public class AskTransactionStateExecuteService implements RpcExecuteService {
@Override @Override
public Object execute(TransactionCmd transactionCmd) { public Object execute(TransactionCmd transactionCmd) {
return transactionManager.transactionState(transactionContext.getTransaction(transactionCmd.getGroupId())); int state = transactionManager.transactionState(transactionContext.getTransaction(transactionCmd.getGroupId()));
return state == -1 ? 0 : state;
} }
} }
...@@ -65,7 +65,7 @@ public class JoinGroupExecuteService implements RpcExecuteService { ...@@ -65,7 +65,7 @@ public class JoinGroupExecuteService implements RpcExecuteService {
txLogger.trace( txLogger.trace(
transactionCmd.getGroupId(), joinGroupParams.getUnitId(), Transactions.TAG_TRANSACTION, "start join group"); transactionCmd.getGroupId(), joinGroupParams.getUnitId(), Transactions.TAG_TRANSACTION, "start join group");
TransactionUnit transactionUnit = TransactionUnit transactionUnit =
new TransactionUnit(joinGroupParams.getUnitId(), joinGroupParams.getUnitType(), transactionCmd.getRemoteKey()); new TransactionUnit(joinGroupParams.getUnitId(), joinGroupParams.getUnitType(),joinGroupParams.getTransactionState(), transactionCmd.getRemoteKey());
transactionManager.join(dtxTransaction, transactionUnit); transactionManager.join(dtxTransaction, transactionUnit);
txLogger.trace( txLogger.trace(
......
...@@ -61,19 +61,26 @@ public class NotifyGroupExecuteService implements RpcExecuteService { ...@@ -61,19 +61,26 @@ public class NotifyGroupExecuteService implements RpcExecuteService {
NotifyGroupParams notifyGroupParams = transactionCmd.getMsg().loadData(NotifyGroupParams.class); NotifyGroupParams notifyGroupParams = transactionCmd.getMsg().loadData(NotifyGroupParams.class);
log.debug("notify group params: {}", JSON.toJSONString(notifyGroupParams)); log.debug("notify group params: {}", JSON.toJSONString(notifyGroupParams));
int commitState = notifyGroupParams.getState();
//获取事务状态(当手动回滚时会先设置状态)
int transactionState = transactionManager.transactionState(dtxTransaction);
if (transactionState == 0) {
commitState = 0;
}
// 系统日志 // 系统日志
txLogger.trace( txLogger.trace(
transactionCmd.getGroupId(), "", transactionCmd.getGroupId(), "",
Transactions.TAG_TRANSACTION, "notify group " + notifyGroupParams.getState()); Transactions.TAG_TRANSACTION, "notify group " + notifyGroupParams.getState());
if (notifyGroupParams.getState() == 1) { if (commitState == 1) {
transactionManager.commit(dtxTransaction); transactionManager.commit(dtxTransaction);
return null; return null;
} else if (notifyGroupParams.getState() == 0) { } else if (commitState == 0) {
transactionManager.rollback(dtxTransaction); transactionManager.rollback(dtxTransaction);
return null; return null;
} }
log.error("ignored transaction state:{}", notifyGroupParams.getState()); log.error("ignored transaction transactionState:{}", notifyGroupParams.getState());
} catch (SerializerException e) { } catch (SerializerException e) {
throw new TxManagerException(e.getMessage()); throw new TxManagerException(e.getMessage());
} finally { } finally {
......
...@@ -17,6 +17,8 @@ package com.codingapi.txlcn.manager.core.transaction; ...@@ -17,6 +17,8 @@ package com.codingapi.txlcn.manager.core.transaction;
import com.codingapi.txlcn.commons.exception.SerializerException; import com.codingapi.txlcn.commons.exception.SerializerException;
import com.codingapi.txlcn.commons.exception.TxManagerException; import com.codingapi.txlcn.commons.exception.TxManagerException;
import com.codingapi.txlcn.manager.core.context.DTXTransactionContext;
import com.codingapi.txlcn.manager.core.context.TransactionManager;
import com.codingapi.txlcn.manager.support.service.TxExceptionService; import com.codingapi.txlcn.manager.support.service.TxExceptionService;
import com.codingapi.txlcn.manager.support.service.WriteTxExceptionDTO; import com.codingapi.txlcn.manager.support.service.WriteTxExceptionDTO;
import com.codingapi.txlcn.manager.core.message.RpcExecuteService; import com.codingapi.txlcn.manager.core.message.RpcExecuteService;
...@@ -43,10 +45,16 @@ public class WriteTxExceptionExecuteService implements RpcExecuteService { ...@@ -43,10 +45,16 @@ public class WriteTxExceptionExecuteService implements RpcExecuteService {
private final RpcClient rpcClient; private final RpcClient rpcClient;
private final TransactionManager transactionManager;
private final DTXTransactionContext transactionContext;
@Autowired @Autowired
public WriteTxExceptionExecuteService(TxExceptionService compensationService, RpcClient rpcClient) { public WriteTxExceptionExecuteService(TxExceptionService compensationService, RpcClient rpcClient,TransactionManager transactionManager, DTXTransactionContext transactionContext) {
this.compensationService = compensationService; this.compensationService = compensationService;
this.rpcClient = rpcClient; this.rpcClient = rpcClient;
this.transactionManager = transactionManager;
this.transactionContext =transactionContext;
} }
@Override @Override
...@@ -55,7 +63,11 @@ public class WriteTxExceptionExecuteService implements RpcExecuteService { ...@@ -55,7 +63,11 @@ public class WriteTxExceptionExecuteService implements RpcExecuteService {
TxExceptionParams txExceptionParams = transactionCmd.getMsg().loadData(TxExceptionParams.class); TxExceptionParams txExceptionParams = transactionCmd.getMsg().loadData(TxExceptionParams.class);
WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO(); WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO();
writeTxExceptionReq.setModId(rpcClient.getAppName(transactionCmd.getRemoteKey())); writeTxExceptionReq.setModId(rpcClient.getAppName(transactionCmd.getRemoteKey()));
writeTxExceptionReq.setTransactionState(txExceptionParams.getTransactionState());
//获取事务状态(可能存在设置了手动回滚)
int transactionState = transactionManager.transactionState(transactionContext.getTransaction(txExceptionParams.getGroupId()));
writeTxExceptionReq.setTransactionState(transactionState==-1?txExceptionParams.getTransactionState():transactionState);
writeTxExceptionReq.setGroupId(txExceptionParams.getGroupId()); writeTxExceptionReq.setGroupId(txExceptionParams.getGroupId());
writeTxExceptionReq.setUnitId(txExceptionParams.getUnitId()); writeTxExceptionReq.setUnitId(txExceptionParams.getUnitId());
writeTxExceptionReq.setRegistrar(Objects.isNull(txExceptionParams.getRegistrar()) ? -1 : txExceptionParams.getRegistrar()); writeTxExceptionReq.setRegistrar(Objects.isNull(txExceptionParams.getRegistrar()) ? -1 : txExceptionParams.getRegistrar());
......
...@@ -39,8 +39,8 @@ public interface TxExceptionMapper { ...@@ -39,8 +39,8 @@ public interface TxExceptionMapper {
@Select("select * from t_tx_exception") @Select("select * from t_tx_exception")
List<TxException> findAll(); List<TxException> findAll();
@Update("update t_tx_exception set ex_state=#{state} where id=#{id}") @Update("update t_tx_exception set ex_state=#{transactionState} where id=#{id}")
void changeExState(@Param("id") Long id, @Param("state") short state); void changeExState(@Param("id") Long id, @Param("transactionState") short state);
@Select("select * from t_tx_exception where group_id=#{groupId}") @Select("select * from t_tx_exception where group_id=#{groupId}")
TxException getByGroupId(String groupId); TxException getByGroupId(String groupId);
......
...@@ -44,7 +44,7 @@ public class RedisGroupRelationship implements GroupRelationship { ...@@ -44,7 +44,7 @@ public class RedisGroupRelationship implements GroupRelationship {
private static final String REDIS_PREFIX = "tx.manager:group:"; private static final String REDIS_PREFIX = "tx.manager:group:";
private static final String REDIS_GROUP_STATE = REDIS_PREFIX + ":state"; private static final String REDIS_GROUP_STATE = REDIS_PREFIX + ":transactionState";
private final RedisTemplate<String, String> redisTemplate; private final RedisTemplate<String, String> redisTemplate;
...@@ -104,7 +104,7 @@ public class RedisGroupRelationship implements GroupRelationship { ...@@ -104,7 +104,7 @@ public class RedisGroupRelationship implements GroupRelationship {
public Short transactionState(String groupId) { public Short transactionState(String groupId) {
String state = redisTemplate.opsForValue().get(REDIS_GROUP_STATE + groupId); String state = redisTemplate.opsForValue().get(REDIS_GROUP_STATE + groupId);
if (Objects.isNull(state)) { if (Objects.isNull(state)) {
return 0; return -1;
} }
try { try {
return Short.valueOf(state); return Short.valueOf(state);
......
...@@ -84,7 +84,7 @@ public class NettyRpcClientInitializer implements RpcClientInitializer, Disposab ...@@ -84,7 +84,7 @@ public class NettyRpcClientInitializer implements RpcClientInitializer, Disposab
b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000); b.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000);
b.handler(nettyRpcClientHandlerInitHandler); b.handler(nettyRpcClientHandlerInitHandler);
ChannelFuture channelFuture = b.connect(socketAddress).syncUninterruptibly(); ChannelFuture channelFuture = b.connect(socketAddress).syncUninterruptibly();
log.info("client -> {} , state:{}", socketAddress, channelFuture.isSuccess()); log.info("client -> {} , transactionState:{}", socketAddress, channelFuture.isSuccess());
connected = true; connected = true;
break; break;
......
...@@ -53,7 +53,7 @@ public enum LCNCmdType { ...@@ -53,7 +53,7 @@ public enum LCNCmdType {
* 响应事务状态 * 响应事务状态
* 间写 ats * 间写 ats
*/ */
askTransactionState("ask-transaction-state", MessageConstants.ACTION_ASK_TRANSACTION_STATE), askTransactionState("ask-transaction-transactionState", MessageConstants.ACTION_ASK_TRANSACTION_STATE),
/** /**
* 记录补偿 * 记录补偿
......
...@@ -47,4 +47,10 @@ public class JoinGroupParams implements Serializable { ...@@ -47,4 +47,10 @@ public class JoinGroupParams implements Serializable {
* 通讯标识 * 通讯标识
*/ */
private String remoteKey; private String remoteKey;
/**
* 事务状态
* 0 回滚 1提交
*/
private int transactionState = 1;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册