提交 08c31d68 编写于 作者: 武汉红喜's avatar 武汉红喜


上级 8fff2f60
...@@ -26,6 +26,7 @@ whatsmars-spring | Spring Framework ...@@ -26,6 +26,7 @@ whatsmars-spring | Spring Framework
whatsmars-spring-boot | Spring Boot 实战 whatsmars-spring-boot | Spring Boot 实战
whatsmars-spring-boot-samples | Spring Boot Samples whatsmars-spring-boot-samples | Spring Boot Samples
whatsmars-spring-cloud | Spring Cloud 微服务生态 whatsmars-spring-cloud | Spring Cloud 微服务生态
whatsmars-storm | 分布式实时计算系统
whatsmars-zk | zookeeper remoting 封装 whatsmars-zk | zookeeper remoting 封装
### Rocket Stack ### Rocket Stack
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
<module>whatsmars-spring-boot-samples</module> <module>whatsmars-spring-boot-samples</module>
<module>whatsmars-elasticsearch</module> <module>whatsmars-elasticsearch</module>
<module>whatsmars-zk</module> <module>whatsmars-zk</module>
</modules> </modules>
<!--这里的properties会覆盖父pom里的重名的配置--> <!--这里的properties会覆盖父pom里的重名的配置-->
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!-- in a real topology that will be deploy to a real cluster, scope should be set to provided
as shown below. we're keeping it in default 'compile' scope to allow for easy execution with
maven in local cluster mode -->
<!-- <scope>provided</scope> -->
\ No newline at end of file
package org.hongxi.whatsmars.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.hongxi.whatsmars.storm.topology.CommitFeedListener;
import org.hongxi.whatsmars.storm.topology.EmailCounter;
import org.hongxi.whatsmars.storm.topology.EmailExtractor;
public class LocalTopologyRunner {
private static final int TEN_MINUTES = 600000;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
.setBolt("email-extractor", new EmailExtractor())
.setBolt("email-counter", new EmailCounter())
.fieldsGrouping("email-extractor", new Fields("email"));
Config config = new Config();
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
package org.hongxi.whatsmars.storm.topology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
* This spout simulates reading commits from a live stream by doing two things:
* <p/>
* 1) Reading a file containing commit data into a list of strings (one string per commit).
* 2) When nextTuple() is called, emit a tuple for each string in the list.
public class CommitFeedListener extends BaseRichSpout {
private SpoutOutputCollector outputCollector;
private List<String> commits;
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("commit"));
public void open(Map map,
TopologyContext context,
SpoutOutputCollector outputCollector) {
this.outputCollector = outputCollector;
try {
commits = IOUtils.readLines(ClassLoader.getSystemResourceAsStream("changelog.txt"),
} catch (IOException e) {
throw new RuntimeException(e);
public void nextTuple() {
for (String commit : commits) {
outputCollector.emit(new Values(commit));
\ No newline at end of file
package org.hongxi.whatsmars.storm.topology;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class EmailCounter extends BaseBasicBolt {
private Map<String, Integer> counts;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// This bolt does not emit any values and therefore does not define any output fields.
public void prepare(Map stormConf,
TopologyContext context) {
counts = new HashMap<String, Integer>();
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String email = tuple.getStringByField("email");
counts.put(email, countFor(email) + 1);
private Integer countFor(String email) {
Integer count = counts.get(email);
return count == null ? 0 : count;
* Print the counts to System.out so you can easily see what's happening.
private void printCounts() {
for (String email : counts.keySet()) {
System.out.println(String.format("%s has count of %s", email, counts.get(email)));
\ No newline at end of file
package org.hongxi.whatsmars.storm.topology;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class EmailExtractor extends BaseBasicBolt {
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("email"));
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String commit = tuple.getStringByField("commit");
String[] parts = commit.split(" ");
outputCollector.emit(new Values(parts[1]));
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册