欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

实战Datahub:使用Sqllineage解析SQL实现全链路数据追溯

最编程 2024-01-24 14:01:10
...
from sqllineage.runner import LineageRunner def test_create_as(): sql=""" -- mes数据中获取每个批次第一次上线扫码时间 drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_00; create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_00 as select min(produce_date) min_produce_DATE, mo_lot_no, organization_id from bda${db_para}.BDA_MES_PRODUCT_SUMMARY where factory_no ='CY-SR' and step_name in ('OC上线组装','整机组装1') group by mo_lot_no, organization_id ; -- 订单承诺 drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1; create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01_1 as select t1.version_id , t1.promise_id , t1.organization_id , t1.order_id , t1.order_no , t1.order_stage , t1.order_type , t1.so_type , t1.order_status , t1.order_priority , t1.promise_status , t1.product_id , t1.product_no , t1.product_model , t1.order_qty , t1.bu_name , t1.rcv_client_name , t1.prepared_client_name , t1.order_source , t1.om_user_name , t1.term_cust , t1.to_pur_time , t1.factory_no , t1.mo_lot_no , t1.completed_qty , t1.mo_audit_status , t1.req_arrival_time , t1.mtr_ready_time , t1.plan_promise_time , t1.promise_date_change_reason , t1.schedule_start_time , t1.schedule_end_time , t1.pps_type , t1.pps_exception_info , t1.promise_diff_day , t1.promise_delivery_cycle , t1.change_reason , t1.client_abbr , t1.item_type_product , t1.match_forecast , t1.software_flag , t1.risk_level , t1.risk_reason , t1.ckd_type , t1.crt_user , t1.crt_time , t1.upd_user , t1.upd_time , t1.crt_user_name , t1.upd_user_name from bda${db_para}.bda_whole_pto_order t1 left join bda${db_para}.bda_promise_history_record t2 on t1.promise_id = t2.promise_id and coalesce(t2.afterchangereason,'') = 'AGAIN_PLAN' where t1.version_id like '%最新版本%' and t2.promise_id is null union all select t1.version_id , t1.promise_id , t1.organization_id , t1.order_id , t1.order_no , t1.order_stage , t1.order_type , t1.so_type , t1.order_status , t1.order_priority , t1.promise_status , t1.product_id , t1.product_no , t1.product_model , t1.order_qty , t1.bu_name , t1.rcv_client_name , t1.prepared_client_name , t1.order_source , t1.om_user_name , t1.term_cust , t1.to_pur_time , t1.factory_no , t1.mo_lot_no , t1.completed_qty , t1.mo_audit_status , t1.req_arrival_time , t1.mtr_ready_time , t1.plan_promise_time , t1.promise_date_change_reason , t1.schedule_start_time , t1.schedule_end_time , t1.pps_type , t1.pps_exception_info , t1.promise_diff_day , t1.promise_delivery_cycle , t1.change_reason , t1.client_abbr , t1.item_type_product , t1.match_forecast , t1.software_flag , t1.risk_level , t1.risk_reason , t1.ckd_type , t1.crt_user , t1.crt_time , t1.upd_user , t1.upd_time , t1.crt_user_name , t1.upd_user_name from ( select t1.version_id , t1.promise_id , t1.organization_id , t1.order_id , t1.order_no , t1.order_stage , t1.order_type , t1.so_type , t1.order_status , t1.order_priority , t1.promise_status , t1.product_id , t1.product_no , t1.product_model , t1.order_qty , t1.bu_name , t1.rcv_client_name , t1.prepared_client_name , t1.order_source , t1.om_user_name , t1.term_cust , t1.to_pur_time , t1.factory_no , t1.mo_lot_no , t1.completed_qty , t1.mo_audit_status , t1.req_arrival_time , t1.mtr_ready_time , t1.plan_promise_time , t1.promise_date_change_reason , t1.schedule_start_time , t1.schedule_end_time , t1.pps_type , t1.pps_exception_info , t1.promise_diff_day , t1.promise_delivery_cycle , t1.change_reason , t1.client_abbr , t1.item_type_product , t1.match_forecast , t1.software_flag , t1.risk_level , t1.risk_reason , t1.ckd_type , t1.crt_user , t1.crt_time , t1.upd_user , t1.upd_time , t1.crt_user_name , t1.upd_user_name , row_number() over (partition by t1.promise_id order by t1.version_id desc) rn from bda${db_para}.bda_whole_pto_order t1 where version_id not like '%最新版本%' and not exists (select 1 from bda${db_para}.bda_whole_pto_order t2 where version_id like '%最新版本%' and t1.promise_id = t2.promise_id ) ) t1 left join bda${db_para}.bda_promise_history_record t2 on t1.promise_id = t2.promise_id and coalesce(t2.afterchangereason,'') = 'AGAIN_PLAN' where t2.promise_id is null and t1.rn = 1 ; -- CRM订单与工单关联 drop table if exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01; create table if not exists sda${db_para}.tmp_sda_delivety_complete_sr_sum_01 as select bu.dept_name bu_name ,t2.organization_id -- 20220701 wyr -- ,'514' Organization_Id ,t1.item_code item_code ,cus.cus_name -- 收货客户 ,t1.so_header_id ,t1.so_line_id so_line_id ,t1.so_code so_header_code ,t1.line_no so_line_code ,t2.wip_entity_name -- 工单号 ,t2.lot_number -- 批次 ,t2.Project_Name ,t1.om_user_name Om_User_Name -- 销管 ,t1.sale_name sales_user -- 销售 ,case when bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' and mig.min_class like '%PC模块%' then date_add(t1.pur_start_time, 20) when bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 35) when bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' and mig.min_class like'%PC模块%' then date_add(t1.pur_start_time, 25) when bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 45) when bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' and mig.min_class like '%PC模块%' then date_add(t1.pur_start_time, 20) when bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' and mig.min_class not like '%PC模块%' then date_add(t1.pur_start_time, 30) else t1.pur_start_time end stat_date -- 统计日期 提交下采购日期 + 对应日期 ,substr(t1.expected_delivery_date, 1, 10) delivety_time -- 计划发运日期 ,substr(t1.crt_time, 1, 10) crm_create_time -- 销售订单创建时间 ,substr(t1.pur_start_time, 1, 10) purchase_date -- 提交下采购时间 ,substr(t1.produce_start_time, 1, 10) produce_date -- 下生产时间 ,substr(t2.Xwh_Creation_Date, 1, 10) wip_create_date -- 委外工单创建日期 ,substr(t2.Scheduled_Start_Date, 1, 10) Scheduled_Start_Date -- 工单齐套日期 ,substr(t2.Mc_Creation_Date, 1, 10) Mc_Creation_Date -- 生管确认时间 ,substr(t2.first_trx_date, 1, 10) first_finish_date -- 首次完工入库日期 ,substr(t2.last_trx_date, 1, 10) last_finish_date -- 完全完工入库日期 ,t1.so_type_name order_type -- 订单类型 ,t2.wip_job_status -- 工单状态 ,t2.Job_Type -- 工单类型 ,t2.Class_Code -- 工单分类 ,t2.Quantity_Completed -- 工单已完工数量 ,t1.qty -- 订单数量 ,case when t6.order_no is not null then t6.match_forecast else bsse.is_source_forecast end as is_source_forecast -- 订单有无预测 ,mio.planning_make_buy_code -- 整机加工模式 制造/采购 ,case when mig.min_class like '%PC模块%' then 'PC模块' else '其他' end prod_type ,datediff(t2.last_trx_date, t1.pur_start_time) supply_cycle -- 供应链周期 (取多个工单中最早的完工入库时间,计算供应链周期) ,case when t1.so_type_name <> '备品订单' and t2.first_trx_date is not null then 'Y' else 'N' end supply_cycle_flag -- 供应链周期标识 ,case when t1.so_type_name = '客户订单' and t2.Job_Type = '标准' and ( (bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 20) or (bsse.is_source_forecast = '1' and mio.planning_make_buy_code = '制造' and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 35) or (bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 25) or (bsse.is_source_forecast = '0' and mio.planning_make_buy_code = '制造' and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 45) or (bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' and mig.min_class like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 20) or (bsse.is_source_forecast is null and mio.planning_make_buy_code = '制造' and mig.min_class not like '%PC模块%' and datediff(t2.first_trx_date, t1.pur_start_time) <= 35) ) and t2.first_trx_date is not null then 'Y' else 'N' end delivety_complete_flag -- 交付达成标识 ,case when t1.so_type_name in ('客户订单','销售订单') and t2.Job_Type = '标准' then 'Y' else 'N' end is_delivety_complete_flag -- 交付达成标识 ,t1.expected_delivery_date overseas_stat_date -- 海外订单交付达成归集时间 ,case when t1.so_type_name in ('客户订单','销售订单') -- and bsse.is_source_forecast is not null and datediff(t2.last_trx_date, t1.expected_delivery_date) <= 0 and t2.last_trx_date is not null then 'Y' else 'N' end overseas_is_delivety_complete_flag -- 海外订单交付达成标识 ,case when t1.so_type_name in ('客户订单','销售订单') -- and bsse.is_source_forecast is not null and (datediff('${bizDate}', t1.expected_delivery_date) >= 0 or (datediff('${bizDate}', t1.expected_delivery_date) < 0 and datediff(t2.last_trx_date, t1.expected_delivery_date) <= 0) ) then 'Y' else 'N' end overseas_delivety_complete_flag -- 海外订单交付达成数据范围 ,row_number() over(partition by t2.Lot_Number order by t1.pur_start_time) rn ,t2.Start_Quantity wip_qty ,t2.fisrt_picking_date -- 首次领料时间 ,t3.first_ship_date ,t3.last_ship_date ,-1*trx33.shipped_qty shipped_qty -- 已出货数量 ,t2.Quantity_Completed + trx33.shipped_qty as difference_qty -- 差异 ,dmpm.screen_size -- 尺寸 ,t2.Created_By as pm_user -- 生管负责人 ,substr(t3.min_scheduled_date, 1, 10) as min_scheduled_date -- 实际齐套日期 ,substr(t5.min_produce_DATE, 1, 10) min_produce_date ,t1.bt_name -- add by tjl 2022.07.21 ,bsse.so_line_group_id -- ,substr(t3.online_date, 1, 10) as online_date ,datediff(substr(t1.expected_delivery_date, 1, 10),substr(t1.pur_start_time, 1, 10)) as cus_expect_cycle -- 客户期望周期 ,case when t6.order_no is not null and t6.plan_promise_time is not null then datediff(substr(t6.plan_promise_time,1,10),substr(t1.pur_start_time, 1, 10)) -- 如有承诺日期 预计供应链=承诺日期-下采购日期 when t6.order_no is not null and t6.plan_promise_time is null and t2.wip_entity_name is null then datediff(date_add(substr(t6.mtr_ready_time, 1, 10),6),substr(t1.pur_start_time, 1, 10)) -- 无承诺日期 未开工单,= 齐套日期+6 when t2.wip_entity_name is not null and t3.online_date is not null then datediff(date_add(substr(t3.online_date, 1, 10),4),substr(t1.pur_start_time, 1, 10)) -- 已开工单,已有上线日期,=上线日期+4 when t2.wip_entity_name is not null and t3.online_date is null then datediff(date_add(substr(t2.Scheduled_Start_Date, 1, 10),6),substr(t1.pur_start_time, 1, 10)) -- 已开工单,暂无上线日期,=齐套日期+6 end as estimate_supply_cycle -- 预计供应链周期 ,t8.cus_level -- from bda${db_para}.bda_oms_so_lines t1 FROM bda${db_para}.bda_sd_so t1 left join bda${db_para}.bda_sd_so_ext bsse on t1.so_line_id = bsse.so_line_id and bsse.part_dt IN ('crm_so', 'oms_so') join bda${db_para}.bda_job_inv_trx_zj_dtl t2 on bsse.so_line_group_id = t2.source_line_id -- and t1.so_header_id = t2.source_header_id left join dim${db_para}.dim_hcm_orgunit bu on t1.bill_bu_id = bu.dept_oid left join bda${db_para}.comm_market_cus cus on t1.rec_cus_code = cus.id -- join (select item_value, fullname -- from o_crm${db_para}.comm_dictionary_detail -- where parentcode = '$CRM_DELIVERY_SO_TYPE') cdd -- on cdd.item_value = t1.so_type left join dim${db_para}.md_item_group mig