欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

长篇访谈回放--低地的访谈体验

最编程 2024-04-10 17:20:33
...

按理说开头应该介绍一下自己,但由于这两年互联网动荡不安,想来许多人也是如此,所以就略过吧。谨以此文,记录下从23-24的面试历程. 在此文中有涉及某些细节不对还请谅解,此文仅以记录 这两年的成长

ps: 其实我很想转业务后端的.... 今年大数据的岗位好少, 可以的话想尽量在厦门,由于背上了房贷.妈妈扎针灸需要用到我的医保, 即将步入27岁的自己心情有些沉重

面试开始前的开源复盘

agent的隔离部署:

github.com/kaori-seaso…

本项目的主旨在于解决apisix插件运行时打包 方便进行开发, 支持热更新打包,为ApiSix的每个插件(每个runner属于不同线程) 使用隔离的classloader,并实现代码更新

  • watcher 监听本地文件变更(.class & .java)并缓存变更文件路径

  • 将变更文件上传到 server 并保存在临时目录(本地模式忽略次步骤)

  • 通过 javaagent 技术 attach 到 jvm 进程,拿到Instrumentation对象

  • 使用自定义类加载器(与业务代码隔离)加载 apisix-plugin-core 编译 java 文件(如有)

  • 读取 class 文件字节码,通过instrumentation.redefineClasses()方法重新定义并加载 class

Seata Saga go框架设计任务拆分

github.com/seata/seata…

  • 实现: github.com/apache/incu…

  • 背景:

seata saga模式未来会支持多种使用方式, 1)状态机json:json 2)流式编排 :saga.task(xxx).build().start() 3)类TCC模式

seata java里面的saga时基于状态机实现的, 1)2)都是基于状态机引擎实现的,3)可以脱离状态机引擎存在, 完全是业务的独立编码,不需要理解状态机过程

目前仅针对第三种模式而言,

定义了一个saga action的接口(在java里面是注解),里面有执行、和补偿两个方法。

业务模块里面调用到saga接口,都会走到seata-server去注册分支事务, 最终由seata-server根据全局事务的状态进行提交(在saga里面是空动作)、或者顺序补偿。

调研计划: 1.在tm初始化seataContext的上下文 编写对应的driver,使得 seata context 在向tm发送信号的话 根据driver 拿到seata Server的rm地址 从server返回的context协程当中拿到branchID和xid以及acion

seata-java的架构分为三者资源控制器,协调器以及事务管理器 用户发起事务请求后,transcationManager向TranscationCoordniator发起全局事务请求 这里会存储全局事务的XID,并将XID传递给ResourceManager,开始进行分支事务的注册

相当于Saga 状态机模式来讲 ,无论用户如果编写事务操作,始终只包含 根据事务类型(ServiceTask,choice action )的普通事务操作 以及补偿事务操作(compensation) 所以类tcc模式相当于退化版的tcc,没有prepare过程 一阶段直接提交正向/负向事务操作 无需等待. 二阶段只有在回滚的时候 才会发生事务补偿操作 所以基于注解的方式实现,设置action与compensation的拦截器,反射缓存用户调用方法 在分支事务的提交,注册阶段实现事务的编排. 也是将来用户普遍使用的一种方式

calcite gremlin adapter适配器贡献描述

PR: github.com/apache/calc…

Motivation(动机) Hi,社区.目前越来越多的用户在使用一些图数据库,比如JanusGraph,HugeGraph等 来做一些人员社交网络的关系表示, 以用来统计社交网络中每个用户的活跃度.大多数的图数据库在建图,遍历图阶段 都会采用Gremlin语法实现。 然而,这对不熟悉gremlin语法的用户很不友好。calcite作为一种查询框架存在的同时, 也提供了adapter接口,为不同的数据库方言做适配,比如解析,关系代数的转换,查询计划的绑定. 我们公司在适配各种图数据库的问题时得以解决,这是我的仓库: github.com/kaori-seaso…

Background(背景)

calcite本身支持adapter的数据库语言拓展,能够使用户在明白语法意义的情况下, 完成对方言的简化。比如拓展SqlNode实现语法的解析,拓展RelNode实现逻辑计划的映射.

thinkerpop是一个对多种图数据库的适配框架,在此框架中首次提及gremlin语法, 现在已经成为图数据库的通用查询层,在通过calcite的adapter接口拓展查询语句的同时, 我们也会借助thinkerpop的通用图数据库api,为不同的图数据库做方言兼容.

举个简单的例子: 从 SELECT "key" FROM inttype 映射到 g.V().hasLabel("inttype").group().unfold().select(Column.values).order().by(.unfold().id()).project("inttype").by(.project("key").by(.unfold().choose(.has("key"),.values("key"),.constant("$%#NULL#%$"))))

在设计架构层面分为三层.

解析语法层, 关系代数转换层,逻辑计划绑定层.

解析语法层: 在解析查询语句阶段,会将字段与等值条件拆分后转换为点和边

