/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.shardingsphere.proxy.frontend.postgresql.command.query.binary.bind; import lombok.Getter; import org.apache.shardingsphere.db.protocol.packet.DatabasePacket; import org.apache.shardingsphere.db.protocol.postgresql.constant.PostgreSQLColumnType; import org.apache.shardingsphere.db.protocol.postgresql.packet.PostgreSQLPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLColumnDescription; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.PostgreSQLRowDescriptionPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBinaryResultSetRowPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLBindCompletePacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.binary.bind.PostgreSQLComBindPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.command.query.text.PostgreSQLDataRowPacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLCommandCompletePacket; import org.apache.shardingsphere.db.protocol.postgresql.packet.generic.PostgreSQLErrorResponsePacket; import org.apache.shardingsphere.infra.database.type.DatabaseTypeRegistry; import org.apache.shardingsphere.infra.executor.sql.QueryResult; import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.QueryHeader; import org.apache.shardingsphere.infra.parser.ShardingSphereSQLParserEngine; import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine; import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngineFactory; import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection; import org.apache.shardingsphere.proxy.backend.context.ProxyContext; import org.apache.shardingsphere.proxy.backend.response.BackendResponse; import org.apache.shardingsphere.proxy.backend.response.query.QueryData; import org.apache.shardingsphere.proxy.backend.response.query.QueryResponse; import org.apache.shardingsphere.proxy.backend.response.update.UpdateResponse; import org.apache.shardingsphere.proxy.frontend.command.executor.QueryCommandExecutor; import org.apache.shardingsphere.proxy.frontend.command.executor.ResponseType; import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Optional; /** * Command bind executor for PostgreSQL. */ public final class PostgreSQLComBindExecutor implements QueryCommandExecutor { private final PostgreSQLComBindPacket packet; private final DatabaseCommunicationEngine databaseCommunicationEngine; @Getter private volatile ResponseType responseType; public PostgreSQLComBindExecutor(final PostgreSQLComBindPacket packet, final BackendConnection backendConnection) { this.packet = packet; if (null != packet.getSql()) { ShardingSphereSQLParserEngine sqlStatementParserEngine = new ShardingSphereSQLParserEngine( DatabaseTypeRegistry.getTrunkDatabaseTypeName(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType())); SQLStatement sqlStatement = sqlStatementParserEngine.parse(packet.getSql(), true); databaseCommunicationEngine = DatabaseCommunicationEngineFactory.getInstance().newBinaryProtocolInstance(sqlStatement, packet.getSql(), packet.getParameters(), backendConnection); } else { databaseCommunicationEngine = null; } } @Override public Collection> execute() throws SQLException { if (ProxyContext.getInstance().getMetaDataContexts().isCircuitBreak()) { return Collections.singletonList(new PostgreSQLErrorResponsePacket()); } List> result = new LinkedList<>(); result.add(new PostgreSQLBindCompletePacket()); if (null == databaseCommunicationEngine) { return result; } BackendResponse backendResponse = databaseCommunicationEngine.execute(); if (backendResponse instanceof QueryResponse) { createQueryPacket((QueryResponse) backendResponse).ifPresent(result::add); } if (backendResponse instanceof UpdateResponse) { responseType = ResponseType.UPDATE; result.add(createUpdatePacket((UpdateResponse) backendResponse)); } return result; } private Optional createQueryPacket(final QueryResponse queryResponse) { List columnDescriptions = getPostgreSQLColumnDescriptions(queryResponse); if (columnDescriptions.isEmpty()) { responseType = ResponseType.QUERY; } if (columnDescriptions.isEmpty() || packet.isBinaryRowData()) { return Optional.empty(); } return Optional.of(new PostgreSQLRowDescriptionPacket(columnDescriptions.size(), columnDescriptions)); } private List getPostgreSQLColumnDescriptions(final QueryResponse queryResponse) { List result = new LinkedList<>(); List queryResults = queryResponse.getQueryResults(); ResultSetMetaData resultSetMetaData = queryResults.isEmpty() ? null : queryResults.get(0).getResultSetMetaData(); int columnIndex = 0; for (QueryHeader each : queryResponse.getQueryHeaders()) { result.add(new PostgreSQLColumnDescription(each.getColumnName(), ++columnIndex, each.getColumnType(), each.getColumnLength(), resultSetMetaData)); } return result; } private PostgreSQLCommandCompletePacket createUpdatePacket(final UpdateResponse updateResponse) { return new PostgreSQLCommandCompletePacket(updateResponse.getType(), updateResponse.getUpdateCount()); } @Override public boolean next() throws SQLException { return null != databaseCommunicationEngine && databaseCommunicationEngine.next(); } @Override public PostgreSQLPacket getQueryData() throws SQLException { QueryData queryData = databaseCommunicationEngine.getQueryData(); return packet.isBinaryRowData() ? new PostgreSQLBinaryResultSetRowPacket(queryData.getData(), getPostgreSQLColumnTypes(queryData)) : new PostgreSQLDataRowPacket(queryData.getData()); } private List getPostgreSQLColumnTypes(final QueryData queryData) { List result = new ArrayList<>(queryData.getColumnTypes().size()); for (int i = 0; i < queryData.getColumnTypes().size(); i++) { result.add(PostgreSQLColumnType.valueOfJDBCType(queryData.getColumnTypes().get(i))); } return result; } }