Flink 实现电子商务实时仓库计数 0-1 - ODS 和 DWD(上) - DWD 层
日志 DWD 层
我们前面采集的日志数据已经保存到Kafka中,作为日志数据的ODS层,从kafka的ODS层读取的日志数据分为3类, 页面日志、启动日志和曝光日志。这三类数据虽然都是用户行为数据,但是有着完全不一样的数据结构,所以要拆分处理。将拆分后的不同的日志写回Kafka不同主题中,作为日志DWD层。
页面日志输出到主流,启动日志输出到启动侧输出流,曝光日志输出到曝光侧输出流。
1. 分析主要任务
-
接收Kafka数据,过滤空值数据
对Maxwell抓取的数据进行ETL,保留有用的部分,过滤掉没用的
-
实现 动态 分流功能
由于MaxWell是把全部数据统一写入一个Topic中, 这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是
维度表
,有些表是事实表
,有的表既是事实表在某种情况下也是维度表。在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?
我们可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。
这种可以有两个方案实现
-
一种是用Zookeeper存储,通过Watch感知数据变化。
-
另一种是用mysql数据库存储,周期性的同步,使用
FlinkCDC
读取。
这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便。
如图:
业务数据保存到Kafka
的主题中
维度数据保存到Hbase
的表中 -
2. 代码实现
1. 建 配置表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '来源表',
`operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete(用于过滤是否处理 某些操作的数据)',
`sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段(用于过滤一些不需要的字段)',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展(建hbase表时,的表配置)',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2. maven
依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
3. 配置表实体类
@Data
public class TableProcess {
/**
* 动态分流Sink常量 改为小写和脚本一致
*/
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
/**
* 来源表
*/
private String sourceTable;
/**
* 操作类型 insert,update,delete
*/
private String operateType;
/**
* 输出类型 hbase kafka
*/
private String sinkType;
/**
* 输出表(主题)
*/
private String sinkTable;
/**
* 输出字段
*/
private String sinkColumns;
/**
* 主键字段
*/
private String sinkPk;
/**
* 建表扩展
*/
private String sinkExtend;
}
4. 在MySQL Binlog 添加对配置数据库的监听,并重启MySQL
修改配置文件
sudo vim /etc/my.cnf
把存放配置数据库(tmall_realtime
)添加至 binlog-do-db
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=tmall
binlog-do-db=tmall_realtime
下期预告:DWD层 用到的工具类
关注专栏持续更新 ????????????????????????????????????????????????????????????????
上一篇: 电源管理入门-8 休眠唤醒