关系代数层: 将拆分成点和边的集合,在calcite抽象出聚合/排序/查询的阶段,转换成TableScan, 方便在查询计划生成的时候,根据条件以及字段信息 连接扫描/单表过滤等方式从图中任意边/任意起点开始遍历

逻辑计划绑定层: 将 连接扫描/单表过滤/投影等行为 绑定到calcite的planner当中 实现查询计划的生成

select语句 生成Gremlin逻辑计划的过程:

1.首先,所有的图数据库都是从一个源点开始建图的,我们会以thinkerpop提供的GraphTraversalSource 作为原点,对传入的点与边信息进行语法的提取. 这一步骤将在SqlSchemaGrabber当中实现 2.其次, 对于select/where/having/order by/group by 我们在解析阶段的计划如下:

  • group by :对于一个点而言。在图中存在出度和入度的属性.从数据表的视角来看,相当于将表的数据以IN 或者 OUT 这两各维度做聚合,此行为也对应着表的遍历图操作,在遍历图的过程当中,会生成fold/unfold标记,代表着图遍历的方向
  • select : 对于select操作而言,扫描全表的操作可以看作是将所有列投影成点属性.将每列的值变化对应到gremlin添加点的操作当中
  • where: 过滤操作在图计算语义的场景,可以看作是过滤点的出度和入度所连接的边,所以不涉及关系代数的转换, 而是直接下推到逻辑计划当中
  • order by : 在图遍历的过程中,我们提到过会在路径上生成fold/unflod用来表征方向的前进/后退. 如果遇到某个字段,没有可以排序的值,我们会后退到GraphTraversalSource的原点,结束排序操作 如果有可以排序的值,则会统一在SqlTraversalEngine当中,分别统计入度和出度进行聚合,然后根据label (IN/OUT)配合group by使用
  • having: 意义同grouo by 只是label不同(除了IN/OUT列,还需要指定具体的点字段)

samples(样例) 以下我会以我项目中的单元测试举一个简单的例子 现在假设有这样一个图数据集,存在多个点集????和一个边集合 点集分别表示 国家,公司,人群,空间位置

{
  "tables": [
    {
      "name": "company",
      "columns": [
        {"name": "name", "type": "string"}
      ]
    },
    {
      "name": "country",
      "columns": [
        {"name": "name", "type": "string"}
      ]
    },
    {
      "name": "planet",
      "columns": [
        {"name": "name", "type": "string"}
      ]
    },
    {
      "name": "person",
      "columns": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "integer"}
      ]
    },
    {
      "name": "spaceship",
      "columns": [
        {"name": "name", "type": "string"},
        {"name": "model", "type": "string"}
      ]
    },
    {
      "name": "satellite",
      "columns": [
        {"name": "name", "type": "string"}
      ]
    },
    {
      "name": "sensor",
      "columns": [
        {"name": "name", "type": "string"},
        {"name": "type", "type": "string"}
      ]
    },
    {
      "name": "sensorReading",
      "columns": [
        {"name": "tstamp", "type": "long_timestamp", "propertyName": "timestamp"},
        {"name": "dt", "type": "long_date", "propertyName": "date"},
        {"name": "value", "type": "double"}
      ]
    },
    {
      "name": "fliesTo",
      "columns":[
        {"name": "trips", "type": "integer"}
      ]
    },
    {
      "name": "orbits",
      "columns":[
        {"name": "launched", "type": "integer"}
      ]
    }
    ],
  "relationships": [
    {"outTable": "company", "inTable": "country", "edgeLabel": "baseIn"},
    {"outTable": "person", "inTable": "company", "edgeLabel": "worksFor"},
    {"outTable": "person", "inTable": "planets", "edgeLabel": "travelledTo"},
    {"outTable": "company", "inTable": "spaceship", "edgeLabel": "owns"},
    {"outTable": "person", "inTable": "spaceship", "edgeLabel": "pilots"},
    {"outTable": "sensor", "inTable": "sensorReading", "edgeLabel": "hasReading", "fkTable": "sensorReading"},
    {"outTable": "person", "inTable": "planet", "edgeLabel": "fliesTo"},
    {"outTable": "satellite", "inTable": "planet", "edgeLabel": "orbits"},
    {"outTable": "person", "inTable": "person", "edgeLabel": "friendsWith"}
  ]
}

面试中的项目复盘

字节跳动-推荐实时计算方向/ 某公司实时计算方向

Q:你可以说下你们统一计算引擎是怎么做的么 做这件事的目的,痛点,为什么这样设计(200字以内) - todo 汇报工作说结果,检讨工作说流程,请示工作说方案。

A: 首先需要先介绍一下计算引擎的定位, 我们是一家面向*的网络安全公司 架构分为四层,从底往上依次是 数据治理层, 离线/实时引擎层, 数据建模和数据标签系统 最上面是一些数据服务应用(人员实时轨迹,机动车备案). 我们既需要对下层数据治理层(进行数据标准化,画布治理,数据组织)后的数据 进行SQL计算,也需要支持上层数据建模或者数据标签平台输出聚合后的数据指标 解决了在接入不同数据源的时,数据不规整,口径不一致的问题

