Flink快速入门教程:从零开始掌握大数据流处理技术

2024-12-26 0 891

初涉Flink领域,你是否觉得安装、设置、编写代码的过程让人头疼?许多人遇到这些入门难题,但实际上,只要掌握了正确的方法,就能轻松入门。

Flink基础回顾

 aliMaven
 aliyun Maven
 http://Maven.aliyun.com/nexus/content/groups/public/
 central 

Flink有其独特的基本原理、架构和众多组成部分。这些构成了掌握Flink的关键。比如,掌握其架构有助于理解数据的流动和处理过程。有位朋友在学习Flink时,跳过基础原理直接尝试案例,结果遇到了不少难题。只有对基础知识有深入理解,后续操作才会更加得心应手。在真实的开发环境中,许多团队都要求开发者先掌握Flink的基础知识。

在大数据领域,Flink的相关知识同样至关重要。众多大数据企业在招聘过程中,往往会涉及这一话题。熟练掌握Flink,有助于求职者在竞争激烈的市场中脱颖而出。

 
 org.apache.flink 
 flink-java 
 1.6.1 
provided 
 
 
 org.apache.flink 
 flink-streaming-java_2.11 
 1.6.1 
 provided 

选择开发语言

 
 org.apache.flink 
 flink-scala_2.11 
 1.6.1 
provided 
 
 
 org.apache.flink 
 flink-streaming-scala_2.11 
 1.6.1 
provided 

在编写Flink程序时,可以选择Java或Scala。Scala在实现函数式编程时显得更为精炼。我的一位同事起初用Java编写Flink程序,代码冗长,维护起来颇为不易。后来他转而使用Scala,程序状况得到了显著提升。尽管如此,Java也有其独特优势,比如众多开发者对Java更为熟悉。开发语言的选择受到多种因素影响,包括开发环境和个人偏好。在有些项目里,团队成员普遍擅长Java,那么选用Java来开发Flink程序也是合情合理的。

配置国内镜像与依赖管理

Flink快速入门教程:从零开始掌握大数据流处理技术

使用阿里云的Maven仓库镜像,需要调整.conf/.xml文件。在Maven中管理依赖项,必须先完成配置。我遇到一个项目,由于Maven仓库镜像配置不当,依赖项下载变得极其缓慢。同样,在Maven项目的pom.xml文件中也需要进行配置。不同版本需要去Maven仓库寻找相应的配置。例如,之前的一个小项目使用的是旧版本,配置过程颇为复杂。

不同语言的配置差异

package xuwei.tech.streaming;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.utils.ParameterTool;
import org.apache.Flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.Flink.runtime.state.filesystem.FsStateBackend;
import org.apache.Flink.runtime.state.memory.MemoryStateBackend;
import org.apache.Flink.streaming.api.DataStream.DataStream;
import org.apache.Flink.streaming.api.DataStream.DataStreamSource;
import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.Flink.streaming.api.windowing.time.Time;
import org.apache.Flink.util.Collector;
/**
 * 单词计数之滑动窗口计算
 *
 * Created by xuwei.tech
 */
