未验证 提交 86c5a512 编写于 作者: L Liang Zhang 提交者: GitHub

Rename refreshSchema (#8166)

上级 674d5980
......@@ -33,7 +33,6 @@ import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMate
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.rule.ShardingSphereRule;
import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
......@@ -76,30 +75,30 @@ public abstract class AbstractStatementExecutor {
}
@SuppressWarnings({"unchecked", "rawtypes"})
protected final void refreshTableMetaData(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
protected final void refreshSchema(final ShardingSphereMetaData metaData, final SQLStatement sqlStatement, final Collection<RouteUnit> routeUnits) throws SQLException {
if (null == sqlStatement) {
return;
}
Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
if (schemaRefresher.isPresent()) {
Collection<String> routeDataSourceNames = routeUnits.stream().map(RouteUnit::getDataSourceMapper).map(RouteMapper::getLogicName).collect(Collectors.toList());
schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement,
new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps()));
notifyPersistSchema(DefaultSchema.LOGIC_NAME, metaData.getSchema());
Collection<String> routeDataSourceNames = routeUnits.stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
SchemaBuilderMaterials materials = new SchemaBuilderMaterials(metaDataContexts.getDatabaseType(), dataSourceMap, metaData.getRuleMetaData().getRules(), metaDataContexts.getProps());
schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
notifySchemaChanged(DefaultSchema.LOGIC_NAME, metaData.getSchema());
}
}
private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
}
protected final boolean executeAndRefreshMetaData(final Collection<InputGroup<StatementExecuteUnit>> inputGroups, final SQLStatement sqlStatement,
final Collection<RouteUnit> routeUnits, final SQLExecutorCallback<Boolean> sqlExecutorCallback) throws SQLException {
List<Boolean> result = sqlExecutor.execute(inputGroups, sqlExecutorCallback);
refreshTableMetaData(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
refreshSchema(metaDataContexts.getDefaultMetaData(), sqlStatement, routeUnits);
return null != result && !result.isEmpty() && null != result.get(0) && result.get(0);
}
private void notifyPersistSchema(final String schemaName, final ShardingSphereSchema schema) {
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
}
/**
* Execute SQL.
*
......
......@@ -81,7 +81,7 @@ public final class PreparedStatementExecutor extends AbstractStatementExecutor {
boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
SQLExecutorCallback<Integer> sqlExecutorCallback = createDefaultSQLExecutorCallbackWithInteger(isExceptionThrown);
List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
refreshTableMetaData(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
refreshSchema(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
return isNeedAccumulate(getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules().stream().filter(
rule -> rule instanceof DataNodeContainedRule).collect(Collectors.toList()), sqlStatementContext) ? accumulate(results) : results.get(0);
}
......
......@@ -132,7 +132,7 @@ public final class StatementExecutor extends AbstractStatementExecutor {
}
};
List<Integer> results = getSqlExecutor().execute(inputGroups, sqlExecutorCallback);
refreshTableMetaData(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
refreshSchema(getMetaDataContexts().getDefaultMetaData(), sqlStatementContext.getSqlStatement(), routeUnits);
if (isNeedAccumulate(getMetaDataContexts().getDefaultMetaData().getRuleMetaData().getRules().stream().filter(
rule -> rule instanceof DataNodeContainedRule).collect(Collectors.toList()), sqlStatementContext)) {
return accumulate(results);
......
......@@ -18,8 +18,6 @@
package org.apache.shardingsphere.proxy.backend.communication.jdbc;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.governance.core.event.GovernanceEventBus;
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaPersistEvent;
import org.apache.shardingsphere.infra.binder.LogicSQL;
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.config.properties.ConfigurationPropertyKey;
......@@ -31,12 +29,13 @@ import org.apache.shardingsphere.infra.executor.sql.raw.execute.result.query.Que
import org.apache.shardingsphere.infra.merge.MergeEngine;
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.schema.ShardingSphereSchema;
import org.apache.shardingsphere.infra.metadata.schema.builder.SchemaBuilderMaterials;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresher;
import org.apache.shardingsphere.infra.metadata.schema.refresher.SchemaRefresherFactory;
import org.apache.shardingsphere.infra.route.context.RouteMapper;
import org.apache.shardingsphere.infra.route.context.RouteUnit;
import org.apache.shardingsphere.infra.metadata.schema.refresher.spi.SchemaChangedNotifier;
import org.apache.shardingsphere.infra.rule.type.DataNodeContainedRule;
import org.apache.shardingsphere.infra.spi.ordered.OrderedSPIRegistry;
import org.apache.shardingsphere.proxy.backend.communication.DatabaseCommunicationEngine;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.execute.SQLExecuteEngine;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
......@@ -49,6 +48,7 @@ import org.apache.shardingsphere.sql.parser.sql.common.statement.SQLStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
......@@ -90,26 +90,30 @@ public final class JDBCDatabaseCommunicationEngine implements DatabaseCommunicat
}
sqlExecuteEngine.checkExecutePrerequisites(executionContext);
response = sqlExecuteEngine.execute(executionContext);
Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream()
.map(RouteUnit::getDataSourceMapper).map(RouteMapper::getLogicName).collect(Collectors.toList());
refreshTableMetaData(executionContext.getSqlStatementContext().getSqlStatement(), routeDataSourceNames);
refreshSchema(executionContext);
return merge(executionContext.getSqlStatementContext());
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void refreshTableMetaData(final SQLStatement sqlStatement, final Collection<String> routeDataSourceNames) throws SQLException {
private void refreshSchema(final ExecutionContext executionContext) throws SQLException {
SQLStatement sqlStatement = executionContext.getSqlStatementContext().getSqlStatement();
if (null == sqlStatement) {
return;
}
Optional<SchemaRefresher> schemaRefresher = SchemaRefresherFactory.newInstance(sqlStatement);
if (schemaRefresher.isPresent()) {
Collection<String> routeDataSourceNames = executionContext.getRouteContext().getRouteUnits().stream().map(each -> each.getDataSourceMapper().getLogicName()).collect(Collectors.toList());
SchemaBuilderMaterials materials = new SchemaBuilderMaterials(ProxyContext.getInstance().getMetaDataContexts().getDatabaseType(),
metaData.getResource().getDataSources(), metaData.getRuleMetaData().getRules(), ProxyContext.getInstance().getMetaDataContexts().getProps());
schemaRefresher.get().refresh(metaData.getSchema(), routeDataSourceNames, sqlStatement, materials);
GovernanceEventBus.getInstance().post(new SchemaPersistEvent(metaData.getName(), metaData.getSchema()));
notifySchemaChanged(metaData.getName(), metaData.getSchema());
}
}
private void notifySchemaChanged(final String schemaName, final ShardingSphereSchema schema) {
OrderedSPIRegistry.getRegisteredServices(Collections.singletonList(schema), SchemaChangedNotifier.class).values().forEach(each -> each.notify(schemaName, schema));
}
private BackendResponse merge(final SQLStatementContext<?> sqlStatementContext) throws SQLException {
if (response instanceof UpdateResponse) {
mergeUpdateCount(sqlStatementContext);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册