我负责的是数据画布治理,以及模型计算方面的工作 举一个比较常见的例子,在网安业务当中,反诈涉黑的现象是比较常见的,比如现在有一个诈骗团伙, 可能从某些渠道了解到目前你有(购房/某行大额消费/贷款)明显的消费倾向时, 会进行多级转账,而根据资金流的趋向,该转账的链路一定会在某一个节点重复关联, 什么意思呢 请试想一个链表 1 -> 2 -> 4 -> 5 -> 4 -> 6 -> 7 , 这里的4号节点在转账的时候重复了两次,则可能存在信用卡诈骗的情况

现在我们带入业务理解 ,张三在10月19号 给银行贷款了100w,然而可能在银行工作的小A 有存在下游产业链的相关的熟人,比如小额贷款,第三方贷款公司,可能会告知相关人员 进行推荐,这是资金链的首节点.从张三办理业务的银行人员出发, 到下游产业链会形成一个大的社交网络, 那么这个时候我们关注事件 是 被诈骗人 转账给 诈骗团队的这一事件, 并且涉及到金钱类相关的标签属性,且存在产业链的交集( 产业链相关的人员如何认识,是否存在相关的亲戚关系, 公司同事或前同事, 是否有在证券/投资从事相关工作)

Q:流程是什么

简化一下整个流程,涉及到五张表, 网民个人档案备案表, 银行消费转账记录表,人员三代以内亲属表, 以及涉黑人员名单, 人员轨迹表

对于反诈场景 最终目的是需要定位到 被诈骗人 在第一层发生关系的人员当中 在何时 何地进行与谁 什么银行进行的转账

先说一下数据比例

网民个人档案备案表 10亿数据 作为一个维度表 记录着一个人的年龄,身高,机动车是否备案,以及相关的银行账号信息

银行消费转账记录表 5亿数据 作为一个事实表 这里的转账记录属于快速变化维,基于拉链表采取按照三个月以内 按照时间跨度分表, 每个月末 同步一次

有点啰嗦,下面这里的介绍应该简化 人员三代以内亲属表 6亿数据 事实表 来源于手机,QQ,微信 等多渠道进行汇总之后的数据 支存在下级亲属以及上级亲属相关的信息

涉黑人员名单 由TD提供 6亿数据

第一步 根据时间跨度计算 根据被诈骗人的档案信息,找到人员银行账号,查询近三个月内的 银行消费转账记录,找到所有大于5w以上的大额转账,找到相关的所有银行流水明细,并返回相关的转账方账户 然后输出账号的人员编号,接着在第一级发生关系的人员三代以内亲属表当中进行扩线,分别以 (子,父)节点为单位进行扩线 找到所有想关联的人员, 分成两个临时表, 临时表A存与亲属(父子级别)相关的人员表 临时表B存相关联的银行流水明细

第二步 上述得到了两个临时表 第一级发生关系人员的银行流水明细 和 父子级别相关人员表 ,存在mongodb当中, 先对第一级发生关系人员的银行流水明细的日期进行聚合(月),查看发生在哪几个月频繁转账 然后筛选出最近三个月的转账记录, 对相关的人员进行记录 输出临时表C, 临时表C在与相关人员表进行join ,排除掉可能性低的社交关系人员 输出临时表D

ps: 由于数据分为数据治理平台和资源目录两层,资源目录是已经经过ODS层的表 所以这里银行的流水记录已经从原始的oracle转移到mongodb当中 第三步

由于人员三代以内的亲属表的数据(其实数据早期也并不是多规范,存在重录,乱录的情况), 需要先进行清洗,有些亲属下级人员编号不存在的记录优先排除掉, 样子就会造成一定程度的信息价值缺失

所以需要引入涉黑人员名单,这个表的来源比较复杂,有出入境,持Q人员等等, 用这个表其实是想通过临时表C再做一次关联,是否能筛选出有价值的信息 记为临时表E 将临时表D与表E合并 就得到了表F,此时我们会发现,父子级别相关人员表的扩线信息还没有利用 但是如果在这一层进行关联,在下一步流批关联的时候延迟会非常大,延迟1-2min是不可接受的 (因为会hang在join算子上),所以扩线信息 放在图计算这一次来进行社交网络的计算

第四步

由于人员轨迹信息属于实时变化的信息,所以存在kafka当中 流表与批表在join时,会存在由于checkpoint 对齐时间而造成的延迟,所以放在计算的最后一步 根据时间跨度 分别对近一年内 每三个月的数据(mongodb表进行经历oDS层处理的数据) 与表F进行关联 ,在关联近一周的数据(kafka表 未经历ods层处理的数据 ) 分别进行四次 双join关联 然后进行合并. 得到表G

在这条链路当中存在mongodb -> mongodb | -> mongodb + kafka的关系 oracle -> mongodb |

