初涉Flink领域,你是否觉得安装、设置、编写代码的过程让人头疼?许多人遇到这些入门难题,但实际上,只要掌握了正确的方法,就能轻松入门。
Flink基础回顾
aliMaven aliyun Maven http://Maven.aliyun.com/nexus/content/groups/public/ central
Flink有其独特的基本原理、架构和众多组成部分。这些构成了掌握Flink的关键。比如,掌握其架构有助于理解数据的流动和处理过程。有位朋友在学习Flink时,跳过基础原理直接尝试案例,结果遇到了不少难题。只有对基础知识有深入理解,后续操作才会更加得心应手。在真实的开发环境中,许多团队都要求开发者先掌握Flink的基础知识。
在大数据领域,Flink的相关知识同样至关重要。众多大数据企业在招聘过程中,往往会涉及这一话题。熟练掌握Flink,有助于求职者在竞争激烈的市场中脱颖而出。
org.apache.flink flink-java 1.6.1 provided org.apache.flink flink-streaming-java_2.11 1.6.1 provided
选择开发语言
org.apache.flink flink-scala_2.11 1.6.1 provided org.apache.flink flink-streaming-scala_2.11 1.6.1 provided
在编写Flink程序时,可以选择Java或Scala。Scala在实现函数式编程时显得更为精炼。我的一位同事起初用Java编写Flink程序,代码冗长,维护起来颇为不易。后来他转而使用Scala,程序状况得到了显著提升。尽管如此,Java也有其独特优势,比如众多开发者对Java更为熟悉。开发语言的选择受到多种因素影响,包括开发环境和个人偏好。在有些项目里,团队成员普遍擅长Java,那么选用Java来开发Flink程序也是合情合理的。
配置国内镜像与依赖管理
使用阿里云的Maven仓库镜像,需要调整.conf/.xml文件。在Maven中管理依赖项,必须先完成配置。我遇到一个项目,由于Maven仓库镜像配置不当,依赖项下载变得极其缓慢。同样,在Maven项目的pom.xml文件中也需要进行配置。不同版本需要去Maven仓库寻找相应的配置。例如,之前的一个小项目使用的是旧版本,配置过程颇为复杂。
不同语言的配置差异
package xuwei.tech.streaming; import org.apache.Flink.api.common.functions.FlatMapFunction; import org.apache.Flink.api.Java.utils.ParameterTool; import org.apache.Flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.Flink.runtime.state.filesystem.FsStateBackend; import org.apache.Flink.runtime.state.memory.MemoryStateBackend; import org.apache.Flink.streaming.api.DataStream.DataStream; import org.apache.Flink.streaming.api.DataStream.DataStreamSource; import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.Flink.streaming.api.windowing.time.Time; import org.apache.Flink.util.Collector; /** * 单词计数之滑动窗口计算 * * Created by xuwei.tech */ public class SocketWindowWordCountJava { public static void main(String[] args) throws Exception{ //获取需要的端口号 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--Java"); port = 9000; } //获取Flink的运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String hostname = "hadoop100"; String delimiter = "n"; //连接Socket获取输入的数据 DataStreamSource text = env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream windowCounts = text.flatMap(new FlatMapFunction () { public void flatMap(String value, Collector out) throws Exception { String[] splits = value.split("\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2s,指定时间间隔为1s .sum("count");//在这里使用sum或者reduce都可以 /*.reduce(new ReduceFunction() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } })*/ //把数据打印到控制台并且设置并行度 windowCounts.print().setParallelism(1); //这一行代码一定要实现,否则程序不执行 env.execute("Socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "WordWithCount{" + "word='" + word + ''' + ", count=" + count + '}'; } } }
在Java编程中需要加入特定的Java配置,Scala编程同样需要相应的配置。我之前曾将这两种语言的配置搞混。一旦配置错误,程序便可能无法正常运作。此外,不同版本的配置要求各异,有时为了与旧版本兼容,还需付出额外努力。
在开发工具中运行和打包的要点
package xuwei.tech.streaming import org.apache.Flink.api.Java.utils.ParameterTool import org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironment import org.apache.Flink.streaming.api.windowing.time.Time /** * 单词计数之滑动窗口计算 * * Created by xuwei.tech */ object SocketWindowWordCountScala { def main(args: Array[String]): Unit = { //获取Socket端口号 val port: Int = try { ParameterTool.fromArgs(args).getInt("port") }catch { case e: Exception => { System.err.println("No port set. use default port 9000--Scala") } 9000 } //获取运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //连接Socket获取输入数据 val text = env.socketTextStream("hadoop100",port,'n') //解析数据(把数据打平),分组,窗口计算,并且聚合求sum //注意:必须要添加这一行隐式转行,否则下面的FlatMap方法执行会报错 import org.apache.Flink.api.Scala._ val windowCounts = text.flatMap(line => line.split("\s"))//打平,把每一行单词都切开 .map(w => WordWithCount(w,1))//把单词转成word , 1这种形式 .keyBy("word")//分组 .timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间 .sum("count");// sum或者reduce都可以 //.reduce((a,b)=>WordWithCount(a.word,a.count+b.count)) //打印到控制台 windowCounts.print().setParallelism(1); //执行任务 env.execute("Socket window count"); } case class WordWithCount(word: String,count: Long) }
[root@hadoop100 soft]# nc -l 9000 a b a
使用IDEA等开发环境执行代码时,记得将依赖配置中的scope属性注释掉。而在制作JAR包时,则需确保这一属性被启用。举例来说,若之前某个项目在打包时遗漏了这一步骤,结果生成的JAR包体积庞大,还附带了许多不必要的依赖。Flink具备延迟计算的特性,只有调用特定方法才会启动执行。在调试过程中,我为此特性耗费了不少精力,不过它也确实简化了复杂程序的编写。
案例需求分析与数据处理
WordWithCount{word='a', count=1} WordWithCount{word='b', count=1} WordWithCount{word='a', count=2} WordWithCount{word='b', count=1} WordWithCount{word='a', count=1}
手工生成单词,Flink实时获取数据,并在特定窗口内进行汇总计算。需要为多种语言添加Maven依赖。之前曾遇到过类似需求,仅配置依赖就耗费了大量时间。按照要求完成依赖添加后,在IDEA中执行代码即可获得结果。此外,之后还可以尝试使用Flink的Batch离线批处理功能。
package xuwei.tech.batch; import org.apache.Flink.api.common.functions.FlatMapFunction; import org.apache.Flink.api.Java.DataSet; import org.apache.Flink.api.Java.ExecutionEnvironment; import org.apache.Flink.api.Java.operators.DataSource; import org.apache.Flink.api.Java.tuple.Tuple2; import org.apache.Flink.util.Collector; /** *单词计数之离线计算 * * Created by xuwei.tech */ public class BatchWordCountJava { public static void main(String[] args) throws Exception{ String inputPath = "D:\data\file"; String outPath = "D:\data\result"; //获取运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //获取文件中的内容 DataSource text = env.readTextFile(inputPath); DataSet<Tuple2> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1); counts.writeAsCsv(outPath,"n"," ").setParallelism(1); env.execute("batch word count"); } public static class Tokenizer implements FlatMapFunction<String,Tuple2>{ public void flatMap(String value, Collector<Tuple2> out) throws Exception { String[] tokens = value.toLowerCase().split("\W+"); for (String token: tokens) { if(token.length()>0){ out.collect(new Tuple2(token,1)); } } } } }
最后我想请教各位,在学习Flink的过程中,大家觉得哪一部分最让人感到棘手?期待大家的点赞和转发。
package xuwei.tech.batch import org.apache.Flink.api.Scala.ExecutionEnvironment /** * 单词计数之离线计算 * Created by xuwei.tech */ object BatchWordCountScala { def main(args: Array[String]): Unit = { val inputPath = "D:\data\file" val outPut = "D:\data\result" val env = ExecutionEnvironment.getExecutionEnvironment val text = env.readTextFile(inputPath) //引入隐式转换 import org.apache.Flink.api.Scala._ val counts = text.flatMap(_.toLowerCase.split("\W+")) .filter(_.nonEmpty) .map((_,1)) .groupBy(0) .sum(1) counts.writeAsCsv(outPut,"n"," ").setParallelism(1) env.execute("batch word count") } }