本文介绍在文件存储HDFS上搭建及使用Apache Flink的方法。
准备工作
在文件存储HDFS上使用Apache Flink,需要先完成以下准备工作。
说明 本文档的操作步骤中涉及的安装包版本号、文件夹路径,请根据实际情况进行替换。
- 开通文件存储HDFS服务并创建文件系统实例和挂载点,详情请参见HDFS快速入门。
- 在计算节点上安装JDK。
版本不能低于1.8。
- 在计算节点上安装Scala。
Scala下载地址:
官方链接,其版本要与使用的Apache Flink版本相兼容。
- 下载Apache Hadoop压缩包。
Apache Hadoop下载地址:
官方链接。建议您选用的Apache Hadoop版本不低于2.7.2,本文档中使用的Apache Hadoop版本为Apache Hadoop 2.7.2。
- 下载Apache Flink压缩包。
Apache Flink下载地址:
官方链接,本文中使用的Apache Flink版本为1.9.0。
说明 在文件存储HDFS上使用的Apache Flink的版本必须为1.9.0及以上版本。
配置Apache Hadoop
- 执行如下命令解压Apache Hadoop压缩包到指定文件夹。
tar -zxvf hadoop-2.7.2.tar.gz -C /usr/local/
- 修改hadoop-env.sh配置文件。
- 执行如下命令打开hadoop-env.sh配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/hadoop-env.sh
- 配置JAVA_HOME目录,如下所示。
export JAVA_HOME=/usr/java/default
- 修改core-site.xml配置文件。
- 执行如下命令打开core-site.xml配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/core-site.xml
- 在core-site.xml配置文件中,配置如下信息,详情请参见挂载文件系统。
<configuration>
<property>
<name>fs.defaultFS</name>
<value>dfs://x-xxxxxxxx.cn-xxxxx.dfs.aliyuncs.com:10290</value>
<!-- 该地址填写您的挂载点地址 -->
</property>
<property>
<name>fs.dfs.impl</name>
<value>com.alibaba.dfs.DistributedFileSystem</value>
</property>
<property>
<name>fs.AbstractFileSystem.dfs.impl</name>
<value>com.alibaba.dfs.DFS</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>8388608</value>
</property>
<property>
<name>alidfs.use.buffer.size.setting</name>
<value>true</value>
</property>
<property>
<name>dfs.usergroupservice.impl</name>
<value>com.alibaba.dfs.security.LinuxUserGroupService.class</value>
</property>
<property>
<name>dfs.connection.count</name>
<value>16</value>
</property>
</configuration>
- 修改mapred-site.xml配置文件。
- 执行如下命令打开mapred-site.xml配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/mapred-site.xml
- 在mapred-site.xml配置文件中,配置如下信息。
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
- 修改yarn-site.xml配置文件。
- 执行如下命令打开yarn-site.xml配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/yarn-site.xml
- 在yarn-site.xml配置文件中,配置如下信息。
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>xxxx</value>
<!-- 该地址填写集群中yarn的resourcemanager的hostname -->
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value>
<!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>4</value>
<!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
<name>yarn.scheduler.maximum-allocation-vcores</name>
<value>4</value>
<!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>3584</value>
<!-- 根据您当前的集群能力进行配置此项 -->
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>14336</value>
<!-- 根据您当前的集群能力进行配置此项 -->
</property>
</configuration>
- 修改slaves配置文件。
- 执行如下命令打开slaves配置文件。
vim /usr/local/hadoop-2.7.2/etc/hadoop/slaves
- 在slaves配置文件中,配置如下信息。
- 配置环境变量。
- 执行如下命令打开/etc/profile配置文件。
- 在/etc/profile配置文件中,配置如下信息。
export HADOOP_HOME=/usr/local/hadoop-2.7.2
export HADOOP_CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath)
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
- 执行如下命令使配置生效。
- 执行如下命令配置文件存储HDFS的SDK。
您可以单击此处,下载文件存储HDFS的SDK (此处以aliyun-sdk-dfs-1.0.3.jar为例),将其部署在Apache Hadoop生态系统组件的CLASSPATH上,详情请参见挂载文件系统。
cp aliyun-sdk-dfs-1.0.3.jar /usr/local/hadoop-2.7.2/share/hadoop/hdfs
- 执行如下命令将${HADOOP_HOME}文件夹同步到集群的其他节点。
scp -r hadoop-2.7.2/ root@node2:/usr/local/
验证Apache Hadoop配置
完成Apache Hadoop配置后,不需要格式化namenode,也不需要使用start-dfs.sh来启动HDFS相关服务。如需使用yarn服务,只需在resourcemanager节点启动yarn服务,具体验证Apache
Hadoop配置成功的方法请参见验证安装。
配置Apache Flink
此处以Apache Flink on yarn模式为例进行搭建测试。
说明
- 在使用Apache Flink之前必须在您的集群环境变量中配置HADOOP_CLASSPATH和HADOOP_CONF_DIR,详情请参见配置Apache Hadoop中的步骤 7。
- 配置Apache Flink时不可以使用Apache Hadoop的jar包flink-shaded-hadoop2-uber-x.x.x.jar。flink-shaded-hadoop2-uber-x.x.x.jar包含了Apache
Flink自身对Apache Hadoop提供的一些依赖包,会引起依赖包冲突。在使用文件存储HDFS时,Apache Flink会在HADOOP_CLASSPATH中寻找依赖包。
- 执行如下命令解压Apache Flink压缩包到指定文件夹。
tar -zxvf flink-1.9.0-bin-scala_2.11.tgz -C /usr/local/
- (可选)对Apache Flink进行其他配置,详情请参见配置操作指南。
验证Apache Flink配置
使用Flink自带的WordCount.jar对文件存储HDFS上的数据进行读取,并将计算结果写入到文件存储HDFS,在测试之前需要先启动yarn服务。
- 生成测试数据。
此处使用Apache Hadoop 2.7.2自带的jar包hadoop-mapreduce-examples-2.7.2.jar中的randomtextwriter方法在文件存储HDFS上生成测试数据。
/usr/local/hadoop-2.7.2/bin/hadoop jar /usr/local/hadoop-2.7.2/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.2.jar
randomtextwriter \
-D mapreduce.randomtextwriter.totalbytes=10240 \
-D mapreduce.randomtextwriter.bytespermap=1024 \
-D mapreduce.job.maps=4 \
-D mapreduce.job.reduces=2 \
dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input \
其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
- 查看在文件存储HDFS上生成的测试数据。
/usr/local/hadoop-2.7.2/bin/hadoop fs -cat dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input/*
其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
- 提交wordcount程序。
/usr/local/flink-1.9.0/bin/flink run
-m yarn-cluster -yn 1 -yjm 1024 -ytm 1024 \
/usr/local/flink-1.9.0/examples/batch/WordCount.jar \
--input dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/input \
--output dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/output \
其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
- 查看在文件存储HDFS上的结果文件。
/usr/local/hadoop-2.7.2/bin/hadoop fs -cat dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290/flink-test/output
其中,dfs://f-xxxxx.cn-xxx.dfs.aliyuncs.com:10290为文件存储HDFS的挂载点,请根据您的实际情况替换。
评论