提交 fc243a9b 编写于 作者: 武汉红喜's avatar 武汉红喜

shardingsphere-elasticjob

上级 63d62ddb
......@@ -45,7 +45,7 @@
<commons-io.version>2.7</commons-io.version>
<commons-lang.version>2.6</commons-lang.version>
<commons-logging.version>1.2</commons-logging.version>
<curator.version>2.12.0</curator.version>
<curator.version>5.1.0</curator.version>
<disruptor.version>3.4.2</disruptor.version>
<dubbo.version>2.7.3</dubbo.version>
<fastjson.version>1.2.70</fastjson.version>
......@@ -175,7 +175,6 @@
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>${curator.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
......
### 分布式调度
http://elasticjob.io
https://shardingsphere.apache.org/elasticjob/
先启动zookeeper
......
......@@ -15,24 +15,28 @@
<description>elasticjob demo</description>
<properties>
<elastic-job.version>2.1.5</elastic-job.version>
<elasticjob.version>3.0.0-alpha</elasticjob.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
<groupId>org.apache.shardingsphere.elasticjob</groupId>
<artifactId>elasticjob-lite-spring-boot-starter</artifactId>
<version>${elasticjob.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hongxi.whatsmars.job;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
/**
* Created by javahongxi on 2017/10/31.
*/
@SpringBootApplication
@ImportResource({"classpath:esjob.xml"})
public class Application {
public static void main(String[] args) {
public static void main(final String[] args) {
EmbedZookeeperServer.start(2181);
SpringApplication.run(Application.class, args);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hongxi.whatsmars.job;
import org.apache.curator.test.TestingServer;
import java.io.File;
import java.io.IOException;
/**
* Embed ZooKeeper.
*
* <p>
* Only used for examples
* </p>
*/
public final class EmbedZookeeperServer {
private static TestingServer testingServer;
/**
* Embed ZooKeeper.
*
* @param port ZooKeeper port
*/
public static void start(final int port) {
try {
testingServer = new TestingServer(port, new File(String.format("target/test_zk_data/%s/", System.nanoTime())));
// CHECKSTYLE:OFF
} catch (final Exception ex) {
// CHECKSTYLE:ON
ex.printStackTrace();
} finally {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
Thread.sleep(1000L);
testingServer.close();
} catch (final InterruptedException | IOException ignore) {
}
}));
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.hongxi.whatsmars.job.controller;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.OneOffJobBootstrap;
import org.springframework.context.annotation.DependsOn;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@DependsOn("org.apache.shardingsphere.elasticjob.lite.spring.boot.job.ElasticJobLiteAutoConfiguration")
public class OneOffJobController {
@Resource(name = "manualScriptJobBean")
private OneOffJobBootstrap manualScriptJob;
@GetMapping("/execute")
public String executeOneOffJob() {
manualScriptJob.execute();
return "{\"msg\":\"OK\"}";
}
}
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
......@@ -12,10 +13,9 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package org.hongxi.whatsmars.job.fixture.entity;
package org.hongxi.whatsmars.job.entity;
import java.io.Serializable;
......@@ -51,6 +51,7 @@ public final class Foo implements Serializable {
this.status = status;
}
@Override
public String toString() {
return String.format("id: %s, location: %s, status: %s", id, location, status);
}
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
......@@ -12,37 +13,42 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package org.hongxi.whatsmars.job.dataflow;
package org.hongxi.whatsmars.job.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.dataflow.DataflowJob;
import org.hongxi.whatsmars.job.fixture.entity.Foo;
import org.hongxi.whatsmars.job.fixture.repository.FooRepository;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob;
import org.hongxi.whatsmars.job.entity.Foo;
import org.hongxi.whatsmars.job.repository.FooRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class SpringDataflowJob implements DataflowJob<Foo> {
@Component
public class SpringBootDataflowJob implements DataflowJob<Foo> {
private final Logger logger = LoggerFactory.getLogger(SpringBootDataflowJob.class);
@Resource
private FooRepository fooRepository;
@Override
public List<Foo> fetchData(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH"));
logger.info("Item: {} | Time: {} | Thread: {} | {}",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW FETCH");
return fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
}
@Override
public void processData(final ShardingContext shardingContext, final List<Foo> data) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS"));
logger.info("Item: {} | Time: {} | Thread: {} | {}",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "DATAFLOW PROCESS");
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
}
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
......@@ -12,30 +13,35 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package org.hongxi.whatsmars.job.simple;
package org.hongxi.whatsmars.job.job;
import com.dangdang.ddframe.job.api.ShardingContext;
import com.dangdang.ddframe.job.api.simple.SimpleJob;
import org.hongxi.whatsmars.job.fixture.entity.Foo;
import org.hongxi.whatsmars.job.fixture.repository.FooRepository;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import org.hongxi.whatsmars.job.entity.Foo;
import org.hongxi.whatsmars.job.repository.FooRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
public class SpringSimpleJob implements SimpleJob {
@Component
public class SpringBootSimpleJob implements SimpleJob {
@Resource
private final Logger logger = LoggerFactory.getLogger(SpringBootSimpleJob.class);
@Autowired
private FooRepository fooRepository;
@Override
public void execute(final ShardingContext shardingContext) {
System.out.println(String.format("Item: %s | Time: %s | Thread: %s | %s",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE"));
public void execute(ShardingContext shardingContext) {
logger.info("Item: {} | Time: {} | Thread: {} | {}",
shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()), Thread.currentThread().getId(), "SIMPLE");
List<Foo> data = fooRepository.findTodoData(shardingContext.getShardingParameter(), 10);
for (Foo each : data) {
fooRepository.setCompleted(each.getId());
......
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
......@@ -12,12 +13,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package org.hongxi.whatsmars.job.fixture.repository;
package org.hongxi.whatsmars.job.repository;
import org.hongxi.whatsmars.job.fixture.entity.Foo;
import org.hongxi.whatsmars.job.entity.Foo;
import org.springframework.stereotype.Repository;
import java.util.ArrayList;
......@@ -28,7 +28,7 @@ import java.util.concurrent.ConcurrentHashMap;
@Repository
public class FooRepository {
private Map<Long, Foo> data = new ConcurrentHashMap<Long, Foo>(300, 1);
private final Map<Long, Foo> data = new ConcurrentHashMap<>(300, 1);
public FooRepository() {
init();
......@@ -47,7 +47,7 @@ public class FooRepository {
}
public List<Foo> findTodoData(final String location, final int limit) {
List<Foo> result = new ArrayList<Foo>(limit);
List<Foo> result = new ArrayList<>(limit);
int count = 0;
for (Map.Entry<Long, Foo> each : data.entrySet()) {
Foo foo = each.getValue();
......
spring:
datasource:
url: jdbc:h2:mem:job_event_storage
driver-class-name: org.h2.Driver
username: sa
password:
spring:
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=false
driver-class-name: com.mysql.jdbc.Driver
username: test
password: test
tomcat:
max-wait: 10000
min-idle: 0
initial-size: 25
validation-query: SELECT 1
test-on-borrow: false
test-while-idle: true
time-between-eviction-runs-millis: 18800
remove-abandoned: true
remove-abandoned-timeout: 180
serverLists=127.0.0.1:2181
namespace=elastic-job-example-lite-spring
baseSleepTimeMilliseconds=1000
maxSleepTimeMilliseconds=3000
maxRetries=3
event.rdb.driver=org.h2.Driver
event.rdb.url=jdbc:h2:mem:job_event_storage
event.rdb.username=sa
event.rdb.password=
listener.simple=com.dangdang.ddframe.job.example.listener.SpringSimpleListener
listener.distributed=com.dangdang.ddframe.job.example.listener.SpringSimpleDistributeListener
listener.distributed.startedTimeoutMilliseconds=1000
listener.distributed.completedTimeoutMilliseconds=3000
simple.id=springSimpleJob
simple.class=org.hongxi.whatsmars.job.simple.SpringSimpleJob
simple.cron=0/5 * * * * ?
simple.shardingTotalCount=3
simple.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
simple.monitorExecution=false
simple.failover=true
simple.description=\u53EA\u8FD0\u884C\u4E00\u6B21\u7684\u4F5C\u4E1A\u793A\u4F8B
simple.disabled=false
simple.overwrite=true
simple.monitorPort=9888
dataflow.id=springDataflowJob
dataflow.class=org.hongxi.whatsmars.job.dataflow.SpringDataflowJob
dataflow.cron=0/5 * * * * ?
dataflow.shardingTotalCount=3
dataflow.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
dataflow.maxTimeDiffSeconds=-1
dataflow.monitorExecution=true
dataflow.failover=true
dataflow.streamingProcess=true
dataflow.description=\u6309\u987A\u5E8F\u4E0D\u505C\u6B62\u8FD0\u884C\u7684\u4F5C\u4E1A\u793A\u4F8B
dataflow.disabled=false
dataflow.overwrite=true
script.id=springScriptJob
# need absolute path
script.scriptCommandLine=your_path/elastic-job/elastic-job-example/elastic-job-example-lite-spring/src/main/resources/script/demo.sh
script.cron=0/5 * * * * ?
script.shardingTotalCount=3
script.shardingItemParameters=0=Beijing,1=Shanghai,2=Guangzhou
script.description=Script Job
script.overwrite=true
spring:
profiles:
active: dev
elasticjob:
tracing:
type: RDB
regCenter:
serverLists: localhost:2181
namespace: elasticjob-lite-springboot
jobs:
simpleJob:
elasticJobClass: org.apache.shardingsphere.elasticjob.simple.job.SimpleJob
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
dataflowJob:
elasticJobClass: org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob
cron: 0/5 * * * * ?
shardingTotalCount: 3
shardingItemParameters: 0=Beijing,1=Shanghai,2=Guangzhou
scriptJob:
elasticJobType: SCRIPT
cron: 0/10 * * * * ?
shardingTotalCount: 3
props:
script.command.line: "echo SCRIPT Job: "
manualScriptJob:
elasticJobType: SCRIPT
jobBootstrapBeanName: manualScriptJobBean
shardingTotalCount: 9
props:
script.command.line: "echo Manual SCRIPT Job: "
dump:
port: 9888
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:reg="http://www.dangdang.com/schema/ddframe/reg"
xmlns:job="http://www.dangdang.com/schema/ddframe/job"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.dangdang.com/schema/ddframe/reg
http://www.dangdang.com/schema/ddframe/reg/reg.xsd
http://www.dangdang.com/schema/ddframe/job
http://www.dangdang.com/schema/ddframe/job/job.xsd
">
<bean id="elasticJobLog" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="${event.rdb.driver}"/>
<property name="url" value="${event.rdb.url}"/>
<property name="username" value="${event.rdb.username}"/>
<property name="password" value="${event.rdb.password}"/>
</bean>
<reg:zookeeper id="regCenter" server-lists="${serverLists}" namespace="${namespace}" base-sleep-time-milliseconds="${baseSleepTimeMilliseconds}" max-sleep-time-milliseconds="${maxSleepTimeMilliseconds}" max-retries="${maxRetries}" />
<job:simple id="springSimpleJob" class="${simple.class}" registry-center-ref="regCenter" sharding-total-count="${simple.shardingTotalCount}" cron="${simple.cron}" sharding-item-parameters="${simple.shardingItemParameters}" monitor-execution="${simple.monitorExecution}" monitor-port="${simple.monitorPort}" failover="${simple.failover}" description="${simple.description}" disabled="${simple.disabled}" overwrite="${simple.overwrite}" event-trace-rdb-data-source="elasticJobLog" />
<job:dataflow id="springDataflowJob" class="${dataflow.class}" registry-center-ref="regCenter" sharding-total-count="${dataflow.shardingTotalCount}" cron="${dataflow.cron}" sharding-item-parameters="${dataflow.shardingItemParameters}" monitor-execution="${dataflow.monitorExecution}" failover="${dataflow.failover}" max-time-diff-seconds="${dataflow.maxTimeDiffSeconds}" streaming-process="${dataflow.streamingProcess}" description="${dataflow.description}" disabled="${dataflow.disabled}" overwrite="${dataflow.overwrite}" />
<!-- use absolute path to run script job -->
<!--
<job:script id="${script.id}" registry-center-ref="regCenter" script-command-line="${script.scriptCommandLine}" sharding-total-count="${script.shardingTotalCount}" cron="${script.cron}" sharding-item-parameters="${script.shardingItemParameters}" description="${script.description}" overwrite="${script.overwrite}" />
-->
</beans>
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property name="log.context.name" value="elastic-job-example-lite" />
<property name="log.context.name" value="elasticjob-example-lite" />
<property name="log.charset" value="UTF-8" />
<property name="log.pattern" value="[%-5level] %date --%thread-- [%logger] %msg %n" />
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册