5.md 9.0 KB
Newer Older
W
init  
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281


# Local Setup Tutorial

Get a Flink example program up and running in a few simple steps.

## Setup: Download and Start Flink

Flink runs on **Linux, Mac OS X, and Windows**. To be able to run Flink, the only requirement is to have a working **Java 8.x** installation. Windows users, please take a look at the [Flink on Windows](//ci.apache.org/projects/flink/flink-docs-release-1.7/tutorials/flink_on_windows.html) guide which describes how to run Flink on Windows for local setups.

You can check the correct installation of Java by issuing the following command:

<figure class="highlight">

```
java -version
```

</figure>

If you have Java 8, the output will look something like this:

<figure class="highlight">

```
java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
```

</figure>

1.  Download a binary from the [downloads page](http://flink.apache.org/downloads.html). You can pick any Hadoop/Scala combination you like. If you plan to just use the local file system, any Hadoop version will work fine.
2.  Go to the download directory.
3.  Unpack the downloaded archive.

<figure class="highlight">

```
$ cd ~/Downloads        # Go to download directory
$ tar xzf flink-*.tgz   # Unpack the downloaded archive
$ cd flink-1.7.1
```

</figure>

For MacOS X users, Flink can be installed through [Homebrew](https://brew.sh/).

<figure class="highlight">

```
$ brew install apache-flink
...
$ flink --version
Version: 1.2.0, Commit ID: 1c659cf
```

</figure>

### Start a Local Flink Cluster

<figure class="highlight">

```
$ ./bin/start-cluster.sh  # Start Flink
```

</figure>

Check the **Dispatcher’s web frontend** at [http://localhost:8081](http://localhost:8081) and make sure everything is up and running. The web frontend should report a single available TaskManager instance.

[![Dispatcher: Overview](https://ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart-setup/jobmanager-1.png)](//ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart-setup/jobmanager-1.png)

You can also verify that the system is running by checking the log files in the `logs` directory:

<figure class="highlight">

```
$ tail log/flink-*-standalonesession-*.log
INFO ... - Rest endpoint listening at localhost:8081
INFO ... - http://localhost:8081 was granted leadership ...
INFO ... - Web frontend listening at http://localhost:8081.
INFO ... - Starting RPC endpoint for StandaloneResourceManager at akka://flink/user/resourcemanager .
INFO ... - Starting RPC endpoint for StandaloneDispatcher at akka://flink/user/dispatcher .
INFO ... - ResourceManager akka.tcp://flink@localhost:6123/user/resourcemanager was granted leadership ...
INFO ... - Starting the SlotManager.
INFO ... - Dispatcher akka.tcp://flink@localhost:6123/user/dispatcher was granted leadership ...
INFO ... - Recovering all persisted jobs.
INFO ... - Registering TaskManager ... under ... at the SlotManager.
```

</figure>

## Read the Code

You can find the complete source code for this SocketWindowWordCount example in [scala](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/socket/SocketWindowWordCount.scala) and [java](https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/socket/SocketWindowWordCount.java) on GitHub.

<figure class="highlight">

```
object SocketWindowWordCount {

    def main(args: Array[String]) : Unit = {

        // the port to connect to
        val port: Int = try {
            ParameterTool.fromArgs(args).getInt("port")
        } catch {
            case e: Exception => {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                return
            }
        }

        // get the execution environment
        val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

        // get input data by connecting to the socket
        val text = env.socketTextStream("localhost", port, '\n')

        // parse the data, group it, window it, and aggregate the counts
        val windowCounts = text
            .flatMap { w => w.split("\\s") }
            .map { w => WordWithCount(w, 1) }
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .sum("count")

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1)

        env.execute("Socket Window WordCount")
    }

    // Data type for words with count
    case class WordWithCount(word: String, count: Long)
}
```

</figure>

<figure class="highlight">

```
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {

        // the port to connect to
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream("localhost", port, "\n");

        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                @Override
                public void flatMap(String value, Collector<WordWithCount> out) {
                    for (String word : value.split("\\s")) {
                        out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                @Override
                public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                    return new WordWithCount(a.word, a.count + b.count);
                }
            });

        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // Data type for words with count
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}
```

</figure>

## Run the Example

Now, we are going to run this Flink application. It will read text from a socket and once every 5 seconds print the number of occurrences of each distinct word during the previous 5 seconds, i.e. a tumbling window of processing time, as long as words are floating in.

*   First of all, we use **netcat** to start local server via

<figure class="highlight">

```
$ nc -l 9000
```

</figure>

*   Submit the Flink program:

<figure class="highlight">

```
$ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
Starting execution of program
```

</figure>

The program connects to the socket and waits for input. You can check the web interface to verify that the job is running as expected:

[![Dispatcher: Overview (cont'd)](https://ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart-setup/jobmanager-2.png)](//ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart-setup/jobmanager-2.png)[![Dispatcher: Running Jobs](https://ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart-setup/jobmanager-3.png)](//ci.apache.org/projects/flink/flink-docs-release-1.7/page/img/quickstart-setup/jobmanager-3.png)

*   Words are counted in time windows of 5 seconds (processing time, tumbling windows) and are printed to `stdout`. Monitor the TaskManager’s output file and write some text in `nc` (input is sent to Flink line by line after hitting &lt;return&gt;):&lt;/return&gt;

<figure class="highlight">

```
$ nc -l 9000
lorem ipsum
ipsum ipsum ipsum
bye
```

</figure>

The `.out` file will print the counts at the end of each time window as long as words are floating in, e.g.:

<figure class="highlight">

```
$ tail -f log/flink-*-taskexecutor-*.out
lorem : 1
bye : 1
ipsum : 4
```

</figure>

To **stop** Flink when you’re done type:

<figure class="highlight">

```
$ ./bin/stop-cluster.sh
```

</figure>

## Next Steps

Check out some more [examples](//ci.apache.org/projects/flink/flink-docs-release-1.7/examples) to get a better feel for Flink’s programming APIs. When you are done with that, go ahead and read the [streaming guide](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/datastream_api.html).