public class SocketWindowWordCountJava {
 public static void main(String[] args) throws Exception{
 //获取需要的端口号
 int port;
 try {
 ParameterTool parameterTool = ParameterTool.fromArgs(args);
 port = parameterTool.getInt("port");
 }catch (Exception e){
 System.err.println("No port set. use default port 9000--Java");
 port = 9000;
 }
 //获取Flink的运行环境
 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 String hostname = "hadoop100";
 String delimiter = "n";
 //连接Socket获取输入的数据
 DataStreamSource text = env.socketTextStream(hostname, port, delimiter);
 // a a c
 // a 1
 // a 1
 // c 1
 DataStream windowCounts = text.flatMap(new FlatMapFunction 
() {
 public void flatMap(String value, Collector out) throws 
Exception {
 String[] splits = value.split("\s");
 for (String word : splits) {
 out.collect(new WordWithCount(word, 1L));
 }
 }
 }).keyBy("word")
 .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2s,指定时间间隔为1s
 .sum("count");//在这里使用sum或者reduce都可以
 /*.reduce(new ReduceFunction() {
 public WordWithCount reduce(WordWithCount a, 
WordWithCount b) throws Exception {
 return new WordWithCount(a.word,a.count+b.count);
 }
 })*/
 //把数据打印到控制台并且设置并行度
 windowCounts.print().setParallelism(1);
 //这一行代码一定要实现,否则程序不执行
 env.execute("Socket window count");
 }
 public static class WordWithCount{
 public String word;
 public long count;
 public WordWithCount(){}
 public WordWithCount(String word,long count){
 this.word = word;
 this.count = count;
 }
 @Override
 public String toString() {
 return "WordWithCount{" +
 "word='" + word + ''' +
 ", count=" + count +
 '}';
 }
 }
}

在Java编程中需要加入特定的Java配置,Scala编程同样需要相应的配置。我之前曾将这两种语言的配置搞混。一旦配置错误,程序便可能无法正常运作。此外,不同版本的配置要求各异,有时为了与旧版本兼容,还需付出额外努力。

在开发工具中运行和打包的要点

package xuwei.tech.streaming
import org.apache.Flink.api.Java.utils.ParameterTool
import org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironment
import org.apache.Flink.streaming.api.windowing.time.Time
/**
 * 单词计数之滑动窗口计算
 *
 * Created by xuwei.tech
 */
object SocketWindowWordCountScala {
 def main(args: Array[String]): Unit = {
 //获取Socket端口号
 val port: Int = try {
 ParameterTool.fromArgs(args).getInt("port")
 }catch {
 case e: Exception => {
 System.err.println("No port set. use default port 9000--Scala")
 }
 9000
 }
 //获取运行环境
 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
 //连接Socket获取输入数据
 val text = env.socketTextStream("hadoop100",port,'n')
 //解析数据(把数据打平),分组,窗口计算,并且聚合求sum
 //注意:必须要添加这一行隐式转行,否则下面的FlatMap方法执行会报错
 import org.apache.Flink.api.Scala._
 val windowCounts = text.flatMap(line => line.split("\s"))//打平,把每一行单词都切开
 .map(w => WordWithCount(w,1))//把单词转成word , 1这种形式
 .keyBy("word")//分组
 .timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间
 .sum("count");// sum或者reduce都可以
 //.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))
 //打印到控制台
 windowCounts.print().setParallelism(1);
 //执行任务
 env.execute("Socket window count");
 }
 case class WordWithCount(word: String,count: Long)
}

[root@hadoop100 soft]# nc -l 9000
a
b
a

使用IDEA等开发环境执行代码时,记得将依赖配置中的scope属性注释掉。而在制作JAR包时,则需确保这一属性被启用。举例来说,若之前某个项目在打包时遗漏了这一步骤,结果生成的JAR包体积庞大,还附带了许多不必要的依赖。Flink具备延迟计算的特性,只有调用特定方法才会启动执行。在调试过程中,我为此特性耗费了不少精力,不过它也确实简化了复杂程序的编写。

案例需求分析与数据处理

WordWithCount{word='a', count=1}
WordWithCount{word='b', count=1}
WordWithCount{word='a', count=2}
WordWithCount{word='b', count=1}
WordWithCount{word='a', count=1}

手工生成单词,Flink实时获取数据,并在特定窗口内进行汇总计算。需要为多种语言添加Maven依赖。之前曾遇到过类似需求,仅配置依赖就耗费了大量时间。按照要求完成依赖添加后,在IDEA中执行代码即可获得结果。此外,之后还可以尝试使用Flink的Batch离线批处理功能。

package xuwei.tech.batch;
import org.apache.Flink.api.common.functions.FlatMapFunction;
import org.apache.Flink.api.Java.DataSet;
import org.apache.Flink.api.Java.ExecutionEnvironment;
import org.apache.Flink.api.Java.operators.DataSource;
import org.apache.Flink.api.Java.tuple.Tuple2;
import org.apache.Flink.util.Collector;
/**
 *单词计数之离线计算
 *
 * Created by xuwei.tech
 */
public class BatchWordCountJava {
 public static void main(String[] args) throws Exception{
 String inputPath = "D:\data\file";
 String outPath = "D:\data\result";
 //获取运行环境
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 //获取文件中的内容
 DataSource text = env.readTextFile(inputPath);
 DataSet<Tuple2> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
 counts.writeAsCsv(outPath,"n"," ").setParallelism(1);
 env.execute("batch word count");
 }
 public static class Tokenizer implements FlatMapFunction<String,Tuple2>{
 public void flatMap(String value, Collector<Tuple2> out) 
throws Exception {
 String[] tokens = value.toLowerCase().split("\W+");
 for (String token: tokens) {
 if(token.length()>0){
 out.collect(new Tuple2(token,1));
 }
 }
 }
 }
}

最后我想请教各位,在学习Flink的过程中,大家觉得哪一部分最让人感到棘手?期待大家的点赞和转发。

package xuwei.tech.batch
import org.apache.Flink.api.Scala.ExecutionEnvironment
/**
 * 单词计数之离线计算
 * Created by xuwei.tech
 */
object BatchWordCountScala {
 def main(args: Array[String]): Unit = {
 val inputPath = "D:\data\file"
 val outPut = "D:\data\result"
 val env = ExecutionEnvironment.getExecutionEnvironment
 val text = env.readTextFile(inputPath)
 //引入隐式转换
 import org.apache.Flink.api.Scala._
 val counts = text.flatMap(_.toLowerCase.split("\W+"))
 .filter(_.nonEmpty)
 .map((_,1))
 .groupBy(0)
 .sum(1)
 counts.writeAsCsv(outPut,"n"," ").setParallelism(1)
 env.execute("batch word count")
 }
}

申明:本文由第三方发布,内容仅代表作者观点,与本网站无关。对本文以及其中全部或者部分内容的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。本网发布或转载文章出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,也不代表本网对其真实性负责。

七爪网 行业资讯 Flink快速入门教程:从零开始掌握大数据流处理技术 https://www.7claw.com/2805384.html

七爪网源码交易平台

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务