未验证 提交 2822a72b 编写于 作者: 邱鹿 Lucas 提交者: GitHub

Add shardingsphere-scaling-elasticjob module (#7310)

* add shardingsphere-scaling-elasticjob module

* add a blank line in the end of file

Co-authored-by: qiulu3 <Lucas209910>
上级 2737b51f
......@@ -132,7 +132,8 @@
<os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
<dockerfile-maven.version>1.4.6</dockerfile-maven.version>
<docker-compose-maven-plugin.version>2.3.0</docker-compose-maven-plugin.version>
<elasticjob.version>3.0.0-alpha</elasticjob.version>
<javadocExecutable>${java.home}/../bin/javadoc</javadocExecutable>
<maven.deploy.skip>false</maven.deploy.skip>
<argLine>-Xmx1024m -XX:MaxMetaspaceSize=256m</argLine>
......@@ -366,7 +367,23 @@
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-api</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-lifecycle</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
......
......@@ -33,5 +33,6 @@
<module>shardingsphere-scaling-bootstrap</module>
<module>shardingsphere-scaling-mysql</module>
<module>shardingsphere-scaling-postgresql</module>
<module>shardingsphere-scaling-elasticjob</module>
</modules>
</project>
......@@ -19,3 +19,8 @@ port: 8888
blockQueueSize: 10000
pushTimeout: 1000
workerThread: 30
#name: elasticjob
#registryCenter:
# type: zookeeper
# serverLists: localhost:2181
# props:
......@@ -35,6 +35,8 @@ public final class JobConfiguration {
private String jobName;
private boolean running = true;
private String[] shardingTables;
private int shardingItem;
......
......@@ -17,11 +17,13 @@
package org.apache.shardingsphere.scaling.core.config;
import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine;
import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.apache.shardingsphere.governance.core.yaml.swapper.GovernanceCenterConfigurationYamlSwapper;
import org.apache.shardingsphere.scaling.core.execute.engine.ShardingScalingExecuteEngine;
import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntryLoader;
/**
* ShardingSphere-Scaling context.
......@@ -56,5 +58,12 @@ public final class ScalingContext {
this.serverConfiguration = serverConfiguration;
taskExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
importerExecuteEngine = new ShardingScalingExecuteEngine(serverConfiguration.getWorkerThread());
initElasticJobEntry(serverConfiguration);
}
private void initElasticJobEntry(final ServerConfiguration serverConfiguration) {
if (!Strings.isNullOrEmpty(serverConfiguration.getName()) && null != serverConfiguration.getRegistryCenter()) {
ElasticJobEntryLoader.init(serverConfiguration.getName(), new GovernanceCenterConfigurationYamlSwapper().swapToObject(serverConfiguration.getRegistryCenter()));
}
}
}
......@@ -56,9 +56,9 @@ public final class DataSourceManager implements AutoCloseable {
private void createSourceDatasources(final List<SyncConfiguration> syncConfigs) {
for (SyncConfiguration syncConfiguration : syncConfigs) {
DataSourceConfiguration dataSourceConfig = syncConfiguration.getDumperConfiguration().getDataSourceConfiguration();
DataSourceWrapper hikariDataSource = dataSourceFactory.newInstance(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, hikariDataSource);
sourceDatasources.put(dataSourceConfig, hikariDataSource);
DataSourceWrapper dataSource = dataSourceFactory.newInstance(dataSourceConfig);
cachedDataSources.put(dataSourceConfig, dataSource);
sourceDatasources.put(dataSourceConfig, dataSource);
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.spi;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
/**
* Elastic job entry.
*/
public interface ElasticJobEntry {
/**
* Init elastic job.
*
* @param namespace registry center namespace
* @param registryCenter registry center
*/
void init(String namespace, GovernanceCenterConfiguration registryCenter);
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.core.spi;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import java.util.Collection;
/**
* Elastic job entry loader.
*/
public final class ElasticJobEntryLoader {
/**
* Init elastic job entry.
*
* @param namespace registry center namespace
* @param registryCenter registry center
*/
public static void init(final String namespace, final GovernanceCenterConfiguration registryCenter) {
ShardingSphereServiceLoader.register(ElasticJobEntry.class);
Collection<ElasticJobEntry> elasticJobEntries = ShardingSphereServiceLoader.newServiceInstances(ElasticJobEntry.class);
for (ElasticJobEntry each : elasticJobEntries) {
each.init(namespace, registryCenter);
}
}
}
......@@ -19,22 +19,29 @@ package org.apache.shardingsphere.scaling.core.job.task.inventory;
import org.apache.shardingsphere.scaling.core.datasource.DataSourceManager;
import org.apache.shardingsphere.scaling.core.job.SyncProgress;
import org.apache.shardingsphere.scaling.core.job.position.InventoryPosition;
import org.apache.shardingsphere.scaling.core.job.task.ScalingTask;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.Collections;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@RunWith(MockitoJUnitRunner.class)
public final class InventoryDataScalingTaskGroupTest {
private DataSourceManager dataSourceManager;
@Mock
private ScalingTask<InventoryPosition> scalingTask;
@Before
public void setUp() {
dataSourceManager = new DataSourceManager();
......@@ -47,7 +54,6 @@ public final class InventoryDataScalingTaskGroupTest {
@Test
public void assertStart() {
ScalingTask scalingTask = mock(ScalingTask.class);
InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask));
inventoryDataSyncTaskGroup.start();
verify(scalingTask).start();
......@@ -55,7 +61,6 @@ public final class InventoryDataScalingTaskGroupTest {
@Test
public void assertStop() {
ScalingTask scalingTask = mock(ScalingTask.class);
InventoryDataScalingTaskGroup inventoryDataSyncTaskGroup = new InventoryDataScalingTaskGroup(Collections.singletonList(scalingTask));
inventoryDataSyncTaskGroup.stop();
verify(scalingTask).stop();
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-scaling</artifactId>
<version>5.0.0-RC1-SNAPSHOT</version>
</parent>
<artifactId>shardingsphere-scaling-elasticjob</artifactId>
<name>${project.artifactId}</name>
<dependencies>
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-scaling-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-lifecycle</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.elasticjob;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.internal.election.LeaderService;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobAPIFactory;
import org.apache.shardingsphere.elasticjob.lite.lifecycle.api.JobConfigurationAPI;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.governance.repository.api.config.GovernanceCenterConfiguration;
import org.apache.shardingsphere.governance.repository.api.listener.DataChangedEvent;
import org.apache.shardingsphere.governance.repository.zookeeper.CuratorZookeeperRepository;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.spi.ElasticJobEntry;
import org.apache.shardingsphere.scaling.elasticjob.job.ScalingElasticJob;
import java.util.Optional;
/**
* Scaling elastic job entry.
*/
@Slf4j
public final class ScalingElasticJobEntry implements ElasticJobEntry {
private static final String SCALING_JOB_NAME = "ScalingJob";
private static final String SCALING_JOB_CONFIG = "/__scalingjob_config";
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
private final CuratorZookeeperRepository curatorZookeeperRepository = new CuratorZookeeperRepository();
private OneOffJobBootstrap scalingJobBootstrap;
private boolean running;
private String namespace;
private GovernanceCenterConfiguration registryCenter;
@Override
public void init(final String namespace, final GovernanceCenterConfiguration registryCenter) {
log.info("Scaling elastic job start...");
this.namespace = namespace;
this.registryCenter = registryCenter;
initConfigurationRepository();
watchConfigurationRepository();
}
private void initConfigurationRepository() {
scalingJobBootstrap = new OneOffJobBootstrap(createRegistryCenter(), new ScalingElasticJob(), createJobConfiguration());
curatorZookeeperRepository.init(namespace, registryCenter);
}
private CoordinatorRegistryCenter createRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(registryCenter.getServerLists(), namespace);
zkConfig.setMaxSleepTimeMilliseconds(getProperty("max.sleep.time.milliseconds", zkConfig.getMaxSleepTimeMilliseconds()));
zkConfig.setBaseSleepTimeMilliseconds(getProperty("base.sleep.time.milliseconds", zkConfig.getBaseSleepTimeMilliseconds()));
zkConfig.setConnectionTimeoutMilliseconds(getProperty("connection.timeout.milliseconds", zkConfig.getConnectionTimeoutMilliseconds()));
zkConfig.setSessionTimeoutMilliseconds(getProperty("session.timeout.milliseconds", zkConfig.getSessionTimeoutMilliseconds()));
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zkConfig);
regCenter.init();
return regCenter;
}
private int getProperty(final String key, final int defaultValue) {
if (Strings.isNullOrEmpty(registryCenter.getProps().getProperty(key))) {
return defaultValue;
}
return Integer.parseInt(registryCenter.getProps().getProperty(key));
}
private JobConfiguration createJobConfiguration() {
return createJobConfiguration(1, null);
}
private JobConfiguration createJobConfiguration(final int shardingTotalCount, final String jobParameter) {
return JobConfiguration.newBuilder(SCALING_JOB_NAME, shardingTotalCount).jobParameter(jobParameter).build();
}
private void watchConfigurationRepository() {
curatorZookeeperRepository.watch(SCALING_JOB_CONFIG, event -> {
Optional<ScalingConfiguration> scalingConfiguration = getScalingConfiguration(event);
if (!scalingConfiguration.isPresent()) {
return;
}
switch (event.getChangedType()) {
case ADDED:
case UPDATED:
executeJob(scalingConfiguration.get());
break;
case DELETED:
deleteJob(scalingConfiguration.get());
break;
default:
break;
}
});
}
private Optional<ScalingConfiguration> getScalingConfiguration(final DataChangedEvent event) {
try {
log.info("{} scaling config: {}", event.getChangedType(), event.getValue());
return Optional.of(GSON.fromJson(event.getValue(), ScalingConfiguration.class));
} catch (JsonSyntaxException ex) {
log.error("analyze scaling config failed.", ex);
}
return Optional.empty();
}
private void executeJob(final ScalingConfiguration scalingConfiguration) {
if (running && scalingConfiguration.getJobConfiguration().isRunning()) {
log.warn("scaling elastic job has already running, ignore current config.");
return;
}
if (running == scalingConfiguration.getJobConfiguration().isRunning()) {
return;
}
if (new LeaderService(createRegistryCenter(), SCALING_JOB_NAME).isLeader()) {
log.info("leader worker update config.");
updateJobConfiguration(scalingConfiguration);
}
scalingJobBootstrap.execute();
running = scalingConfiguration.getJobConfiguration().isRunning();
}
private void deleteJob(final ScalingConfiguration scalingConfiguration) {
scalingConfiguration.getJobConfiguration().setRunning(false);
executeJob(scalingConfiguration);
}
private void updateJobConfiguration(final ScalingConfiguration scalingConfiguration) {
JobConfigurationAPI jobConfigurationAPI = JobAPIFactory.createJobConfigurationAPI(registryCenter.getServerLists(), namespace, null);
jobConfigurationAPI.updateJobConfiguration(
JobConfigurationPOJO.fromJobConfiguration(createJobConfiguration(scalingConfiguration.getJobConfiguration().getShardingTables().length, GSON.toJson(scalingConfiguration))));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.scaling.elasticjob.job;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.apache.shardingsphere.scaling.core.ScalingJobController;
import org.apache.shardingsphere.scaling.core.config.ScalingConfiguration;
import org.apache.shardingsphere.scaling.core.job.ShardingScalingJob;
import org.apache.shardingsphere.scaling.core.utils.SyncConfigurationUtil;
/**
* Scaling elastic job.
*/
@Slf4j
public final class ScalingElasticJob implements SimpleJob {
private static final Gson GSON = new GsonBuilder().disableHtmlEscaping().serializeNulls().create();
private static final ScalingJobController SCALING_JOB_CONTROLLER = new ScalingJobController();
private ShardingScalingJob shardingScalingJob;
@Override
public void execute(final ShardingContext shardingContext) {
log.info("execute job: {} - {}/{}", shardingContext.getTaskId(), shardingContext.getShardingItem(), shardingContext.getShardingTotalCount());
ScalingConfiguration scalingConfiguration = GSON.fromJson(shardingContext.getJobParameter(), ScalingConfiguration.class);
if (scalingConfiguration.getJobConfiguration().isRunning()) {
startJob(scalingConfiguration, shardingContext);
return;
}
stopJob(shardingContext);
}
private void startJob(final ScalingConfiguration scalingConfiguration, final ShardingContext shardingContext) {
log.info("start job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
scalingConfiguration.getJobConfiguration().setShardingItem(shardingContext.getShardingItem());
shardingScalingJob = new ShardingScalingJob(scalingConfiguration.getJobConfiguration().getJobName(), scalingConfiguration.getJobConfiguration().getShardingItem());
shardingScalingJob.getSyncConfigurations().addAll(SyncConfigurationUtil.toSyncConfigurations(scalingConfiguration));
SCALING_JOB_CONTROLLER.start(shardingScalingJob);
}
private void stopJob(final ShardingContext shardingContext) {
log.info("stop job: {} - {}", shardingContext.getJobName(), shardingContext.getShardingItem());
if (null != shardingScalingJob) {
SCALING_JOB_CONTROLLER.stop(shardingScalingJob.getJobId());
shardingScalingJob = null;
}
}
}
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
org.apache.shardingsphere.scaling.elasticjob.ScalingElasticJobEntry
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册