所以我们需要做几件事,因为mongodb同步到下游的过程中flink-cdc是必不可少的 并且如果一个mongodb的多个表,比如像银行流水表,已经分表了,如果分成多条insert into进行执行的话 不仅source会分成多个,并行度不好控制,并且数据库连接也会分成多个,造成连接数暴涨断流 所以我们考虑开发整库同步功能,将多个source在flink的执行计划上合并成一个 并通过设置分流的flink算子链并行度控制资源消耗,避免数据同步的时候出现频繁延迟和暴涨的情况(checkpoint间隔时间已经调整为2s)

首先提供一种类似于阿里云的CDAS语法, 保存源表和对应的schema结构, 在组装目标表schema的时候,遍历源表的 schema,使其订阅相关的schemaname和tablename 附加到OutTag标签流当中 然后利用自定义的Env 将翻译后的transfrom添加到执行计划图当中, 完成端对端的数据同步 , 同时因为多源合并的原因,源端并行度的分配是 由合并后的source算子控制, 所以便于提高并行度做控制

主要是利用了多源合并的机制 将多个数据源的连接数收敛到一个数据源 然后从一个数据源开始分发 分为两个三个阶段 统一元数据 统一建表 统一分发

我们会实现统一的元数据存储 基于内存catalog做持久化 通过解析flink的执行计划来完成 通过解析火山模型中project算子的字段信息 还原出 需要挂载的udf以及表schema信息,从而作为任务重启时候的表结构 保证上下游字段口径的一致性

Q: 你们模型任务的多节点输出的SQL切分怎么做的

A: github.com/kaori-seaso…

Q: 那你们HDFS小文件监控统计是怎么做的呢?

A: 采用的是hdfs自带的分析fsimage文件的命令hdfs oiv -i + fsimage文件 -o +输出文件 -p Delimited,该命令将fsimage文件解析成可阅读的csv文件

