linux 通过软中断机制调用网络协议栈代码,处理数据。 在 net_dev 模块初始化时,注册网络收发数据的软中断处理函数:

1
2
3
4
5
static int __init net_dev_init(void)
{
	open_softirq(NET_TX_SOFTIRQ, net_tx_action);
	open_softirq(NET_RX_SOFTIRQ, net_rx_action);
}

kernel 为每个 cpu 创建一个本地的数据结构: softnet_data,在代码中简写为 sd。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
DEFINE_PER_CPU_ALIGNED(struct softnet_data, softnet_data);
EXPORT_PER_CPU_SYMBOL(softnet_data);

struct softnet_data {
	// 当前 CPU 需要被处理的 napi 链表
	struct list_head	poll_list;
	struct sk_buff_head	process_queue;

	/* 软中断的接收队列
	   网卡收到的数据放入这个队列
	   软中断 NET_RX_SOFTIRQ 处理这个队列中的数据
	*/
	struct sk_buff_head	input_pkt_queue;
	struct napi_struct	backlog;
};

NAPI 机制

  1. 所有希望收发收据的网卡驱动在初始化时创建一个 napi_struct 结构:

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
    /*
      创建 napi_struct 结构 napi
      加入 dev->napi_list
      设置 napi->poll 处理函数
    */
    void netif_napi_add(struct net_device *dev, struct napi_struct *napi,
                int (*poll)(struct napi_struct *, int), int weight)
    {
        // 设置 napi 的 gro 哈希表
        init_gro_hash(napi);
        // 设置 poll 函数指针
        napi->poll = poll;
        // 将 napi 加入 dev->napi_list
        list_add(&napi->dev_list, &dev->napi_list);
        /*
          将 napi 加入全局哈希表 napi_hash
          可以通过 napi_by_id() 得到 napi
        */
        napi_hash_add(napi);
    }
    
  2. 在驱动的中断函数中,检查有无新数据收到,如果有,调用 __napi_schedule()。

    1
    2
    3
    4
    5
    6
    
    static irqreturn_t e1000_intr_msi(int __always_unused irq, void *data)
    {
        if (napi_schedule_prep(&adapter->napi)) {
            __napi_schedule(&adapter->napi);
        }
    }
    
  3. 通过 __napi_schedule() 将 napi_struct 加入到当前 CPU 的 sd->poll_list 中,然后设置 NET_RX_SOFTIRQ 标识位,这个软中断会在中断发生的 cpu 上进行。

    1
    2
    3
    4
    5
    6
    
    static inline void ____napi_schedule(struct softnet_data *sd,
                         struct napi_struct *napi)
    {
        list_add_tail(&napi->poll_list, &sd->poll_list);
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);
    }
    

    为了保护 sd->poll_list 不被多个中断处理函数同时访问,这个函数需要关闭中断。

  4. NET_RX_SOFTIRQ 对应的软中断处理函数为 net_rx_action

    1
    
    open_softirq(NET_RX_SOFTIRQ, net_rx_action);
    

    当中断处理函数退出,会检查是否有软中断需要处理并执行。

  5. 在 NET_RX_SOFTIRQ 的处理函数中,会遍历sd->poll_list,取得所有 napi_struct,调用它们的 poll() 函数,poll 函数是通过 netif_napi_add() 注册到 napi 中的。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    
    static __latent_entropy void net_rx_action(struct softirq_action *h)
    {
        LIST_HEAD(list);
        /* 取得当前 cpu 的 sd */
        struct softnet_data *sd = this_cpu_ptr(&softnet_data);
        // 将 sd->poll_list 并入本地链表 list
        list_splice_init(&sd->poll_list, &list);
    
        for (;;) {
            struct napi_struct *n;
            /*
              遍历 poll_list
              得到所有被调度的 napi_struct 并调用 napi_poll
            */
              n = list_first_entry(&list, struct napi_struct, poll_list);
              budget -= napi_poll(n, &repoll);
        }
    }
    
    static int napi_poll(struct napi_struct *n, struct list_head *repoll)
    {
        if (test_bit(NAPI_STATE_SCHED, &n->state)) {
            /* 调用 napi 的 poll 函数 */
            work = n->poll(n, weight);
            trace_napi_poll(n, work, weight);
        }
    }
    
  6. 各个驱动的 poll() 函数实现各不相同,但一般行为都是从 DMA 内存中取出数据,生成 skb,从硬件取出 rss 值设置给 skb->hash ,最后调用 napi_gro_receive(),进行协议栈前的 gro 处理。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    
    static bool e1000_clean_rx_irq(struct e1000_ring *rx_ring, int *work_done,
                       int work_to_do)
    {
        while (staterr & E1000_RXD_STAT_DD) {
            struct sk_buff *skb =  buffer_info->skb;
            e1000_rx_hash(netdev, rx_desc->wb.lower.hi_dword.rss, skb);
    
            e1000_receive_skb(adapter, netdev, skb, staterr,
                      rx_desc->wb.upper.vlan);
        }
    }
    
    static void e1000_receive_skb(struct e1000_adapter *adapter,
                      struct net_device *netdev, struct sk_buff *skb,
                      u32 staterr, __le16 vlan)
    {
        skb->protocol = eth_type_trans(skb, netdev);
        napi_gro_receive(&adapter->napi, skb);
    }
    
  7. napi_gro_receive() 对 skbuf 进行 合并处理,根据处理的结果,决定 skbuf 是否继续送给上次协议栈,或是直接丢弃(被 merge 到其他的 skbuf 中了)

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
    {
        ret = napi_skb_finish(napi, skb, dev_gro_receive(napi, skb));
    }
    
    static gro_result_t napi_skb_finish(struct napi_struct *napi,
                        struct sk_buff *skb,
                        gro_result_t ret)
    {
        switch (ret) {
        case GRO_NORMAL:
            gro_normal_one(napi, skb);
            break;
    
        case GRO_DROP:
            kfree_skb(skb);
            break;
    
        case GRO_MERGED_FREE:
                __kfree_skb(skb);
            break;
        }
    }
    
  8. 仍然需要协议栈处理的数据,通过 gro_normal_one(),被放入 napi 的接收队列。为了效率,接收队列中的数据,希望尽量批量处理。

    1
    2
    3
    4
    5
    6
    7
    
    static void gro_normal_one(struct napi_struct *napi, struct sk_buff *skb)
    {
        list_add_tail(&skb->list, &napi->rx_list);
        if (++napi->rx_count >= gro_normal_batch)
            gro_normal_list(napi);
    }
    
    
  9. gro_normal_list 将数据从给协议栈,然后清空 napi 的接收队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    
    static void gro_normal_list(struct napi_struct *napi)
    {
        if (!napi->rx_count)
            return;
        netif_receive_skb_list_internal(&napi->rx_list);
        INIT_LIST_HEAD(&napi->rx_list);
        napi->rx_count = 0;
    }
    
    
  10. netif_receive_skb_list_internal() 会进行 rps 检查,如果使能了 rps,那么会为 skb 计算其他的 cpu,然后将 skb 放入那个 cpu 的 sd->backlog 中。sd->backlog 的类型也是 napi,因此在将数据放入 backlog 后,调用 schedule_napi,以 trigger 另一个 cpu 在自己的 软中断中处理从当前 cpu 转移过去的数据。

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    
    static void netif_receive_skb_list_internal(struct list_head *head)
    {
      list_for_each_entry_safe(skb, next, head, list) {
        struct rps_dev_flow voidflow, *rflow = &voidflow;
        int cpu = get_rps_cpu(skb->dev, skb, &rflow);
        enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
      }
    }
    
    static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
                      unsigned int *qtail)
    {
      sd = &per_cpu(softnet_data, cpu);
      __skb_queue_tail(&sd->input_pkt_queue, skb);
      ____napi_schedule(sd, &sd->backlog);
    }
    

    backlog 的 poll 函数 process_backlog 负责从 sd->input_pkt_queue 中取出一部分数据,放入 sd->process_queue,然后将 process_queue 中的数据送给协议栈。这里不直接处理 input_pkt_queue 的原因是,避免长时间锁住 sd。在处理 process_queue 中数据的时候,其他 cpu 仍可以将数据放入该 cpu 的 input_pkt_queue。

  11. 最后,调用 __netif_receive_skb() 将数据送给协议栈处理。

