命令行安装#
如果您希望使用命令行的方式部署 Flink,请按照本章节步骤安装。
Flink Standalone Session 模式安装#
前提#
安装#
首先登录到 flink1,然后切换到 root 用户
ssh flink1
su - root
创建一个flinkhosts
文件,包含 Flink 集群中所有的机器
cat > ${HOME}/flinkhosts << EOF
flink1
flink2
flink3
EOF
在 flink1 节点配置 yum 源,安装 lava 命令行管理工具
# 从yum源所在机器(假设为192.168.1.10)获取repo文件
scp root@192.168.1.10:/etc/yum.repos.d/oushu.repo /etc/yum.repos.d/oushu.repo
# 追加yum源所在机器信息到/etc/hosts文件
# 安装lava命令行管理工具
yum clean all
yum makecache
yum install -y lava
flink1 节点和集群内其他节点交换公钥,以便 ssh 免密码登陆和分发配置文件。
lava ssh-exkeys -f ${HOME}/flinkhosts -p ********
分发 repo 文件到其他机器
lava scp -f ${HOME}/flinkhosts /etc/yum.repos.d/oushu.repo =:/etc/yum.repos.d
安装#
在使用 yum install 安装 Flink
lava ssh -f ${HOME}/flinkhosts -e "sudo yum install -y flink"
配置#
修改 Flink 配置文件/usr/local/oushu/conf/flink/flink-conf.yaml
,修改如下配置项
jobmanager.rpc.address: flink1
jobmanager.rpc.port: 1689
metrics.reporter.wasp.url: http://${WaspServerIP}:1682/api/lava/wasp/monitor/reporter
metrics.reporter.wasp.interval: 3 SECONDS
task.cancellation.timeout: 0
metrics.reporters: wasp
flink.hadoop.ipc.client.fallback-to-simple-auth-allowed: true
rest.port: 1688
metrics.reporter.wasp.factory.class: org.apache.flink.metrics.wasp.WaspReporterFactory
修改 Flink JobManager 配置文件/usr/local/oushu/conf/flink/masters
cat > /usr/local/oushu/conf/flink/masters << EOF
flink1:1688
EOF
修改 Flink TaskManager 配置文件/usr/local/oushu/conf/flink/workers
cat > /usr/local/oushu/conf/flink/workers << EOF
flink1
flink2
flink3
EOF
添加依赖的 HDFS 配置文件/usr/local/oushu/conf/flink/core-site.xml
,以下是基本配置,具体根据依赖的 HDFS 集群配置决定
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://oushu</value>
</property>
</configuration>
修改配置文件的 owner 和权限
chown flink:flink /usr/local/oushu/conf/flink/core-site.xml
chmod 755 /usr/local/oushu/conf/flink/core-site.xml
添加依赖的 HDFS 配置文件/usr/local/oushu/conf/flink/hdfs-site.xml
,假定 HDFS 两个 NameNode 节点分别在 flink1, flink2上,
以下是基本配置,具体根据依赖的 HDFS 集群配置决定
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>dfs.client.failover.proxy.provider.oushu</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.namenodes.oushu</name>
<value>nn1,nn2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.oushu.nn1</name>
<value>flink1:9000</value>
</property>
<property>
<name>dfs.namenode.rpc-address.oushu.nn2</name>
<value>flink2:9000</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>oushu</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.oushu</name>
<value>true</value>
</property>
</configuration>
修改配置文件的 owner 和权限
chown flink:flink /usr/local/oushu/conf/flink/hdfs-site.xml
chmod 755 /usr/local/oushu/conf/flink/hdfs-site.xml
添加 HDFS config 文件的路径到 /usr/local/oushu/flink/bin/config.sh
cat > /usr/local/oushu/flink/bin/config.sh << EOF
export HADOOP_USER_NAME="hdfs"
export HADOOP_CONF_DIR="/usr/local/oushu/flink/conf"
EOF
Kerberos 相关配置(可选)#
如果 Flink 依赖的 HDFS 集群集成了 Kerberos 认证,那么 Flink 也需要相应的 keytab 和 principal, 为了写 checkpoint 数据
修改 Flink 配置文件/usr/local/oushu/conf/flink/flink-conf.yaml
,修改如下配置项, 注意替换其中的变量
${keytabPath}:keytab 文件的路径,要求每个 flink 节点都上传该文件
${principal}:选择正确的 principal
${krb5Path}:krb5.conf 文件的路径,要求每个 flink 节点都上传该文件,通常是 /etc/krb5.conf
cat >> /usr/local/oushu/conf/flink/flink-conf.yaml << EOF
flink.hadoop.ipc.client.fallback-to-simple-auth-allowed: true
security.kerberos.login.use-ticket-cache: false
security.kerberos.login.keytab: ${keytabPath}
security.kerberos.login.principal: ${principal}
env.java.opts: -Djava.security.krb5.conf=${krb5Path}
EOF
分发配置文件到其他机器#
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/flink/bin/config.sh =:/tmp
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/flink-conf.yaml =:/tmp
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/core-site.xml =:/tmp
lava scp -f ${HOME}/flinkhosts /usr/local/oushu/conf/flink/hdfs-site.xml =:/tmp
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/config.sh /usr/local/oushu/flink/bin"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/flink/bin/config.sh"
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/flink-conf.yaml /usr/local/oushu/conf/flink"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/flink-conf.yaml"
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/core-site.xml /usr/local/oushu/conf/flink"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/core-site.xml"
lava ssh -f ${HOME}/flinkhosts -e "mv -f /tmp/hdfs-site.xml /usr/local/oushu/conf/flink"
lava ssh -f ${HOME}/flinkhosts -e "chown flink:flink /usr/local/oushu/conf/flink/hdfs-site.xml"
启动#
启动 Flink JobManager#
在 flink1 节点, 执行以下操作以启动 FLink JobManager
sudo -u flink /usr/local/oushu/flink/bin/jobmanager.sh start
启动 Flink Worker#
在 flink1 节点,执行一下操作以启动 Flink TaskManager
lava ssh -f ${HOME}/flinkhosts -e "sudo -u flink /usr/local/oushu/flink/bin/taskmanager.sh start"
检查状态#
在各节点切换到 flink 用户,通过 jps 查看进程:
node |
process |
---|---|
flink1 |
StandaloneSessionClusterEntrypoint, TaskManagerRunner |
flink2 |
TaskManagerRunner |
flink3 |
TaskManagerRunner |
Flink UI 查看
浏览器 URL 输入:http://${flink1管理IP}:1688/#/overview
切换到 Overview 查看 TaskManager 运行状况, slot 使用情况等
常用命令#
停止 Flink 服务
#停止 JobManager
/usr/local/oushu/flink/bin/jobmanager.sh stop
#停止 TaskManager
/usr/local/oushu/flink/bin/taskmanager.sh stop
#停止 Flink session cluster (停止构成集群的所有 taskmanager 和 jobmanager,要求节点见做了flink用户的免密)
/usr/local/oushu/flink/bin/stop-cluster.sh
#启动 Flink session cluster (启动构成集群的所有 taskmanager 和 jobmanager,要求节点见做了flink用户的免密)
/usr/local/oushu/flink/bin/start-cluster.sh
注册到Skylab(可选)#
在 flink1 节点修改 lava 命令行工具配置中 skylab 的节点 ip
vi /usr/local/oushu/lava/conf/server.json
编写注册 request 到一个文件,例如 ~/flink-register.json
{
"data": {
"name": "FlinkCluster",
"group_roles": [
{
"role": "flink.jobmanager",
"cluster_name": "flink_cluster",
"group_name": "jm1",
"machines": [
{
"id": 1,
"name": "flink1",
"subnet": "lava",
"data_ip": "${flink1ip}",
"manage_ip": "${flink1ip}",
"assist_port": 1622,
"ssh_port": 22
}
]
},
{
"role": "flink.taskmanager",
"cluster_name": "flink_cluster",
"group_name": "tm1",
"machines": [
{
"id": 1,
"name": "flink1",
"subnet": "lava",
"data_ip": "${flink1ip}",
"manage_ip": "${flink1ip}",
"assist_port": 1622,
"ssh_port": 22
},
{
"id": 2,
"name": "flink2",
"subnet": "lava",
"data_ip": "${flink2ip}",
"manage_ip": "${flink2ip}",
"assist_port": 1622,
"ssh_port": 22
},
{
"id": 3,
"name": "flink3",
"subnet": "lava",
"data_ip": "${flink3ip}",
"manage_ip": "${flink3ip}",
"assist_port": 1622,
"ssh_port": 22
}
]
}
],
"config": {
"flink-conf.yaml": [
{
"key": "jobmanager.rpc.address",
"value": "flink1"
},
{
"key": "jobmanager.rpc.port",
"value": "1688"
},
{
"key": "task.cancellation.timeout",
"value": "0"
},
{
"key": "flink.hadoop.ipc.client.fallback-to-simple-auth-allowed",
"value": "true"
},
{
"key": "rest.port",
"value": "1688"
},
// 以下4项如果是 Flink 作为 Wasp 的计算引擎,需要注册 reporter metrics 配置,否则不需要传
{
"key": "metrics.reporters",
"value": "wasp"
},
{
"key": "metrics.reporter.wasp.url",
"value": "http://${WaspServerIP}:1682/api/lava/wasp/monitor/reporter"
},
{
"key": "metrics.reporter.wasp.interval",
"value": "3 SECONDS"
},
{
"key": "metrics.reporter.wasp.factory.class",
"value": "org.apache.flink.metrics.wasp.WaspReporterFactory"
}
],
"config.sh": [
{
"key": "HADOOP_USER_NAME",
"value": "hdfs"
},
{
"key": "HADOOP_CONF_DIR",
"value": "/usr/local/oushu/flink/conf"
}
],
"core-site.xml": [
{
"key": "fs.defaultFS",
"value": "hdfs://oushu"
}
],
"hdfs-site.xml": [
{
"key": "dfs.client.failover.proxy.provider.oushu",
"value": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
},
{
"key": "dfs.ha.namenodes.oushu",
"value": "nn1,nn2"
},
{
"key": "dfs.namenode.rpc-address.oushu.nn1",
"value": "flink1:9000"
},
{
"key": "dfs.namenode.rpc-address.oushu.nn2",
"value": "flink2:9000"
},
{
"key": "dfs.nameservices",
"value": "oushu"
},
{
"key": "dfs.ha.automatic-failover.enabled.oushu",
"value": "true"
}
]
}
}
}
上述配置文件中,需要根据实际情况修改 machines 数组中的机器信息,通过如下方式查看,在平台基础组件 lava 所安装的机器执行:
psql lavaadmin -p 4432 -U oushu -c "select m.id,m.name,s.name as subnet,m.private_ip as data_ip,m.public_ip as manage_ip,m.assist_port,m.ssh_port from machine as m,subnet as s where m.subnet_id=s.id;"
获取到所需的机器信息,根据服务角色对应的节点,将机器信息添加到 machines 数组中。
例如 flink1 对应 Flink JobManager 节点,那么 flink1 的机器信息需要备添加到 flink.jobmanager 角色对应的 machines 数组中。
调用 lava 命令注册集群:
lava login -u oushu -p ********
lava onprem-register service -s Flink -f ~/flink-register.json
如果返回值为:
Add service by self success
则表示注册成功,如果有错误信息,请根据错误信息处理。
同时,从页面登录后,在自动部署模块对应服务中可以查看到新添加的集群,同时列表中会实时监控 Flink 进程在机器上的状态。