#建外表
CREATE EXTERNAL TABLE `default.hdfs_info`(
  `path` string, 
  `owner` string, 
  `is_dir` string, 
  `filesize` string, 
  `blocksize` string, 
  `permisson` string, 
  `acctime` string, 
  `modificatetime` string, 
  `replication` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' 
WITH SERDEPROPERTIES ( 
  'field.delim'=',', 
  'serialization.format'=',') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  'hdfs://nameservice1/tmp/dfs/content'

#sql分析一级目录大小
select joinedpath, sumsize
from 
(
select joinedpath,round(sum(filesize)/1024/1024/1024,2) as sumsize
from
(select concat('/',split(path,'/')[1]) as joinedpath,accTime,filesize,owner 
from default.hdfs_info
)t
group by joinedpath
)h
order by sumsize desc

#sql分析二级目录大小
select joinedpath, sumsize
from 
(
select joinedpath,round(sum(filesize)/1024/1024/1024,2) as sumsize
from
(select concat('/',split(path,'/')[1],'/',split(path,'/')[2]) as joinedpath,accTime,filesize,owner 
from default.hdfs_info
)t
group by joinedpath
)h
order by sumsize desc 
###后面的各级目录方式类似,就不再详述了,下面说下各级目录小文件统计的sql

#三级目录下小于100k文件数量的统计
 SELECT concat('/',split(path,'/')[1],'/',split(path,'/')[2],'/',split(path,'/')[3]) as path ,count(*) as small_file_num
  FROM 
  (SELECT relative_size,path 
  FROM 
  (SELECT (case filesize < 100*1024 WHEN true THEN 'small' ELSE 'large' end) 
  AS 
  relative_size, path 
  FROM default.hdfs_info WHERE is_dir='1') tmp 
  WHERE 
  relative_size='small') tmp2 
  group by concat('/',split(path,'/')[1],'/',split(path,'/')[2],'/',split(path,'/')[3]) 
  order by small_file_num desc;
  
###其他各级目录小文件数量的统计,方法类似,下面说下hive某个库下面表大小以及修改时间的统计
SELECT joinedpath,
       from_unixtime(ceil(acctime/1000),'yyyy-MM-dd HH:mm:ss') AS acctime,
       from_unixtime(ceil(modificatetime/1000),'yyyy-MM-dd HH:mm:ss') AS modificatetime,
       sumsize
FROM
  (SELECT joinedpath,
          min(accTime) AS acctime,
          max(modificatetime) AS modificatetime,
          round(sum(filesize)/1024/1024/1024,2) AS sumsize
   FROM
     (SELECT concat('/',split(path,'/')[1],'/',split(path,'/')[2],'/',split(path,'/')[3],'/',split(path,'/')[4],'/',split(path,'/')[5]) AS joinedpath,
             accTime,
             modificatetime,
             filesize,
             OWNER
      FROM default.hdfs_info
      WHERE concat('/',split(path,'/')[1],'/',split(path,'/')[2],'/',split(path,'/')[3],'/',split(path,'/')[4])='/user/hive/warehouse/default.db')t
   WHERE joinedpath != 'null'
   GROUP BY joinedpath)h
ORDER BY sumsize DESC

统计三级目录下 小于100k的小文件数量以及其相对路径 根据PayPal/NNanalysis定时输出监控报表 用户可以根据不同目录做手动聚合 默认自动聚合按照128M 聚合成一个文件 并生成_SUCCESS标记

用到HDFS的地方,在离线引擎 数据采集阶段未进入ODS时 控制fink sink到hdfs的时候文件滚动合并的策略128M 或者调整合并时间 * 2 等于checkpoint时间 让其不那么快输出

Q: 任务调度怎么做的?

分为三个定时任务和提交任务,当提交任务时,触发回调函数,来到调度终态

1.在提交任务时,发生异常时回调,如果没有,则进入下一步 2.更新回调job状态

  • 默认每分钟更新一次application集群的集群运行状态
  • 默认每20s更新一次historyserver的延时作业查询

背景展开 因为flink的实时任务通过之前反诈场景的描述,会存在顺序执行的关系,而之前介绍过 架构分为四层,从底往上依次是 数据治理层, 离线/实时引擎层, 数据建模和数据标签系统 在每一句SQL计算处理完成后,要即使的通知到数据建模过程中的对应节点, 比如反诈场景,计算完最近三个月的银行流水模型后,要等待任务状态更新到终态才能提交下一条SQL 否则建模任务就hang 在那里不能跑。建模任务可以看作是多个模型节点组成的流水线, 每一次执行需要等待前置任务状态完成

步骤如下:

  • 1.先查询running状态,默认所有任务都在跑(单个租户)
  • 2.根据applicationId 查询集群 当前applicatinId存在多个时候
    • 更新JobDetail任务详情信息,查询histoyserver当前的任务信息,回填任务状态, 分析task(集群级别)正在跑的任务(job)条数,回填任务状态
    • 如果当前集群状态为finished时,查询historyserver
    • 如果histoyserver不可用,则设置历史服务器的延迟时间, 如果集群状态是finished则为30s 否则大于当前时间则开启延迟同步

Q: 你们的flink hackthon季军项目 是怎么通过数据一致性算法实现实时物化视图的

A: 根据flink的论文DBLOG算法, 会将历史数据的watermark作为version标记,与实时数据的同步时间戳进行关联 实现无锁的并发同步, 在批数据源进行同步的时候 所有的watermark会传递到checkpointCoordinator, 在流数据源与批数据源开始join之前 会将进入join算子的本地时间作为全局时间(第一批成功对齐checkpoint的本地时间) ,然后通过将这一批流数据的版本号 binlog的序列化信息传递到水印处理算子当中,与本批次的流数据的时间戳呈现一对一的关系 如果当前的时间戳大于本批次的同步时间戳 则将本批次数据进行缓存 等待checkpoint对齐的时机到来时,通过collect统一法网下游,实现实时物化视图计算OLAP方面 但是缺陷在于 这种可缓存的数据量是很小的,不像doris有自己的LSM数据组织,一旦流数据延迟到来,无法将之前的数据落盘 对齐的barrier线程就只能一直等待

Q: 看你在fastjson2实现了json diff算法 能说下怎么实现的嘛

A: 数据模型节点的schema变更,需要通过稽核校验任务来保证数据一致性. 在上述流水线编排的情况下 无法通过湖仓的schema evoluation做动态变更 加上fastjson2的序列化性能在string上得到了较好的提高 所以打算采用fastjson2实现diff算法

分为三部分 数据分段 ,多线程文件句柄读写 数据合并

github.com/alibaba/fas…

shopee 数据流-消息队列方向

Q: 你工作中用到哪个消息队列 说下你看过的kafka源码细节

A: 以前的博客有写

Q: 说下你在rocketmq-eventbridge以及rocketmq社区参与的经历 1.EventTarget并发消费

eventbridge本身是一个EDA系统 包含了监听,发布,订阅.作用旨在于作为一个事件中心 作为多个订阅方以及多个被订阅方的介质,需要保证每个租户在订阅来自同一个集群的consumerGroup的时候 经历listener-> transfer -> trigger 这三个线程上下文中阻塞队列的数据做到相互隔离,避免并发消费 1.最先是调研了生产者-消费者模式 为每一个租户分配一个阻塞队列,以runnnername为维度 2.但是这种方式会随着消息的突增,造成流量的爆散效应.就是说阻塞队列中的数据会不断增长 而不是一个平滑增长的趋势,造成反压 所以我们考虑利用flink的水位线做流量控制,但是水位线的衡量成为了一个问题

比较经典的有spark当中基于PID的工业控制算法以及
Netflix的concurrency-limits库 
其中concurrency-limits依赖处理流量的回调决定执行什么限流策略
不符合上下游线程按照顺序消费过程中,流量的爆散问题. 
所以需要基于PID这种流量反馈的控制算法来做,就有了第五步 根据PID这种工业反馈回路做控制

2.rocketmq批量消费

github.com/apache/rock…

3.rocketmq幂等性

提案: https://docs.google.com/document/d/1nResLevPbeGmKwSQiId_jw0tfBhJtoPZTduOQL3qxNg/edit#heading=h.nwczedg8v2na

实现: https://github.com/kaori-seasons/rocketmq-enhance-client

工作可以拆分为两部分 - 1.单分区幂等 Q: 对于roketmq的单集群维度而言 一个instance对应一个消费组实例 由于instance在计算消息位移时 需要知道每个分区的maxOffset与nextoffset之间的messageId数据, offset是存在客户端还是服务端 为什么要这样做? A: 针对集群消费offset保存在broker,针对广播消费offset保存在本地

-   2.基于事务协调器 对齐消费者的consumerOffsetepoch 实现多分区一致性
  Q: 并发消费如果造成 消费者线程A,消费者线程B的消费顺序乱序 怎么避免

  A: 滑动窗口+broker远端保存+sendback+本地重试兜底

4.eventbridge反压限流 做这件事的目的,痛点,为什么这样设计: github.com/apache/rock…

  eventbridge的限流:
  因为事件的派发是由消费组动态消费的,下游无法准确的知道何时消息数暴涨,何时消息数下降,所以需要一套动态根据QPS限流的算法
  比较经典的有spark当中基于PID的工业控制算法以及Netflix的concurrency-limits库 其中concurrency-limits依赖处理流量的回调决定执行什么限流策略
  不符合上下游线程按照顺序消费过程中,流量的爆散问题. 所以需要基于PID这种流量反馈的控制算法来做
  大致的算法逻辑为:
   Math.round(Kp * error + Ki * interval + Kd * derivative)
   用于控制速度。例如计算得到速度为20条/s,令牌桶设置为20。需要1秒生成20个,每次拿一个,拿20次拿完即止,等待下一次采样
   这里的采样是指从上游线程的阻塞队列中采样正在存留队列中的元素
   (1)增大比例系数Kp一般将加快系统的响应,在有静差的情况下有利于减小静差。但过大的比例系数会使系统有较大的超调,并产生振荡,使系统的稳定性变坏;
(2)增大积分时间TI一般有利于减小超调,减小振荡,使系统更加稳定,但系统静差的消除将随之减慢;
(3)增大微分时间TD亦有利于加快系统的响应,减小振荡,使系统稳定性增加,但系统对干扰的抑制能力减弱,对扰动有较敏感的响应;另外,过大的微分系数也将使系统的稳定性变坏。

1.通过定时采集上游消费组的数据容量,将其传入pid估算器当中计算阻塞队列中消息的数量与预期值的差距,并将其转化为生产速度 2.通过映射的速度初始化一个阻塞队列用于控制生产速度推送速度,也可以用令牌桶,但令牌桶不能动态设置每秒令牌生产的数量,要求重新初始化 3.可以通过kp参数控制反应速度 ,通过控制振幅 减少震荡 4.在Listener,transfer的阻塞队列范围内 水位线可以根据需要调整

简单来说就像一个微积分的无穷级数 横轴的时间 步长越大 流量上升的曲线越平滑

  1. rocketmq通常情况下怎么做到重复消费

例如:假设我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存 假设这个消费的所有代码加起来需要1秒,有重复的消息在这1秒内(假设100毫秒) 内到达(例如生产者快速重发,Broker重启等),那么很可能, 上面去重代码里面会发现,数据依然是空的(因为上一条消息还没消费完, 还没成功更新订单状态) ,那么就会穿透掉检查的挡板,最后导致重复的消息消费逻辑进入到 非幂等安全的业务代码中, 从而引发重复消费的问题(如主键冲突抛出异常、库存被重复扣减而没释放等)

方案: 基于关系数据库事务, 开启事务把select 改成 select for update语句,把记录进行锁定。

select * from t_order where order_no = 'THIS_ORDER_NO' for update  //开启事务
if(order.status != null) {
    return ;//消息重复,直接返回
}

假设我们业务消息消费的逻辑是:更新MySQL数据库中 某张订单表的状态 要实现Exactly Once 样做:在这个数据库中增加一个消息消费记录表, 把消息插入到这个表, 并且把原来的订单更新和这个插入的动作放到同一个事务中 一起提交,就能保证消息只会被消费一遍了。

逻辑如下:

  • 开启事务
  • 插入消息表(处理好主键冲突的问题)
  • 更新订单表(原消费逻辑)
  • 提交事务

Rocketmq基于消息幂等表的非事务方案

消息已经消费成功了,第二条消息将被直接幂等处理掉(消费成功)。 并发场景下的消息,依旧能满足不会出现消息重复,即穿透幂等挡板的问题。 支持上游业务生产者重发的业务重复的消息幂等问题。

关于第一个问题已经很明显已经解决了,在此就不讨论了。

关于第二个问题是如何解决的?主要是依靠插入消息表的这个动作做控制的, 假设我们用MySQL作为消息表的存储媒介(设置消息的唯一ID为主键), 那么插入的动作只有一条消息会成功,后面的消息插入会由于主键冲突而失败, 走向延迟消费的分支,然后后面延迟消费的时候就会变成上面第一个场景的问题。

瑞幸 - java-供应链方向

八股文问的好像比较多 cas,aqs,线程池姑且复习一下

  • 1.分库分表 blog.****.net/u013615903/…

    • 分页查询怎么办
    • join查询怎么从各个分表取数据
    1. mysql千万级表怎么优化
    • 方式一: Order by + select字段 加联合索引
    • 方式二: Order by 加索引以及手动回表
      • 在范围查询的情况下, 先把id捞出来 对id进行扫描(手动回表), 不用让mysql自己去查主键索引数
      select .... from xxx join ( select id from xxx order by score desc limit 90000,20) on a.id = b.id )
      

    手动回表造成的痛点 - 解决由于回表性能消耗过大而不走索引的问题

