SchemaChangedListener.java 8.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

18
package org.apache.shardingsphere.governance.core.config.listener;
19 20 21

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
22
import org.apache.commons.collections4.SetUtils;
23
import org.apache.shardingsphere.governance.core.config.ConfigCenterNode;
24
import org.apache.shardingsphere.governance.core.event.listener.PostGovernanceRepositoryEventListener;
L
Liang Zhang 已提交
25 26
import org.apache.shardingsphere.governance.core.event.model.GovernanceEvent;
import org.apache.shardingsphere.governance.core.event.model.datasource.DataSourceChangedEvent;
27 28
import org.apache.shardingsphere.governance.core.event.model.metadata.MetaDataAddedEvent;
import org.apache.shardingsphere.governance.core.event.model.metadata.MetaDataDeletedEvent;
29 30
import org.apache.shardingsphere.governance.core.event.model.rule.RuleConfigurationsChangedEvent;
import org.apache.shardingsphere.governance.core.event.model.schema.SchemaChangedEvent;
31
import org.apache.shardingsphere.governance.core.yaml.config.YamlDataSourceConfigurationWrap;
32
import org.apache.shardingsphere.governance.core.yaml.config.schema.YamlSchema;
33
import org.apache.shardingsphere.governance.core.yaml.swapper.DataSourceConfigurationYamlSwapper;
34
import org.apache.shardingsphere.governance.core.yaml.swapper.SchemaYamlSwapper;
35 36
import org.apache.shardingsphere.governance.repository.api.ConfigurationRepository;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
kimmking's avatar
kimmking 已提交
37
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent.Type;
38 39 40
import org.apache.shardingsphere.infra.yaml.config.YamlRootRuleConfigurations;
import org.apache.shardingsphere.infra.yaml.engine.YamlEngine;
import org.apache.shardingsphere.infra.yaml.swapper.YamlRuleConfigurationSwapperEngine;
41 42

import java.util.Collection;
43
import java.util.Collections;
44
import java.util.HashSet;
kimmking's avatar
kimmking 已提交
45
import java.util.LinkedHashMap;
46
import java.util.LinkedHashSet;
47
import java.util.Map.Entry;
48
import java.util.Optional;
49
import java.util.Set;
50
import java.util.stream.Collectors;
51 52 53 54

/**
 * Schema changed listener.
 */
55
public final class SchemaChangedListener extends PostGovernanceRepositoryEventListener {
56
    
kimmking's avatar
kimmking 已提交
57
    private final ConfigCenterNode configurationNode;
58
    
59
    private final Collection<String> existedSchemaNames;
60
    
M
menghaoranss 已提交
61 62 63
    public SchemaChangedListener(final ConfigurationRepository configurationRepository, final Collection<String> schemaNames) {
        super(configurationRepository, new ConfigCenterNode().getAllSchemaConfigPaths(schemaNames));
        configurationNode = new ConfigCenterNode();
64
        existedSchemaNames = new LinkedHashSet<>(schemaNames);
65 66 67
    }
    
