Flink源码编译

版本说明

Java 版本

1
2
3
4
[root@hadoop102 ~]# java -version
openjdk version "1.8.0_362"
OpenJDK Runtime Environment (Tencent Kona 8.0.13) (build 1.8.0_362-b1)
OpenJDK 64-Bit Server VM (Tencent Kona 8.0.13) (build 25.362-b1, mixed mode, sharing)

Maven 版本

1
2
3
4
5
6
[root@hadoop102 ~]# mvn -version
Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39)
Maven home: /root/module/java/maven/apache-maven-3.8.8
Java version: 1.8.0_362, vendor: Tencent, runtime: /root/module/jdk/tencent_jdk8/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.4.119-1-tlinux4-0008", arch: "amd64", family: "unix"

服务器版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@hadoop102 ~]# cat /etc/os-release 
NAME="CentOS Linux"
VERSION="8"
ID="centos"
ID_LIKE="rhel fedora"
VERSION_ID="8"
PLATFORM_ID="platform:el8"
PRETTY_NAME="CentOS Linux 8"
ANSI_COLOR="0;31"
CPE_NAME="cpe:/o:centos:centos:8"
HOME_URL="https://centos.org/"
BUG_REPORT_URL="https://bugs.centos.org/"
CENTOS_MANTISBT_PROJECT="CentOS-8"
CENTOS_MANTISBT_PROJECT_VERSION="8"

编译

1. 下载源码

1
2
[root@hadoop102 bin]# ./flink --version
Version: 1.17.2, Commit ID: c844010

我下载的是稳定版分支 1.17.2,下载链接为 Flink 1.17.2。下载完成之后,我同步到了远端机器,然后用 jetbrains gateway 打开。

2. 使用 jetbrains gateway 打开源码

这是我远端机器的存放 Flink 源码的目录,然后用 jetbrains gateway 打开:

这是一个 maven 项目,`jetbrains gateway` 会自动解析

3. 注意事项

具体是将

1
2
3
4
5
6
7
8
9
10
11
12
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>ci --cache-max=0 --no-save ${npm.proxy}</arguments>
<environmentVariables>
<HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL>
</environmentVariables>
</configuration>
</execution>

修改为:

1
2
3
4
5
6
7
8
9
10
11
12
<execution>
<id>npm install</id>
<goals>
<goal>npm</goal>
</goals>
<configuration>
<arguments>install -g -registry=https://registry.npm.taobao.org --cache-max=0 --no-save</arguments>
<environmentVariables>
<HUSKY_SKIP_INSTALL>true</HUSKY_SKIP_INSTALL>
</environmentVariables>
</configuration>
</execution>

参考文献:编译flink源码卡在Running ‘npm ci –cache-max=0 –no-save’

3.2 先安装好 npm

yum install npm

不然也会报错。

4. 开始编译

因为需要较多内存,需要对 maven 做一些设置,,以增加 jdk 可以使用的内存

如果是命令行格式,那么我们可以直接在 /etc/profile 文件里配置:

1
export MAVEN_OPTS="-Xmx8g"

这样就可以加大内存的配置,在编译过程中不会出现 OOM。

但是在 jetbrains gateway 中,这个文件不会直接被使用,那么我们就需要在 maven 中进行配置:

编译时,需要执行的命令是:
1
mvn clean install -DskipTests -Dfast -T 4

如果是命令行格式,那么直接执行就行了,但如果要在 jetbrains gateway 中执行,那么默认只有 mvn installmvn clean 因此我们需要自定义一个 maven 的执行配置:

Run Configurations 中新增一条配置,更改执行参数:

然后右键点击执行即可。

经过一段时间的编译,这个编译就完成啦,每个模块下都会有对应的 target 文件生成。

到这里,编译就已经告一段落了。接下来可以开始运行

运行

1. 运行 JobManager

flink-runtime 模块下的 StandaloneSessionClusterEntrypoint 类中

配置文件目录

不修改直接启动会报错。

简单来说,就是需要我们指定配置文件所在目录。在这里,我们的配置文件全都放在

1
2
3
4
5
6
[root@hadoop102 conf]# pwd
/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf
[root@hadoop102 conf]# ls
flink-conf.yaml log4j.properties logback-session.xml workers
log4j-cli.properties log4j-session.properties logback.xml zoo.cfg
log4j-console.properties logback-console.xml masters

中。因此我们需要更新输入参数:--configDir configpath

完整的程序输入参数为:
1
--configDir /root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf --executionMode cluster -D jobmanager.memory.off-heap.size=134217728b -D jobmanager.memory.jvm-overhead.min=201326592b -D jobmanager.memory.jvm-metaspace.size=268435456b -D jobmanager.memory.heap.size=1073741824b -D jobmanager.memory.jvm-overhead.max=201326592b

并且这样启动没有日志,还需要配置日志的参数(在 VM 参数中)。

