最近在学习 Intel 的 igb kernel driver 的 Rx page reuse 部分,学习结束作一个总结。可能文章的内容会有一些不准确的地方,望指正。
page reuse 就是通过在初始的时候分配(num_rx_desc - 1)个 page,之后 reuse,这样就不用像以前一样一直 alloc skb,可以实现优化。当然,在 reuse fail 的时候,还是需要重新 alloc 的。该部分代码在igb_poll => igb_clean_rx_irq => igb_fetch_rx_buffer
driver 的 tx 和 rx 各自是一个 ring,包含 next_to_use 和 next_to_clean,这部分内容可以参考这篇文章学习一下网卡驱动收发包过程,我后面的画图分析也有参考它。而 igb 里面为了 page reuse 又引入了一个 next_to_alloc。这三个变量需要注意,是核心。如果想要快速了解 page reuse,可以直接从目录看翻转页面和流程图示。
首先看一下 igb_rx_buffer,这个结构比较简单,主要是 dma、page 和 page_offset。这里面比较关键的就是 page_offset,这是用来定位所使用的 buffer 的位置的,后续也用得到。这里有一个CONFIG_IGB_DISABLE_PACKET_SPLIT,这个是 not defined 的,后续也有涉及,需要注意。
struct igb_rx_buffer {
dma_addr_t dma;
#ifdef CONFIG_IGB_DISABLE_PACKET_SPLIT
struct sk_buff *skb;
#else
struct page *page;
u32 page_offset;
#endif
};
这个函数是 rx 的处理函数,即收包过程,只是简单的提一下,毕竟不是本文的重点。
/* igb_clean_rx_irq -- * packet split */
static bool igb_clean_rx_irq(struct igb_q_vector *q_vector, int budget)
{
struct igb_ring *rx_ring = q_vector->rx.ring;
struct sk_buff *skb = rx_ring->skb;
unsigned int total_bytes = 0, total_packets = 0;
//igb_desc_unused return ((ntc > ntu) ? 0 : ring->count) + ntc - ntu - 1;
//这里面的 ring->count 就是 desc num,cleaned_count 就是 hw 没有使用、需要 clean 的 desc 数目
//next_to_use 的 use 是指没有填充过数据包即将使用的 desc,
//next_to_clean 的 clean是指已经填充过数据包,并且已经将数据送往协议栈后,需要处理的 desc
u16 cleaned_count = igb_desc_unused(rx_ring);
do {
union e1000_adv_rx_desc *rx_desc;
/* return some buffers to hardware, one at a time is too slow */
//如果需要 clean 的 desc 超过了阈值,那么先 clean 一下,返还一些 buffer 给 hw
if (cleaned_count >= IGB_RX_BUFFER_WRITE) {
igb_alloc_rx_buffers(rx_ring, cleaned_count);
cleaned_count = 0;
}
//这里给到 next_to_clean
rx_desc = IGB_RX_DESC(rx_ring, rx_ring->next_to_clean);
if (!igb_test_staterr(rx_desc, E1000_RXD_STAT_DD))
break;
/* * This memory barrier is needed to keep us from reading * any other fields out of the rx_desc until we know the * RXD_STAT_DD bit is set */
rmb();
/* retrieve a buffer from the ring */
//把这个 next_to_clean 指向的 desc 给到 igb_fetch_rx_buffer,用于 reuse,也就是 retrieve
skb = igb_fetch_rx_buffer(rx_ring, rx_desc, skb);
/* exit if we failed to retrieve a buffer */
if (!skb)
break;
//每用完一个 desc,要 cleaned_count++。
cleaned_count++;
/* fetch next buffer in frame if non-eop */
//这里更新了 next_to_clean,并且判断是否是 end of packet,如果是的话就 return false,否则 return true。
if (igb_is_non_eop(rx_ring, rx_desc))
continue;
/* verify the packet layout is correct */
if (igb_cleanup_headers(rx_ring, rx_desc, skb)) {
skb = NULL;
continue;
}
/* probably a little skewed due to removing CRC */
total_bytes += skb->len;
/* populate checksum, timestamp, VLAN, and protocol */
igb_process_skb_fields(rx_ring, rx_desc, skb);
#ifndef IGB_NO_LRO
if (igb_can_lro(rx_ring, rx_desc, skb))
igb_lro_receive(q_vector, skb);
else
#endif
#ifdef HAVE_VLAN_RX_REGISTER
igb_receive_skb(q_vector, skb);
#else
napi_gro_receive(&q_vector->napi, skb);
#endif
#ifndef NETIF_F_GRO
netdev_ring(rx_ring)->last_rx = jiffies;
#endif
/* reset skb pointer */
skb = NULL;
/* update budget accounting */
total_packets++;
} while (likely(total_packets < budget));
/* place incomplete frames back on ring for completion */
rx_ring->skb = skb;
rx_ring->rx_stats.packets += total_packets;
rx_ring->rx_stats.bytes += total_bytes;
q_vector->rx.total_packets += total_packets;
q_vector->rx.total_bytes += total_bytes;
//这里是在循环结束后,为 desc 分配 buffer
if (cleaned_count)
igb_alloc_rx_buffers(rx_ring, cleaned_count);
#ifndef IGB_NO_LRO
igb_lro_flush_all(q_vector);
#endif /* IGB_NO_LRO */
return (total_packets < budget);
}
#endif /* CONFIG_IGB_DISABLE_PACKET_SPLIT */
/** * igb_is_non_eop - process handling of non-EOP buffers * @rx_ring: Rx ring being processed * @rx_desc: Rx descriptor for current buffer * * This function updates next to clean. If the buffer is an EOP buffer * this function exits returning false, otherwise it will place the * sk_buff in the next buffer to be chained and return true indicating * that this is in fact a non-EOP buffer. * 这个函数更新 next to clean。如果这个 buffer 是EOP(end of packet)buffer 那么函数退出并且 return false。 * 否则,它将把 sk_buff 放在下一个要链接的缓冲区中并返回 true,表明这实际上是一个非 EOP 缓冲区。 **/
static bool igb_is_non_eop(struct igb_ring *rx_ring,
union e1000_adv_rx_desc *rx_desc)
{
u32 ntc = rx_ring->next_to_clean + 1;
/* fetch, update, and store next to clean */
ntc = (ntc < rx_ring->count) ? ntc : 0;
rx_ring->next_to_clean = ntc;
prefetch(IGB_RX_DESC(rx_ring, ntc));
if (likely(igb_test_staterr(rx_desc, E1000_RXD_STAT_EOP)))
return false;
return true;
}
/** * igb_alloc_rx_buffers - Replace used receive buffers; packet split * @rx_ring: rx descriptor ring * @cleaned_count: number of buffers to clean **/
void igb_alloc_rx_buffers(struct igb_ring *rx_ring, u16 cleaned_count)
{
union e1000_adv_rx_desc *rx_desc;
struct igb_rx_buffer *bi;
u16 i = rx_ring->next_to_use;//既然是分配 buffer,那么得从 next_to_use 开始
/* nothing to do */
if (!cleaned_count)
return;
rx_desc = IGB_RX_DESC(rx_ring, i);
bi = &rx_ring->rx_buffer_info[i];
i -= rx_ring->count;//先减掉 num_rx_desc
do {
#ifdef CONFIG_IGB_DISABLE_PACKET_SPLIT
if (!igb_alloc_mapped_skb(rx_ring, bi))
#else
//分配 page
if (!igb_alloc_mapped_page(rx_ring, bi))
#endif /* CONFIG_IGB_DISABLE_PACKET_SPLIT */
break;
/* * Refresh the desc even if buffer_addrs didn't change * because each write-back erases this info. */
#ifdef CONFIG_IGB_DISABLE_PACKET_SPLIT
rx_desc->read.pkt_addr = cpu_to_le64(bi->dma);
#else
rx_desc->read.pkt_addr = cpu_to_le64(bi->dma + bi->page_offset);
#endif
rx_desc++;
bi++;
i++;
//这里的目的就是为了使得 i 的值在 0 ~ 255 的范围内
if (unlikely(!i)) {
rx_desc = IGB_RX_DESC(rx_ring, 0);
bi = rx_ring->rx_buffer_info;
i -= rx_ring->count;
}
/* clear the hdr_addr for the next_to_use descriptor */
rx_desc->read.hdr_addr = 0;
cleaned_count--;
} while (cleaned_count);
i += rx_ring->count;//这里再加回 num_rx_desc
if (rx_ring->next_to_use != i) {
/* record the next descriptor to use */
rx_ring->next_to_use = i;//更新 next_to_use
//这里是 ifndef,难怪理了半天不对 = =
#ifndef CONFIG_IGB_DISABLE_PACKET_SPLIT
/* update next to alloc since we have filled the ring */
//只要调用了alloc rx buffer,那么next_to_use = next_to_alloc
rx_ring->next_to_alloc = i;
#endif
/* * Force memory writes to complete before letting h/w * know there are new descriptors to fetch. (Only * applicable for weak-ordered memory model archs, * such as IA-64). */
wmb();
writel(i, rx_ring->tail);
}
}
static bool igb_alloc_mapped_page(struct igb_ring *rx_ring,
struct igb_rx_buffer *bi)
{
struct page *page = bi->page;
dma_addr_t dma;
/* since we are recycling buffers we should seldom need to alloc */
//如果 reuse 成功,那么就直接跳出了
if (likely(page))
return true;
/* alloc new page for storage */
page = alloc_page(GFP_ATOMIC | __GFP_COLD);
if (unlikely(!page)) {
rx_ring->rx_stats.alloc_failed++;
return false;
}
/* map page for use */
dma = dma_map_page(rx_ring->dev, page, 0, PAGE_SIZE, DMA_FROM_DEVICE);
/* * if mapping failed free memory back to system since * there isn't much point in holding memory we can't use */
if (dma_mapping_error(rx_ring->dev, dma)) {
__free_page(page);
rx_ring->rx_stats.alloc_failed++;
return false;
}
bi->dma = dma;
bi->page = page;
//初始的 offset 为 0
bi->page_offset = 0;
return true;
}
接下来就看 igb_fetch_rx_buffer 了,code 部分其实 Intel 的注释挺全面的。
static struct sk_buff *igb_fetch_rx_buffer(struct igb_ring *rx_ring,
union e1000_adv_rx_desc *rx_desc,
struct sk_buff *skb)
{
struct igb_rx_buffer *rx_buffer;
struct page *page;
//因为要回收 rx_buffer,rx_buffer 指向 next_to_clean
rx_buffer = &rx_ring->rx_buffer_info[rx_ring->next_to_clean];
page = rx_buffer->page;
prefetchw(page);//__builtin_prefetch() 是 gcc 的一个内置函数。它通过对数据手工预取的方法,减少了读取延迟,从而提高了性能,但该函数也需要 CPU 的支持。
//看看这个 ring 的 skb 是否为空,如果为空,那么重新分配一个 skb 给它
if (likely(!skb)) {
void *page_addr = page_address(page) +
rx_buffer->page_offset;
/* prefetch first cache line of first page */
prefetch(page_addr);
#if L1_CACHE_BYTES < 128
prefetch(page_addr + L1_CACHE_BYTES);
#endif
/* allocate a skb to store the frags */
skb = netdev_alloc_skb_ip_align(rx_ring->netdev,
IGB_RX_HDR_LEN);
if (unlikely(!skb)) {
rx_ring->rx_stats.alloc_failed++;
return NULL;
}
/* * we will be copying header into skb->data in * pskb_may_pull so it is in our interest to prefetch * it now to avoid a possible cache miss */
prefetchw(skb->data);
}
/* we are reusing so sync this buffer for CPU use */
dma_sync_single_range_for_cpu(rx_ring->dev,
rx_buffer->dma,
rx_buffer->page_offset,
IGB_RX_BUFSZ,
DMA_FROM_DEVICE);
/* pull page into skb */
if (igb_add_rx_frag(rx_ring, rx_buffer, rx_desc, skb)) {
/* hand second half of page back to the ring */
igb_reuse_rx_page(rx_ring, rx_buffer);
} else {
/* we are not reusing the buffer so unmap it */
dma_unmap_page(rx_ring->dev, rx_buffer->dma,
PAGE_SIZE, DMA_FROM_DEVICE);
}
/* clear contents of rx_buffer */
//因为数据已经复制到 skb 里了,所以这里把 page 给清空了
rx_buffer->page = NULL;
return skb;
}
接下来看 igb_add_rx_frag。
/** * igb_add_rx_frag - Add contents of Rx buffer to sk_buff * @rx_ring: rx descriptor ring to transact packets on * @rx_buffer: buffer containing page to add * @rx_desc: descriptor containing length of buffer written by hardware * @skb: sk_buff to place the data into * * This function will add the data contained in rx_buffer->page to the skb. * This is done either through a direct copy if the data in the buffer is * less than the skb header size, otherwise it will just attach the page as * a frag to the skb. * 此函数会将 rx_buffer->page 中包含的数据添加到 skb。 * 如果缓冲区中的数据小于 skb 标头大小,则可以通过直接复制来完成此操作,否则它将仅将页面作为碎片附加到 skb。 * * The function will then update the page offset if necessary and return * true if the buffer can be reused by the adapter. * 如果需要,该函数将更新页面偏移量。如果 adapter 可以重用 buffer,则返回 true。 **/
static bool igb_add_rx_frag(struct igb_ring *rx_ring,
struct igb_rx_buffer *rx_buffer,
union e1000_adv_rx_desc *rx_desc,
struct sk_buff *skb)
{
struct page *page = rx_buffer->page;
unsigned char *va = page_address(page) + rx_buffer->page_offset;//virtual address
unsigned int size = le16_to_cpu(rx_desc->wb.upper.length);
#if (PAGE_SIZE < 8192)//我所使用的 kernel 中 page size 是 4096
unsigned int truesize = IGB_RX_BUFSZ;
#else
unsigned int truesize = SKB_DATA_ALIGN(size);
#endif
unsigned int pull_len;
if (unlikely(skb_is_nonlinear(skb)))
goto add_tail_frag;
#ifdef HAVE_PTP_1588_CLOCK
if (unlikely(igb_test_staterr(rx_desc, E1000_RXDADV_STAT_TSIP))) {
igb_ptp_rx_pktstamp(rx_ring->q_vector, va, skb);
va += IGB_TS_HDR_LEN;
size -= IGB_TS_HDR_LEN;
}
#endif /* HAVE_PTP_1588_CLOCK */
if (likely(size <= IGB_RX_HDR_LEN)) {
memcpy(__skb_put(skb, size), va, ALIGN(size, sizeof(long)));
/* we can reuse buffer as-is, just make sure it is local */
if (likely(page_to_nid(page) == numa_node_id()))
return true;
/* this page cannot be reused so discard it */
put_page(page);
return false;
}
/* we need the header to contain the greater of either ETH_HLEN or * 60 bytes if the skb->len is less than 60 for skb_pad. */
pull_len = eth_get_headlen(skb->dev, va, IGB_RX_HDR_LEN);
/* align pull length to size of long to optimize memcpy performance */
memcpy(__skb_put(skb, pull_len), va, ALIGN(pull_len, sizeof(long)));
/* update all of the pointers */
va += pull_len;
size -= pull_len;
add_tail_frag:
skb_add_rx_frag(skb, skb_shinfo(skb)->nr_frags, page,
(unsigned long)va & ~PAGE_MASK, size, truesize);
return igb_can_reuse_rx_page(rx_buffer, page, truesize);
}
igb_can_reuse_rx_page 判断是否可以 reuse 的,以及实现翻转。
static bool igb_can_reuse_rx_page(struct igb_rx_buffer *rx_buffer,
struct page *page,
unsigned int truesize)
{
/* avoid re-using remote pages */
if (unlikely(page_to_nid(page) != numa_node_id()))
return false;
#if (PAGE_SIZE < 8192)
/* if we are only owner of page we can reuse it */
//page 刚分配的时候 page count 是 1,如果既被 cpu 使用,又被 nic 使用,page count 是 2。所以这里判断是否为 1
//只有这个 page 已经不被 nic 使用,才可以 reuse
if (unlikely(page_count(page) != 1))
return false;
/* flip page offset to other buffer */
//将页面偏移量翻转到其他缓冲区,这部分是我一开始比较不明白的一点,后续我会通过画图来解释一下。
//这里的 IGB_RX_BUFSZ = 2048
rx_buffer->page_offset ^= IGB_RX_BUFSZ;
#else
/* move offset up to the next cache line */
rx_buffer->page_offset += truesize;
if (rx_buffer->page_offset > (PAGE_SIZE - IGB_RX_BUFSZ))
return false;
#endif
/* bump ref count on page before it is given to the stack */
//ref count +1 后送给协议栈,协议栈用完了会 -1
get_page(page);
return true;
}
igb_reuse_rx_page 就是 reuse 的函数,这部分也就是把旧的 page 的反面赋给 next_to_alloc,这时候,offset 的值已经改变,即页面已经翻转了,也就是说,成功地将可以使用的 buffer 从 next_to_clean 给到了 next_to_alloc。
/** * igb_reuse_rx_page - page flip buffer and store it back on the ring * @rx_ring: rx descriptor ring to store buffers on * @old_buff: donor buffer to have page reused * * Synchronizes page for reuse by the adapter **/
static void igb_reuse_rx_page(struct igb_ring *rx_ring,
struct igb_rx_buffer *old_buff)
{
struct igb_rx_buffer *new_buff;
u16 nta = rx_ring->next_to_alloc;
new_buff = &rx_ring->rx_buffer_info[nta];
/* update, and store next to alloc */
//nta 的 update 在后
nta++;
rx_ring->next_to_alloc = (nta < rx_ring->count) ? nta : 0;
/* transfer page from old buffer to new buffer */
//这里的 old buffer 是 next_to_clean 对应的 page 反转后对应的 buffer
*new_buff = *old_buff;
/* sync the buffer for use by the device */
dma_sync_single_range_for_device(rx_ring->dev, old_buff->dma,
old_buff->page_offset,
IGB_RX_BUFSZ,
DMA_FROM_DEVICE);
}
page reuse 最核心的部分就是翻转页面(即上面提到的 flip page offset to other buffer)。这是什么意思呢?PAGE_SIZE 一般为 4096,page->offset 的初始值为 0。下面这句代码执行异或运算,而IGB_RX_BUFSZ 的值为 2048。那么 page->offset 的值就在 0 和 2048 之间来回变换,即有两个缓存区,0 ~ 2047、2048 ~ 4095。
/* flip page offset to other buffer */
rx_buffer->page_offset ^= IGB_RX_BUFSZ;
如下图所示,图中阴影部分表示缓存区 0 ~ 2047,空白部分为 2048 ~ 4095。这样的转换是一种 ping-pong 机制,可以称之为 ping-pong page。在本文中我称这种机制为 page 的正反面,当然 page 是没有正反面的,只是一种帮助理解的叫法而已。
在此基础上,igb_can_reuse_rx_page 中判断是否可以 reuse page,如果不可以 reuse 那么就重新alloc。这里面的一种情况是,page 的一面仍被使用,就不可以 reuse page,那么就需要重新 alloc 一个 page 给 next_to_alloc。
igb_configure => igb_alloc_rx_buffers。
/* call igb_desc_unused which always leaves * at least 1 descriptor unused to make sure * next_to_use != next_to_clean */
//这里要保证始终有一个 desc 是未曾被使用的,就是下图的 255
for (i = 0; i < adapter->num_rx_queues; i++) {
struct igb_ring *ring = adapter->rx_ring[i];
igb_alloc_rx_buffers(ring, igb_desc_unused(ring));
}
这里为 num_rx_desc 个 desc 分配了 page,code 执行完毕后,next_to_alloc = next_to_use = num_rx_desc -1,next_to_clean = 0。
这里为了帮助理解,让我们从头开始,第一次收到一个包,且只用到一个 desc,然后开始处理。
(1) cleaned_count = 0,那么此处不用 alloc rx buffer。
(2)igb_fetch_rx_buffer,经过这里,next_to_clean 的 page 翻转后给到了 next_to_alloc,next_to_alloc = 0。这里是把 desc 0 的 page 的反面给 desc 255。注意,这里 desc 0 的 page 已经被清空了。
(3)cleaned_count = 1。
(4)igb_is_non_eop,next_to_clean = 1。
(5)cleaned_count = 1,alloc buffer,因为 desc 255 已经被 next_to_clean 赋予了 desc 0 的 page,因此不需要 alloc page,next_to_alloc = next_to_use = 0。
多个 desc 的情况和一个 desc 是类似的,仍然是一个包。
(1) cleaned_count = 0,那么此处不用 alloc rx buffer。
(2)igb_fetch_rx_buffer,经过这里,next_to_clean 的 page 翻转后给到了 next_to_alloc,next_to_alloc = 0。这里是把 desc 0 的 page 的反面给 desc 255。注意,这里 desc 0 的 page 已经被清空了。
(3)cleaned_count = 1。
(4)igb_is_non_eop,next_to_clean = 1。因为不是 end of packet,所以 continue。
(5)重复三次:第一次重复,把 desc 1 的 page 反面给到 desc 0,next_to_alloc = 1,cleaned_count = 2,next_to_clean = 2;第二次重复,把 desc 2page 的反面给到 desc 1,next_to_alloc = 2,cleaned_count = 3,next_to_clean = 3;第三次重复,把 desc 3page 的反面给到 desc 2,next_to_alloc = 3,cleaned_count = 4,next_to_clean = 4。
(6)cleaned_count = 4,rx alloc buffer,但是因为 desc 255、0、1、2,都是有对应的 page 的,所以不需要 alloc,next_to_use = 3。
这里为了帮助理解,让我们从头开始,第一次收到一个包,且只用到一个 desc,然后开始处理。
(1)cleaned_count = 0,那么此处不用 alloc rx buffer。
(2)igb_fetch_rx_buffer,这里 dma_unmap_page,desc 0 page != NULL。
(3)cleaned_count =1。
(4)igb_is_non_eop,next_to_clean = 1。
(5)cleaned_count =1,alloc rx buffer,igb_alloc_mapped_page 这里因为 desc 255 的 page 是没有分配的,所以分配一个 page,next_to_alloc = next_to_use = 0。
然后是多个 desc,一个 packet 的情况。假设是四个 desc,且 desc 2 所对应的 page 无法被 reuse。
(1)cleaned_count = 0,那么此处不用alloc rx buffer。
(2)igb_fetch_rx_buffer,经过这里,next_to_clean 的 page 翻转后给到了 next_to_alloc,next_to_alloc = 0。这里是把 desc 0 的 page 的反面给 desc 255。注意,这里 desc 0 的 page 已经被清空了。
(3)cleaned_count = 1。
(4)igb_is_non_eop,next_to_clean = 1。因为不是 end of packet,所以 continue。
(5)igb_fetch_rx_buffer,经过这里,next_to_clean 的 page 翻转后给到了 next_to_alloc,next_to_alloc = 1。这里是把 desc 1 的 page 的反面给 desc 0。注意,这里 desc 1 的 page 已经被清空了。
(6)cleaned_count = 2。
(7)igb_is_non_eop,next_to_clean = 2。因为不是 end of packet,所以 continue。
(8)igb_fetch_rx_buffer,desc 2 的 page 是无法 reuse 的,因此 dma_unmap_page,desc 2 page != NULL。
(9)cleaned_count = 3。
(10)igb_is_non_eop,next_to_clean = 3。因为不是 end of packet,所以 continue。
(11)igb_fetch_rx_buffer,经过这里,next_to_clean 的 page 翻转后给到了 next_to_alloc,next_to_alloc = 2。这里是把 desc 3 的 pag e的反面给了 desc 1。注意,这里 desc 3 的 page 已经被清空了。
(12)cleaned_count = 4。
(13)igb_is_non_eop,next_to_clean = 4。是 end of packet。
(14)cleaned_count = 4,alloc rx buffer,对于 desc 255、0、1、2,除了 desc 2 是空外,另外三个都是有 page 的,所以只需要 alloc 一个 page 给 desc 2。后续 next_to_alloc = next_to_clean = 3。
至此,page reuse 已经讲完了,其实一开始我只打算介绍 igb_fetch_rx_buffer,但是总感觉有些难受,所以后续又补充了 Rx 的其他一些分析,希望大家能够满意。
如果觉得这篇文章有用的话,可以点赞、评论或者收藏,万分感谢,goodbye~