提交 18cd725f 编写于 作者: 武汉红喜's avatar 武汉红喜

elastic-job

上级 c1459d9c
......@@ -12,7 +12,7 @@ whatsmars-javase-example Java基础学习<br />
whatsmars-spring spring原理、集成及新特性<br />
whatsmars-tomcat 模拟tomcat实现,嵌入式(embed)tomcat<br />
whatsmars-dbrouter 分库分表实现<br />
whatsmars-worker 多实例任务调度<br />
whatsmars-elasticjob 分布式调度<br />
whatsmars-dubbo Dubbo demo<br />
whatsmars-mq rocketmq,activemq<br />
rocketmq-console rocketmq管理后台<br />
......
......@@ -21,7 +21,7 @@
<module>whatsmars-spring</module>
<module>whatsmars-spring-boot</module>
<module>whatsmars-dbrouter</module>
<module>whatsmars-worker</module>
<module>whatsmars-elasticjob</module>
<module>whatsmars-rpc</module>
<module>whatsmars-dubbo</module>
<module>whatsmars-mq</module>
......
### 分布式调度
http://elasticjob.io
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>whatsmars-parent</artifactId>
<groupId>com.itlong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-elasticjob</artifactId>
<properties>
<java.version>1.7</java.version>
<elastic-job.version>2.1.5</elastic-job.version>
<curator.version>2.10.0</curator.version>
<springframework.version>4.3.7.RELEASE</springframework.version>
<slf4j.version>1.7.7</slf4j.version>
<logback.version>1.1.2</logback.version>
<commons-dbcp.version>1.4</commons-dbcp.version>
<h2.version>1.4.184</h2.version>
<mysql.version>5.1.30</mysql.version>
<maven-compiler-plugin.version>3.3</maven-compiler-plugin.version>
</properties>
<dependencies>
<dependency>
<artifactId>elastic-job-common-core</artifactId>
<groupId>com.dangdang</groupId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<artifactId>elastic-job-lite-core</artifactId>
<groupId>com.dangdang</groupId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<artifactId>elastic-job-lite-spring</artifactId>
<groupId>com.dangdang</groupId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<artifactId>elastic-job-cloud-executor</artifactId>
<groupId>com.dangdang</groupId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${springframework.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>
<dependency>
<groupId>commons-dbcp</groupId>
<artifactId>commons-dbcp</artifactId>
<version>${commons-dbcp.version}</version>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>${h2.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.itlong.whatsmars.job;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* Created by javahongxi on 2017/10/31.
*/
public class Main {
public static void main(String[] args) {
new ClassPathXmlApplicationContext("classpath:META-INF/applicationContext.xml");
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.dataflow;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.itlong.whatsmars.job.fixture.entity.Foo;
import com.itlong.whatsmars.job.fixture.repository.FooRepository;
import com.itlong.whatsmars.job.fixture.repository.FooRepositoryFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class JavaDataflowJob implements DataflowJob<Foo> {
private FooRepository fooRepository = FooRepositoryFactory.getFooRepository();
@Override
public List<Foo> fetchData(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH"));
return fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
}
@Override
public void processData(final ShardingContext shardingContext, final List<Foo> data) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS"));
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.dataflow;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import com.itlong.whatsmars.job.fixture.entity.Foo;
import com.itlong.whatsmars.job.fixture.repository.FooRepository;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class SpringDataflowJob implements DataflowJob<Foo> {
@Resource
private FooRepository fooRepository;
@Override
public List<Foo> fetchData(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH"));
return fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
}
@Override
public void processData(final ShardingContext shardingContext, final List<Foo> data) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS"));
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.fixture.entity;
import java.io.Serializable;
public final class Foo implements Serializable {
private static final long serialVersionUID = 2706842871078949451L;
private final long id;
private final String location;
private Status status;
public Foo(final long id, final String location, final Status status) {
this.id = id;
this.location = location;
this.status = status;
}
public long getId() {
return id;
}
public String getLocation() {
return location;
}
public Status getStatus() {
return status;
}
public void setStatus(final Status status) {
this.status = status;
}
public String toString() {
return String.format("id: %s, location: %s, status: %s", id, location, status);
}
public enum Status {
TODO,
COMPLETED
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.fixture.repository;
import com.itlong.whatsmars.job.fixture.entity.Foo;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Repository
public class FooRepository {
private Map<Long, Foo> data = new ConcurrentHashMap<Long, Foo>(300, 1);
public FooRepository() {
init();
}
private void init() {
addData(0L, 100L, "Beijing");
addData(100L, 200L, "Shanghai");
addData(200L, 300L, "Guangzhou");
}
private void addData(final long idFrom, final long idTo, final String location) {
for (long i = idFrom; i < idTo; i++) {
data.put(i, new Foo(i, location, Foo.Status.TODO));
}
}
public List<Foo> findTodoData(final String location, final int limit) {
List<Foo> result = new ArrayList<Foo>(limit);
int count = 0;
for (Map.Entry<Long, Foo> each : data.entrySet()) {
Foo foo = each.getValue();
if (foo.getLocation().equals(location) && foo.getStatus() == Foo.Status.TODO) {
result.add(foo);
count++;
if (count == limit) {
break;
}
}
}
return result;
}
public void setCompleted(final long id) {
data.get(id).setStatus(Foo.Status.COMPLETED);
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.fixture.repository;
public final class FooRepositoryFactory {
private static FooRepository fooRepository = new FooRepository();
public static FooRepository getFooRepository() {
return fooRepository;
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.simple;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.itlong.whatsmars.job.fixture.entity.Foo;
import com.itlong.whatsmars.job.fixture.repository.FooRepository;
import com.itlong.whatsmars.job.fixture.repository.FooRepositoryFactory;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class JavaSimpleJob implements SimpleJob {
private FooRepository fooRepository = FooRepositoryFactory.getFooRepository();
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package com.itlong.whatsmars.job.simple;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import com.itlong.whatsmars.job.fixture.entity.Foo;
import com.itlong.whatsmars.job.fixture.repository.FooRepository;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class SpringSimpleJob implements SimpleJob {
@Resource
private FooRepository fooRepository;
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<context:component-scan base-package="com.dangdang.ddframe.job.example" />
<context:property-placeholder location="classpath:conf/*.properties" />
<bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${event.rdb.driver}"/>
<property name="url" value="${event.rdb.url}"/>
<property name="username" value="${event.rdb.username}"/>
<property name="password" value="${event.rdb.password}"/>
</bean>
<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />
<job:simple id="${simple.id}" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}" event-trace-rdb-data-source="elasticJobLog" />
<job:dataflow id="${dataflow.id}" class="${dataflow.class}" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" streaming-process="${dataflow.streamingProcess}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" />
<!-- use absolute path to run script job -->
<!--
<job:script id="${script.id}" registry-center-ref="regCenter" script-command-line="${script.scriptCommandLine}" sharding-total-count="${script.shardingTotalCount}" cron="${script.cron}" sharding-item-parameters="${script.shardingItemParameters}" description="${script.description}" overwrite="${script.overwrite}" />
-->
</beans>
event.rdb.driver=org.h2.Driver
event.rdb.url=jdbc:h2:mem:job_event_storage
event.rdb.username=sa
event.rdb.password=
listener.simple=com.dangdang.ddframe.job.example.listener.SpringSimpleListener
listener.distributed=com.dangdang.ddframe.job.example.listener.SpringSimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=1000
listener.distributed.completedTimeoutMilliseconds=3000
simple.id=springSimpleJob
simple.class=com.itlong.whatsmars.job.simple.SpringSimpleJob
simple.cron=0/5 * * * * ?
simple.shardingTotalCount=3
simple.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
simple.monitorExecution=false
simple.failover=true
simple.description=\u53EA\u8FD0\u884C\u4E00\u6B21\u7684\u4F5C\u4E1A\u793A\u4F8B
simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888
dataflow.id=springDataflowJob
dataflow.class=com.itlong.whatsmars.job.dataflow.SpringDataflowJob
dataflow.cron=0/5 * * * * ?
dataflow.shardingTotalCount=3
dataflow.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
dataflow.maxTimeDiffSeconds=-1
dataflow.monitorExecution=true
dataflow.failover=true
dataflow.streamingProcess=true
dataflow.description=\u6309\u987A\u5E8F\u4E0D\u505C\u6B62\u8FD0\u884C\u7684\u4F5C\u4E1A\u793A\u4F8B
dataflow.disabled=false
dataflow.overwrite=true
script.id=springScriptJob
# need absolute path
script.scriptCommandLine=your_path/elastic-job/elastic-job-example/elastic-job-example-lite-spring/src/main/resources/script/demo.sh
script.cron=0/5 * * * * ?
script.shardingTotalCount=3
script.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
script.description=Script Job
script.overwrite=true
serverLists=localhost:2181
namespace=elastic-job-example-lite-spring
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="log.context.name" value="elastic-job-example-lite" />
<property name="log.charset" value="UTF-8" />
<property name="log.pattern" value="[%-5level] %date --%thread-- [%logger] %msg %n" />
<contextName>${log.context.name}</contextName>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder charset="${log.charset}">
<pattern>${log.pattern}</pattern>
</encoder>
</appender>
<root>
<level value="INFO" />
<appender-ref ref="STDOUT" />
</root>
<logger name="org.apache.zookeeper" level="WARN" />
<logger name="org.apache.curator" level="WARN" />
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>whatsmars-parent</artifactId>
<groupId>com.itlong</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-worker</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>net.sf.json-lib</groupId>
<artifactId>json-lib</artifactId>
<version>2.4</version>
<classifier>jdk15</classifier>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${ibatis.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.11.1</version>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package com.itlong.whatsmars.worker.base;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/11.
*/
public abstract class AbstractJobExecutor implements JobExecutor {
protected TaskBaseService taskBaseService;
protected int batchSize;
protected String dbKey; // db1, db2
protected String tableIndex; // _0000, _0001
protected void execute(ThreadPoolTaskExecutor threadPoolTaskExecutor, StandardTaskService standardTaskService, List<Task> tasks) {
JobRunnable jobRunnable = new JobRunnable();
jobRunnable.setTasks(tasks);
jobRunnable.setStandardTaskService(standardTaskService);
jobRunnable.setTableIndex(tableIndex);
jobRunnable.setDbKey(dbKey);
threadPoolTaskExecutor.execute(jobRunnable);
}
protected void execute(ThreadPoolTaskExecutor threadPoolTaskExecutor, SimpleTaskService simpleTaskService, List<Task> tasks) {
SimpleJobRunnable jobRunnable = new SimpleJobRunnable();
jobRunnable.setTasks(tasks);
jobRunnable.setSimpleTaskService(simpleTaskService);
jobRunnable.setTableIndex(tableIndex);
jobRunnable.setDbKey(dbKey);
threadPoolTaskExecutor.execute(jobRunnable);
}
/**
* 计算批处理执行次数
* @param taskSize
* @param batchSize
* @return
*/
protected int computeBatchTimes(int taskSize, int batchSize){
int times = 0;
if (taskSize % batchSize > 0) {
times = taskSize / batchSize + 1;
}else{
times = taskSize / batchSize;
}
return times;
}
/**
* 计算批处开始
* @param taskSize
* @param batchSize
* @return
*/
protected int computeBatchFromIndex(int taskSize, int batchSize, int i){
int from = i * batchSize;
if(from > taskSize){
from = taskSize;
}
return from;
}
/**
* 计算批处理结束
* @param taskSize
* @param batchSize
* @return
*/
protected int computeBatchToIndex(int taskSize, int batchSize, int i){
int to = (i + 1) * batchSize;
if (to > taskSize){
to = taskSize;
}
return to;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public String getDbKey() {
return dbKey;
}
public void setDbKey(String dbKey) {
this.dbKey = dbKey;
DbContext.setDbKey(dbKey);
}
public String getTableIndex() {
return tableIndex;
}
public void setTableIndex(String tableIndex) {
this.tableIndex = tableIndex;
DbContext.setTableIndex(tableIndex);
}
public TaskBaseService getTaskBaseService() {
return taskBaseService;
}
public void setTaskBaseService(TaskBaseService taskBaseService) {
this.taskBaseService = taskBaseService;
}
}
package com.itlong.whatsmars.worker.base;
/**
* Created by shenhongxi on 2016/7/11.
*/
public class BaseDO {
private String phone; // 手机号码
private String tableIndex;
public String getPhone() {
return phone;
}
public void setPhone(String phone) {
this.phone = phone;
}
public String getTableIndex() {
tableIndex = DbContext.getTableIndex();
return tableIndex;
}
public void setTableIndex(String tableIndex) {
this.tableIndex = tableIndex;
}
}
package com.itlong.whatsmars.worker.base;
import org.apache.ibatis.session.SqlSession;
/**
* Author: qing
* Date: 14-10-12
*/
public abstract class BaseDao {
protected SqlSession sqlSession;
public void setSqlSession(SqlSession sqlSession) {
this.sqlSession = sqlSession;
}
}
package com.itlong.whatsmars.worker.base;
/**
* Created by shenhongxi on 2016/7/12.
*/
public interface CacheService {
void set(String key, String value, long seconds);
String get(String key);
}
package com.itlong.whatsmars.worker.base;
/**
* Created by shenhongxi on 2016/7/11.
*/
public class DbContext {
private static final ThreadLocal<String> dbHolder = new ThreadLocal<String>();
private static final ThreadLocal<String> tableHolder = new ThreadLocal<String>();
public static void setDbKey(String dbKey) {
dbHolder.set(dbKey);
}
public static String getDbKey() {
return dbHolder.get();
}
public static void removeDbKey() {
dbHolder.remove();
}
public static void setTableIndex(String tableIndex) {
tableHolder.set(tableIndex);
}
public static String getTableIndex() {
return tableHolder.get();
}
public static void removeTableIndex() {
tableHolder.remove();
}
}
package com.itlong.whatsmars.worker.base;
/**
* Created by shenhongxi on 2016/7/11.
* 1. 一个job的多个实例,谁先成功锁定任务,谁先处理任务,若处理失败则解锁任务
* 2. 对于1中解锁失败的,要利用另外的job来专门进行解锁
* 3. 将任务分成几批,并行处理
* 4. 这些任务的子任务分批串行处理,同样有锁定-处理-失败解锁
* 5. 对于4中解锁失败的,同样要利用另外的job来专门进行解锁
*/
public interface JobExecutor {
void execute() throws Exception;
}
package com.itlong.whatsmars.worker.base;
import net.sf.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* Created by shenhongxi on 2016/7/12.
*/
public class JobGenerateExecutor extends AbstractJobExecutor {
private static final Logger log = LoggerFactory.getLogger(JobGenerateExecutor.class);
private int dbs;
private int tables;
private String tableFormat = "0000";
@Autowired
private CacheService cacheService;
private static final int cacheTime = 60*60*24-60*20; // 缓存时间23小时40分
@Override
public void execute() throws Exception {
log.info("shard job start!");
String taskType = new Integer(tableIndex).toString();
String today = new SimpleDateFormat("yyyyMMdd").format(new Date());
String _tableIndex; // 0000
String taskId = "";
String successFlag = "y";
String shardAllCacheKey = "shardAllJob_" + today;
if (successFlag.equals(cacheService.get(shardAllCacheKey))) {
log.info("任务已经全部生成,date:" + today);
} else {
boolean allSuccess = true;
DbContext.setTableIndex(tableIndex);
for (int i = 1; i <= dbs; i++) {
String dbKey = "db" + i;
DbContext.setDbKey(dbKey);
for (int j = 0; j < tables; j++) {
try {
_tableIndex = format(j, tableFormat);
taskId = today + _tableIndex;
String shardSingleCacheKey = "shardSingleJob_" + dbKey + "_" + taskId;
if (successFlag.equals(cacheService.get(shardSingleCacheKey))) {
log.info("此任务已经生成,dbKey:" + dbKey + ",taskId:" + taskId);
} else {
Task task = new Task();
task.setTaskId(taskId);
task.setType(taskType);
JSONObject json = new JSONObject();
json.put("lastId", 0L);
task.setData(json.toString());
taskBaseService.add(task);
log.info("生成任务成功。dbKey:" + dbKey + ",tableIndex:" + tableIndex + ",taskId:" + taskId);
}
} catch (Exception e) {
if (e.getMessage() != null && (e.getMessage().indexOf("Duplicate") >= 0 || e.getMessage().indexOf("UNIQUE KEY") >= 0
|| e.getMessage().indexOf("PRIMARY KEY") >= 0)) { // 任务重复,视为成功
log.info("生成任务重复。taskType:" + taskType + ",taskId:" + taskId);
}else{
log.error("生成任务异常。taskType:" + taskType + ",taskId:" + taskId, e);
allSuccess = false;
}
}
}
}
if (allSuccess) {
cacheService.set(shardAllCacheKey, successFlag, cacheTime);
}
}
}
private String format(int num, String format){
DecimalFormat df = new DecimalFormat();
df.applyPattern(format);
return df.format(num);
}
public int getDbs() {
return dbs;
}
public void setDbs(int dbs) {
this.dbs = dbs;
}
public int getTables() {
return tables;
}
public void setTables(int tables) {
this.tables = tables;
}
public String getTableFormat() {
return tableFormat;
}
public void setTableFormat(String tableFormat) {
this.tableFormat = tableFormat;
}
}
package com.itlong.whatsmars.worker.base;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/12.
*/
public class JobRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(JobRunnable.class);
private StandardTaskService standardTaskService;
private List<Task> tasks;
private String dbKey;
private String tableIndex;
@Override
public void run() {
if (tasks != null) {
if (log.isDebugEnabled()) {
log.debug("Current thread: " + Thread.currentThread().getName());
}
try {
DbContext.setDbKey(dbKey);
DbContext.setTableIndex(tableIndex);
for (Task task : tasks) {
task.setTableIndex(tableIndex);
// 1. 一个job的多个实例,谁先成功锁定任务,谁先处理任务,若处理失败则解锁任务
// 2. 对于1中解锁失败的,要利用另外的job来专门进行解锁
// 3. 将任务分成几批,并行处理
// 4. 这些任务的子任务分批串行处理,同样有锁定-处理-失败解锁
// 5. 对于4中解锁失败的,同样要利用另外的job来专门进行解锁
boolean locked = standardTaskService.lock(task);
if (!locked) continue;
boolean result = standardTaskService.process(task);
standardTaskService.finished(result, task);
}
} catch (Exception e) {
log.error("Do task error", e);
throw new RuntimeException("Do task error");
}
}
}
public List<Task> getTasks() {
return tasks;
}
public void setTasks(List<Task> tasks) {
this.tasks = tasks;
}
public StandardTaskService getStandardTaskService() {
return standardTaskService;
}
public void setStandardTaskService(StandardTaskService standardTaskService) {
this.standardTaskService = standardTaskService;
}
public String getTableIndex() {
return tableIndex;
}
public void setTableIndex(String tableIndex) {
this.tableIndex = tableIndex;
}
public String getDbKey() {
return dbKey;
}
public void setDbKey(String dbKey) {
this.dbKey = dbKey;
}
}
package com.itlong.whatsmars.worker.base;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/12.
*/
public class JobUnlockExecutor extends AbstractJobExecutor {
private Logger log = LoggerFactory.getLogger(JobUnlockExecutor.class);
/** 配置任务最大锁定时间(秒)*/
private int splitSeconds = 60 * 5;
/** 配置失败重试次数 */
private int retries = 6;
/** 配置每次批处理个数 */
private int batchSize = 100;
/** 配置数据库个数 */
private int dbs;
@Autowired
private ThreadPoolTaskExecutor taskUnlockThreadPool;
@Resource(name="taskUnlockService")
private SimpleTaskService taskUnlockService;
@Override
public void execute() throws Exception {
try {
DbContext.setTableIndex(tableIndex);
for (int i = 1; i <= dbs; i++) {
String dbKey = "db" + i;
DbContext.setDbKey(dbKey);
List<Task> tasks = taskBaseService.findLocked(splitSeconds, retries);
if (tasks != null && tasks.size() > 0) {
int size = tasks.size();
log.info("取到被锁定任务条数:" + size + ",dbKey:" + dbKey);
int batchTimes = computeBatchTimes(size, batchSize);
for (int j = 0; j < batchTimes; j++) {
int from = computeBatchFromIndex(size, batchSize, j);
int to = computeBatchToIndex(size, batchSize, j);
this.execute(taskUnlockThreadPool, taskUnlockService, tasks.subList(from, to));
}
} else {
log.info("任务调度--没有被锁定的任务,dbkey:" + dbKey);
}
}
} catch (Exception e) {
log.error("任务调度--定期重置被锁定的任务异常", e);
}
}
public Integer getDbs() {
return dbs;
}
public void setDbs(Integer dbs) {
this.dbs = dbs;
}
public Integer getSplitSeconds() {
return splitSeconds;
}
public void setSplitSeconds(Integer splitSeconds) {
this.splitSeconds = splitSeconds;
}
public Integer getRetries() {
return retries;
}
public void setRetries(Integer retries) {
this.retries = retries;
}
public void setBatchSize(Integer batchSize) {
this.batchSize = batchSize;
}
}
package com.itlong.whatsmars.worker.base;
import java.util.HashMap;
import java.util.Map;
/**
* Created by shenhongxi on 2016/7/13.
*/
public class ParamHashMap<K,V> extends HashMap<K,V> {
private static final String TABLE_SUFFIX = "tableIndex";
@SuppressWarnings( "unchecked" )
public ParamHashMap(){
super();
this.put((K) TABLE_SUFFIX, (V) DbContext.getTableIndex());
}
@SuppressWarnings( "unchecked" )
public ParamHashMap(Map<? extends K, ? extends V> m){
super(m);
this.put((K)TABLE_SUFFIX, (V)DbContext.getTableIndex());
}
@SuppressWarnings( "unchecked" )
public ParamHashMap(int initialCapacity) {
super(initialCapacity);
this.put((K)TABLE_SUFFIX, (V)DbContext.getTableIndex());
}
@SuppressWarnings( "unchecked" )
public ParamHashMap(int initialCapacity, float loadFactor) {
super(initialCapacity,loadFactor);
this.put((K)TABLE_SUFFIX, (V) DbContext.getTableIndex());
}
private static final long serialVersionUID = 5541751367713832209L;
}
package com.itlong.whatsmars.worker.base;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/12.
*/
public class SimpleJobRunnable implements Runnable {
private static final Logger log = LoggerFactory.getLogger(SimpleJobRunnable.class);
private SimpleTaskService simpleTaskService;
private List<Task> tasks;
private String dbKey;
private String tableIndex;
@Override
public void run() {
if (tasks != null) {
if (log.isDebugEnabled()) {
log.debug("Current simple thread: " + Thread.currentThread().getName());
}
try {
DbContext.setDbKey(dbKey);
DbContext.setTableIndex(tableIndex);
for (Task task : tasks) {
task.setTableIndex(tableIndex);
simpleTaskService.process(task);
}
} catch (Exception e) {
log.error("Do simple task error", e);
throw new RuntimeException("Do simple task error");
}
}
}
public List<Task> getTasks() {
return tasks;
}
public void setTasks(List<Task> tasks) {
this.tasks = tasks;
}
public String getTableIndex() {
return tableIndex;
}
public void setTableIndex(String tableIndex) {
this.tableIndex = tableIndex;
}
public String getDbKey() {
return dbKey;
}
public void setDbKey(String dbKey) {
this.dbKey = dbKey;
}
public SimpleTaskService getSimpleTaskService() {
return simpleTaskService;
}
public void setSimpleTaskService(SimpleTaskService simpleTaskService) {
this.simpleTaskService = simpleTaskService;
}
}
package com.itlong.whatsmars.worker.base;
/**
* Created by shenhongxi on 2016/7/11.
*/
public interface SimpleTaskService {
boolean process(Task task);
}
package com.itlong.whatsmars.worker.base;
/**
* Created by shenhongxi on 2016/7/11.
*/
public interface StandardTaskService {
boolean lock(Task task);
boolean process(Task task);
boolean finished(boolean processResult, Task task);
}
package com.itlong.whatsmars.worker.base;
import java.util.Date;
/**
* Created by shenhongxi on 2016/7/11.
*/
public class Task extends BaseDO {
private Long id;
private String taskId;
private String type;
private int status;
private String data; // json
private int retries;
private Date createDate;
private Date updateDate;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public int getStatus() {
return status;
}
public void setStatus(int status) {
this.status = status;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
public int getRetries() {
return retries;
}
public void setRetries(int retries) {
this.retries = retries;
}
public Date getCreateDate() {
return createDate;
}
public void setCreateDate(Date createDate) {
this.createDate = createDate;
}
public Date getUpdateDate() {
return updateDate;
}
public void setUpdateDate(Date updateDate) {
this.updateDate = updateDate;
}
}
package com.itlong.whatsmars.worker.base;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/12.
*/
public interface TaskBaseService {
void add(Task task);
boolean updateSuccess(Task task);
boolean lock(Task task);
boolean unlock(Task task);
List<Task> findLocked(int splitSeconds, int retries);
List<Task> findByType(String type);
}
package com.itlong.whatsmars.worker.base;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/12.
*/
public interface TaskDao {
void insert(Task task);
int updateSuccess(Task task);
int updateFailed(Task task);
int lock(Task task);
int unlock(Task task);
List<Task> findLocked(int splitSeconds, int retries);
List<Task> findByType(String type);
}
package com.itlong.whatsmars.worker.base;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/13.
*/
public class TaskDaoImpl extends BaseDao implements TaskDao {
@Override
public void insert(Task task) {
this.sqlSession.insert("Task.insert", task);
}
@Override
public int updateSuccess(Task task) {
return this.sqlSession.update("Task.updateSuccess", task);
}
@Override
public int updateFailed(Task task) {
return this.sqlSession.update("Task.updateFailed", task);
}
@Override
public int lock(Task task) {
return this.sqlSession.update("Task.lock", task);
}
@Override
public int unlock(Task task) {
return this.sqlSession.update("Task.unlock", task);
}
@Override
public List<Task> findLocked(int splitSeconds, int retries) {
ParamHashMap<String, Object> params = new ParamHashMap<String, Object>();
params.put("splitSeconds", splitSeconds);
params.put("retries", retries);
return this.sqlSession.selectList("Task.findLocked", params);
}
@Override
public List<Task> findByType(String type) {
ParamHashMap<String, Object> params = new ParamHashMap<String, Object>();
params.put("type", type);
return this.sqlSession.selectList("Task.findByType", params);
}
}
package com.itlong.whatsmars.worker.demo;
import com.itlong.whatsmars.worker.base.StandardTaskService;
import com.itlong.whatsmars.worker.base.Task;
import com.itlong.whatsmars.worker.base.TaskBaseService;
import org.springframework.beans.factory.annotation.Autowired;
/**
* Created by shenhongxi on 2016/7/12.
*/
public abstract class AbstractStandardTaskService implements StandardTaskService {
@Autowired
private TaskBaseService taskBaseService;
@Override
public boolean finished(boolean processResult, Task task) {
if(processResult){
return taskBaseService.updateSuccess(task);
} else {
taskBaseService.unlock(task);
}
return true;
}
@Override
public boolean lock(Task task) {
return taskBaseService.lock(task);
}
}
package com.itlong.whatsmars.worker.demo;
import com.itlong.whatsmars.worker.base.DbContext;
import com.itlong.whatsmars.worker.base.Task;
import net.sf.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import java.util.List;
/**
* Created by shenhongxi on 16/7/16.
*/
public class BizServiceImpl extends AbstractStandardTaskService {
private static final Logger log = LoggerFactory.getLogger(BizServiceImpl.class);
@Resource(name = "smsSender")
private JmsTemplate jmsTemplate;
@Resource(name = "taskMqExecutor")
private ThreadPoolTaskExecutor taskMqExecutor;
@Override
public boolean process(Task task) {
String taskTableIndex = DbContext.getTableIndex();
try {
String bizTableIndex = task.getTaskId().substring(6, task.getTaskId().length());
DbContext.setTableIndex(bizTableIndex);
JSONObject data = JSONObject.fromObject(task.getData());
final long lastId = data.getInt("lastId");
while (true) {
List list = null;
if (list == null || list.isEmpty()) {
log.info("任务全部处理完成,table:" + bizTableIndex);
return true;
}
for (final Object e : list) {
// 锁定
boolean lockResult = lock(e);
if (lockResult) {
taskMqExecutor.execute(new Runnable() {
@Override
public void run() {
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
JSONObject json = new JSONObject();
json.put("bizId", "");
return session.createTextMessage(json.toString());
}
});
}
});
}
}
}
} catch (Exception e) {
log.error("biz process error, taskId:" + task.getTaskId());
return false;
} finally {
DbContext.setTableIndex(taskTableIndex);
}
}
private boolean lock(Object e) {
// ...
return true;
}
}
package com.itlong.whatsmars.worker.demo;
import com.itlong.whatsmars.worker.base.SimpleTaskService;
import com.itlong.whatsmars.worker.base.Task;
/**
* Created by shenhongxi on 16/7/17.
*/
public class BizUnlockServiceImpl implements SimpleTaskService {
@Override
public boolean process(Task task) {
return false;
}
}
package com.itlong.whatsmars.worker.demo;
import com.itlong.whatsmars.worker.base.Task;
import com.itlong.whatsmars.worker.base.TaskBaseService;
import com.itlong.whatsmars.worker.base.TaskDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/12.
*/
@Service("taskBaseService")
public class TaskBaseServiceImpl implements TaskBaseService {
@Autowired
private TaskDao taskDao;
@Override
public void add(Task task) {
taskDao.insert(task);
}
@Override
public boolean updateSuccess(Task task) {
try {
return taskDao.updateSuccess(task) > 0;
} catch (Exception e) {
return false;
}
}
@Override
public boolean lock(Task task) {
try {
return taskDao.lock(task) > 0;
} catch (Exception e) {
return false;
}
}
@Override
public boolean unlock(Task task) {
try {
return taskDao.unlock(task) > 0;
} catch (Exception e) {
return false;
}
}
@Override
public List<Task> findLocked(int splitSeconds, int retries) {
return taskDao.findLocked(splitSeconds, retries);
}
@Override
public List<Task> findByType(String type) {
return taskDao.findByType(type);
}
}
package com.itlong.whatsmars.worker.demo;
import com.itlong.whatsmars.worker.base.SimpleTaskService;
import com.itlong.whatsmars.worker.base.Task;
import com.itlong.whatsmars.worker.base.TaskDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Created by shenhongxi on 2016/7/12.
*/
@Service("taskFailedService")
public class TaskFailedServiceImpl implements SimpleTaskService {
@Autowired
private TaskDao taskDao;
@Override
public boolean process(Task task) {
try {
return taskDao.updateFailed(task) > 0;
} catch (Exception e) {
return false;
}
}
}
package com.itlong.whatsmars.worker.demo;
import com.itlong.whatsmars.worker.base.SimpleTaskService;
import com.itlong.whatsmars.worker.base.Task;
import com.itlong.whatsmars.worker.base.TaskDao;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* Created by shenhongxi on 2016/7/12.
*/
@Service("taskUnlockService")
public class TaskUnlockServiceImpl implements SimpleTaskService {
@Autowired
private TaskDao taskDao;
@Override
public boolean process(Task task) {
try {
return taskDao.unlock(task) > 0;
} catch (Exception e) {
return false;
}
}
}
package com.itlong.whatsmars.worker.support;
import com.itlong.whatsmars.worker.base.AbstractJobExecutor;
import com.itlong.whatsmars.worker.base.DbContext;
import com.itlong.whatsmars.worker.base.StandardTaskService;
import com.itlong.whatsmars.worker.base.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.List;
/**
* Created by shenhongxi on 2016/7/14.
*/
public class BizJobProcessExecutor extends AbstractJobExecutor {
private static final Logger log = LoggerFactory.getLogger(BizJobProcessExecutor.class);
private ThreadPoolTaskExecutor commonThreadPool;
private StandardTaskService standardTaskService;
@Override
public void execute() throws Exception {
DbContext.setDbKey(dbKey);
DbContext.setTableIndex(tableIndex);
String taskType = new Integer(tableIndex).toString();
try {
List<Task> tasks = taskBaseService.findByType(taskType);
if (tasks != null && tasks.size() > 0) {
int taskSize = tasks.size();
int batchTimes = computeBatchTimes(taskSize, batchSize);
for (int i = 0; i < batchTimes; i++) {
int from = computeBatchFromIndex(taskSize, batchSize, i);
int to = computeBatchToIndex(taskSize, batchSize, i);
this.execute(commonThreadPool, standardTaskService, tasks.subList(from, to));
}
} else {
log.info("没有未处理的任务");
}
} catch (Exception e) {
}
}
public ThreadPoolTaskExecutor getCommonThreadPool() {
return commonThreadPool;
}
public void setCommonThreadPool(ThreadPoolTaskExecutor commonThreadPool) {
this.commonThreadPool = commonThreadPool;
}
public StandardTaskService getStandardTaskService() {
return standardTaskService;
}
public void setStandardTaskService(StandardTaskService standardTaskService) {
this.standardTaskService = standardTaskService;
}
}
package com.itlong.whatsmars.worker.support;
import com.itlong.whatsmars.worker.base.AbstractJobExecutor;
import com.itlong.whatsmars.worker.base.DbContext;
import com.itlong.whatsmars.worker.base.SimpleTaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.Resource;
import java.text.DecimalFormat;
import java.util.List;
/**
* Created by shenhongxi on 16/7/16.
* 参考TaskUnlockJobExecutor
*/
public class BizJobUnlockExecutor extends AbstractJobExecutor {
private Logger log = LoggerFactory.getLogger(BizJobUnlockExecutor.class);
private int tables;
private String tableFormat = "0000";
@Resource(name = "bizUnlockThreadPool")
private ThreadPoolTaskExecutor bizUnlockThreadPool;
@Resource(name = "bizUnlockService")
private SimpleTaskService bizUnlockService;
@Override
public void execute() throws Exception {
// 循环各表查询锁定任务
for (int i = 0; i < tables; i++) {
try {
tableIndex = format(i, tableFormat);
DbContext.setTableIndex(tableIndex);
List list = null;
if (list != null && list.size() > 0) {
int taskSize = list.size();
int batchTimes = computeBatchTimes(taskSize, batchSize);
for (int j = 0; j < batchTimes; j++) {
int from = computeBatchFromIndex(taskSize, batchSize, i);
int to = computeBatchToIndex(taskSize, batchSize, i);
this.execute(bizUnlockThreadPool, bizUnlockService, list.subList(from, to));
}
} else {
log.info("没有被锁定的任务");
}
} catch (Exception e) {
log.error("unlock error",e);
}
}
}
private String format(int num, String format){
DecimalFormat df = new DecimalFormat();
df.applyPattern(format);
String ret=df.format(num);
return ret;
}
public int getTables() {
return tables;
}
public void setTables(int tables) {
this.tables = tables;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册