完整的 VM 输入参数为:

1
-Xmx1073741824 -Xms1073741824 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/log/flink-standalonesession.log -Dlog4j.configuration=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlog4j.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlogback.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/logback-console.xml

主类缺失

此时启动,会报错:

1
Exception in thread "main" java.lang.NullPointerException

这是因为没有添加依赖的 jar 包,这也就是我们之前使用 maven 编译的目的。

具体方法是:在 IDEA 的 File -> project structure -> Modules 中给 flink-runtime 添加依赖,依赖的 jar 包来源是 flink-dist 模块下的 target 文件夹。将

1
2
3
4
5
6
7
8
[root@hadoop102 lib]# pwd
/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/target/flink-1.17.2-bin/flink-1.17.2/lib
[root@hadoop102 lib]# ls
flink-cep-1.17.2.jar flink-scala_2.12-1.17.2.jar log4j-api-2.17.1.jar
flink-connector-files-1.17.2.jar flink-table-api-java-uber-1.17.2.jar log4j-core-2.17.1.jar
flink-csv-1.17.2.jar flink-table-planner-loader-1.17.2.jar log4j-slf4j-impl-2.17.1.jar
flink-dist-1.17.2.jar flink-table-runtime-1.17.2.jar
flink-json-1.17.2.jar log4j-1.2-api-2.17.1.jar

这个目录下的所有 jar 都添加到依赖中:

修改配置文件

注意,在默认的配置文件中,允许访问的 ip 是 localhost,那么我们不能从外部访问,因此要修改 ip 为 0.0.0.0,这样所有 ip 都可以访问到 flink 中。

1
rest.bind-address: 0.0.0.0

此时再启动,我们就可以从外部访问到 flink 了。

在这里,flink 是在容器中的,我将 web 端口 8081 映射到了 10002 端口(使用 nginx),nginx 配置为:

1
2
3
4
5
6
7
8
9
10
11
12
13
server {
listen 10002;
server_name hadoop102;
location / {
root html;
proxy_pass http://hadoop102:8081;
index index.html index.htm;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
}

对于外部,10002 端口映射的外部端口是 62102,docker 配置为:

1
9d117ae6ee09   centos-coachhe    "/bin/bash"              9 months ago   Up 9 months   0.0.0.0:62022->22/tcp, :::62022->22/tcp, 0.0.0.0:62080->80/tcp, :::62080->80/tcp, 0.0.0.0:62088->8080/tcp, :::62088->8080/tcp, 0.0.0.0:62100->10000/tcp, :::62100->10000/tcp, 0.0.0.0:62101->10001/tcp, :::62101->10001/tcp, 0.0.0.0:62102->10002/tcp, :::62102->10002/tcp, 0.0.0.0:62103->10003/tcp, :::62103->10003/tcp, 0.0.0.0:62104->10004/tcp, :::62104->10004/tcp, 0.0.0.0:62105->10005/tcp, :::62105->10005/tcp, 0.0.0.0:62106->10006/tcp, :::62106->10006/tcp, 0.0.0.0:62107->10007/tcp, :::62107->10007/tcp, 0.0.0.0:62108->10008/tcp, :::62108->10008/tcp, 0.0.0.0:62109->10009/tcp, :::62109->10009/tcp, 0.0.0.0:62110->10010/tcp, :::62110->10010/tcp   hadoop102

因此我们首先启动 Flink 的 JobManager:

在外部访问:

http://9.135.11.161:62102

可以看到:

成功访问!

2. 运行 TaskManager

成功开启 JobManager 之后,我们就要运行 TaskManager 了。

其实解决了 JobManager 之后,TaskManager 的启动也非常类似。启动参数为:

程序参数:

1
-XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/log/flink-standalonesession.log -Dlog4j.configuration=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlog4j.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlogback.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/logback-console.xml

VM 参数:

1
-XX:+UseG1GC -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458 -XX:MaxMetaspaceSize=268435456 -Dlog.file=/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/log/flink-standalonesession.log -Dlog4j.configuration=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlog4j.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/log4j-console.properties -Dlogback.configurationFile=file:/root/coachhe-learning/coachhe.github.io/source/_code/BigData/RealTime/Flink/SourceCode/flink-release-1.17.2/flink-dist/src/main/flink-bin/conf/logback-console.xml

启动完成:

此时 TaskManager 会自动注册到 JobManager 中:

完美! 源码编译完成,之后就可以开始手撸 Flink 源码啦

源码编译并启动 Flink 之后,我只需要在这一台机器编译 Flink 即可,并不需要将所有分布式环境都编译好 Flink ,这是因为别的机器只需要有环境就可以和我们现有的机器做分布式了。因此,别的机器我们就直接使用官网下载的编译好的可执行文件并执行即可。


Flink源码编译
http://example.com/2024/01/a82dd53e3e46.html
作者
CoachHe
发布于
2024年1月18日
许可协议