linux 网络收包流程(NAPI)
文章目录
linux 通过软中断机制调用网络协议栈代码,处理数据。 在 net_dev 模块初始化时,注册网络收发数据的软中断处理函数:
|
|
kernel 为每个 cpu 创建一个本地的数据结构: softnet_data,在代码中简写为 sd。
|
|
NAPI 机制
-
所有希望收发收据的网卡驱动在初始化时创建一个 napi_struct 结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
/* 创建 napi_struct 结构 napi 加入 dev->napi_list 设置 napi->poll 处理函数 */ void netif_napi_add(struct net_device *dev, struct napi_struct *napi, int (*poll)(struct napi_struct *, int), int weight) { // 设置 napi 的 gro 哈希表 init_gro_hash(napi); // 设置 poll 函数指针 napi->poll = poll; // 将 napi 加入 dev->napi_list list_add(&napi->dev_list, &dev->napi_list); /* 将 napi 加入全局哈希表 napi_hash 可以通过 napi_by_id() 得到 napi */ napi_hash_add(napi); }
-
在驱动的中断函数中,检查有无新数据收到,如果有,调用 __napi_schedule()。
1 2 3 4 5 6
static irqreturn_t e1000_intr_msi(int __always_unused irq, void *data) { if (napi_schedule_prep(&adapter->napi)) { __napi_schedule(&adapter->napi); } }
-
通过 __napi_schedule() 将 napi_struct 加入到当前 CPU 的 sd->poll_list 中,然后设置 NET_RX_SOFTIRQ 标识位,这个软中断会在中断发生的 cpu 上进行。
1 2 3 4 5 6
static inline void ____napi_schedule(struct softnet_data *sd, struct napi_struct *napi) { list_add_tail(&napi->poll_list, &sd->poll_list); __raise_softirq_irqoff(NET_RX_SOFTIRQ); }
为了保护 sd->poll_list 不被多个中断处理函数同时访问,这个函数需要关闭中断。
-
NET_RX_SOFTIRQ 对应的软中断处理函数为 net_rx_action
1
open_softirq(NET_RX_SOFTIRQ, net_rx_action);
当中断处理函数退出,会检查是否有软中断需要处理并执行。
-
在 NET_RX_SOFTIRQ 的处理函数中,会遍历sd->poll_list,取得所有 napi_struct,调用它们的 poll() 函数,poll 函数是通过 netif_napi_add() 注册到 napi 中的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
static __latent_entropy void net_rx_action(struct softirq_action *h) { LIST_HEAD(list); /* 取得当前 cpu 的 sd */ struct softnet_data *sd = this_cpu_ptr(&softnet_data); // 将 sd->poll_list 并入本地链表 list list_splice_init(&sd->poll_list, &list); for (;;) { struct napi_struct *n; /* 遍历 poll_list 得到所有被调度的 napi_struct 并调用 napi_poll */ n = list_first_entry(&list, struct napi_struct, poll_list); budget -= napi_poll(n, &repoll); } } static int napi_poll(struct napi_struct *n, struct list_head *repoll) { if (test_bit(NAPI_STATE_SCHED, &n->state)) { /* 调用 napi 的 poll 函数 */ work = n->poll(n, weight); trace_napi_poll(n, work, weight); } }
-
各个驱动的 poll() 函数实现各不相同,但一般行为都是从 DMA 内存中取出数据,生成 skb,从硬件取出 rss 值设置给 skb->hash ,最后调用 napi_gro_receive(),进行协议栈前的 gro 处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
static bool e1000_clean_rx_irq(struct e1000_ring *rx_ring, int *work_done, int work_to_do) { while (staterr & E1000_RXD_STAT_DD) { struct sk_buff *skb = buffer_info->skb; e1000_rx_hash(netdev, rx_desc->wb.lower.hi_dword.rss, skb); e1000_receive_skb(adapter, netdev, skb, staterr, rx_desc->wb.upper.vlan); } } static void e1000_receive_skb(struct e1000_adapter *adapter, struct net_device *netdev, struct sk_buff *skb, u32 staterr, __le16 vlan) { skb->protocol = eth_type_trans(skb, netdev); napi_gro_receive(&adapter->napi, skb); }
-
napi_gro_receive() 对 skbuf 进行 合并处理,根据处理的结果,决定 skbuf 是否继续送给上次协议栈,或是直接丢弃(被 merge 到其他的 skbuf 中了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb) { ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb)); } static gro_result_t napi_skb_finish(struct napi_struct *napi, struct sk_buff *skb, gro_result_t ret) { switch (ret) { case GRO_NORMAL: gro_normal_one(napi, skb); break; case GRO_DROP: kfree_skb(skb); break; case GRO_MERGED_FREE: __kfree_skb(skb); break; } }
-
仍然需要协议栈处理的数据,通过 gro_normal_one(),被放入 napi 的接收队列。为了效率,接收队列中的数据,希望尽量批量处理。
1 2 3 4 5 6 7
static void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb) { list_add_tail(&skb->list, &napi->rx_list); if (++napi->rx_count >= gro_normal_batch) gro_normal_list(napi); }
-
gro_normal_list 将数据从给协议栈,然后清空 napi 的接收队列
1 2 3 4 5 6 7 8 9
static void gro_normal_list(struct napi_struct *napi) { if (!napi->rx_count) return; netif_receive_skb_list_internal(&napi->rx_list); INIT_LIST_HEAD(&napi->rx_list); napi->rx_count = 0; }
-
netif_receive_skb_list_internal() 会进行 rps 检查,如果使能了 rps,那么会为 skb 计算其他的 cpu,然后将 skb 放入那个 cpu 的 sd->backlog 中。sd->backlog 的类型也是 napi,因此在将数据放入 backlog 后,调用 schedule_napi,以 trigger 另一个 cpu 在自己的 软中断中处理从当前 cpu 转移过去的数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
static void netif_receive_skb_list_internal(struct list_head *head) { list_for_each_entry_safe(skb, next, head, list) { struct rps_dev_flow voidflow, *rflow = &voidflow; int cpu = get_rps_cpu(skb->dev, skb, &rflow); enqueue_to_backlog(skb, cpu, &rflow->last_qtail); } } static int enqueue_to_backlog(struct sk_buff *skb, int cpu, unsigned int *qtail) { sd = &per_cpu(softnet_data, cpu); __skb_queue_tail(&sd->input_pkt_queue, skb); ____napi_schedule(sd, &sd->backlog); }
backlog 的 poll 函数 process_backlog 负责从 sd->input_pkt_queue 中取出一部分数据,放入 sd->process_queue,然后将 process_queue 中的数据送给协议栈。这里不直接处理 input_pkt_queue 的原因是,避免长时间锁住 sd。在处理 process_queue 中数据的时候,其他 cpu 仍可以将数据放入该 cpu 的 input_pkt_queue。
-
最后,调用 __netif_receive_skb() 将数据送给协议栈处理。
legacy 机制
kernel 在 sd 中实现了一个缺省的 napi_struct : backlog,以兼容不支持 NAPI 机制的网卡驱动。
这些网卡驱动在中断处理函数中将收到的数据转换为 skb,然后通过 enqueue_to_backlog() 将 skb 加入 CPU 的 sd->input_pkt_queue:
|
|
backlog 的 poll 函数为 process_backlog()。process_backlog 在软中断上下文中调用,将数据从 softnet_data 的 input_pkt_queue 合并入 process_queue, 然后将数据从 process_queue 中取出,调用 __netif_receive_skb() 开始处理数据。
文章作者 Griffin
上次更新 2021-03-30