Flink SQL CDC 已上线!我们总结了 13 项生产实践-1.Flink 作业最初是在独立会话模式下运行的,提交多个 Flink 作业会导致作业失败错误。
最编程
2024-03-31 18:54:08
...
02 解决方案
03 项目运行环境与现状
04 具体实现
-- 在Flink创建账单实收source表
CREATE TABLE bill_info (
billCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING ,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11,2),
status STRING,
proc_time AS PROCTIME() -–使用维表时需要指定该字段
) WITH (
'connector' = 'mysql-cdc', -- 连接器
'hostname' = '******', --mysql地址
'port' = '3307', -- mysql端口
'username' = '******', --mysql用户名
'password' = '******', -- mysql密码
'database-name' = 'cdc', -- 数据库名称
'table-name' = '***'
);
-- 在Flink创建订单实收source表
CREATE TABLE order_info (
orderCode STRING,
serviceCode STRING,
accountPeriod STRING,
subjectName STRING ,
subjectCode STRING,
occurDate TIMESTAMP,
amt DECIMAL(11, 2),
status STRING,
proc_time AS PROCTIME() -–使用维表时需要指定该字段
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '******',
'port' = '3307',
'username' = '******',
'password' = '******',
'database-name' = 'cdc',
'table-name' = '***',
);
-- 创建科目维表
CREATE TABLE subject_info (
code VARCHAR(32) NOT NULL,
name VARCHAR(64) NOT NULL,
PRIMARY KEY (code) NOT ENFORCED --指定主键
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true',
'driver' = 'com.mysql.cj.jdbc.Driver',
'table-name' = '***',
'username' = '******',
'password' = '******',
'lookup.cache.max-rows' = '3000',
'lookup.cache.ttl' = '10s',
'lookup.max-retries' = '3'
);
-- 创建实收分布结果表,把结果写到 Elasticsearch
CREATE TABLE income_distribution (
serviceCode STRING,
accountPeriod STRING,
subjectCode STRING,
subjectName STRING,
amt DECIMAL(13,2),
PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://xxxx:9200',
'index' = 'income_distribution',
'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL'
);
INSERT INTO income_distribution
SELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt
FROM (
SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM bill_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
UNION ALL
SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt
FROM order_info AS b
JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code
GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name
) AS t1
GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;
05 踩过的坑和学到的经验