3.AQS场景带入:(重入读写锁原理) 略过

4.synchonr对象空间布局 : 偏向的线程ID,GC年龄,偏向锁标记,锁状态 偏向锁: 在锁对象头部记录一下当前获取到该锁的线程ID,该线程如果下次又来获取就能直接获取,可以支持锁冲入 轻量级锁: 偏向锁如果此时有第二个线程来竞争,就会升级为轻量级锁 重量级锁: 如果自旋次数过多,则会升级为重量级锁 5. 说下公平锁和非公平锁 公平锁:每个线程获取锁的顺序是按照线程访问锁的先后顺序获取的, 最前面的线程总是最先获取到锁。 非公平锁:每个线程获取锁的顺序是随机的,并不会遵循先来先得的规则, 所有线程会竞争获取锁。

  • 业务方向(分析业务流程,找出业务瓶颈,推断场景题) 从以往的业务视角: 供应链库存拆分,消费者同时下单,延迟怎么补偿的场景题应该是会问的

从产品分析(简历匹配部分&& 趋向引导): APP首页 -> 支付结果页 -> 返回取餐码

APP首页作为着陆页,存在曝光,转化,增长等因素;这里可能会问我数据分析

如何根据漏斗模型计算出一天内的pv , uv ,gmv 。以及营业额和返客数

