(1)安装standlode集群模式启动
启动脚本是 bin/start-cluster.sh 不能用sh start-cluster
flink-1.8.1/bin/flink list
flink-1.8.1/bin/flink cancel 9b99be4eed871c4e62562f9035ebef65
(2)flink任务停不掉
执行取消命令后,查看一直在取消状态
是由于有台机器挂掉了,没有响应
提交任务的时候,报错
flink scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutabl
项目用到了import scala.collection.JavaConverters._
所以需要引入scala的jar包
我本地用的scala2.11,我相信自己线上下的也是2.11,找了半天,在flink安装目录opt/jar中看到是2.12的scala,版本不一致导致的
(3)web监控页面
Bytes received Records received Bytes sent Records sent
如果源和sink在同一个里面是不会有显示值的
需要额外说明的是,这里的输入/输出只是在 flink 的各个节点之间,不包含与外界组件的交互信息。所以,这里的统计里, flink source 的 read-bytes/read-records 都是 0;flink sink 的 write-bytes/write-records 都是 0。在 flink UI 上的显示也是如此。(yuchuanchen的博客有说明:https://blog.csdn.net/yuchuanchen/article/details/88406438)
通过在写出前加个keyby就可以了
val source = env
.addSource(kafkaConsumerA)
.map(a =>{
val simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
val day = simpleDateFormat.format(new Date())
parseToBean(a,day)
})
source.keyBy(a =>{
a.carid
}).map(a =>toClickHouseInsertFormat(a))
.addSink(new ClickhouseSink(props)).name("ck_sink")
也可以自己定义metric,然后提交上去就能看到指定位置的数据信息 |