legacy 机制

kernel 在 sd 中实现了一个缺省的 napi_struct : backlog,以兼容不支持 NAPI 机制的网卡驱动。

这些网卡驱动在中断处理函数中将收到的数据转换为 skb,然后通过 enqueue_to_backlog() 将 skb 加入 CPU 的 sd->input_pkt_queue:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
			      unsigned int *qtail)
{
	struct softnet_data *sd;
	unsigned long flags;
	unsigned int qlen;

	/* 获得 cpu 的 sd */
	sd = &per_cpu(softnet_data, cpu);

	/* 计算 sd->input_pkt_queue 长度 */
	qlen = skb_queue_len(&sd->input_pkt_queue);
	/* 检查队列容量以及发送速度限制 */
	if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
		/* 将 skb 插入 sd->input_pkt_queue */
		if (qlen) {
enqueue:
			/* 将 sbk 放入 backlog 队列 */
			__skb_queue_tail(&sd->input_pkt_queue, skb);
			input_queue_tail_incr_save(sd, qtail);
			rps_unlock(sd);
			local_irq_restore(flags);
			return NET_RX_SUCCESS;
		}

		/* 调度 sd->backlog,在软中断中处理数据 */
		if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
			if (!rps_ipi_queued(sd))
				____napi_schedule(sd, &sd->backlog);
		}
		goto enqueue;
	}
}

backlog 的 poll 函数为 process_backlog()。process_backlog 在软中断上下文中调用,将数据从 softnet_data 的 input_pkt_queue 合并入 process_queue, 然后将数据从 process_queue 中取出,调用 __netif_receive_skb() 开始处理数据。