1. Flink环境搭建
1.1 Flink版本列表:
archive.apache.org/dist/flink/
1.2 选择最新的1.12.2版本进行安装
wget https://archive.apache.org/dist/flink/flink-1.12.2/flink-1.12.2-bin-scala_2.12.tgz
复制代码
1.3 解压安装
tar -xzf flink-1.12.2-bin-scala_2.12.tgz
./bin/start-cluster.sh
复制代码
检查是否安装成功:jps -l|grep flink
web UI页面地址:http://192.168.9.226:8081/#/overview
2. wordCount例子
2.1 springboot项目目录结构:
2.2 添加maven依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
复制代码
2.3 示例1:批处理wordCount
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
//批处理wordCount
public class WordCountBatch {
public static void main(String[] args) throws Exception{
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//从文件中读取数据
String filePath = "F:\\ttWork\\flink-demo\\src\\main\\resources\\hello.txt";
DataSet<String> inputDateSet = env.readTextFile(filePath);
//分词统计
DataSet<Tuple2<String, Integer>> sum = inputDateSet.flatMap(new MyFlatMap())
.groupBy(0) //第一个位置分组
.sum(1);//第二个位置汇总
sum.print();
}
//实现flatMap操作
public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>>{
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
//按空格分词
String[] words = s.split(" ");
//遍历输出二元组
for (String word : words){
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
2.4 示例2:流处理wordCount
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
//流处理wordCount
public class WordCountStream {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
env.setParallelism(4);
//从文件中读取数据
String filePath = "F:\\ttWork\\flink-demo\\src\\main\\resources\\hello.txt";
DataStream<String> inputDataStream = env.readTextFile(filePath);
//数据流转换操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap())
.keyBy(0)
.sum(1);
sum.print();
//启动任务
env.execute();
}
//实现flatMap操作
public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
//按空格分词
String[] words = s.split(" ");
//遍历输出二元组
for (String word : words){
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
2.5 示例3:socket流处理wordCount
socket端口输入测试数据
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
//socket流处理wordCount
public class WordCountSocketStream {
public static void main(String[] args) throws Exception {
//创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置并行度
// env.setParallelism(4);
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String host = parameterTool.get("host");
int port = parameterTool.getInt("port");
//从socket文本流读取数据
DataStream<String> inputDataStream = env.socketTextStream(host, port);
//数据流转换操作
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = inputDataStream.flatMap(new MyFlatMap())
.keyBy(0)
.sum(1);
sum.print();
//启动任务
env.execute();
}
//实现flatMap操作
public static class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
//按空格分词
String[] words = s.split(" ");
//遍历输出二元组
for (String word : words){
collector.collect(new Tuple2<>(word, 1));
}
}
}
}
复制代码
2.6 将示例三使用flink web ui发送到flink服务器上
输入要运行的类,socket的地址以及端口:
在running jobs页上查看flink程序的执行情况,如各个算子的并行度,接受数据的条数大小等
在task managers页上查看flink的日志输出:
近期评论