    @Override
68
    protected Optional<GovernanceEvent> createGovernanceEvent(final DataChangedEvent event) {
69
        // TODO Consider removing the following one.
70
        if (configurationNode.getMetadataNodePath().equals(event.getKey())) {
71 72
            return createSchemaNamesUpdatedEvent(event.getValue());
        }
73 74
        String schemaName = configurationNode.getSchemaName(event.getKey());
        if (Strings.isNullOrEmpty(schemaName) || !isValidNodeChangedEvent(schemaName, event.getKey())) {
75
            return Optional.empty();
76
        }
kimmking's avatar
kimmking 已提交
77
        if (Type.ADDED == event.getType()) {
78
            return Optional.of(createAddedEvent(schemaName));
79
        }
kimmking's avatar
kimmking 已提交
80
        if (Type.UPDATED == event.getType()) {
81
            return Optional.of(createUpdatedEvent(schemaName, event));
82
        }
kimmking's avatar
kimmking 已提交
83
        if (Type.DELETED == event.getType()) {
84
            existedSchemaNames.remove(schemaName);
85
            return Optional.of(new MetaDataDeletedEvent(schemaName));
86
        }
87
        return Optional.empty();
88 89
    }
    
90 91 92
    private Optional<GovernanceEvent> createSchemaNamesUpdatedEvent(final String schemaNames) {
        Collection<String> persistedSchemaNames = configurationNode.splitSchemaName(schemaNames);
        Set<String> addedSchemaNames = SetUtils.difference(new HashSet<>(persistedSchemaNames), new HashSet<>(existedSchemaNames));
L
Liang Zhang 已提交
93
        if (!addedSchemaNames.isEmpty()) {
94
            return Optional.of(createAddedEvent(addedSchemaNames.iterator().next()));
95
        }
96
        Set<String> deletedSchemaNames = SetUtils.difference(new HashSet<>(existedSchemaNames), new HashSet<>(persistedSchemaNames));
L
Liang Zhang 已提交
97
        if (!deletedSchemaNames.isEmpty()) {
98 99
            String schemaName = deletedSchemaNames.iterator().next();
            existedSchemaNames.remove(schemaName);
100
            return Optional.of(new MetaDataDeletedEvent(schemaName));
101
        }
102
        return Optional.empty();
103 104
    }
    
105
    private boolean isValidNodeChangedEvent(final String schemaName, final String nodeFullPath) {
106 107
        return !existedSchemaNames.contains(schemaName) || configurationNode.getDataSourcePath(schemaName).equals(nodeFullPath) 
                || configurationNode.getRulePath(schemaName).equals(nodeFullPath)
108
                || configurationNode.getSchemaPath(schemaName).equals(nodeFullPath);
109 110
    }
    
111 112
    private GovernanceEvent createAddedEvent(final String schemaName) {
        existedSchemaNames.add(schemaName);
113
        return new MetaDataAddedEvent(schemaName, Collections.emptyMap(), Collections.emptyList());
114 115
    }
    
116
    private GovernanceEvent createUpdatedEvent(final String schemaName, final DataChangedEvent event) {
117
        // TODO Consider remove judgement.
118
        return existedSchemaNames.contains(schemaName) ? createUpdatedEventForExistedSchema(schemaName, event) : createAddedEvent(schemaName);
119 120
    }
    
121
    private GovernanceEvent createUpdatedEventForExistedSchema(final String schemaName, final DataChangedEvent event) {
122 123 124 125 126
        if (event.getKey().equals(configurationNode.getDataSourcePath(schemaName))) {
            return createDataSourceChangedEvent(schemaName, event);
        } else if (event.getKey().equals(configurationNode.getRulePath(schemaName))) {
            return createRuleChangedEvent(schemaName, event);
        }
L
Liang Zhang 已提交
127
        return createSchemaChangedEvent(schemaName, event);
128 129
    }
    
130
    private DataSourceChangedEvent createDataSourceChangedEvent(final String schemaName, final DataChangedEvent event) {
131 132 133
        YamlDataSourceConfigurationWrap result = YamlEngine.unmarshal(event.getValue(), YamlDataSourceConfigurationWrap.class);
        Preconditions.checkState(null != result && !result.getDataSources().isEmpty(), "No available data sources to load for governance.");
        return new DataSourceChangedEvent(schemaName, result.getDataSources().entrySet().stream()
kimmking's avatar
kimmking 已提交
134
                .collect(Collectors.toMap(Entry::getKey, entry -> new DataSourceConfigurationYamlSwapper().swapToObject(entry.getValue()), (oldValue, currentValue) -> oldValue, LinkedHashMap::new)));
135 136
    }
    
137
    private GovernanceEvent createRuleChangedEvent(final String schemaName, final DataChangedEvent event) {
T
tristaZero 已提交
138
        YamlRootRuleConfigurations configurations = YamlEngine.unmarshal(event.getValue(), YamlRootRuleConfigurations.class);
139
        Preconditions.checkState(null != configurations, "No available rule to load for governance.");
140
        return new RuleConfigurationsChangedEvent(schemaName, new YamlRuleConfigurationSwapperEngine().swapToRuleConfigurations(configurations.getRules()));
141 142
    }
    
L
Liang Zhang 已提交
143
    private GovernanceEvent createSchemaChangedEvent(final String schemaName, final DataChangedEvent event) {
144
        return new SchemaChangedEvent(schemaName, new SchemaYamlSwapper().swapToObject(YamlEngine.unmarshal(event.getValue(), YamlSchema.class)));
145
    }
146
}