运行 flinksql 时遇到的问题
一、checkpoint失败
原因:我把insert into这个语句给注释掉了
其他的原因大概率是flinksql不够严谨
二、运行中的报错
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:32)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = topic_test_01, partition = 2, leaderEpoch = 0, offset = 808016, CreateTime = 1682242261803, serialized key size = -1, serialized value size = 278, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@25f6134a).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51)
... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
... 15 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$41.processElement_split3(Unknown Source)
at StreamExecCalc$41.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 23 more
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic test_tpoic_02 not present in metadata after 60000 ms.
java.io.IOException: Failed to deserialize consumer record due to
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:54)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:32)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize consumer record ConsumerRecord(topic = topic_test_01, partition = 2, leaderEpoch = 0, offset = 808016, CreateTime = 1682242261803, serialized key size = -1, serialized value size = 278, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = [B@25f6134a).
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:57)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:51)
... 14 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:196)
at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter$SourceOutputWrapper.collect(KafkaRecordEmitter.java:65)
at org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:84)
at org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at org.apache.flink.connector.kafka.source.reader.deserializer.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:54)
... 15 more
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:99)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
at StreamExecCalc$41.processElement_split3(Unknown Source)
at StreamExecCalc$41.processElement(Unknown Source)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
... 23 more
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic test_tpoic_02 not present in metadata after 60000 ms.
这个错误消息表明Flink在反序列化来自Kafka主题“dwd_crm_mianxin_card_use_detail_rt”的消费者记录时遇到了麻烦。
这个错误可能有几个原因,但一个常见的原因是Kafka主题中的数据不是预期的格式。Flink使用反序列化器将Kafka主题中的二进制数据转换为Java对象,如果数据不是预期的格式,反序列化器将失败。
要解决此问题,您可以尝试以下步骤:
检查Kafka主题中的数据以确保它是预期的格式。你可以使用Kafka消费者工具从主题中读取数据并检查它。
检查Flink作业中Kafka消费者的配置,以确保它对主题中的数据使用了正确的反序列化器。你可以在Flink Kafka消费者配置中指定反序列化类。
检查Flink和Kafka库的版本兼容性。确保你正在使用的Flink和Kafka库的版本相互兼容。
请检查Flink作业的日志,了解更详细的错误信息。您提供的错误消息不是非常具体,因此日志中可能有更多信息可以帮助您诊断问题。
三、向doris导入数据遇到的问题
1、table Already Exists and load job finished,change you label prefix or restore from lates savepoint
原因:这个是在任务重新启动的时候,没有修改参数:sink.label-prefix 的值,每次重启任务向doris导数都得改其value。 例如:'sink.label-prefix'='label_ads_cdp_mianxin_card_use_detail_rt_pt_d_2023042716'。如果不改就会报上面的错误。
2、任务正常运行,没有报错,但是doris的目标表没有数据
主要是把配置项修改得符合数据改:'sink.properties.strip_outer_array'='false',
加:'sink.properties.read_json_by_line'='true',
上图错误主要是sink.properties.strip_outer_array=false 没有设置或者说没有生效
3、标准的导入doris的建表语句
create table if not exists ads_rt.ads_cdp_mianxin_card_use_detail_rt(
id string,
dt string,
date_happened string,
mbr_id string,
card_id string,
business_type string,
card_status string,
event_type string,
ts string,
balance decimal(22,2),
change_amount decimal(22,2)
) WITH (
'connector'='doris',
'fenodes'='10.00.00.01:8045',
'table.identifier'='doris_db.doris_sink_table',
'username'='username',
'password'='000000',
'sink.properties.format'='json',
'sink.properties.strip_outer_array'='false',
'sink.enable-delete'='true',
'sink.properties.read_json_by_line'='true',
'sink.label-prefix'='label_doris_sink_table_2023042716'
);
Flink Doris Connector实时数据与doris连接并导入数据相关设置:
doris官方文档--doris连接flink
推荐阅读
-
基于深度学习的毕尔巴鄂工程斑马鱼卵识别系统的注意事项和遇到的问题
-
PCB 学习笔记 - 在原理图上修改封装时出现焊盘与新基底不匹配的问题
-
如何解决 Mac 启动时一直卡在 Apple 徽标或进度条屏幕上的问题?
-
记录黑苹果 OpenCore 安装-v 运行代码结束时显示徽标、进度条、光标,但不进入系统的问题
-
35 岁实现财务*,腾讯程序员手握2300万提前退休?-1000万房产、1000万腾讯股票、加上300万的现金,一共2300万的财产。有网友算了一笔账,假设1000万的房产用于自住,剩下1300万资产按照平均税后20-50万不等进行计算,大约花上26-60年左右的时间才能赚到这笔钱。也就是说,普通人可能奋斗一辈子,才能赚到这笔钱。在很多人还在为中年危机而惶惶不可终日的时候,有的人的35岁,就已经安全着陆,试问哪个打工人不羡慕?但问题是有这样财富积累必然有像样的实力做靠山。没有人可以不劳而获。 看到这里,肯定有人说,那么对于普通人来说,卷可能真就成了唯一的出路。但是卷也有轻松的卷,“偷懒”的卷法,对于程序员而言,刨除掉一时无法改掉的开会传统占用的大部分时间,如何把有限的时间和精力放在真正重要的架构设计、需求设计上,而不是重复的造*,编码、改bug、手动测试。因此在科技改变生活的今天,学会使用AI工具成为程序员们的必备技能。 以全栈式全自动的软件开发工具飞算SoFlu软件机器人为例,作为全球首款面向微服务架构设计和最佳实践的软件机器人,SoFlu软件机器人改变了原来手工编码的作业模式,通过可视化拖拽方式以及参数配置就能实现等同于编写复杂代码的业务逻辑,在设计业务逻辑时就完成了微服务应用开发,做到“业务即图,图即代码”,实现“软件开发,十倍提效”。 而作为飞算SoFlu软件机器人的一个重要组成部分,FuncGPT(慧函数)支持所有类型函数创建。通过自然语言描述Java函数需求,实时生成高质量、高可读性的Java函数代码。生成代码可直接复制到IDEA,或一键导入Java全自动开发工具函数库。 FuncGPT(慧函数)具有五大核心能力: ● 自然语言:通过自然语言即可生成函数,降低软件开发门槛。 ● 秒级函数开发:全自动秒级开发函数,秒级完成,效率千倍提升。 ● 一站式函数开发应用:函数生成即可直接应用,效率、安全有保障。 ● 质量可靠:生成的函数代码符合业界规范、具备良好的可读性,符合最优解。 ● 灵活生成:可以根据开发人员的具体需求生成并支持调整和修改。 以“将文件流转换为File对象”这个需求为例,以下是FuncGPT(慧函数)秒级生成的代码: / 类名:FileConverter // 函数名:convertStreamToFile // 函数功能:将文件流转换为File对象 // POM依赖包:无 import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; public class FileConverter { /** * 将文件流转换为File对象 * @param inputStream 文件流 * @param filePath 文件路径 * @param fileName 文件名 * @return 转换后的File对象 * @throws IOException 如果发生I/O异常 */ public static File convertStreamToFile(InputStream inputStream, String filePath, String fileName) throws IOException { File file = new File(filePath + File.separator + fileName); // 创建File对象 try (FileOutputStream outputStream = new FileOutputStream(file)) { // 创建文件输出流 byte buffer = new byte[1024]; int bytesRead; while ((bytesRead = inputStream.read(buffer)) != -1) { // 从文件流读取数据并写入文件 outputStream.write(buffer, 0, bytesRead); } } return file; // 返回转换后的File对象 } } // 函数示例 // 将文件流转换为File对象示例 // 入参:inputStream,文件流 // 入参:filePath,文件路径 // 入参:fileName,文件名 // 出参:file,转换后的File对象 // 调用示例: // InputStream inputStream = new FileInputStream("example.txt"); // String filePath = "C:\\Users\\User\\Documents"; // String fileName = "example.txt"; // File file = FileConverter.convertStreamToFile(inputStream, filePath, fileName); // System.out.println(file.getAbsolutePath); // 输出结果:例如,将文件流转换为File对象后,文件的绝对路径为:C:\Users\User\Documents\example.txt // 则输出结果为:C:\Users\User\Documents\example.txt 通过分析,不难发现以上代码:
-
整理更新 MuPDF 和修改相关 C 代码过程中遇到的问题。
-
鹅厂的另一个蠢问题:当 TCP 收到一个有四个摆动的乱码 FIN 数据包时,它会怎么做?
-
运行 flinksql 时遇到的问题
-
greendao3.0 更新 新增字段时遇到的问题
-
安装 R 语言包时遇到 arch - i386 错误的解决方案