谈论大数据--后端开发中经常涉及的大数据开发技术栈
基本简介
1-大数据特点
第一个就是数据量大的特点
第二个种类多样
1)结构化:多指关系型数据库中,信用卡号码、日期、财务金额、电话号码、地址等
2)半结构化:xml / json
3)非结构化数据:全文文本、图像、声音、影视、超媒体等信息
第三个就是应用价值高
2-大数据的作用
1-追溯:日志数据、定位问题;
我们服务会打一些日志,当服务出现一些问题的时候,我们就可以根据这些日志数据去定位问题
2-监控:监控服务(可用性,QPS,上下游的调用情况,接口访问量);
监控具体服务,比如看双十一的销售额;数据质量等等;
3-分析与洞察:toB业务,看板报表搭建;
看后台的数据,对当前业务的影响程度,用户画像,用户兴趣分布
3-数据分类
按照数据时效性(延迟情况)
1)离线:
在今天(T)处理N天前的数据,常用的是今天处理昨天产出的(T-1)的数据,
也就是T+1的产出数据, 延迟粒度为-->天
2)准实时:
在当前小时,处理N小时前的数据,延时粒度为-->小时
3)实处理时:
在当前处理时刻,处理当前的数据,延迟粒度为-->秒
数据处理链路
大致思路
数据采集-->数据处理-->数据存储-->数据服务
1-数据采集
从数据的源头,也就是各种数据源中获取数据。
分离线和实时采集,比如通过采集日志数据,实时采集到数据中间件(mq)中
1)消息队列:Kafka & Rocket
解耦:接口出现问题,不会影响当前功能;
异步处理:不用走完所有流程再返回结果;
流量削峰:流量高峰期,可以将数据暂放在消息队列中;
2)消息代理:eventbus
事件总线,构建在消息队列之上的,围绕eventbus构建的一种虚拟总线,用户无需关注底层消息队列的细节(比如选型,多少分区,扩容,报警之类的),覆盖消息声明、发布、订阅、运维
举例:
调用
服务A------------->服务B 不会直接通过接口哦调用
发送 订阅
服务A-------------->消息队列-------------->服务B
(1)使用场景展示:
若服务A1,A2,A3……An逐渐多了起来的时候,想往服务B的数据库里面写一些数据,如果数据直接写的话,就是当QPS比较高的时候,会对服务B造成一定的压力。
(2)解决方案:
让若干个的服务统一的把数据按照固定的格式发到消息队列中,然后服务B可以重写消息队列中某某消费的一个接口,然后服务B再去处理这些数据 然后可以实现上述的功能(消息队列)。
3)使用场景展示
若服务A1,A2,A3……An逐渐多了起来的时候,想往服务B的数据库里面写一些数据,如果数据直接写的话,就是当QPS比较高的时候,会对服务B造成一定的压力。
解决方案:
让若干个的服务统一的把数据按照固定的格式发到消息队列中,然后服务B可以重写消息队列中某某消费的一个接口,然后服务B再去处理这些数据 然后可以实现上述的功能(消息队列)。
2-数据处理
离线用(hive、spark),(近)实时用flink。
数据同步:全量、增量同步 (在数据采集/数据处理阶段,方案的选择)
数据存储:将数据库存到数据库中
存储规范:数据仓库相关的知识;数仓表的设计
数据服务:根据数据做出一些具体的应用
3-总结
首先要确定业务的使用场景,然后在选择适用于业务场景的一个数据库,再决定要选择一个怎么样的数据加工处理方式。
数据计算处理
【学习路线】
->首先接触的hiveSQL,感觉跟MySQL很像(DDL,DML,DQL语句),学了一种数据库,其他SQL语法差不多
->学习spark参数相关的,优化任务执行
->了解Hadoop、spark原理
1-离线处理(批处理)Hive
离线计算(首推hive、spark)
流式计算(flink)也可以做批处理
基于Hadoop的数仓管理工具,可以将 HDFS 上的文件映射为表结构
架构:
1)元数据存储
2)SQL到MapReduce的转换(SQL的解析)
注意要点:
存储:hdfs
计算:MapReduce/Spark
2-hive SQL的基础语法
参考: hive中文文档
常用的是创建 天级/小时级别表,分区为date/hour
-- 简单查询语句
select field1, field2
from db.table
where date = '${date}' and hour = '${hour}'
-- 聚合查询语句
select field1, sum(field2) as field2_alias
from db.table
where date = '${date}' and hour = '${hour}'
group by field1
-- 临时查询
WITH subquery AS (
SELECT col1, col2
FROM db.table1
WHERE condition1
)
SELECT a.col1, a.col2, b.col3
FROM subquery a
JOIN db.table2 b ON a.col1 = b.col3
WHERE condition2;
udf用户自定义函数
根据用户需求,自定义写函数处理数据
使用:用Java代码实现udf
step1:添加hive相关依赖
step2:编写udf类:继承UDF,并实现evalute方法
step3:项目编译,打jar包
step4:在hive中注册udf函数,并使用
package com.example;
import org.apache.hadoop.hive.ql.exec.UDF;
public class MyUDF extends UDF {
public String evaluate(String input) {
if (input == null) return null;
// 在这里编写你的函数逻辑,例如把输入的字符串转成大写
return input.toUpperCase();
}
}
3-Hadoop
分布式系统基础架构(主要是分布式存储和计算框架)
1)架构
- 存储:hdfs(hadoop distributed file system,分布式文件系统)
- 计算:mapReduce。
将计算任务拆成map和reduce两个阶段,实际分成Map、Sort、Combine、Shuffle 以及Reduce 5个步骤
简单说一下图解:
dear没有分过来,可以暂时不用管
input阶段,讲这些单词分过来,转换成map的(key,value)格式,然后做一个shuffle,之后在reduce阶段,把相同的key做一个合并,最后输出结果。 MapReduce的代码实现以及详细的原理,需要去学一学
资源调度管理:yarn(cpu和memory)
2-Hadoop常用命令
Hadoop提供了shell命令来与hdfs交互:
hadoop fs -ls <path>:列出指定 HDFS 路径下的文件和目录
hadoop fs -mkdir <path>:在 HDFS 中创建新目录,类似于 Unix 的 mkdir 命令。
hadoop fs -put <localsrc> <dst>:将本地文件(或目录)复制到 HDFS,这与 FTP 命令 put 类似。
hadoop fs -get <src> <localdst>:将 HDFS 上的文件(或目录)复制到本地,这与 FTP 命令 get 类似。
hadoop fs -mv <src> <dst>:移动 HDFS 中的文件或目录,相当于 Unix 的 mv 命令。
hadoop fs -cp <src> <dst>:复制 HDFS 中的文件或目录,相当于 Unix 的 cp 命令。
hadoop fs -rm <path>:删除 HDFS 中的文件,类似于 Unix 的 rm 命令。
hadoop fs -cat <path>:在控制台显示 HDFS 文件的内容,类似于 Unix 的 cat 命令。
hadoop fs -du <path>:显示 HDFS 文件或目录的大小,类似于 Unix 的 du 命令。
hadoop fs -df <path>:显示 HDFS 的可用空间,类似于 Unix 的 df 命令
4-spark
1)基本概念
架构:
使用成百上千台机器 并行处理 存储在离线表中的海量数据。
海量数据的任务,拆成小任务,并发到每一台机器上处理。
计算引擎:
支持离线(批)处理、流式
部署:
可以继承到hadoop yarn上运行,也可以单独部署(目前只用过前者)
支持spark sql:
变体的hive sql
性能比较:
计算比mapReduce快
需要去了解一下spark的原理,
以及spark来做计算的时候,为什么比MapReduce快
2)spark SQL的基本语法
Python的pyspark的使用
1. 启动一个spark client
2. 查出你想要的数据
3. 编写处理数据的逻辑并执行任务
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf() #配置参数的接口 -核心1
conf.setMaster('yarn')
conf.setAppName('hive_sql_query')
#具体用的话,需要再了解一下相关的参数
spark = SparkSession \ #传入 生成dataframe 去进行SQL查询 核心2
.builder \
.config(conf=conf) \
.enableHiveSupport() \
.config("", "") \
.config("", "") \
.getOrCreate()
# 查询语句
df = spark.sql("select field1 AS f1, field2 as f2 from table1")
3)spark参数
1-master和worker
定义:物理节点
master节点常驻master守护进程,负责管理worker节点,我们从master节点提交应用。
worker节点常驻worker守护进程,与master节点通信,并且管理executor进程。
【Spark中master、worker、executor和driver的关系】
首先说一句,master和worker是物理节点,driver和executor是进程。
master和worker节点
搭建spark集群的时候我们就已经设置好了master节点和worker节点,一个集群有多个master节点和多个worker节点。
master节点常驻master守护进程,负责管理worker节点,我们从master节点提交应用。
worker节点常驻worker守护进程,与master节点通信,并且管理executor进程。
PS:一台机器可以同时作为master和worker节点(举个例子:你有四台机器,你可以选择一台设置为master节点,然后剩下三台设为worker节点,也可以把四台都设为worker节点,这种情况下,有一个机器既是master节点又是worker节点)
driver和executor进程
driver进程就是应用的main()函数并且构建sparkContext对象,当我们提交了应用之后,便会启动一个对应的driver进程,driver本身会根据我们设置的参数占有一定的资源(主要指cpu core和memory)。下面说一说driver和executor会做哪些事。
driver可以运行在master上,也可以运行worker上(根据部署模式的不同)。driver首先会向集群管理者(standalone、yarn,mesos)申请spark应用所需的资源,也就是executor,然后集群管理者会根据spark应用所设置的参数在各个worker上分配一定数量的executor,每个executor都占用一定数量的cpu和memory。在申请到应用所需的资源以后,driver就开始调度和执行我们编写的应用代码了。driver进程会将我们编写的spark应用代码拆分成多个stage,每个stage执行一部分代码片段,并为每个stage创建一批tasks,然后将这些tasks分配到各个executor中执行。
executor进程宿主在worker节点上,一个worker可以有多个executor。每个executor持有一个线程池,每个线程可以执行一个task,executor执行完task以后将结果返回给driver,每个executor执行的task都属于同一个应用。此外executor还有一个功能就是为应用程序中要求缓存的 RDD 提供内存式存储,RDD 是直接缓存在executor进程内的,因此任务可以在运行时充分利用缓存数据加速运算。
例子:
只做概念层面的理解就行
2-driver和executor
后续去给hiveSQL,sparkSQL去调用参数的时候,一些很重要的内存参数都需要用这个
driver:可运行在master/worker上;
作用:
1. main函数,构建SparkContext对象;
2. 向集群申请spark所需要资源,也就是executor,然后集群管理者会根据spark应用所设置的参数在各个worker上分配一定数量的executor,每个executor都占用一定数量的cpu和memory;
3. 然后开始调度、执行应用代码
executor:可运行在worker上(1:n)
作用:每个executor持有一个线程池,每个线程可以执行一个task,executor执行完task以后将结果返回给driver。
3-常用参数
分配资源
内存参数
--每个execotr分配几个核心
set spark.executor.cores=4;
--executor内存
set spark.executor.memory= 20g;
--executor堆外内存
set spark.executor.memoryOverhead = 10g;
--driver内存
set spark.driver.memory=25g;
--driver堆外内存
set spark.driver.memoryOverhead = 10g;
--任务推测机制相关
--开启推测机制(处理慢任务,尝试在其他节点去重启这些任务;加速整个应用的执行)
set spark.speculation=true;
-- 设置阈值
spark.speculation.multiplier=1.8
-- 设置分位数
spark.speculation..quantile=0.7
--坏节点黑明单(处理可能导致任务频繁失败的故障节点,当一个节点被列入黑名单后,Spark 调度器将不再在这个节点上调度任务,从而提高了作业的成功率和整体性能)
set spark.blacklist.enabled = true
调整shuffle,数据倾斜,内存OOM,超时相关的,根据具体任务使用具体参数
学hive和spark的时候,从简单的SQL学起,简单的SQL的话,可能不需要加一些参数,但当数据量很大的时候,就需要加一些这样的参数,去优化任务,否则任务执行起来非常的慢/或者失败之类的(因为内存不够)
5-在线处理(流式处理)
暂时只列出一种计算框架
flink
计算模式(流批一体)
批计算:统一收集数据,存储到数据库中,然后对数据进行批量处理(离线计算)
流计算:对数据流进行实时处理(实时计算)
架构
三层架构:
-
API & Libraries层:提供了支持流计算DataStream API和和批计算DataSet API的接口
还有提供基于流处理的CEP(复杂事件处理库)、SQL&Table库和基于批处理的FlinkML(机器学习库)、Gelly(图处理库)等
-
Runtime核心层:负责对上层不同接口提供基础服务,也是Flink分布式计算框架的核心实现层,支持分布式Stream作业的执行、JobGraph到ExecutionGraph的映射转换、任务调度等。
-
物理部署层:该层主要涉及Flink的部署模式,目前Flink支持多种部署模式:本地、集群(Standalone/YARN)
图片引用:https://blog.****.net/qq_35423154/article/details/113775546
运行时组件(任务调度)
4个组件(application url)
-
JobManager(任务管理器)
-
TaskManager(作业管理器)
-
ResourceManger(资源管理器)
-
Dispatcher(分发器)
Flink sql基本语法
参考:
参考:flink中文文档
定义一个source,数据从哪里输入;定义一个sink,数据输出到哪里
create table data_source (
`user_id` string,
`data` string) with (
'connector' = 'rocketmq',//也就是其他的数据来源
'scan.startup-mode' = 'latest', //数据的一个消费格式 最新/最早
'cluster' = '',
'topic' = '',
'group' = '',
'format' ='json',
'parallelism' = ‘9’
);
create table sink(
user_id varchar,
vv_cnt varchar,
`start_window` TIMESTAMP,
`end_window` TIMESTAMP
) with (
'connector' = 'kafka-0.10',
'scan.startup.mode' = 'latest-offset',
'properties.cluster' = '',
'topic' = '',
'parallelism' = '5',
'format' ='json'
);
INSERT INTO sink
SELECT
xx
FROM feature_data_source
where xx
GROUP BY xx;
窗口计算
-
滚动窗口
-
滑动窗口
udf用户自定义函数
一些复杂的逻辑用户可以自定义处理
scalar function:输入,输出为标量
aggregation function:多行数据聚合成一行
table function:
我没用过,不了解。
6-数仓知识
数仓分层
将数据有序组织和存储起来,常见的数仓层级划分:
-
ODS层(数据引用层,operational data store):存放未经处理的原始数据,是数仓的数据准备区。
-
CDM层(数据公共层,common data model):完成数据加工与整合,是数仓最核心最关键的一层。
-
DIM(维度层):以维度作为建模驱动
-
DWD层(数据明细层):以业务过程作为建模驱动
-
常见的业务域:产商品域(产品product,商品goods,配置报价quote,合同contract),交易计费域(订单order,计量measure,计费billing,履约perform,票税invoice)
-
-
DWS层(数据汇总层):以业务主题作为建模驱动,根据DWD层数据,以各维度ID进行粗粒度汇总
-
-
ADS层(数据应用层,application data service):保存结果数据,为外部系统提供查询接口
其他分层结构
引用:知乎https://zhuanlan.zhihu.com/p/421081877
表命名规范
层名 |
命名规范 |
举例 |
dwd |
{层缩写}_{业务板块}_{数据域缩写}_{自定义表命名标签缩写}_{加载方式,天级/小时级,增量/全量} |
dwd_sale_trd_item_di 交易信息事实增量表 |
dws |
{层缩写}_{业务板块}_{数据域缩写}_{自定义表命名标签缩写}_{统计时间周期范围缩写} |
dwd_sale_trd_item_1h 交易信息事实1h增量表 |
总结
这部分学习,是边用边学,遇到问题就针对性地解决这个问题。这个过程学的很零散,需要定期将这些点连成线。
总结的不全,希望懂的朋友指点我一下。谢谢了
推荐阅读
-
谈论大数据--后端开发中经常涉及的大数据开发技术栈
-
活动预告|NineData 创始人兼 CEO 叶春生将参加 QCon 全球软件开发大会,讲述人工智能大模型技术在数据库 DevOps 中的实践。
-
反传销网8月30日发布:视频区块链里的骗子,币里的韭菜,杜子建骂人了!金融大V周召说区块链!——“一小帮骗子玩一大帮小白,被割韭菜,小白还轮流被割,割的就是你!” 什么区块链,统统是骗子 作者:周召(知乎金融领域大V,毕业于上海财经大学,目前任职上海某股权投资基金合伙人) 有人问我,区块链现在这么火,到底是不是骗局? 我的回答是: 是骗局。而且我并不是说数字货币是骗局,而是说所有搞区块链的都是骗局。 -01- 区块链是一种鸡肋技术 人类社会任何技术的发明应用,本质都是为了提高社会的生产效率。而所谓区块链技术本质不过是几种早已成熟的技术的大杂烩,冗余且十分低效,除了提高了洗钱和诈骗的效率以外,对人类社会的进步毫无贡献。 真正意义上的区块链得包含三个要素:分布式系统(包括记账和存储),无法篡改的数据结构,以及共识算法,三者互为基础和因果,就像三体世界一样。看上去挺让人不明觉厉的,而经过几年的瞎折腾,稍微懂点区块链的碰了几次壁后都已经渐渐明白区块链其实并没有什么卵用,区块链技术已经名存实亡,沦为了营销工具和传销组织的画皮。 因为符合上述定义的、以比特币为代表的原教旨区块链技术,是反效率的,从经济学角度来说,不但不是一种帕累托改进,甚至还可以说是一种帕累托倒退。 原教旨区块链技术的效率十分低下,因为要遍历所有节点,只能做非常轻量级的数据应用,一旦涉及到大量的数据传输与更新,区块链就瞎了。 一方面整条链交易速度会极慢,另一方面数据库容量极速膨胀,考虑到人手一份的存储机制,区块链其实是对存储资源和能源的一种极大的浪费。 这里还没有加上为了取得所谓的共识和挖矿消耗的巨大的能源,如果说区块链技术是屎,那么这波区块链投机浪潮可谓人类历史上最大规模的搅屎运动。 区块链也验证不了任何东西。 所谓的智能合约,即不智能,也非合约。我看有人还说,如果有了智能合约,就可以跟老板签一份放区块链上,如果明年销售业绩提升30%,就加薪10%,由于区块链不能篡改,不能抵赖,所以老板必须得执行,说得有板有眼,不懂行的愣一看,好像还真是那么回事。 但仔细一想,问题就来了。首先,在区块链上如何证明你真的达到了30%业绩提升?即便真的达到老板耍赖如何执行? 也就是说,如果区块链真这么厉害,要法院和仲裁干什么。 人类社会真正的符合成本效益原则的是代理制度。之前有人说要用区块链改造注册会计师行业,我不知道他准备怎么设计,我猜想他思路大概是这样的,首先肯定搞去中心化,让所有会计师到链上来,然后一个新人要成为注册会计师就要所有会计师同意并记录在链上。 那我就请问了,我每天上班累死累活,为什么还要花时间去验证一个跟我无关的的人的专业能力?最优做法当然是组织一个委员会,让专门的人来负责,这不就是现在注册会师协会干的事儿吗?区块链的逻辑相当于什么事情都要拿出来公投,这个绝对是扯淡的。 当然这么说都有点抬举区块链了,区块链技术本身根本没有判断是非能力,如果这么高级的人工智能,靠一个无脑分布式记账就能实现的话,我们早就进入共产主义社会了。 虽然EOS等数字货币采用了超级节点,通过再中心化的方式提高效率,有点行业协会的意思,是对区块链原教旨主义的一种修正,但是依然无法突破区块链技术最本质的局限性。有人说,私有链和联盟链是区块链技术的未来,也是扯淡,因为区块链技术没有未来。如果有,说明他是包装成区块链的伪区块链技术。 区块链所涉及的所有底层技术,不管是分布式数据库技术,加密技术,还是点对点传输技术等,基本都是早已存在没什么秘密可言的技术。 比特币系统最重要的特性是封闭性和自洽性,他验证不了任何系统自身以外产生的信息的真实性。 所谓系统自身产生的信息,就是数据库数据的变动信息,有价值的基本上有且只有交易信息。所以说比特币最初不过是中本聪一种炫技的产物,来证明自己对几种技术的掌握,你看我多牛逼,设计出了一个像三体一样的系统。因此,数字货币很有可能是区块链从始至终唯一的杀手应用。 比特币和区块链概念从诞生到今天已经快10年了,很多人说区块链技术在爆发的前夜,但这个前夜好像是不是有点过长了啊朋友,跟三体里的长夜有一拼啊。都说区块链技术像是90年代初的互联网,可是90年代初的互联网在十年发展后,已经出现了一大批伟大的公司,阿里巴巴在99年都成立了,区块链怎么除了币还是币呢? 正规的数字货币未来发展的形式无外乎几种,要么就是论坛币形式,或者类似股票的权益凭证等。问题是论坛币和股票之前,本来也都电子化了,区块链来了到底改变了什么呢? 所有想把TOKEN和应用场景结合起来的人最后都很痛苦,最后他们会发现区块链技术就是脱裤子放屁,自己辛苦搞半天,干嘛不自己作为中心关心门来收钱?最后这些人都产生了价值的虚无感,最终精神崩溃,只能发币疯狂收割韭菜,一边嘴里还说着我是个好人之类的奇怪的话。 因此,之前币圈链圈还泾渭分明,互相瞧不起,但这两年链圈逐渐坐不住了,想着是不是趁着泡沫没彻底破灭之前赶快收割一波,不然可能什么都捞不着了。 前段时间和一个名校毕业的链圈朋友瞎聊天,他说他们“致力于用区块链技术解决数字版权保护问题”,我就问他一个问题,你们如何保证你链的版权所有权声明是真实的,万一盗版者抢先一步把数据放在链上怎么办。他说他们的解决方案是连入国家数字版权保护中心的数据库进行验证…… 所以说区块链技术就是个鸡肋,研究到最后都会落入效率与真实性的黑洞,很多人一头扎进链圈后才发现,真正意义上的区块链技术,其实什么都干不了。 -02- 不是蠢就是坏的区块链媒体 空气币和区块链的造富神话,让区块链自媒体也开始迎风乱扭。一群群根本不知道区块链为何物的妖魔鬼怪纷纷进驻区块链自媒体战场,开始大放厥词胡编乱造。 任何东西,但凡只要和区块,链,分,分布式,记账,加密,验证,可追溯等等这些个关键词沾到哪怕一点点,这些所谓的区块链媒体人就会像狗闻到了屎了一样疯狂地把区块链概念往上套。 这让我想起曾经一度也是热闹非凡的物联网,我曾经去看过江苏一家号称要改变世界的“物联网”企业,过去一看是生产路由器的,我黑人问号脸,对方解释说没有路由器万物怎么互联,我觉得他说得好有道理,竟无言以对。 好,下面让我们进入奇葩共赏析时间,来看看区城链媒体经常有哪些危言耸听的奇谈怪论 区块链(分布式记账)的典型应用是*?? 正如前面所说,真正意义上的区块链分布式记账,不光包括“记”这个动作,还包括分布式存储和共识机制等。而*诞生远远早于区块链这个词的出现,勉强算是“分布式编辑”吧,就被很多区块链媒体拿来强行充当区块链技术应用的典范。 其实事实恰恰相反,*恰恰是去中心化失败的典范,现在如果没有精英和专业人士的编辑和维护,*早就没法看了。 区块链会促进社会分工?? 罗振宇好像就说过类似的话,虽然罗振宇说过很多没有逻辑的话,但这句话绝对是最没逻辑思维的。很多区块链自媒体也常常用这句话来忽悠老百姓,说分工代表效率提高社会进步,而区块链“无疑”会促进分工,他们的理由仅仅是分工和分布式记账都共用一个“分”字,就强行把他们扯到一起。 实际情况恰恰相反,区块链是逆分工的,区块链精神是号召所有人积极地参与到他不擅长也不想掺合的事情里面去。 区块链不能像上帝一样许诺他的子民死后上天国,只能给他们许诺你们是六度人脉中的第一级,我可以赚后面五级人的钱,你处于金字塔的顶端。