提交 435050f3 编写于 作者: 武汉红喜's avatar 武汉红喜

remove storm

上级 752464dc
......@@ -30,7 +30,6 @@
<module>whatsmars-spring-boot-samples</module>
<module>whatsmars-elasticsearch</module>
<module>whatsmars-zk</module>
<module>whatsmars-storm</module>
<module>whatsmars-flink</module>
<module>whatsmars-spring-data</module>
</modules>
......
https://github.com/alibaba/jstorm
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>whatsmars-parent</artifactId>
<groupId>org.hongxi</groupId>
<version>Rocket.S7</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>whatsmars-storm</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<!-- in a real topology that will be deploy to a real cluster, scope should be set to provided
as shown below. we're keeping it in default 'compile' scope to allow for easy execution with
maven in local cluster mode -->
<!-- <scope>provided</scope> -->
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>org.hongxi.whatsmars.storm.LocalTopologyRunner</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
<configuration>
<environmentVariables>
<STORM_TEST_TIMEOUT_MS>60000</STORM_TEST_TIMEOUT_MS>
</environmentVariables>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
package org.hongxi.whatsmars.storm;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
import org.hongxi.whatsmars.storm.topology.CommitFeedListener;
import org.hongxi.whatsmars.storm.topology.EmailCounter;
import org.hongxi.whatsmars.storm.topology.EmailExtractor;
public class LocalTopologyRunner {
private static final int TEN_MINUTES = 600000;
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
builder
.setBolt("email-extractor", new EmailExtractor())
.shuffleGrouping("commit-feed-listener");
builder
.setBolt("email-counter", new EmailCounter())
.fieldsGrouping("email-extractor", new Fields("email"));
Config config = new Config();
config.setDebug(true);
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("github-commit-count-topology",
config,
topology);
Utils.sleep(TEN_MINUTES);
cluster.killTopology("github-commit-count-topology");
cluster.shutdown();
}
}
package org.hongxi.whatsmars.storm.topology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.commons.io.IOUtils;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
/**
* This spout simulates reading commits from a live stream by doing two things:
* <p/>
* 1) Reading a file containing commit data into a list of strings (one string per commit).
* 2) When nextTuple() is called, emit a tuple for each string in the list.
*/
public class CommitFeedListener extends BaseRichSpout {
private SpoutOutputCollector outputCollector;
private List<String> commits;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("commit"));
}
@Override
public void open(Map map,
TopologyContext context,
SpoutOutputCollector outputCollector) {
this.outputCollector = outputCollector;
try {
commits = IOUtils.readLines(ClassLoader.getSystemResourceAsStream("changelog.txt"),
Charset.defaultCharset().name());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void nextTuple() {
for (String commit : commits) {
outputCollector.emit(new Values(commit));
}
}
}
\ No newline at end of file
package org.hongxi.whatsmars.storm.topology;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class EmailCounter extends BaseBasicBolt {
private Map<String, Integer> counts;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// This bolt does not emit any values and therefore does not define any output fields.
}
@Override
public void prepare(Map stormConf,
TopologyContext context) {
counts = new HashMap<String, Integer>();
}
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String email = tuple.getStringByField("email");
counts.put(email, countFor(email) + 1);
printCounts();
}
private Integer countFor(String email) {
Integer count = counts.get(email);
return count == null ? 0 : count;
}
/**
* Print the counts to System.out so you can easily see what's happening.
*/
private void printCounts() {
for (String email : counts.keySet()) {
System.out.println(String.format("%s has count of %s", email, counts.get(email)));
}
}
}
\ No newline at end of file
package org.hongxi.whatsmars.storm.topology;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class EmailExtractor extends BaseBasicBolt {
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("email"));
}
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector) {
String commit = tuple.getStringByField("commit");
String[] parts = commit.split(" ");
outputCollector.emit(new Values(parts[1]));
}
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册