版本说明
Java 版本
1 2 3 4
| [root@hadoop102 ~]# java -version openjdk version "1.8.0_362" OpenJDK Runtime Environment (Tencent Kona 8.0.13) (build 1.8.0_362-b1) OpenJDK 64-Bit Server VM (Tencent Kona 8.0.13) (build 25.362-b1, mixed mode, sharing)
|
Maven 版本
1 2 3 4 5 6
| [root@hadoop102 ~]# mvn -version Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39) Maven home: /root/module/java/maven/apache-maven-3.8.8 Java version: 1.8.0_362, vendor: Tencent, runtime: /root/module/jdk/tencent_jdk8/jre Default locale: en_US, platform encoding: UTF-8 OS name: "linux", version: "5.4.119-1-tlinux4-0008", arch: "amd64", family: "unix"
|
服务器版本
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| [root@hadoop102 ~]# cat /etc/os-release NAME="CentOS Linux" VERSION="8" ID="centos" ID_LIKE="rhel fedora" VERSION_ID="8" PLATFORM_ID="platform:el8" PRETTY_NAME="CentOS Linux 8" ANSI_COLOR="0;31" CPE_NAME="cpe:/o:centos:centos:8" HOME_URL="https://centos.org/" BUG_REPORT_URL="https://bugs.centos.org/" CENTOS_MANTISBT_PROJECT="CentOS-8" CENTOS_MANTISBT_PROJECT_VERSION="8"
|
编译
1. 下载源码
1 2
| [root@hadoop102 bin]# ./flink --version Version: 1.17.2, Commit ID: c844010
|
我下载的是稳定版分支 1.17.2
,下载链接为 Flink 1.17.2。下载完成之后,我同步到了远端机器,然后用 jetbrains gateway
打开。
2. 使用 jetbrains gateway
打开源码
这是我远端机器的存放 Flink 源码的目录,然后用 jetbrains gateway
打开:

这是一个 maven 项目,`jetbrains gateway` 会自动解析
3. 注意事项
3.1 flink-runtime-web/pom.xml
需要进行修改
具体是将
1 2 3 4 5 6 7 8 9 10 11 12
| <execution> <id>npm install</id> <goals> <goal>npm</goal> </goals> <configuration> <arguments>ci --cache-max=0 --no-save ${npm.proxy}</arguments> <environmentVariables> <HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL> </environmentVariables> </configuration> </execution>
|
修改为:
1 2 3 4 5 6 7 8 9 10 11 12
| <execution> <id>npm install</id> <goals> <goal>npm</goal> </goals> <configuration> <arguments>install -g -registry=https://registry.npm.taobao.org --cache-max=0 --no-save</arguments> <environmentVariables> <HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL> </environmentVariables> </configuration> </execution>
|
参考文献:编译flink源码卡在Running ‘npm ci –cache-max=0 –no-save’
3.2 先安装好 npm
yum install npm
不然也会报错。
4. 开始编译
因为需要较多内存,需要对 maven 做一些设置,,以增加 jdk 可以使用的内存
如果是命令行格式,那么我们可以直接在 /etc/profile
文件里配置:
1
| export MAVEN_OPTS="-Xmx8g"
|
这样就可以加大内存的配置,在编译过程中不会出现 OOM。
但是在 jetbrains gateway
中,这个文件不会直接被使用,那么我们就需要在 maven
中进行配置:

编译时,需要执行的命令是:
1
| mvn clean install -DskipTests -Dfast -T 4
|
如果是命令行格式,那么直接执行就行了,但如果要在 jetbrains gateway
中执行,那么默认只有 mvn install
和 mvn clean
因此我们需要自定义一个 maven 的执行配置:
在 Run Configurations
中新增一条配置,更改执行参数:
然后右键点击执行即可。
经过一段时间的编译,这个编译就完成啦,每个模块下都会有对应的 target
文件生成。
到这里,编译就已经告一段落了。接下来可以开始运行
运行
1. 运行 JobManager
在 flink-runtime
模块下的 StandaloneSessionClusterEntrypoint
类中
配置文件目录
不修改直接启动会报错。
简单来说,就是需要我们指定配置文件所在目录。在这里,我们的配置文件全都放在
1 2 3 4 5 6
| [root@hadoop102 conf]# pwd /root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf [root@hadoop102 conf]# ls flink-conf.yaml log4j.properties logback-session.xml workers log4j-cli.properties log4j-session.properties logback.xml zoo.cfg log4j-console.properties logback-console.xml masters
|
中。因此我们需要更新输入参数:--configDir configpath

