
1.Standalone 运行模式独立模式是独立运行的不依赖任何外部的资源管理平台当然独立也是有代价的:如果资源不足或者出现故障没有启动扩展或重分配资源的保证必须手动处理。所以独立模式一般只用在开发测试或作业非常少的场景下。部署模式:(1)会话模式部署提前启动集群并通过WEB页面客户端提交任务(可以多个任务但是集群资源固定)。多个任务运行在一个会话中。(2)单作业模式部署Flink 的 Standalone 集群并不支持单作业部署模式。因为单作业模式需要借助于一些资源管理平台。(3)应用部署模式应用模式下不会提前创建集群所以不能调用 start-cluster.sh 脚本。我们可以使用同样在bin目录下的standalone-job.sh 来创建一个 JobManager ;由于默认安装的 openjdk java内存不足.#关闭集群bin/stop-cluster.sh#启动 netcatnc -l -s 192.168.3.11 7777#进入到Flink的安装目录下建应用程序的 jar 包放在 lib/目录下。[flinkhadoop001 flink-1.17.0]$ mv FlinkTuTorial-1.17-1.0-SNAPSHOT.jar lib/#独立模式启动 JobManagerbin/standalone-job.sh start --job-classname com.ycl.WordCountStreamUnboundedDemo这里我们直接指定作业入口类脚本会到 lib 目录扫描所有的 jar 包。#同样是使用 bin 目录下脚本启动 TaskManager ;bin/taskmanager.sh start#在 7777 端口写入数据#观察 hadoop001:8081 地址中观察输出数据vim xacll#!/bin/bash# 获取输入参数个数如果没有参数直接退出pcount$#if [ $pcount -lt 1 ]; thenecho No Enough Arguement!exit;fi# 2. 遍历集群所有机器, 这里要替换成你实际的主机名或IPfor host in hadoop001 hadoop002 hadoop003doecho $host # 3. 遍历所有目录挨个发送for file in $dossh $host jpsdonedonebin/standalone-job.sh start --job-classname com.ycl.WordCountStreamUnboundedDemobin/taskmanager.sh start先运行上面的命令启动了一个进程但是当启动下面这个进程时上面的进程自动停止并退出了这是为啥。#Flink1.17.0Java8bin/standalone-job.sh start --job-classname com.ycl.WordCountStreamUnboundedDemo#启动taskmanager之后上面启动的应用就失败。 bin/taskmanager.sh start Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.KeyGroupStreamPartitioner at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2301) ~[?:1.8.0_262] Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.streaming.runtime.partitioner.K eyGroupStreamPartitioner.keySelector of type org.apache.flink.api.java.functions.KeySelector in instance of org.apache.flink.streaming.runtime.partitioner.Key GroupStreamPartitioner at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2087) ~[?:?] at java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2050) ~[?:?] at java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1304) ~[?:?] at java.io.ObjectInputStream.defaultCheckFieldValues(ObjectInputStream.java:2534) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2441) ~[?:?] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250) ~[?:?] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709) ~[?:?] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518) ~[?:?] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412) ~[?:?] #按照如下方案并没有解决依然报上面的错误。 这是 Flink 1.17 Java 8 环境下 Lambda 序列化的一个已知兼容性问题常见于 keyBy() 等需要序列化传递函数的算子 升级 Java 版本到 11根本解决 Flink 1.17 官方推荐使用 Java 11且 Lambda 序列化在 Java 11 下更稳定。如果你能升级 JDK这是最佳方案。 #暂时放弃。