网站首页 > 教程分享 正文
最近在做 Flink SQL 任务方面的开发,有这样一种情况,用户自己上传自定义的 UDF Jar 包,这里你可以理解为是用户自己定义的函数 Jar包,然后在写的 Flink SQL 任务的时候,需要能够用到 Jar 包中定义的 UDF。最开始想的是将 UDF Jar 包放到 HDFS 上面,每次用的时候,下载下来,直接配置一下 Flink 提交作业时的相关参数就可以了,但这中间也走了一些弯路,这里记录一下,也防止大家再次走坑。
Flink 命令行 Jar 参数配置
我们使用 Flink On Yarn Per Job 模式运行任务,提交任务使用 flink run 命令来进行提交作业,具体提交命令如下:
./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
-m 指定运行模式在 Yarn上运行。有时候我们程序中有需要用到自己定义的 Jar 包任务,查了官方文档,倒是查到了 -C 的使用说明,官方的英文注释如下:
Adds a URL to each user code classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple times for specifying more than one URL.
官网说实话对于 -C 这个参数解释其实非常少,这里我也在网上找了一下资料,看了一些其他网上其他仁兄的博客,整体的解释整理如下:
1. -C 后面指定的 URL 必须是一个能够在提交作业客户端,JobMaster 和 TaskExecutor 都被访问到的位置,记住要都能访问。
2. -C 后面指定的 URL 从 client 端的提交到 JobMaster 的分发到 TaskExecutor 的访问的过程中,不会发生文件移动的动作,在1.4.2和1.5.0和1.6.0的版本中都是这样。
3. URL 支持的协议包括 file,ftp,gopher,http,https,jar,mailto,netdoc,亦即java中URL类支持的协议类型。注意:不能放在hdfs上。
还有一个参数是 -yt , -yt 这个参数后面配置一个你本地的目录,这个目录存放的是你需要上传到每个 Task ClassPath 下的 jar包。当指定了-yt 值后,客户端会将目录中的jar上传到hdfs中本应用的lib目录中,在tm下载之后,会存在于tm的classpath中。使用方法如下:
./bin/flink run -m yarn-cluster -yt 你的本地目录
如果你的实时任务不适用 HDFS 来存储用户的自定义 Jar包时,你可以将其放到共享存储上面,或者放到一个 HTTP 客户端的都能访问下载的地方,注意这个 HTTP 协议是都要能够访问到的地方。我们这边不想弄别的存储,就想使用HDFS,我们自己使用自定义任务实践过,的确发现 -C直接指定HDFS行不通。其实HDFS 上面的文件也支持 HTTP 接口的,但是由于存在主备切换,导致访问 HDFS 的路径就变了,由于很难一直去监控HDFS集群准备切换,所以想了其他的办法。
解决方法
我们这边整体流程是,首先,用户上传的自定义 Jar 包每次上传我们放到 HDFS上面,每次使用的时候,从HDFS上面下载到本地的一个临时目录,这个目录可以结合具体的用户来取,具体看你怎么使用。然后在使用的时候,我会通过 -yt 指定需要使用 jar包所在的本地目录,由于 -yt指定的目录里面的jar会最终到 Task运行的 classpath中,所以任务运行就可以直接进行加载。
但是由于本地 flink run命令在提交作业的时候,会在本地预执行 jar 包里面的代码来形成 JobGraph,所以你还需要指定一下本地的使用到的 Jar包路径,否则会报错。这里使用 -C指定本地使用的 Jar包 ,你可以使用多次 -C 来指定本地使用的 Jar包。具体命令如下:
flink run -m yarn-cluster -yt /user/yourname/udfjars/ -C /user/yourname/udfjars/hello_world.jar -c xxxx yourtask.jar
其他
其他方面,发现在创建 RemoteStreamEnvironment的时候,可以指定使用到的 Jar文件:
所以在创建 StreamExecutionEnvironment,可以指定你所需要的Jar:
String [] udfJars = new String[]("hdfs://a.jar"); RemoteStreamEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment( host, port, udfJars)
不过这种方法我没有测试过,你如果可以的话,可以测试测试。
我是Lake,如果你觉得我的文章对你有帮助的话,欢迎你点赞转发或者关注我,你的一个小小的鼓励,就是我前进的最大动力。
往期文章导读:
猜你喜欢
- 2024-10-20 java 实现利用 RabbitMQ 发送和消费消息
- 2024-10-20 手把手讲解-OkHttp硬核知识点(2)(okhttp原理详解)
- 2024-10-20 XXL-JOB核心源码解读及时间轮原理剖析
- 2024-10-20 高并发场景下的 HttpClient 优化方案,QPS 大大提升!
- 2024-10-20 原来java结合docker这么简单!快来看看命令大全以及java结合使用
- 2024-10-20 分库分表实现方式Client和Proxy,性能和维护性该怎么选?
- 2024-10-20 K8S官方java客户端之三:外部应用(k8s官方java客户端之三:外部应用手册)
- 2024-10-20 在用httpclient发送post报文请求错误解决
- 2024-10-20 基于zabbix4.0监控tomcat服务及JVM内存
- 2024-10-20 Flink 在唯品会的实践(唯品会技术模式分析)
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- css导航条 (66)
- sqlinsert (63)
- js提交表单 (60)
- param (62)
- parentelement (65)
- jquery分享 (62)
- check约束 (64)
- curl_init (68)
- sql if语句 (69)
- import (66)
- chmod文件夹 (71)
- clearinterval (71)
- pythonrange (62)
- 数组长度 (61)
- javafx (59)
- 全局消息钩子 (64)
- sort排序 (62)
- jdbc (69)
- php网页源码 (59)
- assert h (69)
- httpclientjar (60)
- postgresql conf (59)
- winform开发 (59)
- mysql数字类型 (71)
- drawimage (61)
本文暂时没有评论,来添加一个吧(●'◡'●)