完整的程序输入参数为:
1
| --configDir /root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b
|
并且这样启动没有日志,还需要配置日志的参数(在 VM 参数中)。
完整的 VM 输入参数为:
1
| -Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/log/flink-standalonesession.log -Dlog4j.configuration=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlog4j.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlogback.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/logback-console.xml
|
主类缺失
此时启动,会报错:
1
| Exception in thread "main" java.lang.NullPointerException
|
这是因为没有添加依赖的 jar
包,这也就是我们之前使用 maven 编译的目的。
具体方法是:在 IDEA 的 File -> project structure -> Modules
中给 flink-runtime
添加依赖,依赖的 jar 包来源是 flink-dist
模块下的 target
文件夹。将
1 2 3 4 5 6 7 8
| [root@hadoop102 lib] /root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/target/flink-1.17.2-bin/flink-1.17.2/lib [root@hadoop102 lib] flink-cep-1.17.2.jar flink-scala_2.12-1.17.2.jar log4j-api-2.17.1.jar flink-connector-files-1.17.2.jar flink-table-api-java-uber-1.17.2.jar log4j-core-2.17.1.jar flink-csv-1.17.2.jar flink-table-planner-loader-1.17.2.jar log4j-slf4j-impl-2.17.1.jar flink-dist-1.17.2.jar flink-table-runtime-1.17.2.jar flink-json-1.17.2.jar log4j-1.2-api-2.17.1.jar
|
这个目录下的所有 jar 都添加到依赖中:
修改配置文件
注意,在默认的配置文件中,允许访问的 ip 是 localhost
,那么我们不能从外部访问,因此要修改 ip 为 0.0.0.0
,这样所有 ip 都可以访问到 flink 中。
1
| rest.bind-address: 0.0.0.0
|
此时再启动,我们就可以从外部访问到 flink 了。
在这里,flink 是在容器中的,我将 web 端口 8081 映射到了 10002 端口(使用 nginx
),nginx
配置为:
1 2 3 4 5 6 7 8 9 10 11 12 13
| server { listen 10002; server_name hadoop102; location / { root html; proxy_pass http://hadoop102:8081; index index.html index.htm; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } }
|
对于外部,10002 端口映射的外部端口是 62102,docker 配置为:
1
| 9d117ae6ee09 centos-coachhe "/bin/bash" 9 months ago Up 9 months 0.0.0.0:62022->22/tcp, :::62022->22/tcp, 0.0.0.0:62080->80/tcp, :::62080->80/tcp, 0.0.0.0:62088->8080/tcp, :::62088->8080/tcp, 0.0.0.0:62100->10000/tcp, :::62100->10000/tcp, 0.0.0.0:62101->10001/tcp, :::62101->10001/tcp, 0.0.0.0:62102->10002/tcp, :::62102->10002/tcp, 0.0.0.0:62103->10003/tcp, :::62103->10003/tcp, 0.0.0.0:62104->10004/tcp, :::62104->10004/tcp, 0.0.0.0:62105->10005/tcp, :::62105->10005/tcp, 0.0.0.0:62106->10006/tcp, :::62106->10006/tcp, 0.0.0.0:62107->10007/tcp, :::62107->10007/tcp, 0.0.0.0:62108->10008/tcp, :::62108->10008/tcp, 0.0.0.0:62109->10009/tcp, :::62109->10009/tcp, 0.0.0.0:62110->10010/tcp, :::62110->10010/tcp hadoop102
|
因此我们首先启动 Flink 的 JobManager:
在外部访问:
http://9.135.11.161:62102
可以看到:
成功访问!
2. 运行 TaskManager
成功开启 JobManager 之后,我们就要运行 TaskManager 了。
其实解决了 JobManager 之后,TaskManager 的启动也非常类似。启动参数为:
程序参数:
1
| -XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/log/flink-standalonesession.log -Dlog4j.configuration=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlog4j.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlogback.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/logback-console.xml
|
VM 参数:
1
| -XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/log/flink-standalonesession.log -Dlog4j.configuration=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlog4j.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlogback.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/logback-console.xml
|
启动完成:
此时 TaskManager 会自动注册到 JobManager 中:
完美! 源码编译完成,之后就可以开始手撸 Flink 源码啦
源码编译并启动 Flink 之后,我只需要在这一台机器编译 Flink 即可,并不需要将所有分布式环境都编译好 Flink ,这是因为别的机器只需要有环境就可以和我们现有的机器做分布式了。因此,别的机器我们就直接使用官网下载的编译好的可执行文件并执行即可。