app与后端, 一般采取长链接保持用户的channel通道active,以保证退单,补单,下单业务 如果网络抖动 造成下单或者退单失败会怎么样 分类讨论

  • 如果是下单失败 可以在每次下单时设置乐观锁, 将乐观锁的抢锁标记计入缓存. 再用户进入结算之前 利用本地事务 检查缓存是否存在乐观锁标记,如果异常被catch 则取消缓存中的乐观锁标记.

Q: 那用本地事务的话,如果用户此时又下了一笔单,但是由于前一笔单的事务没有完成 此时用户下单会存在什么问题么(库存更新)

对于一次请求同时抢购多个商品,可能会出现死锁。 更新库存的操作可以开一个新的本地事务 按人数做限制 多次更新 100份库存 每人限制2,按2的倍数,每份分20份库存 分散锁竞争

  • 如果是退单失败 由于是负向交易,涉及到的业务属性(sku,发票,审核冻结金额,更新库存) 比较多.此操作应该允许有一定的延迟 先锁库存,计算需要退换的优惠券金额 设置两个本地事务 呈内外依赖 先退金额 再退货 由于事务传播的特性 内部事务执行完才会执行外部事务, 延迟超时可以通过补单处理

Mysql查询优化:

  分析原因:
  1.查询语句写的太过冗长
  2.索引失效
  3.关联查询太多join
  4.服务器调优

  定位慢查询性能瓶颈:
  1.IO(数据访问消耗了太多时间,是否正确使用索引)
  2.CPU(数据运算花费了太多时间,数据聚合,分组排序是不是存在数据量过大导致运算效率下降)
  
  明确优化目标
  1.需要根据数据库当前的状态
  2.数据局与该条SQL的关系

  怎么做
  1.尽可能在索引中完成排序
  排序操作用的比较多,索引本来就是排好序的 所以速度很快,
  没有索引的话,就需要从表中拿数据,在内存中进行排序,
  如果内存空间不够还会溢写到磁盘

  2.只获取自己想要的列

  3.join尽可能少
  如果join占用资源比较多,会导致其他进程等待时间变长
  不要超过三张表
  4.如何判定是否需要创建索引
    - 较为频繁的作为查询条件字段应该创建索引
    2.唯一性(状态字段,类型字段)太差的字段不合适作为索引,即便是查询条件
    3.更新非常频繁的字段不适合创建索引,会造成查询时等待锁的时间增加
    如何选择合适索引
    - 对于单键索引,尽量选择当前查询过滤性更好的索引(唯一索引 or 主键索引)
    - 选择联合索引,当前查询过滤性最好的字段在索引字段顺序中排序靠前
奇富科技 - 模拟面试

Q:请简要说下你工作中负责的内容

