未验证 提交 75685800 编写于 作者: Y Yanjie Zhou 提交者: GitHub

add test test for CommandExecutorTask (#7053)

上级 4d7a8bd8
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.proxy.frontend.command;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import lombok.SneakyThrows;
import org.apache.shardingsphere.db.protocol.codec.DatabasePacketCodecEngine;
import org.apache.shardingsphere.db.protocol.packet.CommandPacket;
import org.apache.shardingsphere.db.protocol.packet.CommandPacketType;
import org.apache.shardingsphere.db.protocol.packet.DatabasePacket;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.ConnectionStateHandler;
import org.apache.shardingsphere.proxy.frontend.api.CommandExecutor;
import org.apache.shardingsphere.proxy.frontend.api.QueryCommandExecutor;
import org.apache.shardingsphere.proxy.frontend.context.FrontendContext;
import org.apache.shardingsphere.proxy.frontend.engine.CommandExecuteEngine;
import org.apache.shardingsphere.proxy.frontend.spi.DatabaseProtocolFrontendEngine;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import java.util.Optional;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public final class CommandExecutorTaskTest {
@Mock
private DatabaseProtocolFrontendEngine engine;
@Mock
private DatabasePacketCodecEngine codecEngine;
@Mock
private PacketPayload payload;
@Mock
private BackendConnection backendConnection;
@Mock
private ChannelHandlerContext handlerContext;
@Mock
private ConnectionStateHandler stateHandler;
@Mock
private CommandExecuteEngine executeEngine;
@Mock
private ByteBuf message;
@Mock
private CommandPacketType commandPacketType;
@Mock
private CommandPacket commandPacket;
@Mock
private QueryCommandExecutor queryCommandExecutor;
@Mock
private CommandExecutor commandExecutor;
@Mock
private DatabasePacket databasePacket;
@Mock
private FrontendContext frontendContext;
@Test
@SneakyThrows
public void assertRunNeedFlushByFalse() {
when(backendConnection.getConnectionSize()).thenReturn(1);
when(queryCommandExecutor.execute()).thenReturn(Collections.EMPTY_LIST);
when(executeEngine.getCommandPacket(eq(payload), eq(commandPacketType), eq(backendConnection))).thenReturn(commandPacket);
when(executeEngine.getCommandExecutor(eq(commandPacketType), eq(commandPacket), eq(backendConnection))).thenReturn(queryCommandExecutor);
when(executeEngine.getCommandPacketType(eq(payload))).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.getStateHandler()).thenReturn(stateHandler);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(stateHandler).waitUntilConnectionReleasedIfNecessary();
verify(stateHandler).setRunningStatusIfNecessary();
}
@Test
@SneakyThrows
public void assertRunNeedFlushByTrue() {
when(backendConnection.getConnectionSize()).thenReturn(1);
when(queryCommandExecutor.execute()).thenReturn(Collections.singletonList(databasePacket));
when(executeEngine.getCommandPacket(eq(payload), eq(commandPacketType), eq(backendConnection))).thenReturn(commandPacket);
when(executeEngine.getCommandExecutor(eq(commandPacketType), eq(commandPacket), eq(backendConnection))).thenReturn(queryCommandExecutor);
when(executeEngine.getCommandPacketType(eq(payload))).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.getStateHandler()).thenReturn(stateHandler);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(stateHandler).waitUntilConnectionReleasedIfNecessary();
verify(stateHandler).setRunningStatusIfNecessary();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
verify(executeEngine).writeQueryData(handlerContext, backendConnection, queryCommandExecutor, 1);
}
@Test
@SneakyThrows
public void assertRunByCommandExecutor() {
when(frontendContext.isFlushForPerCommandPacket()).thenReturn(true);
when(engine.getFrontendContext()).thenReturn(frontendContext);
when(backendConnection.getConnectionSize()).thenReturn(1);
when(commandExecutor.execute()).thenReturn(Collections.singletonList(databasePacket));
when(executeEngine.getCommandPacket(eq(payload), eq(commandPacketType), eq(backendConnection))).thenReturn(commandPacket);
when(executeEngine.getCommandExecutor(eq(commandPacketType), eq(commandPacket), eq(backendConnection))).thenReturn(commandExecutor);
when(executeEngine.getCommandPacketType(eq(payload))).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
when(backendConnection.getStateHandler()).thenReturn(stateHandler);
when(codecEngine.createPacketPayload(eq(message))).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(stateHandler).waitUntilConnectionReleasedIfNecessary();
verify(stateHandler).setRunningStatusIfNecessary();
verify(handlerContext).write(databasePacket);
verify(handlerContext).flush();
}
@Test
public void assertRunWithError() {
RuntimeException mockException = new RuntimeException("mock");
when(backendConnection.getStateHandler()).thenThrow(mockException);
when(codecEngine.createPacketPayload(message)).thenReturn(payload);
when(engine.getCodecEngine()).thenReturn(codecEngine);
when(executeEngine.getErrorPacket(eq(mockException))).thenReturn(databasePacket);
when(executeEngine.getOtherPacket()).thenReturn(Optional.of(databasePacket));
when(engine.getCommandExecuteEngine()).thenReturn(executeEngine);
CommandExecutorTask actual = new CommandExecutorTask(engine, backendConnection, handlerContext, message);
actual.run();
verify(handlerContext, atLeast(2)).writeAndFlush(databasePacket);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册