欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

flink1.14 输出镶块列式文件

最编程 2024-05-06 19:54:04
...
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启checkpoint env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointStorage("file:///F:/P8/workspace/flink-1.14.4/flink/data/ck"); // 构造好一个数据流 DataStreamSource<EventLog> streamSource = env.addSource(new MySourceFunction()); // 将上面的数据流输出到文件系统(假装成一个经过了各种复杂计算后的结果数据流) // 1. 先定义GenericRecord的数据模式 Schema schema = SchemaBuilder.builder() .record("DataRecord") .namespace("cn.doitedu.flink.avro.schema") .doc("用户行为事件数据模式") .fields() //.nullableString() 可以为空 //.optionalString() 可以没有该字段 .requiredInt("gid") // 表示必须有这个字段 .requiredLong("ts") .requiredString("eventId") .requiredString("sessionId") // 定义eventInfo对应的map .name("eventInfo") .type() //mapkey类型只能是string .map() .values() .type("string") // mapvalue类型 .noDefault() // 表示该map没有默认值 .endRecord(); // 2. 通过定义好的schema模式,来得到一个parquetWriter ParquetWriterFactory<GenericRecord> writerFactory = ParquetAvroWriters.forGenericRecord(schema); // 3. 利用生成好的parquetWriter,来构造一个 支持列式输出parquet文件的 sink算子 FileSink<GenericRecord> sink1 = FileSink.forBulkFormat(new Path("file:///F:/P8/workspace/flink-1.14.4/flink/data/sink"), writerFactory) .withBucketAssigner(new DateTimeBucketAssigner<GenericRecord>("yyyy-MM-dd--HH")) .withRollingPolicy(OnCheckpointRollingPolicy.build()) // 系统做快照时滚动 .withOutputFileConfig(OutputFileConfig.builder().withPartPrefix("doit_edu").withPartSuffix(".parquet").build()) .build(); // 4. 将自定义javabean的流,转成 上述sink算子中parquetWriter所需要的 GenericRecordSingleOutputStreamOperator<GenericRecord> recordStream = streamSource .map((MapFunction<EventLog, GenericRecord>) eventLog -> { // 构造一个Record对象 GenericData.Record record = new GenericData.Record(schema); // 将数据填入record record.put("gid", (int) eventLog.getGuid()); record.put("eventId", eventLog.getEventId()); record.put("ts", eventLog.getTimeStamp()); record.put("sessionId", eventLog.getSessionId()); record.put("eventInfo", eventLog.getEventInfo()); return record; }).returns(new GenericRecordAvroTypeInfo(schema)); // 由于avro的相关类、对象需要用avro的序列化器,所以需要显式指定AvroTypeInfo来提供AvroSerializer // 5. 输出数据 recordStream.sinkTo(sink1); env.execute();