flink1.14 输出镶块列式文件
最编程
2024-05-06 19:54:04
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 开启checkpoint
env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointStorage("file:///F:/P8/workspace/flink-1.14.4/flink/data/ck");
// 构造好一个数据流
DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction());
// 将上面的数据流输出到文件系统(假装成一个经过了各种复杂计算后的结果数据流)
// 1. 先定义GenericRecord的数据模式
Schema schema = SchemaBuilder.builder()
.record("DataRecord")
.namespace("cn.doitedu.flink.avro.schema")
.doc("用户行为事件数据模式")
.fields()
//.nullableString() 可以为空
//.optionalString() 可以没有该字段
.requiredInt("gid") // 表示必须有这个字段
.requiredLong("ts")
.requiredString("eventId")
.requiredString("sessionId")
// 定义eventInfo对应的map
.name("eventInfo")
.type() //map的key类型只能是string
.map()
.values()
.type("string") // map的value类型
.noDefault() // 表示该map没有默认值
.endRecord();
// 2. 通过定义好的schema模式,来得到一个parquetWriter
ParquetWriterFactory<GenericRecord> writerFactory = ParquetAvroWriters.forGenericRecord(schema);
// 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子
FileSink<GenericRecord> sink1 = FileSink.forBulkFormat(new Path("file:///F:/P8/workspace/flink-1.14.4/flink/data/sink"), writerFactory)
.withBucketAssigner(new DateTimeBucketAssigner<GenericRecord>("yyyy-MM-dd--HH"))
.withRollingPolicy(OnCheckpointRollingPolicy.build()) // 系统做快照时滚动
.withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("doit_edu").withPartSuffix(".parquet").build())
.build();
// 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的 GenericRecord流
SingleOutputStreamOperator<GenericRecord> recordStream = streamSource
.map((MapFunction<EventLog, GenericRecord>) eventLog -> {
// 构造一个Record对象
GenericData.Record record = new GenericData.Record(schema);
// 将数据填入record
record.put("gid", (int) eventLog.getGuid());
record.put("eventId", eventLog.getEventId());
record.put("ts", eventLog.getTimeStamp());
record.put("sessionId", eventLog.getSessionId());
record.put("eventInfo", eventLog.getEventInfo());
return record;
}).returns(new GenericRecordAvroTypeInfo(schema)); // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer
// 5. 输出数据
recordStream.sinkTo(sink1);
env.execute();
上一篇: 多个国内免费 ERP 系统的库存
推荐阅读