您如何理解 TCP 协议的无限制性和粘性数据包?
最编程
2024-03-22 20:07:03
...
更新记录
时间 | 版本修改 |
---|---|
2020年4月2日 | 初稿 |
- 我们从经典的计算机科学丛书上阅到的知识,都说:
TCP协议是没有消息边界的
。但是这个要怎么理解呢?在我没有接触底层的套接字相关逻辑时。我对此也没有特别的了解。直到阅读了套接字的相关逻辑源码,才对此有了一定的了解 - TCP的发包和我们业务层所发出的协议数据是不一定吻合的。也就是说,我们发的数据库可能会被分拆成不同的包。然后再和别的协议(这里当然是只发往同一个端口)的数据封装同一个TCP包体。
- 因此。对于我们业务网络层而言,我们需要在一个TCP包体里面区分出不同的实际业务包。
- 目前业界常用的做法有三个,可参考TCP消息边界处理
- 但是最常使用的是第二种做法。在我们发送的协议数据里面。协议头带上协议包体的长度,特定的协议号。以及特殊的用于标。协议头的数据。这些协议数据都会统一。作为TCP协议的包体数据。在网络上进行传输
- 第一种方案和第三种方案的缺点都比较大。
- 第二种方案,有一些好处。
- 我们的应用程序中,app和server的数据交互是非常多,往往需要不同的协议号(也叫做命令字)去区分不同的业务场景,比如某几条协议负责登录,某条协议负责用户的个人资料等等。通过在业务协议头上,填充协议号,那么在客户端收到TCP回包,解析了协议头时,就可以往不同的业务上层抛出通知,处理起来就非常方便,水到渠成。
- 其次,协议头里面,还可以塞入其他有需要塞入的数据。比如,客户端版本,登录的用户ID,客户端使用的语言类型等,总之,使得我们自定义的协议头的长度是固定的即可。
- 一个典型的协议头设计如下:
字段 | 意义 |
---|---|
包头标识_uint8[2] | 为固定字符“XX”(可用于识别是否是本app的包头) |
协议版本号_uint8 | 当前版本号 |
ClientType_uint8 | 客户端类型(PC,安卓等) |
ClientVersion_uint16 | Client版本 |
VersionType_uint8 | Client版本类型(区分简繁体等) |
UserID_uint32 | 登录用户ID(用户ID) |
包类型标志_uint8 | (应答or推送) |
SerialNo_uint32 | 命令序列号,每发送一个命令后加1 |
CMD_uint16 | 协议号(区分上层业务) |
BodyLength_uint32 | 协议包体长度(本文重点) |
Reserved | 任意Byte保留字节(保留当然不能太长罗) |
粘包
- 介绍上了上述方案的选择,就要面对这个方案面临的一个问题
- 我们之前说的,我们发送的数据有三个包: [1,2,3] [4,5,6] [7,8,9,10],但是底层的TCP协议发出去的时候不一定是 [1,2,3] [4,5,6] [7,8,9,10]。有可能是 [1,2,3,4] [,5,6,7] [8,9,10]等,随机的一种组合。
- 因此,我们的app应用层就需要去识别这些数据,正确地解成我们自定义的协议数据。
- 下面结合实际代码,来演示从套接字中接收数据的整个过程。
void CMyWinTCPSocket::OnReceive(int nErrorCode)
{
static unsigned int nHeaderLen = sizeof(PROTOCOL_HEADER); //自定义协议,固定长度的头部
m_nLastErrorCode = nErrorCode; //记录错误码
if (nErrorCode != 0)
{
//错误码不为0,此处需要打印日志记录
}
//记录本次套接字被激活的时间
m_uSocketActiveTime = ::GetTickCount();
DWORD nBytes = 0;
if (!IOCtl(FIONREAD, &nBytes) || nBytes == 0) //FIONREAD返回套接字上排队的第一个数据报大小
{
m_nLastErrorCode = WSAGetLastError();
//打印该错误码,由于读取套接字上的数据失败,直接返回。
return;
}
//此时套接字中可获取的数据有nBytes个字节
//开始读取数据
char *pReceiveBuffer = new char[nBytes]; //有多少读多少,一次性读完
int nRead = CAsyncSocketEx::Receive(pReceiveBuffer, nBytes); //nRead是实际读取到的数据
if (nRead <= 0) //出现异常,需要退出
{
//释放new出来的char数组
delete_array(pReceiveBuffer);
return;
}
int nCurrentOffset = 0; //记录读取本次的套接字数据的offset(偏移量)
int nLeftSize = nRead; //本次套接字返回的数据,剩下的未读取的字节数
char *pOffsetBuffer = pReceiveBuffer;
std::vector<tagRecvPack> vecRecvPacks; //tagRecvPack表示一个收到的应用层的包,从成员变量offset来判断当前获取的字节数
do
{
//注意:如果上次调用onReceive时,仍然存有数据(即不完整的包),此时就不会重新解析头部。(也就是,所谓的粘包操作)
if (m_RecvPack.pHeadBuffer == NULL) //从头开始读取头部(一个新的包)
{
m_RecvPack.pHeadBuffer = new char[nHeaderLen]; //记录自定义协议头的数据
ZeroMemory(m_RecvPack.pHeadBuffer, nHeaderLen);
m_RecvPack.uHeadOffset = 0;
m_RecvPack.uHeadTotal = nHeaderLen; //记录包头的长度
//保护逻辑
MF_Delete1D(m_RecvPack.pBodyBuffer); //记录包体的实际数据
m_RecvPack.pBodyBuffer = NULL;
m_RecvPack.uBodyOffset = 0;
}
//判断上次调用onReceive的数据是否已经读取完头部,
if (m_RecvPack.uHeadOffset < nHeaderLen)
{
//读取头部,有两种情况:
//a. 上次的onReceive没有读取完的(也就是要把上次onReceive收到的数据,和这次收到的数据粘起来,搞成一个新的包传给上层)
//b. 本次读取套接字buffer时,新的一个包,重新解析头部的情况。
//---------- 1. 读数据,把这个包的头部读取完---------------------//
char *p = m_RecvPack.pHeadBuffer + m_RecvPack.uHeadOffset;
int len = std::min<int>((nHeaderLen - m_RecvPack.uHeadOffset), nLeftSize);
memcpy(p, pOffsetBuffer, len);
nCurrentOffset += len;
pOffsetBuffer = pReceiveBuffer + nCurrentOffset;
nLeftSize -= len;
m_RecvPack.uHeadOffset += len;
//---------- 1. 读数据,把这个包的头部读取完---------------------//
if (m_RecvPack.uHeadOffset == nHeaderLen)
{
//-----------2. 头部读完,开始做准备或者容错之类的工作-----------//
assert(m_RecvPack.pBodyBuffer != NULL);
PROTOCOL_HEADER *pHeader = (PROTOCOL_HEADER *)m_RecvPack.pHeadBuffer;
int nBodyLength = ntohl(pHeader->dwBodyLength); //这就是传说中的,协议头上带上包体数据的长度
//版本号,协议号等其他字段(可根据业务自行扩展,但后续不允许改动,否则老版本不兼容)
BYTE chVersion = pHeader->chVersion;
WORD wClientVersion = ntohs(pHeader->wClientVersion);
WORD wCmdID = ntohs(pHeader->wCmdID);
DWORD dwSerialNO = ntohl(pHeader->dwSerialNO);
//简单的包校验
if (chVersion != TCP_PROTOCOL_VERSION ||
wClientVersion != m_wClientVersion ||
pHeader->chMagicCode[0] != TCP_PROTOCOL_MAGIC_CODE1 ||
pHeader->chMagicCode[1] != TCP_PROTOCOL_MAGIC_CODE2)
{
//处理包校验错误的情况
Close(false); //出错,直接关闭socket
break;
}
m_RecvPack.uBodyTotal = nBodyLength;
if (m_RecvPack.uBodyTotal == 0) //一个空包,以前的逻辑是直接抛弃的,现在空包也要
{
//处理空包的情况
}
//包体过大,应该是数据错乱了,剩下的包已经不知道怎么解析了,只能断开
else if (m_RecvPack.uBodyTotal > TCP_PROTOCOL_PACKET_MAX_LENGTH)
{
//处理出错的情况
m_RecvPack.reset();
Close(false); //直接关闭socket算了,要不后面包也是乱的了
break;
}
//-----------2. 头部读完,开始做准备或者容错之类的工作-----------//
//-----------3. 创建好干净的包体,用以存储包体数据-----------//
if (m_RecvPack.uBodyTotal > 0)
{
MF_Delete1D(m_RecvPack.pBodyBuffer);
m_RecvPack.pBodyBuffer = new char[m_RecvPack.uBodyTotal];
ZeroMemory(m_RecvPack.pBodyBuffer, m_RecvPack.uBodyTotal);
m_RecvPack.uBodyOffset = 0;
}
//-----------3. 创建好干净的包体,用以存储包体数据-----------//
}
}
//开始读取包体
else if (m_RecvPack.pHeadBuffer && m_RecvPack.uHeadOffset == nHeaderLen)
{
//---4.根据协议头带上的包体长度,直接解析包体,如果套接字buffer不够长,要先存起来,等待下次onReceive调用时再粘包---//
if (m_RecvPack.uBodyTotal > 0 && m_RecvPack.uBodyOffset < m_RecvPack.uBodyTotal)
{
char *p = m_RecvPack.pBodyBuffer + m_RecvPack.uBodyOffset;
int len = std::max<int>(0, (std::min<int>((m_RecvPack.uBodyTotal - m_RecvPack.uBodyOffset), nLeftSize)));
assert(len != 0);
memcpy(p, pOffsetBuffer, len);
nCurrentOffset += len;
pOffsetBuffer = pReceiveBuffer + nCurrentOffset;
nLeftSize -= len;
m_RecvPack.uBodyOffset += len;
}
if (m_RecvPack.uBodyOffset == m_RecvPack.uBodyTotal && m_RecvPack.pBodyBuffer != NULL) //数据读取完成
{
//本次套接字buffer解析出了一个包,用容器记录下来,后续一起抛给上层
vecRecvPacks.push_back(m_RecvPack);
//这个包解析晚了,清空这个成员变量,用以解析下一个包
m_RecvPack.reset();
}
//---4.根据协议头带上的包体长度,直接解析包体,如果套接字buffer不够长,要先存起来,等待下次onReceive调用时再粘包---//
}
else
{
assert;
}
} while (nLeftSize > 0);
//还回数据
MF_Delete1D(pReceiveBuffer);
//本次解析出来的包,每一个依次往上层抛出回调
auto uConnectOrderSession = m_uConnectOrderSession;
for (auto it : vecRecvPacks)
{
if (it.pHeadBuffer == NULL)
{
continue;
}
if (uConnectOrderSession != m_uConnectOrderSession)
{
//出错了
OnErrorPack(it);
assert;
break;
}
if (it.uBodyTotal == 0)
{
//收到了空包
OnRecvEmptyPack(it);
}
else
{
//收到了一个完整的包,通知对应的业务上层
OnRecvPack(it);
}
}
}
void CMyWinTCPSocket::OnReceive(int nErrorCode)
{
static unsigned int nHeaderLen = sizeof(PROTOCOL_HEADER); //自定义协议,固定长度的头部
m_nLastErrorCode = nErrorCode; //记录错误码
if (nErrorCode != 0)
{
//错误码不为0,此处需要打印日志记录
}
//记录本次套接字被激活的时间
m_uSocketActiveTime = ::GetTickCount();
DWORD nBytes = 0;
if (!IOCtl(FIONREAD, &nBytes) || nBytes == 0) //FIONREAD返回套接字上排队的第一个数据报大小
{
m_nLastErrorCode = WSAGetLastError();
//打印该错误码,由于读取套接字上的数据失败,直接返回。
return;
}
//此时套接字中可获取的数据有nBytes个字节
//开始读取数据
char *pReceiveBuffer = new char[nBytes]; //有多少读多少,一次性读完
int nRead = CAsyncSocketEx::Receive(pReceiveBuffer, nBytes); //nRead是实际读取到的数据
if (nRead <= 0) //出现异常,需要退出
{
//释放new出来的char数组
delete_array(pReceiveBuffer);
return;
}
int nCurrentOffset = 0; //记录读取本次的套接字数据的offset(偏移量)
int nLeftSize = nRead; //本次套接字返回的数据,剩下的未读取的字节数
char *pOffsetBuffer = pReceiveBuffer;
std::vector<tagRecvPack> vecRecvPacks; //tagRecvPack表示一个收到的应用层的包,从成员变量offset来判断当前获取的字节数
do
{
//注意:如果上次调用onReceive时,仍然存有数据(即不完整的包),此时就不会重新解析头部。(也就是,所谓的粘包操作)
if (m_RecvPack.pHeadBuffer == NULL) //从头开始读取头部(一个新的包)
{
m_RecvPack.pHeadBuffer = new char[nHeaderLen]; //记录自定义协议头的数据
ZeroMemory(m_RecvPack.pHeadBuffer, nHeaderLen);
m_RecvPack.uHeadOffset = 0;
m_RecvPack.uHeadTotal = nHeaderLen; //记录包头的长度
//保护逻辑
MF_Delete1D(m_RecvPack.pBodyBuffer); //记录包体的实际数据
m_RecvPack.pBodyBuffer = NULL;
m_RecvPack.uBodyOffset = 0;
}
//判断上次调用onReceive的数据是否已经读取完头部,
if (m_RecvPack.uHeadOffset < nHeaderLen)
{
//读取头部,有两种情况:
//a. 上次的onReceive没有读取完的(也就是要把上次onReceive收到的数据,和这次收到的数据粘起来,搞成一个新的包传给上层)
//b. 本次读取套接字buffer时,新的一个包,重新解析头部的情况。
//---------- 1. 读数据,把这个包的头部读取完---------------------//
char *p = m_RecvPack.pHeadBuffer + m_RecvPack.uHeadOffset;
int len = std::min<int>((nHeaderLen - m_RecvPack.uHeadOffset), nLeftSize);
memcpy(p, pOffsetBuffer, len);
nCurrentOffset += len;
pOffsetBuffer = pReceiveBuffer + nCurrentOffset;
nLeftSize -= len;
m_RecvPack.uHeadOffset += len;
//---------- 1. 读数据,把这个包的头部读取完---------------------//
if (m_RecvPack.uHeadOffset == nHeaderLen)
{
//-----------2. 头部读完,开始做准备或者容错之类的工作-----------//
assert(m_RecvPack.pBodyBuffer != NULL);
PROTOCOL_HEADER *pHeader = (PROTOCOL_HEADER *)m_RecvPack.pHeadBuffer;
int nBodyLength = ntohl(pHeader->dwBodyLength); //这就是传说中的,协议头上带上包体数据的长度
//版本号,协议号等其他字段(可根据业务自行扩展,但后续不允许改动,否则老版本不兼容)
BYTE chVersion = pHeader->chVersion;
WORD wClientVersion = ntohs(pHeader->wClientVersion);
WORD wCmdID = ntohs(pHeader->wCmdID);
DWORD dwSerialNO = ntohl(pHeader->dwSerialNO);
//简单的包校验
if (chVersion != TCP_PROTOCOL_VERSION ||
wClientVersion != m_wClientVersion ||
pHeader->chMagicCode[0] != TCP_PROTOCOL_MAGIC_CODE1 ||
pHeader->chMagicCode[1] != TCP_PROTOCOL_MAGIC_CODE2)
{
//处理包校验错误的情况
Close(false); //出错,直接关闭socket
break;
}
m_RecvPack.uBodyTotal = nBodyLength;
if (m_RecvPack.uBodyTotal == 0) //一个空包,以前的逻辑是直接抛弃的,现在空包也要
{
//处理空包的情况
}
//包体过大,应该是数据错乱了,剩下的包已经不知道怎么解析了,只能断开
else if (m_RecvPack.uBodyTotal > TCP_PROTOCOL_PACKET_MAX_LENGTH)
{
//处理出错的情况
m_RecvPack.reset();
Close(false); //直接关闭socket算了,要不后面包也是乱的了
break;
}
//-----------2. 头部读完,开始做准备或者容错之类的工作-----------//
//-----------3. 创建好干净的包体,用以存储包体数据-----------//
if (m_RecvPack.uBodyTotal > 0)
{
MF_Delete1D(m_RecvPack.pBodyBuffer);
m_RecvPack.pBodyBuffer = new char[m_RecvPack.uBodyTotal];
ZeroMemory(m_RecvPack.pBodyBuffer, m_RecvPack.uBodyTotal);
m_RecvPack.uBodyOffset = 0;
}
//-----------3. 创建好干净的包体,用以存储包体数据-----------//
}
}
//开始读取包体
else if (m_RecvPack.pHeadBuffer && m_RecvPack.uHeadOffset == nHeaderLen)
{
//---4.根据协议头带上的包体长度,直接解析包体,如果套接字buffer不够长,要先存起来,等待下次onReceive调用时再粘包---//
if (m_RecvPack.uBodyTotal > 0 && m_RecvPack.uBodyOffset < m_RecvPack.uBodyTotal)
{
char *p = m_RecvPack.pBodyBuffer + m_RecvPack.uBodyOffset;
int len = std::max<int>(0, (std::min<int>((m_RecvPack.uBodyTotal - m_RecvPack.uBodyOffset), nLeftSize)));
assert(len != 0);
memcpy(p, pOffsetBuffer, len);
nCurrentOffset += len;
pOffsetBuffer = pReceiveBuffer + nCurrentOffset;
nLeftSize -= len;
m_RecvPack.uBodyOffset += len;
}
if (m_RecvPack.uBodyOffset == m_RecvPack.uBodyTotal && m_RecvPack.pBodyBuffer != NULL) //数据读取完成
{
//本次套接字buffer解析出了一个包,用容器记录下来,后续一起抛给上层
vecRecvPacks.push_back(m_RecvPack);
//这个包解析晚了,清空这个成员变量,用以解析下一个包
m_RecvPack.reset();
}
//---4.根据协议头带上的包体长度,直接解析包体,如果套接字buffer不够长,要先存起来,等待下次onReceive调用时再粘包---//
}
else
{
assert;
}
} while (nLeftSize > 0);
//还回数据
MF_Delete1D(pReceiveBuffer);
//本次解析出来的包,每一个依次往上层抛出回调
auto uConnectOrderSession = m_uConnectOrderSession;
for (auto it : vecRecvPacks)
{
if (it.pHeadBuffer == NULL)
{
continue;
}
if (uConnectOrderSession != m_uConnectOrderSession)
{
//出错了
OnErrorPack(it);
assert;
break;
}
if (it.uBodyTotal == 0)
{
//收到了空包
OnRecvEmptyPack(it);
}
else
{
//收到了一个完整的包,通知对应的业务上层
OnRecvPack(it);
}
}
}
上一篇: 流式处理平台 Flink 简介
下一篇: 关于 HTTPS 访问缓慢的案例研究
推荐阅读
-
您如何理解 TCP 协议的无限制性和粘性数据包?
-
对话NGC蔡岩:从机制创新到价值沉淀,解析DeFi产品开发逻辑 |链捕手 - 真正的DeFi产品首先要有足够的安全性和稳定性,如果能在此基础上有一些功能创新,那就非常好了。像 Uniswap 这样逐渐成为 DeFi 基础架构的产品,可遇而不可求。 链式捕手:固定利率协议之前关注度比较高,但观察下来发现,大部分协议还是类似于传统金融CDO(抵押债务凭证)的玩法,风险系数很高,您如何理解这块业务的价值和风险? 蔡岩:确实有些定息协议类似CDO玩法,背后绑定一个债券,但并不是所有的定息协议都是这样的玩法,像这种CDO玩法的主要代表项目是88mph,背后绑定的是Aave、Compoud这样的借贷协议,在此基础上做定息和浮息债券;像APWine,背后同样是Aave,它会发行期货收益代币来锁定你的收益;Notional本身是做借贷市场的,在此基础上做定息协议。 非 CDO 的玩法,比如 Horizon,更像是一个利率撮合器,背后需要用户通过拍卖产生更合适的目标收益率;像 Saffron、BarnBridge 等是通过风险分级来定义不同的收益率。总的来说,创新还是挺多的。 价值层面是创新和想象力,因为在传统金融领域,比如银行做固定收益证券,或者评级机构给风险分级,这些业务都非常大,利润也很丰厚。而 DeFi 的对口业务给了类似业务很大的想象空间。尤其是固定利率协议的成熟产品不多,尝试各种微创新是很有意义的。 风险程度还是要具体到不同的玩法,比如,在 Aave、Compoud 等借贷协议的固定利率协议背后,如果这些借贷协议受到攻击,与之绑定的固定利率协议也会受损。 同样,如果自己做借贷市场,可能更需要更强的开发能力。再有,如果该程序的机制或参数设计不当,同样会导致协议运行不稳定,并可能造成大量用户清盘。 总的来说,风险在于固定利率协议的设计,这是一个非常复杂的过程,需要不断地尝试和出错。 链式捕捉器:刚刚提到背后是Aave/Compound的固定费率协议风险较大,您认为Aave最大的不确定性和创新点分在哪里? 蔡岩:其实爱钱进一直被认为是走在行业前列的项目,他们的迭代速度非常快,比如率先尝试闪贷、推出新的经济激励模式、推出目前业内首个安全模块、尝试L2解决方案等等。 而在主要的借贷业务上,他们又十分谨慎,比如在抵押率、清算系数等风险参数的设计上相对于其他借贷协议较为保守,并不会存在为了吸引更多借贷资金而降低风险的要求。 与许多 DeFi 项目一样,即使 Aave 进行了多次审计,也无法保证不存在漏洞。前段时间,Aave 刚进入 V2 阶段时,白帽黑客就指出了某个漏洞。 之前的创新点可能是闪电借贷,这是当时业内独一无二的新产品功能,也为 Aave 带来了不少收益。当然,也有人批评闪电贷只能方便黑客实现资金效益的最大化,但工具本身并没有错,未来闪电贷肯定会有更多的应用场景。 其次是安全模块的设计,这有点像项目本身的储备金库,保障项目的安全性,这也是爱维开创的先河。说实话,目前大多数项目都没有做到代币模式的良性或正向运营,也做不到像Aave一样的安全模块,这是一个不小的门槛。 Chaincatcher从某种程度上来说,挖矿模式是DeFi财富效应的根本支撑,但Aave的CEO却说挖矿机制带来的动力是不可持续的,您怎么看这个观点? 蔡岩:"挖矿机制 "不可能失效,因为它是一种激励机制,或者说是项目冷启动的一种方式。但流动性开采亚博体育手机客户端不会一直高涨。比如去年11月的流行性挖矿高APY持续了一两个月就崩盘了,导致DeFi市场大幅回调。 Aave、Uniswap、Synthetix等项目真正爆发进入市值前15名也是在今年2月,我更倾向于这是头部DeFi长期价值的体现。虽然大家都喜欢抢高APY的矿机,但我个人很少参与挖矿,所以我并不觉得流动性挖矿是DeFi的基本面支撑。