📜  Apache Flink-API概念

📅  最后修改于: 2020-10-30 10:08:57             🧑  作者: Mango


Flink具有丰富的API集,开发人员可以使用它们对批处理和实时数据进行转换。各种转换包括映射,过滤,排序,联接,分组和聚合。 Apache Flink的这些转换是在分布式数据上执行的。让我们讨论Apache Flink提供的不同API。

数据集API

Apache Flink中的数据集API用于在一段时间内对数据执行批处理操作。该API可以在Java,Scala和Python。它可以对数据集应用不同类型的转换,例如过滤,映射,聚合,联接和分组。

可以从诸如本地文件之类的源中创建数据集,也可以通过从特定的源中读取文件来创建数据集,并且可以将结果数据写入不同的接收器(如分布式文件或命令行终端)中。 Java和Scala编程语言均支持此API。

这是Dataset API的Wordcount程序-

public class WordCountProg {
   public static void main(String[] args) throws Exception {
      final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet text = env.fromElements(
      "Hello",
      "My Dataset API Flink Program");

      DataSet> wordCounts = text
      .flatMap(new LineSplitter())
      .groupBy(0)
      .sum(1);

      wordCounts.print();
   }

   public static class LineSplitter implements FlatMapFunction> {
      @Override
      public void flatMap(String line, Collector> out) {
         for (String word : line.split(" ")) {
            out.collect(new Tuple2(word, 1));
         }
      }
   }
}

DataStream API

该API用于处理连续流中的数据。您可以对流数据执行各种操作,如过滤,映射,窗口化,聚合。此数据流上有各种来源,例如消息队列,文件,套接字流,并且结果数据可以写入不同的接收器(如命令行终端)中。 Java和Scala编程语言均支持此API。

这是DataStream API的流式Wordcount程序,您可以在其中连续不断地进行字数统计并将数据分组在第二个窗口中。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WindowWordCountProg {
   public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      DataStream> dataStream = env
      .socketTextStream("localhost", 9999)
      .flatMap(new Splitter())
      .keyBy(0)
      .timeWindow(Time.seconds(5))
      .sum(1);
      dataStream.print();
      env.execute("Streaming WordCount Example");
   }
   public static class Splitter implements FlatMapFunction> {
      @Override
      public void flatMap(String sentence, Collector> out) throws Exception {
         for (String word: sentence.split(" ")) {
            out.collect(new Tuple2(word, 1));
         }
      }
   }
}