A: 身份落地:最终的目的是要落地一个手机号对应最准的一个身份证号 我们第一版的身份落地会以手机的一些实名信息(外卖,快递)的扩线信息作为主流 提取<手机号,姓名>的集合,与支线的机动车,ICP,网名档案,航空铁路的信息进行碰撞分析

分成三个主流 手机号,QQ,微信号 对支线的数据源进行扩线,其中我们会根据各个渠道的数据进行打分, 对于每个主流的匹配。大致分类有出入境,铁路,航空,户籍等标签.手机每个渠道的数据信息, 然后根据每个渠道的数据信息,会根据目标任务(反诈,涉黑)的行为信息进行碰撞分析要素 最后基于随机森林进行每次分支的决策树(二分类器,比如涉黑与非涉黑)权重打分, 从而得到经过权重打分的手机号->身份证信息

Q: 你说的基于每个主流的匹配是怎么做的,基于随机森林的二分类切割的数据集有什么依据标准

A: 每个主流的数据来源于 从一张非常大的维度表(网民档案)汇总而来的数据,这张表本身代表着 各渠道的数据汇总,我们需要以这张表为起点,根据他的各个维度扩线出有价值的信息, 一般以反诈场景为例,我们关心 一个人在近三个月在那些银行转账了多少金额 那么时间,地点,金额就作为扩线的三个维度进行碰撞,我们会把转账流程中涉及到的 整个人员社交网络的关系信息,放到缓存当中,在后面的主流数据中,出现了相关人员的信息 会以手机号为唯一要素 从数据库中补充其他维度信息,完成数据集的收集。 比如金额 每个行的转行单日是存在限额的 如果转账的金额很大,根据犯罪分子的心里 会尽可能的想快点转完 行为筛选: 那么我们会以单日转账 5000,一个月内5000的转帐次数达到 10次时,分割一次数据集 第一版 我们会统计来自转账的三级社交网络中,符合上述策略的转账次数(在所有行) 以此作为统计的评分权重. 在此情况之下,可能数据集还是分割的不明显,我们可能会对大于或者小于该策略 的数据集分别以手机号为关联进行 亲属关系的扩线,获取更多的数据集 那么下一个二分类的条件,可能就是转账金额超过3000了,这样又分割一次数据集 为了使得 分割后的数据集都能被尽可能的利用 所以而分类器的权重不会是<0,1> 这种简单的权重打分, 我们会根据扩线出来归一化后数据集在各个标签内的数据集比例设置动态权重 比如机动车备案这个支线数据5000条数据 铁路类出现200次,出入境内出现500次 他们的权重分别是200/5000 , 500/5000这样分类,那么您就会问了 这样一来,如果有多标签的话 那不是变成一个多分类问题了么

权重评分表是怎么设计的?

根据社交关系分类讨论

1.QQ,微信类: 包含与好友的个人资料的交集,互访行为,单向互动,共同参与 个人资料: 教育/工作经历,出生年代星座/血型, 如果占据资料完整度的70%- 80% 则10分这个评分 互访行为: 在一个月时间内,我访问好友(或者好友访问我) 空间或者发表的内容的次数, 以天为单位进行计算 天数 点赞数0.2+ 天数转发数0.6 + 天数收藏*0.1 计入评分 单向互动是指: 在一个月时间内,我对浩宇发表内容的评论/回复,赞,转发行为 共同参与是指:在一个月时间内,我和好友对共同好友发表内容的评论/回复,对同一动态的点赞

然后将权重的进行归一化,划分成4个区间进行分类

是的 这是一个多分类问题,但由于标签之间关联程度比较小, 对于我们而言信息熵有些过高了 所以此时我们会分裂多个决策树取解这个问题,在每一次做决策的时候, 我们会把人员社交关系 涉嫌人员A -> 关联人员B的关系保存到缓存当中,默认大小保存5000条数据, 去进行每个标签维度的扩线

Q:那你们如何保证验证的数据是可信的呢,考虑到数据质量的问题, 准确率和召回率应该怎么算呢

A:我们一般会以网民档案关联 居委会查访,疫情备案的数据作为正例 ,与网民档案(10亿)记录中出现在两个标签以上的维度做统计 作为我们的验证样本集

正确率按照一般的计算方式 正确预测的正反例/预测总数 验证样本集可以作为预测总数,而我们的每一次决策树的扩线信息 可以作为正反例 由于我们只关心在信息保真的情况下 ,数据的准确预测数量,决策树分叉的分支在10层以内 比较合理,所以在准确率到达0.7时 数据质量认为是比较好的

召回率这个指标的意义在于,统计当前的数据集能够关联出哪些相关的数据集 但是在决策树场景下,分类器筛选出来的信息,并不一定能说明这个人没有嫌疑 上面我也讲过了,在每次分类的情况下 分类器的权重不会是<0,1>. 只是哪些数据相对可信,所以我们没有去专门统计召回率

怎么解决离散值?

L2拟合化

Q:你们用什么缓存保存的人员关系,大概多大数据量,怎么解决缓存与数据库的一致性问题

A