深入理解Linux内核的流量控制机制(第11篇)
最编程
2024-02-29 19:53:28
...
本文档的Copyleft归yfydz所有,使用GPL发布,可以*拷贝,转载,转载时请保持文档的完整性,
严禁用于任何商业用途。
msn: yfydz_no1@hotmail.com
来源: http://yfydz.cublog.cn
msn: yfydz_no1@hotmail.com
来源: http://yfydz.cublog.cn
5.11 HTB(Hierarchical token bucket, 递阶令牌桶)
HTB, 从名称看就是TBF的扩展, 所不同的是TBF就一个节点处理所有数据, 而HTB是基于分类的流控方
法, 以前那些流控一般用一个"tc qdisc"命令就可以完成配置, 而要配置好HTB, 通常情况下tc
qdisc, class, filter三种命令都要用到, 用于将不同数据包分为不同的类别, 然后针对每种类别数
据再设置相应的流控方法, 因此基于分类的流控方法远比以前所述的流控方法复杂。
HTB将各种类别的流控处理节点组合成一个节点树, 每个叶节点是一个流控结构, 可在叶子节点使用
不同的流控方法,如将pfifo, tbf等。HTB一个重要的特点是能设置每种类型的基本带宽,当本类带
宽满而其他类型带宽空闲时可以向其他类型借带宽。注意这个树是静态的, 一旦TC命令配置好后就不
变了, 而具体的实现是HASH表实现的, 只是逻辑上是树, 而且不是二叉树, 每个节点可以有多个子节
点。
HTB运行过程中会将不同类别不同优先权的数据包进行有序排列,用到了有序表, 其数据结构实际是
一种特殊的二叉树, 称为红黑树(Red Black Tree), 这种树的结构是动态变化的,而且数量不只一个
,最大可有8×8个树。
红黑树的特征是:
1) 每个节点不是红的就是黑的;
2) 根节点必须是黑的;
3) 所有叶子节点必须也是黑的;
4) 每个红节点的子节点必须是黑的, 也就是红节点的父节点必须是黑节点;
5) 从每个节点到最底层节点的所有路径必须包含相同数量的黑节点;
关于红黑树的数据结构和操作在include/linux/rbtree.h和lib/rbtree.c中定义.
5.11.1 HTB操作结构定义
// HTB操作数据包模式
enum htb_cmode {
// 不能发送
HTB_CANT_SEND, /* class can't send and can't borrow */
// 借带宽
HTB_MAY_BORROW, /* class can't send but may borrow */
// 可发送
HTB_CAN_SEND /* class can send */
};
但HTB模式是HTB_CAN_SEND时, 表示是可以发送, 没有阻塞; 为HTB_CANT_SEND时表示阻塞, 根本不能
发送数据包了; 为HTB_MAY_BORROW时也属于阻塞状态, 但可以向其他类别借带宽来发送.
/* interior & leaf nodes; props specific to leaves are marked L: */
// HTB类别, 用于定义HTB的节点
struct htb_class {
/* general class parameters */
// 类别ID值, 高16位用于区分不同的HTB流控, 低16位为区分同一HTB流控中的不同类别
u32 classid;
// 字节数, 包数统计
struct gnet_stats_basic bstats;
// 队列信息统计
struct gnet_stats_queue qstats;
// 速率统计, 字节率, 包率
struct gnet_stats_rate_est rate_est;
// HTB统计信息, 借出, 借入, 令牌等参数
struct tc_htb_xstats xstats; /* our special stats */
// HTB类别引用计数
int refcnt; /* usage count of this class */
#ifdef HTB_RATECM
/* rate measurement counters */
// 流率控制参数
unsigned long rate_bytes, sum_bytes;
unsigned long rate_packets, sum_packets;
#endif
/* rate measurement counters */
// 流率控制参数
unsigned long rate_bytes, sum_bytes;
unsigned long rate_packets, sum_packets;
#endif
/* topology */
// 在树中的层次, 0表示叶子节点, 根节点层次是TC_HTB_MAXDEPTH-1(7)
int level; /* our level (see above) */
// 父类别结构节点
struct htb_class *parent; /* parent class */
// 挂接到类别ID链表
struct hlist_node hlist; /* classid hash list item */
// 兄弟节点链表
struct list_head sibling; /* sibling list item */
// 子节点链表
struct list_head children; /* children list */
// 在树中的层次, 0表示叶子节点, 根节点层次是TC_HTB_MAXDEPTH-1(7)
int level; /* our level (see above) */
// 父类别结构节点
struct htb_class *parent; /* parent class */
// 挂接到类别ID链表
struct hlist_node hlist; /* classid hash list item */
// 兄弟节点链表
struct list_head sibling; /* sibling list item */
// 子节点链表
struct list_head children; /* children list */
// 联合:
union {
// 如果该节点是叶子节点, 则使用leaf结构, 实现具体的流控处理;
struct htb_class_leaf {
// 叶子节点的内部流控结构
struct Qdisc *q;
// 优先权
int prio;
int aprio;
// 定额参数, 缺省是取物理网卡的队列长度值
int quantum;
// 不同层次深度的赤字
int deficit[TC_HTB_MAXDEPTH];
// 挂接到丢包链表
struct list_head drop_list;
} leaf;
// 如果非叶子节点, 使用HTB内部类别结构inner, 用于形成分类树
struct htb_class_inner {
// 提供数据包的红黑树结构, 是一个按类别ID进行排序的有序表, 以二叉树实现,
// 不同优先权对应不同的二叉树
struct rb_root feed[TC_HTB_NUMPRIO]; /* feed trees */
// 当前优先权树中正在处理的那个节点的指针
struct rb_node *ptr[TC_HTB_NUMPRIO]; /* current class ptr */
/* When class changes from state 1->2 and disconnects from
parent's feed then we lost ptr value and start from the
first child again. Here we store classid of the
last valid ptr (used when ptr is NULL). */
// 上一个有效的树节点的类别ID
u32 last_ptr_id[TC_HTB_NUMPRIO];
} inner;
} un;
// 类别结构自己的数据包供应树
struct rb_node node[TC_HTB_NUMPRIO]; /* node for self or feed tree */
// 事件树, 实际是等待树, 当带宽超过限制时会将该类别节点挂接到HTB流控节点的
// 等待队列wait_pq
struct rb_node pq_node; /* node for event queue */
unsigned long pq_key; /* the same type as jiffies global */
union {
// 如果该节点是叶子节点, 则使用leaf结构, 实现具体的流控处理;
struct htb_class_leaf {
// 叶子节点的内部流控结构
struct Qdisc *q;
// 优先权
int prio;
int aprio;
// 定额参数, 缺省是取物理网卡的队列长度值
int quantum;
// 不同层次深度的赤字
int deficit[TC_HTB_MAXDEPTH];
// 挂接到丢包链表
struct list_head drop_list;
} leaf;
// 如果非叶子节点, 使用HTB内部类别结构inner, 用于形成分类树
struct htb_class_inner {
// 提供数据包的红黑树结构, 是一个按类别ID进行排序的有序表, 以二叉树实现,
// 不同优先权对应不同的二叉树
struct rb_root feed[TC_HTB_NUMPRIO]; /* feed trees */
// 当前优先权树中正在处理的那个节点的指针
struct rb_node *ptr[TC_HTB_NUMPRIO]; /* current class ptr */
/* When class changes from state 1->2 and disconnects from
parent's feed then we lost ptr value and start from the
first child again. Here we store classid of the
last valid ptr (used when ptr is NULL). */
// 上一个有效的树节点的类别ID
u32 last_ptr_id[TC_HTB_NUMPRIO];
} inner;
} un;
// 类别结构自己的数据包供应树
struct rb_node node[TC_HTB_NUMPRIO]; /* node for self or feed tree */
// 事件树, 实际是等待树, 当带宽超过限制时会将该类别节点挂接到HTB流控节点的
// 等待队列wait_pq
struct rb_node pq_node; /* node for event queue */
unsigned long pq_key; /* the same type as jiffies global */
// 激活的优先权参数, 非0表示相应位数的数据队列有数据包可用
int prio_activity; /* for which prios are we active */
// 当前模式, 表示是否可发送数据包
enum htb_cmode cmode; /* current mode of the class */
int prio_activity; /* for which prios are we active */
// 当前模式, 表示是否可发送数据包
enum htb_cmode cmode; /* current mode of the class */
/* class attached filters */
// 过滤规则表
struct tcf_proto *filter_list;
// 过滤器使用计数
int filter_cnt;
// 过滤规则表
struct tcf_proto *filter_list;
// 过滤器使用计数
int filter_cnt;
// 警告标志
int warned; /* only one warning about non work conserving .. */
int warned; /* only one warning about non work conserving .. */
/* token bucket parameters */
// 令牌率
struct qdisc_rate_table *rate; /* rate table of the class itself */
// 峰值率
struct qdisc_rate_table *ceil; /* ceiling rate (limits borrows too) */
// 缓冲区/峰值缓冲区
long buffer, cbuffer; /* token bucket depth/rate */
// 最大等待时间
psched_tdiff_t mbuffer; /* max wait time */
// 当前令牌数/峰值令牌
long tokens, ctokens; /* current number of tokens */
// 检查点时间
psched_time_t t_c; /* checkpoint time */
};
// 令牌率
struct qdisc_rate_table *rate; /* rate table of the class itself */
// 峰值率
struct qdisc_rate_table *ceil; /* ceiling rate (limits borrows too) */
// 缓冲区/峰值缓冲区
long buffer, cbuffer; /* token bucket depth/rate */
// 最大等待时间
psched_tdiff_t mbuffer; /* max wait time */
// 当前令牌数/峰值令牌
long tokens, ctokens; /* current number of tokens */
// 检查点时间
psched_time_t t_c; /* checkpoint time */
};
// HTB私有数据结构
struct htb_sched {
// HTB根节点链表
struct list_head root; /* root classes list */
// HTB哈希表, 根据类别ID进行哈希
struct hlist_head hash[HTB_HSIZE]; /* hashed by classid */
// 8级丢包链表
struct list_head drops[TC_HTB_NUMPRIO];/* active leaves (for drops) */
/* self list - roots of self generating tree */
// RB树根节点, 对应每一层的每一个优先权值都有一个RB树
struct rb_root row[TC_HTB_MAXDEPTH][TC_HTB_NUMPRIO];
// 掩码, 表示该层的哪些优先权值的树有效
int row_mask[TC_HTB_MAXDEPTH];
// 父节点指针
struct rb_node *ptr[TC_HTB_MAXDEPTH][TC_HTB_NUMPRIO];
// 上次使用的非空父节点的类别ID
u32 last_ptr_id[TC_HTB_MAXDEPTH][TC_HTB_NUMPRIO];
// RB树根节点, 对应每一层的每一个优先权值都有一个RB树
struct rb_root row[TC_HTB_MAXDEPTH][TC_HTB_NUMPRIO];
// 掩码, 表示该层的哪些优先权值的树有效
int row_mask[TC_HTB_MAXDEPTH];
// 父节点指针
struct rb_node *ptr[TC_HTB_MAXDEPTH][TC_HTB_NUMPRIO];
// 上次使用的非空父节点的类别ID
u32 last_ptr_id[TC_HTB_MAXDEPTH][TC_HTB_NUMPRIO];
/* self wait list - roots of wait PQs per row */
// 等待队列, 用来挂接那些带宽超出限制的节点
struct rb_root wait_pq[TC_HTB_MAXDEPTH];
// 等待队列, 用来挂接那些带宽超出限制的节点
struct rb_root wait_pq[TC_HTB_MAXDEPTH];
/* time of nearest event per level (row) */
unsigned long near_ev_cache[TC_HTB_MAXDEPTH];
unsigned long near_ev_cache[TC_HTB_MAXDEPTH];
/* cached value of jiffies in dequeue */
// 当前时间, 在dequeue时更新
unsigned long jiffies;
// 当前时间, 在dequeue时更新
unsigned long jiffies;
/* whether we hit non-work conserving class during this dequeue; we use */
int nwc_hit; /* this to disable mindelay complaint in dequeue */
int nwc_hit; /* this to disable mindelay complaint in dequeue */
// 缺省类别
int defcls; /* class where unclassified flows go to */
int defcls; /* class where unclassified flows go to */
/* filters for qdisc itself */
// 过滤规则表
struct tcf_proto *filter_list;
int filter_cnt;
// 过滤规则表
struct tcf_proto *filter_list;
int filter_cnt;
// 速率到定额转换参数
int rate2quantum; /* quant = rate / rate2quantum */
// 当前时间
psched_time_t now; /* cached dequeue time */
// 定时器
struct timer_list timer; /* send delay timer */
#ifdef HTB_RATECM
// 速率定时器
struct timer_list rttim; /* rate computer timer */
int recmp_bucket; /* which hash bucket to recompute next */
#endif
int rate2quantum; /* quant = rate / rate2quantum */
// 当前时间
psched_time_t now; /* cached dequeue time */
// 定时器
struct timer_list timer; /* send delay timer */
#ifdef HTB_RATECM
// 速率定时器
struct timer_list rttim; /* rate computer timer */
int recmp_bucket; /* which hash bucket to recompute next */
#endif
/* non shaped skbs; let them go directly thru */
// 直接处理数据包队列
struct sk_buff_head direct_queue;
int direct_qlen; /* max qlen of above */
// 直接处理数据包队列
struct sk_buff_head direct_queue;
int direct_qlen; /* max qlen of above */
// 直接处理的数据包计数
long direct_pkts;
};
long direct_pkts;
};
// HTB类别操作结构
static struct Qdisc_class_ops htb_class_ops = {
.graft = htb_graft,
.leaf = htb_leaf,
.get = htb_get,
.put = htb_put,
.change = htb_change_class,
.delete = htb_delete,
.walk = htb_walk,
.tcf_chain = htb_find_tcf,
.bind_tcf = htb_bind_filter,
.unbind_tcf = htb_unbind_filter,
.dump = htb_dump_class,
.dump_stats = htb_dump_class_stats,
};
static struct Qdisc_class_ops htb_class_ops = {
.graft = htb_graft,
.leaf = htb_leaf,
.get = htb_get,
.put = htb_put,
.change = htb_change_class,
.delete = htb_delete,
.walk = htb_walk,
.tcf_chain = htb_find_tcf,
.bind_tcf = htb_bind_filter,
.unbind_tcf = htb_unbind_filter,
.dump = htb_dump_class,
.dump_stats = htb_dump_class_stats,
};
// HTB 流控操作结构
static struct Qdisc_ops htb_qdisc_ops = {
.next = NULL,
.cl_ops = &htb_class_ops,
.id = "htb",
.priv_size = sizeof(struct htb_sched),
.enqueue = htb_enqueue,
.dequeue = htb_dequeue,
.requeue = htb_requeue,
.drop = htb_drop,
.init = htb_init,
.reset = htb_reset,
.destroy = htb_destroy,
// 更改操作为空
.change = NULL /* htb_change */,
.dump = htb_dump,
.owner = THIS_MODULE,
};
5.11.1 HTB的TC操作命令
为了更好理解HTB各处理函数,先用HTB的配置实例过程来说明各种操作调用了哪些HTB处理函数,以
下的配置实例取自HTB Manual, 属于最简单分类配置:
1) 配置网卡的根流控节点为HTB
#根节点ID是0x10000, 缺省类别是0x10012,
# handle x:y, x定义的是类别ID的高16位, y定义低16位
#注意命令中的ID参数都被理解为16进制的数
tc qdisc add dev eth0 root handle 1: htb default 12
在内核中将调用htb_init()函数初始化HTB流控结构.
2) 建立分类树
#根节点总流量带宽100kbps, 内部类别ID是0x10001
tc class add dev eth0 parent 1: classid 1:1 htb rate 100kbps ceil 100kbps
#第一类别数据分30kbps, 最大可用100kbps, 内部类别ID是0x10010(注意这里确实是16进制的10)
tc class add dev eth0 parent 1:1 classid 1:10 htb rate 30kbps ceil 100kbps
#第二类别数据分30kbps, 最大可用100kbps, 内部类别ID是0x10011
tc class add dev eth0 parent 1:1 classid 1:11 htb rate 10kbps ceil 100kbps
#第三类别(缺省类别)数据分60kbps, 最大可用100kbps, 内部类别ID是0x10012
tc class add dev eth0 parent 1:1 classid 1:12 htb rate 60kbps ceil 100kbps
在内核中将调用htb_change_class()函数来修改HTB参数
3) 数据包分类
#对源地址为1.2.3.4, 目的端口是80的数据包为第一类, 0x10010
tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 match ip
dport 80 0xffff flowid 1:10
#对源地址是1.2.3.4的其他类型数据包是第2类, 0x10011
tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 flowid
#对源地址是1.2.3.4的其他类型数据包是第2类, 0x10011
tc filter add dev eth0 protocol ip parent 1:0 prio 1 u32 match ip src 1.2.3.4 flowid
1:11
#其他数据包将作为缺省类, 0x10012
#其他数据包将作为缺省类, 0x10012
在内核中将调用htb_find_tcf(), htb_bind_filter()函数来将为HTB绑定过滤表
4) 设置每个叶子节点的流控方法
# 1:10节点为pfifo
tc qdisc add dev eth0 parent 1:10 handle 20: pfifo limit 10
# 1:11节点也为pfifo
tc qdisc add dev eth0 parent 1:11 handle 30: pfifo limit 10
# 1:12节点使用sfq, 扰动时间10秒
tc qdisc add dev eth0 parent 1:12 handle 40: sfq perturb 10
在内核中会使用htb_leaf()查找HTB叶子节点, 使用htb_graft()函数来设置叶子节点的流控方法.
5.11.2 HTB类别操作
5.11.2.1 嫁接
// 设置HTB叶子节点的流控方法
static int htb_graft(struct Qdisc *sch, unsigned long arg, struct Qdisc *new,
struct Qdisc **old)
{
struct htb_class *cl = (struct htb_class *)arg;
// 类别结构非空而且层次为0(叶子节点)
if (cl && !cl->level) {
// 如果没定义专门的流控方法, 则缺省定义pfifo作为缺省的流控方法
if (new == NULL && (new = qdisc_create_dflt(sch->dev,
&pfifo_qdisc_ops))
== NULL)
return -ENOBUFS;
sch_tree_lock(sch);
// 将新的流控方法作为类别结构叶子节点的流控方法
if ((*old = xchg(&cl->un.leaf.q, new)) != NULL) {
// 如果该类别还处于活动状态, 停止, 因为其原来的流控方法已经要被释放掉,
// 不再处理数据包
if (cl->prio_activity)
htb_deactivate(qdisc_priv(sch), cl);
if (cl && !cl->level) {
// 如果没定义专门的流控方法, 则缺省定义pfifo作为缺省的流控方法
if (new == NULL && (new = qdisc_create_dflt(sch->dev,
&pfifo_qdisc_ops))
== NULL)
return -ENOBUFS;
sch_tree_lock(sch);
// 将新的流控方法作为类别结构叶子节点的流控方法
if ((*old = xchg(&cl->un.leaf.q, new)) != NULL) {
// 如果该类别还处于活动状态, 停止, 因为其原来的流控方法已经要被释放掉,
// 不再处理数据包
if (cl->prio_activity)
htb_deactivate(qdisc_priv(sch), cl);
/* TODO: is it correct ? Why CBQ doesn't do it ? */
// 将老流控节点释放掉
sch->q.qlen -= (*old)->q.qlen;
qdisc_reset(*old);
}
sch_tree_unlock(sch);
return 0;
}
// 否则出错
return -ENOENT;
}
// 将老流控节点释放掉
sch->q.qlen -= (*old)->q.qlen;
qdisc_reset(*old);
}
sch_tree_unlock(sch);
return 0;
}
// 否则出错
return -ENOENT;
}
5.11.2.2 获取叶子节点
static struct Qdisc *htb_leaf(struct Qdisc *sch, unsigned long arg)
{
struct htb_class *cl = (struct htb_class *)arg;
// 如果类别结构非空而且是叶子节点, 返回该类别叶子节点的流控
return (cl && !cl->level) ? cl->un.leaf.q : NULL;
}
5.11.2.3 增加类别的引用计数
static unsigned long htb_get(struct Qdisc *sch, u32 classid)
{
// 查找类别ID对应的HTB类别结构
struct htb_class *cl = htb_find(classid, sch);
// 找到的话增加其引用计数
if (cl)
cl->refcnt++;
return (unsigned long)cl;
}
5.11.2.4 减少类别引用计数
static void htb_put(struct Qdisc *sch, unsigned long arg)
{
struct htb_class *cl = (struct htb_class *)arg;
// 减少类别引用计数, 如果减到0, 释放该类别
if (--cl->refcnt == 0)
htb_destroy_class(sch, cl);
}
if (--cl->refcnt == 0)
htb_destroy_class(sch, cl);
}
// 释放类别
static void htb_destroy_class(struct Qdisc *sch, struct htb_class *cl)
{
// HTB私有数据结构
struct htb_sched *q = qdisc_priv(sch);
if (!cl->level) {
// 属于叶子节点的情况
BUG_TRAP(cl->un.leaf.q);
// HTB流控总队列长度减少该叶子流控节点的队列长度
sch->q.qlen -= cl->un.leaf.q->q.qlen;
// 释放叶子流控节点
qdisc_destroy(cl->un.leaf.q);
}
// 释放流量处理结构: 普通速率和峰值速率
qdisc_put_rtab(cl->rate);
qdisc_put_rtab(cl->ceil);
static void htb_destroy_class(struct Qdisc *sch, struct htb_class *cl)
{
// HTB私有数据结构
struct htb_sched *q = qdisc_priv(sch);
if (!cl->level) {
// 属于叶子节点的情况
BUG_TRAP(cl->un.leaf.q);
// HTB流控总队列长度减少该叶子流控节点的队列长度
sch->q.qlen -= cl->un.leaf.q->q.qlen;
// 释放叶子流控节点
qdisc_destroy(cl->un.leaf.q);
}
// 释放流量处理结构: 普通速率和峰值速率
qdisc_put_rtab(cl->rate);
qdisc_put_rtab(cl->ceil);
// 释放类别过滤规则表
htb_destroy_filters(&cl->filter_list);
htb_destroy_filters(&cl->filter_list);
// 递归调用htb_destroy_class释放该节点的子节点
while (!list_empty(&cl->children))
htb_destroy_class(sch, list_entry(cl->children.next,
struct htb_class, sibling));
while (!list_empty(&cl->children))
htb_destroy_class(sch, list_entry(cl->children.next,
struct htb_class, sibling));
/* note: this delete may happen twice (see htb_delete) */
// 删除类别ID链表
hlist_del_init(&cl->hlist);
// 释放子类别链表
list_del(&cl->sibling);
// 停止类别结构
if (cl->prio_activity)
htb_deactivate(q, cl);
// 删除类别ID链表
hlist_del_init(&cl->hlist);
// 释放子类别链表
list_del(&cl->sibling);
// 停止类别结构
if (cl->prio_activity)
htb_deactivate(q, cl);
// 如果类别结构属于需要等待模式, 将该节点从等待RB树中删除
if (cl->cmode != HTB_CAN_SEND)
htb_safe_rb_erase(&cl->pq_node, q->wait_pq + cl->level);
if (cl->cmode != HTB_CAN_SEND)
htb_safe_rb_erase(&cl->pq_node, q->wait_pq + cl->level);
// 释放类别本身
kfree(cl);
}
kfree(cl);
}
注意: 由于使用了递归处理, 因此HTB树不能太大, 否则就会使内核堆栈溢出而导致内核崩溃, HTB定
义的最大深度是8层.
5.11.2.5 更改类别结构内部参数
static int htb_change_class(struct Qdisc *sch, u32 classid,
u32 parentid, struct rtattr **tca,
unsigned long *arg)
{
int err = -EINVAL;
// HTB私有数据结构
struct htb_sched *q = qdisc_priv(sch);
// 类别结构指针, 从上层传入
struct htb_class *cl = (struct htb_class *)*arg, *parent;
// 通过netlink接口传来的配置参数
struct rtattr *opt = tca[TCA_OPTIONS - 1];
// 速率表, 峰值速率表结构
struct qdisc_rate_table *rtab = NULL, *ctab = NULL;
// 保存解析后的参数
struct rtattr *tb[TCA_HTB_RTAB];
// HTB选项
struct tc_htb_opt *hopt;
/* extract all subattrs from opt attr */
// 解析输入参数, 进行相关合法性检查
if (!opt || rtattr_parse_nested(tb, TCA_HTB_RTAB, opt) ||
tb[TCA_HTB_PARMS - 1] == NULL ||
RTA_PAYLOAD(tb[TCA_HTB_PARMS - 1]) < sizeof(*hopt))
goto failure;
// 解析输入参数, 进行相关合法性检查
if (!opt || rtattr_parse_nested(tb, TCA_HTB_RTAB, opt) ||
tb[TCA_HTB_PARMS - 1] == NULL ||
RTA_PAYLOAD(tb[TCA_HTB_PARMS - 1]) < sizeof(*hopt))
goto failure;
// 如果父节点ID不是根ID, 根据此ID查找父节点, 否则为父节点空
parent = parentid == TC_H_ROOT ? NULL : htb_find(parentid, sch);
parent = parentid == TC_H_ROOT ? NULL : htb_find(parentid, sch);
hopt = RTA_DATA(tb[TCA_HTB_PARMS - 1]);
// 从输入参数中获取速率表结构: 普通速率和峰值速率
rtab = qdisc_get_rtab(&hopt->rate, tb[TCA_HTB_RTAB - 1]);
ctab = qdisc_get_rtab(&hopt->ceil, tb[TCA_HTB_CTAB - 1]);
if (!rtab || !ctab)
goto failure;
rtab = qdisc_get_rtab(&hopt->rate, tb[TCA_HTB_RTAB - 1]);
ctab = qdisc_get_rtab(&hopt->ceil, tb[TCA_HTB_CTAB - 1]);
if (!rtab || !ctab)
goto failure;
if (!cl) { /* new class */
// 如果类别结构为空, 是需要构造的新类别
struct Qdisc *new_q;
int prio;
/* check for valid classid */
// 类别ID合法性检查
if (!classid || TC_H_MAJ(classid ^ sch->handle)
|| htb_find(classid, sch))
goto failure;
// 类别ID合法性检查
if (!classid || TC_H_MAJ(classid ^ sch->handle)
|| htb_find(classid, sch))
goto failure;
/* check maximal depth */
// 如果祖父节点层次都小于2, 也就是最大是1, 表示HTB节点树太深了, 叶子节点都没法表示了
if (parent && parent->parent && parent->parent->level < 2) {
printk(KERN_ERR "htb: tree is too deep\n");
goto failure;
}
err = -ENOBUFS;
// 分配类别空间
if ((cl = kzalloc(sizeof(*cl), GFP_KERNEL)) == NULL)
goto failure;
// 初始化引用计数
cl->refcnt = 1;
// 初始化兄弟链表
INIT_LIST_HEAD(&cl->sibling);
// 初始化哈希链表
INIT_HLIST_NODE(&cl->hlist);
// 初始化子节点链表
INIT_LIST_HEAD(&cl->children);
// 初始化丢包链表
INIT_LIST_HEAD(&cl->un.leaf.drop_list);
// 设置为空节点(父节点是本身)
RB_CLEAR_NODE(&cl->pq_node);
// 如果祖父节点层次都小于2, 也就是最大是1, 表示HTB节点树太深了, 叶子节点都没法表示了
if (parent && parent->parent && parent->parent->level < 2) {
printk(KERN_ERR "htb: tree is too deep\n");
goto failure;
}
err = -ENOBUFS;
// 分配类别空间
if ((cl = kzalloc(sizeof(*cl), GFP_KERNEL)) == NULL)
goto failure;
// 初始化引用计数
cl->refcnt = 1;
// 初始化兄弟链表
INIT_LIST_HEAD(&cl->sibling);
// 初始化哈希链表
INIT_HLIST_NODE(&cl->hlist);
// 初始化子节点链表
INIT_LIST_HEAD(&cl->children);
// 初始化丢包链表
INIT_LIST_HEAD(&cl->un.leaf.drop_list);
// 设置为空节点(父节点是本身)
RB_CLEAR_NODE(&cl->pq_node);
// 初始化self or feed tree节点
for (prio = 0; prio < TC_HTB_NUMPRIO; prio++)
RB_CLEAR_NODE(&cl->node[prio]);
for (prio = 0; prio < TC_HTB_NUMPRIO; prio++)
RB_CLEAR_NODE(&cl->node[prio]);
/* create leaf qdisc early because it uses kmalloc(GFP_KERNEL)
so that can't be used inside of sch_tree_lock
-- thanks to Karlis Peisenieks */
// 新的流控节点缺省是使用pfifo
new_q = qdisc_create_dflt(sch->dev, &pfifo_qdisc_ops);
sch_tree_lock(sch);
// 以下调整父节点的状态参数
if (parent && !parent->level) {
// 如果父节点原先是叶子节点, 将其转为中间节点, 因为现在已经有新的叶子节点作为其子节点
/* turn parent into inner node */
// 释放父节点的流控结构
sch->q.qlen -= parent->un.leaf.q->q.qlen;
qdisc_destroy(parent->un.leaf.q);
// 如果该父节点正处于活动情况, 停止
if (parent->prio_activity)
htb_deactivate(q, parent);
so that can't be used inside of sch_tree_lock
-- thanks to Karlis Peisenieks */
// 新的流控节点缺省是使用pfifo
new_q = qdisc_create_dflt(sch->dev, &pfifo_qdisc_ops);
sch_tree_lock(sch);
// 以下调整父节点的状态参数
if (parent && !parent->level) {
// 如果父节点原先是叶子节点, 将其转为中间节点, 因为现在已经有新的叶子节点作为其子节点
/* turn parent into inner node */
// 释放父节点的流控结构
sch->q.qlen -= parent->un.leaf.q->q.qlen;
qdisc_destroy(parent->un.leaf.q);
// 如果该父节点正处于活动情况, 停止
if (parent->prio_activity)
htb_deactivate(q, parent);
/* remove from evt list because of level change */
// 如果不是HTB_CAN_SEND模式, 说明该节点在等待节点树中, 从该树中删除
if (parent->cmode != HTB_CAN_SEND) {
htb_safe_rb_erase(&parent->pq_node, q->wait_pq);
parent->cmode = HTB_CAN_SEND;
}
// 更新父节点的层次, 如果不存在祖父节点, 则层次为根节点, 否则使用祖父节点的层次值
parent->level = (parent->parent ? parent->parent->level
: TC_HTB_MAXDEPTH) - 1;
// 不再使用内部叶子结构, 而是改为使用HTB内部结构, 参数清零
memset(&parent->un.inner, 0, sizeof(parent->un.inner));
}
/* leaf (we) needs elementary qdisc */
// 设置类别结构的叶子流控节点
cl->un.leaf.q = new_q ? new_q : &noop_qdisc;
// 类别结构的ID和父
cl->classid = classid;
cl->parent = parent;
// 如果不是HTB_CAN_SEND模式, 说明该节点在等待节点树中, 从该树中删除
if (parent->cmode != HTB_CAN_SEND) {
htb_safe_rb_erase(&parent->pq_node, q->wait_pq);
parent->cmode = HTB_CAN_SEND;
}
// 更新父节点的层次, 如果不存在祖父节点, 则层次为根节点, 否则使用祖父节点的层次值
parent->level = (parent->parent ? parent->parent->level
: TC_HTB_MAXDEPTH) - 1;
// 不再使用内部叶子结构, 而是改为使用HTB内部结构, 参数清零
memset(&parent->un.inner, 0, sizeof(parent->un.inner));
}
/* leaf (we) needs elementary qdisc */
// 设置类别结构的叶子流控节点
cl->un.leaf.q = new_q ? new_q : &noop_qdisc;
// 类别结构的ID和父
cl->classid = classid;
cl->parent = parent;
/* set class to be in HTB_CAN_SEND state */
// 令牌和峰值令牌
cl->tokens = hopt->buffer;
cl->ctokens = hopt->cbuffer;
// 缓冲区大小
cl->mbuffer = PSCHED_JIFFIE2US(HZ * 60); /* 1min */
// 初始化时间
PSCHED_GET_TIME(cl->t_c);
// 模式为可以发送模式
cl->cmode = HTB_CAN_SEND;
// 令牌和峰值令牌
cl->tokens = hopt->buffer;
cl->ctokens = hopt->cbuffer;
// 缓冲区大小
cl->mbuffer = PSCHED_JIFFIE2US(HZ * 60); /* 1min */
// 初始化时间
PSCHED_GET_TIME(cl->t_c);
// 模式为可以发送模式
cl->cmode = HTB_CAN_SEND;
/* attach to the hash list and parent's family */
// 挂接到哈希链表
hlist_add_head(&cl->hlist, q->hash + htb_hash(classid));
// 添加到父节点的子节点链表
list_add_tail(&cl->sibling,
parent ? &parent->children : &q->root);
} else
sch_tree_lock(sch);
// 挂接到哈希链表
hlist_add_head(&cl->hlist, q->hash + htb_hash(classid));
// 添加到父节点的子节点链表
list_add_tail(&cl->sibling,
parent ? &parent->children : &q->root);
} else
sch_tree_lock(sch);
/* it used to be a nasty bug here, we have to check that node
is really leaf before changing cl->un.leaf ! */
if (!cl->level) {
// 如果是叶子节点, 设置其定额, 当出现赤字时会按定额大小增加
cl->un.leaf.quantum = rtab->rate.rate / q->rate2quantum;
// 如果计算出的定额量太小或太大, 说明rate2quantum参数该调整了, 这就是tc命令中的r2q参数
// 对于不同的带宽, 要选择不同的r2q值
if (!hopt->quantum && cl->un.leaf.quantum < 1000) {
// 定额太小
printk(KERN_WARNING
"HTB: quantum of class %X is small. Consider r2q
is really leaf before changing cl->un.leaf ! */
if (!cl->level) {
// 如果是叶子节点, 设置其定额, 当出现赤字时会按定额大小增加
cl->un.leaf.quantum = rtab->rate.rate / q->rate2quantum;
// 如果计算出的定额量太小或太大, 说明rate2quantum参数该调整了, 这就是tc命令中的r2q参数
// 对于不同的带宽, 要选择不同的r2q值
if (!hopt->quantum && cl->un.leaf.quantum < 1000) {
// 定额太小
printk(KERN_WARNING
"HTB: quantum of class %X is small. Consider r2q
change.\n",
cl->classid);
cl->un.leaf.quantum = 1000;
}
if (!hopt->quantum && cl->un.leaf.quantum > 200000) {
// 定额太大
printk(KERN_WARNING
"HTB: quantum of class %X is big. Consider r2q
cl->classid);
cl->un.leaf.quantum = 1000;
}
if (!hopt->quantum && cl->un.leaf.quantum > 200000) {
// 定额太大
printk(KERN_WARNING
"HTB: quantum of class %X is big. Consider r2q
change.\n",
cl->classid);
cl->un.leaf.quantum = 200000;
}
if (hopt->quantum)
cl->un.leaf.quantum = hopt->quantum;
// 设置该节点的优先权值, 最大限制为TC_HTB_NUMPRIO - 1
if ((cl->un.leaf.prio = hopt->prio) >= TC_HTB_NUMPRIO)
cl->un.leaf.prio = TC_HTB_NUMPRIO - 1;
}
// 缓冲区
cl->buffer = hopt->buffer;
// 峰值流控缓冲区
cl->cbuffer = hopt->cbuffer;
// 如果该类别原先有速率控制结构, 先释放掉再更新为新的速率控制结构
// 普通速率控制结构更新
if (cl->rate)
qdisc_put_rtab(cl->rate);
cl->rate = rtab;
// 峰值速率控制结构更新
if (cl->ceil)
qdisc_put_rtab(cl->ceil);
cl->ceil = ctab;
sch_tree_unlock(sch);
cl->classid);
cl->un.leaf.quantum = 200000;
}
if (hopt->quantum)
cl->un.leaf.quantum = hopt->quantum;
// 设置该节点的优先权值, 最大限制为TC_HTB_NUMPRIO - 1
if ((cl->un.leaf.prio = hopt->prio) >= TC_HTB_NUMPRIO)
cl->un.leaf.prio = TC_HTB_NUMPRIO - 1;
}
// 缓冲区
cl->buffer = hopt->buffer;
// 峰值流控缓冲区
cl->cbuffer = hopt->cbuffer;
// 如果该类别原先有速率控制结构, 先释放掉再更新为新的速率控制结构
// 普通速率控制结构更新
if (cl->rate)
qdisc_put_rtab(cl->rate);
cl->rate = rtab;
// 峰值速率控制结构更新
if (cl->ceil)
qdisc_put_rtab(cl->ceil);
cl->ceil = ctab;
sch_tree_unlock(sch);
*arg = (unsigned long)cl;
return 0;
return 0;
failure:
if (rtab)
qdisc_put_rtab(rtab);
if (ctab)
qdisc_put_rtab(ctab);
return err;
}
if (rtab)
qdisc_put_rtab(rtab);
if (ctab)
qdisc_put_rtab(ctab);
return err;
}
5.11.2.6 查找过滤规则表
static struct tcf_proto **htb_find_tcf(struct Qdisc *sch, unsigned long arg)
{
struct htb_sched *q = qdisc_priv(sch);
struct htb_class *cl = (struct htb_class *)arg;
// 如果类别结构非空,使用类别结构的过滤表, 否则使用HTB私有结构的过滤表
struct tcf_proto **fl = cl ? &cl->filter_list : &q->filter_list;
return fl;
}
}
5.11.2.7 绑定过滤器
static unsigned long htb_bind_filter(struct Qdisc *sch, unsigned long parent,
u32 classid)
{
struct htb_sched *q = qdisc_priv(sch);
// 根据类别ID查找类别结构
struct htb_class *cl = htb_find(classid, sch);
/*if (cl && !cl->level) return 0;
The line above used to be there to prevent attaching filters to
leaves. But at least tc_index filter uses this just to get class
for other reasons so that we have to allow for it.
----
19.6.2002 As Werner explained it is ok - bind filter is just
another way to "lock" the class - unlike "get" this lock can
be broken by class during destroy IIUC.
*/
// 如果流控类别结构有效, 增加其使用计数
if (cl)
cl->filter_cnt++;
else
// 否则是增加整个HTB流控结构的使用计数
q->filter_cnt++;
return (unsigned long)cl;
}
The line above used to be there to prevent attaching filters to
leaves. But at least tc_index filter uses this just to get class
for other reasons so that we have to allow for it.
----
19.6.2002 As Werner explained it is ok - bind filter is just
another way to "lock" the class - unlike "get" this lock can
be broken by class during destroy IIUC.
*/
// 如果流控类别结构有效, 增加其使用计数
if (cl)
cl->filter_cnt++;
else
// 否则是增加整个HTB流控结构的使用计数
q->filter_cnt++;
return (unsigned long)cl;
}
5.11.2.8 解开过滤器
static void htb_unbind_filter(struct Qdisc *sch, unsigned long arg)
{
struct htb_sched *q = qdisc_priv(sch);
struct htb_class *cl = (struct htb_class *)arg;
// 实际就是对过滤器引用计数减一
if (cl)
cl->filter_cnt--;
else
q->filter_cnt--;
}
if (cl)
cl->filter_cnt--;
else
q->filter_cnt--;
}
5.11.2.9 遍历HTB
static void htb_walk(struct Qdisc *sch, struct qdisc_walker *arg)
{
// HTB流控私有数据
struct htb_sched *q = qdisc_priv(sch);
int i;
// 如果设置停止标志, 返回
if (arg->stop)
return;
if (arg->stop)
return;
// 遍历所有HTB哈希表
for (i = 0; i < HTB_HSIZE; i++) {
struct hlist_node *p;
struct htb_class *cl;
for (i = 0; i < HTB_HSIZE; i++) {
struct hlist_node *p;
struct htb_class *cl;
// 遍历哈希表中每个元素, 即HTB类别结构
hlist_for_each_entry(cl, p, q->hash + i, hlist) {
// 如果要跳过skip个开始的一些节点, 跳过这些节点
if (arg->count < arg->skip) {
arg->count++;
continue;
}
// 对类别结构进行相关操作
if (arg->fn(sch, (unsigned long)cl, arg) < 0) {
arg->stop = 1;
return;
}
arg->count++;
}
}
}
hlist_for_each_entry(cl, p, q->hash + i, hlist) {
// 如果要跳过skip个开始的一些节点, 跳过这些节点
if (arg->count < arg->skip) {
arg->count++;
continue;
}
// 对类别结构进行相关操作
if (arg->fn(sch, (unsigned long)cl, arg) < 0) {
arg->stop = 1;
return;
}
arg->count++;
}
}
}
5.11.2.10 类别参数输出
static int htb_dump_class(struct Qdisc *sch, unsigned long arg,
struct sk_buff *skb, struct tcmsg *tcm)
{
struct htb_class *cl = (struct htb_class *)arg;
unsigned char *b = skb->tail;
struct rtattr *rta;
struct tc_htb_opt opt;
spin_lock_bh(&sch->dev->queue_lock);
// 父节点的类别ID
tcm->tcm_parent = cl->parent ? cl->parent->classid : TC_H_ROOT;
// 本节点的类别ID
tcm->tcm_handle = cl->classid;
// 如果是叶子节点, 提供叶子节点的流控节点的ID
if (!cl->level && cl->un.leaf.q)
tcm->tcm_info = cl->un.leaf.q->handle;
// 父节点的类别ID
tcm->tcm_parent = cl->parent ? cl->parent->classid : TC_H_ROOT;
// 本节点的类别ID
tcm->tcm_handle = cl->classid;
// 如果是叶子节点, 提供叶子节点的流控节点的ID
if (!cl->level && cl->un.leaf.q)
tcm->tcm_info = cl->un.leaf.q->handle;
rta = (struct rtattr *)b;
RTA_PUT(skb, TCA_OPTIONS, 0, NULL);
RTA_PUT(skb, TCA_OPTIONS, 0, NULL);
// 以下提供该类别的各种参数
memset(&opt, 0, sizeof(opt));
// 速率
opt.rate = cl->rate->rate;
// 数据缓冲区
opt.buffer = cl->buffer;
// 峰值速率
opt.ceil = cl->ceil->rate;
// 峰值数据缓冲区
opt.cbuffer = cl->cbuffer;
// 定额
opt.quantum = cl->un.leaf.quantum;
// 优先权值
opt.prio = cl->un.leaf.prio;
// 层次值
opt.level = cl->level;
RTA_PUT(skb, TCA_HTB_PARMS, sizeof(opt), &opt);
// 实际数据长度
rta->rta_len = skb->tail - b;
spin_unlock_bh(&sch->dev->queue_lock);
return skb->len;
rtattr_failure:
spin_unlock_bh(&sch->dev->queue_lock);
skb_trim(skb, b - skb->data);
return -1;
}
memset(&opt, 0, sizeof(opt));
// 速率
opt.rate = cl->rate->rate;
// 数据缓冲区
opt.buffer = cl->buffer;
// 峰值速率
opt.ceil = cl->ceil->rate;
// 峰值数据缓冲区
opt.cbuffer = cl->cbuffer;
// 定额
opt.quantum = cl->un.leaf.quantum;
// 优先权值
opt.prio = cl->un.leaf.prio;
// 层次值
opt.level = cl->level;
RTA_PUT(skb, TCA_HTB_PARMS, sizeof(opt), &opt);
// 实际数据长度
rta->rta_len = skb->tail - b;
spin_unlock_bh(&sch->dev->queue_lock);
return skb->len;
rtattr_failure:
spin_unlock_bh(&sch->dev->queue_lock);
skb_trim(skb, b - skb->data);
return -1;
}
5.11.2.10 类别统计信息输出
static int
htb_dump_class_stats(struct Qdisc *sch, unsigned long arg, struct gnet_dump *d)
{
struct htb_class *cl = (struct htb_class *)arg;
#ifdef HTB_RATECM
// HTB_EWMAC=2, HTB_HSIZE=16
// 字节和包速率参数
cl->rate_est.bps = cl->rate_bytes / (HTB_EWMAC * HTB_HSIZE);
cl->rate_est.pps = cl->rate_packets / (HTB_EWMAC * HTB_HSIZE);
#endif
// HTB_EWMAC=2, HTB_HSIZE=16
// 字节和包速率参数
cl->rate_est.bps = cl->rate_bytes / (HTB_EWMAC * HTB_HSIZE);
cl->rate_est.pps = cl->rate_packets / (HTB_EWMAC * HTB_HSIZE);
#endif
// 叶子节点, 提供当前内部流控结构的队列长度
if (!cl->level && cl->un.leaf.q)
cl->qstats.qlen = cl->un.leaf.q->q.qlen;
// 令牌数
cl->xstats.tokens = cl->tokens;
// 峰值令牌数
cl->xstats.ctokens = cl->ctokens;
if (!cl->level && cl->un.leaf.q)
cl->qstats.qlen = cl->un.leaf.q->q.qlen;
// 令牌数
cl->xstats.tokens = cl->tokens;
// 峰值令牌数
cl->xstats.ctokens = cl->ctokens;
// 分别将基本参数, 速率参数, 队列参数拷贝到目的缓存, 这些都是标准参数
if (gnet_stats_copy_basic(d, &cl->bstats) < 0 ||
gnet_stats_copy_rate_est(d, &cl->rate_est) < 0 ||
gnet_stats_copy_queue(d, &cl->qstats) < 0)
return -1;
// 将应用数据(HTB自身统计数据)拷贝到目的缓存
return gnet_stats_copy_app(d, &cl->xstats, sizeof(cl->xstats));
}
if (gnet_stats_copy_basic(d, &cl->bstats) < 0 ||
gnet_stats_copy_rate_est(d, &cl->rate_est) < 0 ||
gnet_stats_copy_queue(d, &cl->qstats) < 0)
return -1;
// 将应用数据(HTB自身统计数据)拷贝到目的缓存
return gnet_stats_copy_app(d, &cl->xstats, sizeof(cl->xstats));
}
...... 待续 ......
推荐阅读
-
epoll简介及触发模式(accept、read、send)-epoll的简单介绍 epoll在LT和ET模式下的读写方式 一、epoll的接口非常简单,一共就三个函数:1. int epoll_create(int size);创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大。这个参数不同于select中的第一个参数,给出最大监听的fd+1的值。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close关闭,否则可能导致fd被耗尽。2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);epoll的事件注册函数,它不同与select是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。第一个参数是epoll_create的返回值,第二个参数表示动作,用三个宏来表示:EPOLL_CTL_ADD:注册新的fd到epfd中;EPOLL_CTL_MOD:修改已经注册的fd的监听事件;EPOLL_CTL_DEL:从epfd中删除一个fd;第三个参数是需要监听的fd,第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:struct epoll_event { __uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */};events可以是以下几个宏的集合:EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭); EPOLLIN事件:EPOLLIN事件则只有当对端有数据写入时才会触发,所以触发一次后需要不断读取所有数据直到读完EAGAIN为止。否则剩下的数据只有在下次对端有写入时才能一起取出来了。现在明白为什么说epoll必须要求异步socket了吧?如果同步socket,而且要求读完所有数据,那么最终就会在堵死在阻塞里。 EPOLLOUT:表示对应的文件描述符可以写; EPOLLOUT事件:EPOLLOUT事件只有在连接时触发一次,表示可写,其他时候想要触发,那要先准备好下面条件:1.某次write,写满了发送缓冲区,返回错误码为EAGAIN。2.对端读取了一些数据,又重新可写了,此时会触发EPOLLOUT。简单地说:EPOLLOUT事件只有在不可写到可写的转变时刻,才会触发一次,所以叫边缘触发,这叫法没错的!其实,如果真的想强制触发一次,也是有办法的,直接调用epoll_ctl重新设置一下event就可以了,event跟原来的设置一模一样都行(但必须包含EPOLLOUT),关键是重新设置,就会马上触发一次EPOLLOUT事件。1. 缓冲区由满变空.2.同时注册EPOLLIN | EPOLLOUT事件,也会触发一次EPOLLOUT事件这个两个也会触发EPOLLOUT事件 EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);EPOLLERR:表示对应的文件描述符发生错误;EPOLLHUP:表示对应的文件描述符被挂断;EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);等待事件的产生,类似于select调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。-------------------------------------------------------------------------------------------- 从man手册中,得到ET和LT的具体描述如下EPOLL事件有两种模型:Edge Triggered (ET)Level Triggered (LT)假如有这样一个例子:1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符2. 这个时候从管道的另一端被写入了2KB的数据3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作4. 然后我们读取了1KB的数据5. 调用epoll_wait(2)......Edge Triggered 工作模式:如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用 epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。 i 基于非阻塞文件句柄 ii 只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。Level Triggered 工作模式相反的,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在 epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有 EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。然后详细解释ET, LT:LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.ET(edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认(这句话不理解)。在许多测试中我们会看到如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当我们遇到大量的idle- connection(例如WAN环境中存在大量的慢速连接),就会发现epoll的效率大大高于select/poll。(未测试)另外,当使用epoll的ET模型来工作时,当产生了一个EPOLLIN事件后,读数据的时候需要考虑的是当recv返回的大小如果等于请求的大小,那么很有可能是缓冲区还有数据未读完,也意味着该次事件还没有处理完,所以还需要再次读取: 这里只是说明思路(参考《UNIX网络编程》) while(rs) {buflen = recv(activeevents[i].data.fd, buf, sizeof(buf), 0);if(buflen < 0){// 由于是非阻塞的模式,所以当errno为EAGAIN时,表示当前缓冲区已无数据可读// 在这里就当作是该次事件已处理处.if(errno == EAGAIN)break; else return; }else if(buflen == 0) { // 这里表示对端的socket已正常关闭. } if(buflen == sizeof(buf) rs = 1; // 需要再次读取 else rs = 0; } 还有,假如发送端流量大于接收端的流量(意思是epoll所在的程序读比转发的socket要快),由于是非阻塞的socket,那么send函数虽然返回,但实际缓冲区的数据并未真正发给接收端,这样不断的读和发,当缓冲区满后会产生EAGAIN错误(参考man send),同时,不理会这次请求发送的数据.所以,需要封装socket_send的函数用来处理这种情况,该函数会尽量将数据写完再返回,返回-1表示出错。在socket_send内部,当写缓冲已满(send返回-1,且errno为EAGAIN),那么会等待后再重试.这种方式并不很完美,在理论上可能会长时间的阻塞在socket_send内部,但暂没有更好的办法. ssize_t socket_send(int sockfd, const char* buffer, size_t buflen) { ssize_t tmp; size_t total = buflen; const char *p = buffer; while(1) { tmp = send(sockfd, p, total, 0); if(tmp < 0) { // 当send收到信号时,可以继续写,但这里返回-1. if(errno == EINTR) return -1; // 当socket是非阻塞时,如返回此错误,表示写缓冲队列已满, // 在这里做延时后再重试. if(errno == EAGAIN) { usleep(1000); continue; } return -1; } if((size_t)tmp == total) return buflen; total -= tmp; p += tmp; } return tmp; } 二、epoll在LT和ET模式下的读写方式 在一个非阻塞的socket上调用read/write函数, 返回EAGAIN或者EWOULDBLOCK(注: EAGAIN就是EWOULDBLOCK) 从字面上看, 意思是: * EAGAIN: 再试一次 * EWOULDBLOCK: 如果这是一个阻塞socket, 操作将被block * perror输出: Resource temporarily unavailable 总结: 这个错误表示资源暂时不够, 可能read时, 读缓冲区没有数据, 或者, write时,写缓冲区满了 。 遇到这种情况, 如果是阻塞socket, read/write就要阻塞掉。 而如果是非阻塞socket, read/write立即返回-1, 同 时errno设置为EAGAIN. 所以, 对于阻塞socket, read/write返回-1代表网络出错了. 但对于非阻塞socket, read/write返回-1不一定网络真的出错了. 可能是Resource temporarily unavailable. 这时你应该再试, 直到Resource available. 综上, 对于non-blocking的socket, 正确的读写操作为: 读: 忽略掉errno = EAGAIN的错误, 下次继续读 写: 忽略掉errno = EAGAIN的错误, 下次继续写 对于select和epoll的LT模式, 这种读写方式是没有问题的. 但对于epoll的ET模式, 这种方式还有漏洞. epoll的两种模式 LT 和 ET
-
深入理解Linux内核的流量控制机制(第11篇)