xsk all in one

  • xdp xsk 介绍, 及一些使用 libxdp 和 xsk 从网卡收发包的例程.

  • 资料来源:

    • <>
  • 更新

    1
    2025.01.12 初始

导语

这一篇是使用 xsk 的个人总结, 希望可以写完整. 原本计划 24 年年末发出的, 得了又回炉了几次….

xdp xsk 介绍, 及一些使用 libxdp 和 xsk 从网卡收发包的例程.

简述

  • xdp (eXpress Data Path) 快速数据路径: 内核协议栈之前,直接处理 L2 的包;
  • xsk (XDP Socket): af_xdp 类型的 socket, 与 xdp 程序交互直接收发 L2 pkt.

资料

官方文档: 最权威参考

  • ebpf 文档 AF_XDP
  • kernel 文档 AF_XDP

案例 & 教程, issues 信息也相当受用.

  • xdp-tutorial
    • AF_XDP: 完整的 xsk 示例
  • bpf-examples
    • AF_XDP-example: 基准的接收 发送 多个 xsk 共享一个 队列.
    • AF_XDP-forwarding: 多线程,在不同 xsk 之间 forward pkt, 共享全局一个 umem.

一些不错的参考

  • AF_XDP技术详解: 讲解细致, 注意: 其使用的是 libbpf 而不是 libxdp, libxdp 会节省非常多代码.
  • A story about AF_XDP, network namespaces and a cookie (cloudflare.com): cloudflare 就不多说了吧, 博文质量甚高.
  • 一个调试故事:AF_XDP 中的受损数据包;是内核错误还是用户错误?: 同样是 cloudflare 的博文
  • 使用AF_XDP Socket更高效的网络传输

关键词

ebpf: extened Berkeley Packet Filter, 不修改内核情况下,能够运行在内核的程序.

xdp: eXpress Data Path 快速数据路径, 内核旁路 (Kernel bypass) 技术, pkt 在进入内核前,直接处理 L2 的包.

xsk: af_xdp 类型的 socket.

umem: 进程申请的一块连续内存,分为大小相等的帧, 用户态程序和 kernel 共享.

快速开始

Xdp

xpd: eXpress Data Path 快速数据路径, 内核旁路 (Kernel bypass) 技术, pkt 在进入内核前,直接处理 L2 的包;

图描述了 xdp 在整个处理流程中的位置

图中描述的 XDP 动作:

  • XDP_PASS: 正常进入内核协议栈
  • XDP_DROP: 直接丢弃 [1]
  • XDP_ABORTED: indicate an error (the packet is dropped), trigger xdp:xdp_exception tracepoint
  • XDP_TX: emit the packet back out the interface on which it was received
  • XDP_REDIRECT: 另一个网卡 NIC 或 AF_XDP socket
    • af_xdp 的 socket 是本文重点 ^r51xgb

与 dpdk 的对比

  • 个人愚见, xpd 优势在于与 kernel 的紧密结合,而不是完全抛开 kernel 另起炉灶.

xdp 运行的 3 种模式

特性Native modeOffloaded modeGeneric Mode
kernel 标志位XDP_FLAGS_DRV_MODEXDP_FLAGS_HW_MODEXDP_FLAGS_SKB_MODE
libxdp 标志位XDP_MODE_NATIVEXDP_MODE_HWXDP_MODE_SKB
运行位置内核空间的网络驱动程序层网卡硬件网络栈的更高层次
处理路径驱动程序内处理,路径短网卡内处理,路径极短网络栈的早期阶段,但在硬件驱动之后.
硬件依赖依赖于特定的网络驱动程序依赖于支持 XDP Offload 的智能网卡不依赖特定硬件,几乎所有网卡都支持
性能性能极高性能最高性能较低, 与 native 性能差距 10 倍
延迟非常低最低较高
开发复杂度需要内核和驱动程序知识,较复杂需要硬件编程知识和工具,复杂开发和调试相对简单
适用范围需要驱动支持,适用范围有限需要特定硬件,适用于高流量和低延迟场景广泛兼容,适用于几乎所有网卡
成本受限于驱动支持,无额外硬件成本需要昂贵的智能网卡,硬件成本较高无额外硬件成本
  • libxdp 还有一种: XDP_MODE_UNSPEC, 执行 xdp_program__attach 时内核决定以那种方式运行 xdp: 依次是 hw naive skb.

Xsk

xsk: af_xdp 类型的 socket, 可以接受 XDP_REDIRECT 来的包.

  • 关键词: fq cq umem tx rx

这里偷 cloudflare 两张图

已经又不少质量很高的资料介绍 xsk 工作原理, 不做赘述.

  • ebpf 文档: How it works
  • AF_XDP技术详解
  • a-story-about-af-xdp-network-namespaces-and-a-cookie
  • 使用AF_XDP Socket更高效的网络传输

理解 ring 工作

  • 解读eBPF XDP性能之路:它如何向着DPDK看齐?它在实际中又能带来多大效益?

限制

xsk 有非常多的组合方式: 多个 xsk & 网卡多队列 & 多个网卡 & 多个 UMEM??

  • 排列组合, 能够衍生出 M*M*M*M 种问题.
  • 落实到实际如何组合? 这样组合的限制又是什么?

踩 2 月坑, 几乎全部坑都打卡以后, 直接上结论: 只有 2 条硬性限制:

  • 一个 netdev,queue_id 对 只能绑定 一组 fq cq; 1:1 的关系仅存在于此!!!, 其皆无限制.
    • 此限制有何表现? 在一个 队列上绑定第二个 fq 时候直接报错.
  • XDP_REDIRECT 不能将 pkt 重定向到同一个 netdev 的其他 queue;
    • pkt 可以被重定向到其他 netdev 但是无法由 xdp 决定是那个 queue.

来几个例子看看就明白了.

例子

几个实际的栗子, 每个线程对应一个 xsk.

  • 流程图由 gpt 绘制, 细节不佳,但大意没错.

充分利用多核从网卡中收包, 没有转发需求,只.收包

  • 启用网卡多队列, 一个 xsk 绑定一个队列, 每个队列有独立的 umem.
  • 这样 每个 fq-cq 只对应一个 xsk, 不需要上锁.

还是多核收包, 但是有将 pkt 从再次通过其他 xsk 发送的需求.

  • 启用网卡多队列, 一个 xsk 绑定一个队列, 但是全局共享一个 umem, xsk 创建时候传入不同的 fq-cq.
  • 这样是 一个 umem 对应多个 fq-cq, 但是每个 fq-cq 只对应一个 xsk, 因此还是不需要上锁.
  • 全局共享一个 umem, 因此可以 pkt 可以不拷贝直接通过其他 xsk 发送

网卡的 rss 配置不能满足分流, 需要精确的控制那个 pkt 分流的那个线程.

  • 限制 网卡只有一个 queue, 多个 xsk 绑定到同一个队列. xdp 程序内计算 hash 分流,重定向到不通的 xsk 中.
  • 这样是 一个 umem 和 fq cq. 但是多个 xsk 绑定到同一个 fq cq, 因此 需要处理同步问题, 对 fq 加锁.

还是上一个场景, 如果网卡单 queue 成了瓶颈, 还有另外一种配置方式: 将分流移动到用户程序, xdp 只管收包.

  • 启用网卡多队列, 一个 xsk 绑定一个队列, 但是全局共享一个 umem, xsk 创建时候传入不同的 fq-cq.
  • xsk 收到包后计算 hash, 将索引传入对应线程的 ring 中, 实际处理时 线程从 ring 中读取 pkt 信息.
  • 这样有些类似 dpdk, 每个 fq-cq 只对应一个 xsk, 因此还是不需要上锁.

究极缝合怪: 充分利用多核,同时也希望完全控制分流, 还需要能转发 pkt. ^qczgu

  • 基本和上一个例子配置类似, 只不过这里是 umem 在两个 nic 间共享.

使用

代码大多来自 xdp-tutorialbpf-examples

  • libxdp 为我们省略了非常繁多的细节

创建 & 配置 Umem

首先需要为 umem 申请一段固定大小 chunk 的内存, 可以通过 malloc/mmap/hugepages 申请, 下面是 posix_memalign 和 mmap.

  • posix_memalign 申请到的就是对齐的内存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// https://github.com/xdp-project/xdp-tutorial/blob/7a98d00d99d79d411d77472f916479c229683461/advanced03-AF_XDP/af_xdp_user.c#L594
/* Allocate memory for NUM_FRAMES of the default XDP frame size */
packet_buffer_size = NUM_FRAMES * FRAME_SIZE;
if (posix_memalign(&packet_buffer,
getpagesize(), /* PAGE_SIZE aligned */
packet_buffer_size)) {
fprintf(stderr, "ERROR: Can't allocate buffer memory \"%s\"\n",
strerror(errno));
exit(EXIT_FAILURE);
}

// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L2061C28-L2061C32
/* Reserve memory for the umem. Use hugepages if unaligned chunk mode */
bufs = mmap(NULL, NUM_FRAMES * opt_xsk_frame_size,
PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS | opt_mmap_flags, -1, 0);
if (bufs == MAP_FAILED) {
printf("ERROR: mmap failed\n");
exit(EXIT_FAILURE);
}

我们需要为 xsk_umem_config 提供合适的配置

1
2
3
4
5
6
7
8
9
// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L991
struct xsk_umem_config cfg = {
/* We recommend that you set the fill ring size >= HW RX ring size + AF_XDP RX ring size.*/
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.frame_size = opt_xsk_frame_size,
.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
.flags = 0
};
  • fill_size: fq 环的容量, 这里推荐至少大于 硬件 rw + af_xdp rx 环的容量之和, XSK_RING_PROD__DEFAULT_NUM_DESCS 是 2048.
  • comp_size: cq 环的容量.
  • frame_size: 一个 chunk 的大小 (字节), XSK_UMEM__DEFAULT_FRAME_HEADROOM 是 4096, 默认对齐模式下, 其值需要是 2048 到 系统页面大小之间, 2 的指数幂 .[2]
  • frame_headroom: 每个 chunk 中, 预留 N byte 给应用程序, 从 xsk 收包, 包从 N byte 开始写入, 对于从 xdp 收包后需要再次封装发送的场景非常有用,[4]XSK_UMEM__DEFAULT_FRAME_HEADROOM 默认为 0. ^b2u617

环的大小必须是 2 的指数幂 ,[5] 默认在对齐模式下内核将对齐屏蔽 addr.例如: fill_size 配置为 2048 个, 那么对于 fq 来说推送进入的 addr 是 2050 - 3000 都是被认为是一个 chunk.[6]

  • 非对齐模式需要 申请大页内存 [7]

真正创建 umem:

1
2
3
4
5
umem = calloc(1, sizeof(*umem));
if (!umem)
exit_with_error(errno);
ret = xsk_umem__create(&umem->umem, buffer, size, &umem->fq, &umem->cq,
&cfg);

加载 Xdp 程序

xdp_program__open_file 打开 clang 编译后的 bpf 字节码, xdp_program__attach 附加到具体的 nic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// https://github.com/xdp-project/xdp-tutorial/blob/7a98d00d99d79d411d77472f916479c229683461/advanced03-AF_XDP/af_xdp_user.c#L557
prog = xdp_program__open_file(cfg.filename,NULL, &opts);
}
err = libxdp_get_error(prog);
if (err) {
libxdp_strerror(err, errmsg, sizeof(errmsg));
fprintf(stderr, "ERR: loading program: %s\n", errmsg);
return err;
}

err = xdp_program__attach(prog, cfg.ifindex, cfg.attach_mode, 0);
if (err) {
libxdp_strerror(err, errmsg, sizeof(errmsg));
fprintf(stderr, "Couldn't attach XDP program on iface '%s' : %s (%d)\n",
cfg.ifname, errmsg, err);
return err;
}

xdp_program__attachattach_mode 对应 xdp 的不同的运行模式

  • XDP_MODE_NATIVE: Native mode
  • XDP_MODE_HW: Offloaded mode
  • XDP_MODE_SKB: Generic Mode
  • XDP_MODE_UNSPEC: 让内核决定以那种方式运行 xdp, 依次是 hw naive skb.

创建 Xsk

首先还是需要提供一个有效的 xsk_cfg

1
2
3
4
5
6
7
8
// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-forwarding/xsk_fwd.c#L725
struct xsk_socket_config xsk_cfg = {
.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
.libxdp_flags = XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD,
.xdp_flags = XDP_FLAGS_DRV_MODE,
.bind_flags = XDP_USE_NEED_WAKEUP,
};
  • rx_size tx_size 分别是 rx tx ring 的大小, 一般取 XSK_RING_CONS__DEFAULT_NUM_DESCS 2048 即可.
  • libxdp_flags 是绑定的 xdp 的 flag, 这里设置的 XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD 是 xsk 自动加载 xdp 程序, 需要我们自行加载 xdp 程序.[8]
  • xdp_flags: xdp 模式的 flag, 这里的 XDP_FLAGS_DRV_MODE 对应 xdp 的 naive 模式.
    • 还有 XDP_FLAGS_HW_MODEXDP_FLAGS_SKB_MODE
  • bind_flags: 绑定选项, 这里是 XDP_USE_NEED_WAKEUP ^tsoueu

XDP_USE_NEED_WAKEUP: 默认情况下 驱动程序会主动检查 tx 和 fill ring 看看是否需要进行收发. 这个标志位告诉驱动程序永远不需要主动检查 tx 和 fill ring , 而是应用程序通过 系统调用 触发驱动程序进行收发.

  • 相当于 用户态程序批量写入 or 接收, 发个系统调用, 驱动程序才进行收发. 当然这个系统调用也不是真正收发,只是催一下驱动程序.[9]

真正创建 xsk:

  • xsk_socket__createxsk_socket__create_shared 的包装, 本质还是 xsk_socket__create_shared 的调用.
  • 需要传入 nic name 和 队列号.
  • 纯接收和发送 可以只传入 rx+fq 或 tx+cq.[10]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L1065C2-L1066C22
ret = xsk_socket__create(&xsk->xsk, opt_if, opt_queue, umem->umem,
rxr, txr, &cfg);

// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-forwarding/xsk_fwd.c#L497C1-L506C27
/* xsk socket. */
status = xsk_socket__create_shared(&p->xsk,
params->iface,
params->iface_queue,
params->bp->umem,
&p->rxq,
&p->txq,
&p->umem_fq,
&p->umem_cq,
&params->xsk_cfg);

多个 xsk 绑定同一个队列, 创建 xsk 有一些额外的问题:

假如有两个队列, 共享一个 umem, 每个 队列都绑定 2 个 xsk, 当创建完 xsk1 xsk2 绑定到 q1, 创建 xsk3 绑定到 q2, 此时再次调用 xsk_socket__create_shared 创建 xsk4 绑定到 q2, 始终都会出错.

这是一个奇怪的限制, 但是现在 libxdp 的行为就是如此

https://github.com/xdp-project/bpf-examples/issues/85
When looking at the code, it is slightly weird. If you create socket1 reg a umem and bind it to netdev1,qid1 first, then it is possible to bind further sockets to netdev1/qid1 (all sharing the same fill/comp ring pair). You can also bind a new socket sharing the same umem to netdev2/qid2, but it is not possible binding further sockets to netdev2/qid2.
In principle, it should be possible to bind more sockets to netdev2/qid2 as long as they share the same fill and completion ring as the first socket that was bound to netdev2/qid2. But the code does not support this today.
No particular reason. Just that I never thought of the case and no one, up until now, has asked for it. After I have released the multi-buffer feature for AF_XDP, I will take a look at this and see if what it would take.

绕过 限制也非常 trick: 创建一个新的 umem 但是和第一个完全完美的重合,使用第二个 umem 绑定新的 队列 (已测试有效).

https://github.com/xdp-project/xdp-tutorial/issues/388#issuecomment-1929040074
Mixing the two modes is not supported by bind, but there is a trick you can apply. Create a new umem here, but have it overlap the first umem area perfectly. Then repeat step 2 and 3 but with veth2 and the new umem.

Xsk 写入 xdp_map

如果是 xsk 和 队列一一对应, 只需要调用 xsk_socket__update_xskmap 即可. 其 key 就是队列 id

1
2
3
4
5
6
7
8
9
10
11
12
13
// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L2099
ret = recv_xsks_map_fd(&xsks_map_fd);
if (ret) {
fprintf(stderr, "Error %d receiving xsks_map_fd\n", ret);
exit_with_error(ret);
}
if (xsks[0]->xsk) {
ret = xsk_socket__update_xskmap(xsks[0]->xsk, xsks_map_fd);
if (ret) {
fprintf(stderr, "Update of BPF map failed(%d)\n", ret);
exit_with_error(ret);
}
}

多个 xsk 绑定到同一个队列稍微麻烦一些.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L1907
// lookup_bpf_map bpf_map_update_elem 函数都是自行实现, 具体请参考例程
xsks_map = lookup_bpf_map(xdp_program__fd(xdp_prog));
if (xsks_map < 0) {
fprintf(stderr, "ERROR: no xsks map found: %s\n",
strerror(xsks_map));
exit(EXIT_FAILURE);
}

for (i = 0; i < num_socks; i++) {
int fd = xsk_socket__fd(xsks[i]->xsk);
int ret;

key = i;
ret = bpf_map_update_elem(xsks_map, &key, &fd, 0);
if (ret) {
fprintf(stderr, "ERROR: bpf_map_update_elem %d\n", i);
exit(EXIT_FAILURE);
}
}

卸载 Umem & Xsk & Xdp 程序

xdp_program__detach xsk_socket__deletexsk_umem__delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L633C13-L633C28

static void remove_xdp_program(void)
{
int err;

err = xdp_program__detach(xdp_prog, opt_ifindex, opt_attach_mode, 0);
if (err)
fprintf(stderr, "Could not detach XDP program. Error: %s\n", strerror(-err));
}

static void xdpsock_cleanup(void)
{
struct xsk_umem *umem = xsks[0]->umem->umem;
int i, cmd = CLOSE_CONN;

dump_stats();
for (i = 0; i < num_socks; i++)
xsk_socket__delete(xsks[i]->xsk);
(void)xsk_umem__delete(umem);

if (opt_reduced_cap) {
if (write(sock, &cmd, sizeof(int)) < 0)
exit_with_error(errno);
}

if (load_xdp_prog)
remove_xdp_program();
}

接收

借用 AF_XDP技术详解 的图:

  • xdp 收包,写入 fq 中的 chunk,消费 fq ,将对应 chunk 索引写入 rx (生产).
  • 应用程序从 fq 中消费得到 chunk, 做完处理,再将 chunk 写入到 fq (生产).

因此思路也清楚了:

  • 首先需要给 fq 填充足够的 chunk, 让 kernel 能够有地方收包.
  • 应用层收包, xsk_ring_cons__peek 查看 rx 有没有包, 没有包就 recvfrom 催一下驱动程序. xsk all in one#^tsoueu
  • 确认可以收 rcvd 个包后, xsk_ring_prod__reserve 从 fq 中先预留 rcvd 个位置,chunk 处理完还会去.
  • xsk_ring_cons__rx_desc 可以读到 pkt 相对于 umem 的偏移 addr, 还需要 xsk_umem__add_offset_to_addrxsk_umem__get_data 获取到真正的内存地址.
  • 处理完毕, chunk 通过 xsk_ring_prod__fill_addr 写回 fq.
  • 最后自然是 xsk_ring_prod__submit fq 和 xsk_ring_cons__release rx 了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L1022C13-L1022C35
static void xsk_populate_fill_ring(struct xsk_umem_info *umem)
{
int ret, i;
u32 idx;

ret = xsk_ring_prod__reserve(&umem->fq,
XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, &idx);
if (ret != XSK_RING_PROD__DEFAULT_NUM_DESCS * 2)
exit_with_error(-ret);
for (i = 0; i < XSK_RING_PROD__DEFAULT_NUM_DESCS * 2; i++)
*xsk_ring_prod__fill_addr(&umem->fq, idx++) =
i * opt_xsk_frame_size;
xsk_ring_prod__submit(&umem->fq, XSK_RING_PROD__DEFAULT_NUM_DESCS * 2);
}

// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L1464C1-L1505C35
rcvd = xsk_ring_cons__peek(&xsk->rx, opt_batch_size, &idx_rx);
if (!rcvd) {
if (xsk_ring_prod__needs_wakeup(&xsk->umem->fq)) {
xsk->app_stats.rx_empty_polls++;
recvfrom(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
}
return;
}

ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq);
while (ret != rcvd) {
if (ret < 0)
exit_with_error(-ret);
if (xsk_ring_prod__needs_wakeup(&xsk->umem->fq)) {
recvfrom(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, NULL);
}
ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq);
}

for (i = 0; i < rcvd; i++) {
const struct xdp_desc *desc = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx++);
u64 addr = desc->addr;
u32 len = desc->len;
u64 orig = xsk_umem__extract_addr(addr);
eop_cnt += IS_EOP_DESC(desc->options);

addr = xsk_umem__add_offset_to_addr(addr);
char *pkt = xsk_umem__get_data(xsk->umem->buffer, addr);

*xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = orig;
}

xsk_ring_prod__submit(&xsk->umem->fq, rcvd);
xsk_ring_cons__release(&xsk->rx, rcvd);

发送

还是借用 AF_XDP技术详解 的图:

  • 用户态填充 pkt 到 umem ,写入 tx, sendto 催一下驱动. xsk all in one#^tsoueu
  • 再次从 cq 中消费, 确认已经成功发送.
1
2
3
// https://github.com/xdp-project/bpf-examples/blob/5343ed3377471c7b7ef2237526c8bdc0f00a0cef/AF_XDP-example/xdpsock.c#L1535
static int tx_only(struct xsk_socket_info *xsk, u32 *frame_nb,
int batch_size, unsigned long tx_ns)

发送比接收更加简单, 但是对管理 umem 不利 (没有循环).

一个 trick: 已证实,这样操作是个坑,有概率卡死.

  • 通过指针转换, 为 cq 填充 chunk.
  • 从 chunk 中消费得到 chunk , 写入 pkt 填充 tx, 形成一个闭环.
  • 这样的问题是: 没法确认发送后是否发送成功.

其他情况

  • 遇到 预留 head 的情况, 封装后,只要将发送时的正确写入 addr 即可正确发送.
  • 遇到发包出现丢包, 先加大网卡的发送缓存区 🤣.

UMEM

讲真的,踩坑 xsk 程序, 最麻烦的就是 umem, 因此单独拎出来.

  • 每个环节都会用到,出问题非常不容易调试.
  • 也有自己不看最佳实践自由发挥的锅.

个人建议: 单队列随便玩, 但是多队列 只建议按照 AF_XDP-forwarding 修改, 自行创作的很有可能是坑.

AF_XDP-forwarding: 多线程,在不同 xsk 之间 forward pkt, 共享全局一个 umem.

AF_XDP-forwarding

ASCII 版本 😂 防止没有图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
Global UMEM Pool (bpool)
+----------------------------------------------------------+
| bp->slabs |
| ● ● ● ●…..□ |
| ● ● ● ●…..□ |
| ○ ○ ○ R…..□ ────────────┐ |
| ● ● ● B…..□ exchange |
| bp->n_slabs_available □ ←──────┐ | |
| | | |
| |─────────────────────────────────| |
+----------------------------------------------------------+
↑ |
(1) │ next exchange ─ (3)
bcache_cons_check │
│ │
↓ │
Local Cache (bcache) │
+----------------------------------------------------------+
| |
| ○ ○ ○ ○ --> ● ● ● B ● ● ● R |
| Cons Slab Prod Slab |
| |
+----------------------------------------------------------+
│ ↑
(2) │ │ (2)
bcache_cons bcache_prod
│ │
↓ │
+----------------------------------------------------------+
| Fill Queue Completion Queue |
| ● ● ● B ○ ○ ○ ○ ○ ○ R ● |
| ↓ ↑ |
| Kernel processes |
| ↓ ↑ |
| RX Queue TX Queue |
| ● ● B ○ ○ ○ ● ● R ○ ○ ○ |
| |
+----------------------------------------------------------+
│ ↑
(3) │ │ (1)
xsk_ring_cons__rx_desc xsk_ring_prod__tx_desc
│ │
↓ │
+----------------------------------------------------------+
| User Application |
| B R |
| |
+----------------------------------------------------------+

图例:
● = 已使用frame
○ = 空frame
B = 蓝色标记frame (接收流程)
R = 红色标记frame (发送流程)
□ = slab指针

UMEM 全局缓冲池

UMEM 实现中,通过全局缓冲池 (bpool) 来管理这块内存, 大体原理:

  • 类似 slab, 为每个 xsk 分配 2 个 slab 分别用于 rx 和 tx, 这样每次交换可以只在局部的 slab 发生,无需访问全局缓冲区.
  • 当 slab 为全空/全满 时候再与全局缓存交换, 一次交换一整个 slab; 这样大大减少了交换次数;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 初始化配置
// https://github.com/xdp-project/bpf-examples/blob/8d53e6fc46ae625bd16b38eb1007ece99460eada/AF_XDP-forwarding/xsk_fwd.c#L705C1-L723C3
static const struct bpool_params bpool_params_default = {
.n_buffers = 64 * 1024,
.buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.mmap_flags = 0,
.n_users_max = 16,
.n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2, // 每个 slab 有 2048 * 2 个 frame
};

static const struct xsk_umem_config umem_cfg_default = {
.fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
.comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
.frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
.frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
.flags = 0,
};

// https://github.com/xdp-project/bpf-examples/blob/8d53e6fc46ae625bd16b38eb1007ece99460eada/AF_XDP-forwarding/xsk_fwd.c#L103
struct bpool {
struct bpool_params params;
pthread_mutex_t lock;
void *addr; // UMEM 基地址

u64 **slabs; // 全局 slab 数组
u64 **slabs_reserved; // 预留 slab 数组
u64 *buffers; // buffer 数组
u64 *buffers_reserved; // 预留 buffer 数组

u64 n_slabs_available; // 可用 slab 数量(哨兵位置)

struct xsk_umem *umem; // XDP umem 对象
};

每个 buffer 存储的是一个 u64 的索引,代表相对于 umem 起始地址有多少 frame;

  • 经过测试,最多可以创建的 frame 数量是 512 * 1024 bpool_params_default->n_buffers 的最大值是 512 * 1024

xxx_reserved 仅用于 init 和 free 并不影响实际运行, 因此下面分析中略过相关内容.

按照通常 slab 的设计, 分别用于 rx 和 tx 那应该存在两个池子, 一个是 全满 slabs 一个是 全空 slabs…. 但是这里鸡贼的只用了一个 bp->slabs

  • 运行时候只有一个全局池子 bp->slabs 其是 u64 数组, 每个元素是一个地址, 指向一组 buffer;
  • 引入了哨兵 bp->n_slabs_available
    • bp->slabs[bp->n_slabs_available] 指向的始终是第一个全空的 slab, 索引比 n_slabs_available 小的始终都是全满 slab;
    • rx 请求全满 slab, 就取 bp->slabs[bp->n_slabs_available -1] , 然后 bp->n_slabs_available --
    • tx 请求全空 slab,放入全满 slab, 就取 bp->slabs[bp->n_slabs_available], 同时将 bp->slabs[bp->n_slabs_available] 指向全满 slab

RX

从图中可以看到,RX 流程涉及三个主要步骤 (图中蓝色数字标记):

  • bcache_cons_check: 检查本地缓存是否需要从全局池获取新的满 slab
1
2
3
4
5
6
7
8
9
10
  static inline u32 bcache_cons_check(struct bcache *bc, u32 n_buffers) {
if (bc->n_buffers_cons == 0) { // Cons Slab为空
pthread_mutex_lock(&bp->lock);
slab_full = bp->slabs[--bp->n_slabs_available]; // 获取满 slab
bp->slabs[bp->n_slabs_available] = bc->slab_cons; // 归还空 slab
pthread_mutex_unlock(&bp->lock);
bc->slab_cons = slab_full;
bc->n_buffers_cons = n_buffers_per_slab;
}
}
  • bcache_cons: 从本地缓存获取 buffer
1
2
3
4
5
6
static inline u64 bcache_cons(struct bcache *bc) {
    u64 n_buffers_cons = bc->n_buffers_cons - 1;
    u64 buffer = bc->slab_cons[n_buffers_cons];
    bc->n_buffers_cons = n_buffers_cons;
    return buffer;
}
  • xsk_ring_consrx_desc: 从 RX 队列获取数据包
1
2
3
4
5
6
7
8
static inline u32 port_rx_burst(struct port *p, struct burst_rx *b) {
u32 idx, i;
n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &idx);
for (i = 0; i < n_pkts; i++) {
b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, idx + i)->addr;
b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, idx + i)->len;
}
}

TX

TX 流程同样涉及三个主要步骤 (图中红色数字标记):

  • xsk_ring_prodtx_desc: 将数据包放入 TX 队列
1
2
3
4
5
6
7
8
9
static inline void port_tx_burst(struct port *p, struct burst_tx *b) {
u32 idx, i;
xsk_ring_prod__reserve(&p->txq, n_pkts, &idx);
for (i = 0; i < n_pkts; i++) {
xsk_ring_prod__tx_desc(&p->txq, idx + i)->addr = b->addr[i];
xsk_ring_prod__tx_desc(&p->txq, idx + i)->len = b->len[i];
}
xsk_ring_prod__submit(&p->txq, n_pkts);
}
  • bcache_prod: 回收已发送的 buffer 到本地缓存
1
2
3
4
5
6
7
8
9
10
11
static inline void bcache_prod(struct bcache *bc, u64 buffer) {
if (bc->n_buffers_prod >= n_buffers_per_slab) { // Prod Slab满
pthread_mutex_lock(&bp->lock);
slab_empty = bp->slabs[bp->n_slabs_available]; // 获取空 slab
bp->slabs[bp->n_slabs_available++] = bc->slab_prod; // 归还满 slab
pthread_mutex_unlock(&bp->lock);
bc->slab_prod = slab_empty;
bc->n_buffers_prod = 0;
}
bc->slab_prod[bc->n_buffers_prod++] = buffer;
}
  • 全局池交换: 当本地缓存满时,与全局池交换 slab

QA

Q: Xsk 收到不到任何流量

A: 现代网卡默认启用有多队列, xsk 创建时就绑定到一个固定队列 xsk_socket__create, 如果恰好流量没有到绑定队列,就无法收到任何包.xsk all in one

  • ethtool -L <interface> combined 1 将网卡队列设置为 1,这样所有流量都会经过绑定队列.
  • 也通过 ethtool 设置更加精细多队列分流规则.

Q: Xdp 程序中 XSKMAP 包含有绑定不同队列的 Xsk, 可以将包重定向到 其他队列的 Xsk 吗?

A: 不能, xsk1 xsk2 绑定到队列 X, xsk3 绑定到了队列 Y, xdp 程序只能将队列 X 的 pkt 重定向到 xsk1 或 xsk2

  • xdp 程序有两种模式: copy 模式 和 0copy 模式, 即使是复制模式下也不支持.

Q: 一些 Pkt 出现了错误为什么?

A: 原因通常是 NIC 对同一个内存区域写入, 原因多是相同缓存区被错误重用了.

  • 将相同的缓冲区同时放入 fill ring 和 tx ring, 造成了 NIC 同时读写一块内存.
  • UMEM 在不同的 xsk 之间共享, 相同的缓冲区同时写入了多个 fill ring.

Q: 在多个 Xsk 共享同一个 UMEM 场景下, 创建 Xsk 时报错

A: 准确来说是多个 xsk 共享 UMEM, 在第二个 nic queue 已经创建了一个 xsk, 创建第二 xsk 时候报错, 调用 xsk_socket__create or xsk_socket__create_shared 都会报错.

按照 xdp-project/issues/85 的描述, 是个 feature 不是 bug.

When looking at the code, it is slightly weird. If you create socket1 reg a umem and bind it to netdev1,qid1 first, then it is possible to bind further sockets to netdev1/qid1 (all sharing the same fill/comp ring pair). You can also bind a new socket sharing the same umem to netdev2/qid2, but it is not possible binding further sockets to netdev2/qid2.
In principle, it should be possible to bind more sockets to netdev2/qid2 as long as they share the same fill and completion ring as the first socket that was bound to netdev2/qid2. But the code does not support this today.

绕过这个限制也非常 trick…

xdp-tutorial/issues/388
Mixing the two modes is not supported by bind, but there is a trick you can apply. Create a new umem here, but have it overlap the first umem area perfectly. Then repeat step 2 and 3 but with veth2 and the new umem.

调用 xsk_umem__create 创建一个新的 UMEM, 和原有 UMEM 完全重合, 然后就随便用吧.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#define DEFINE_XSK_RING(name) \
struct name { \
__u32 cached_prod; \
__u32 cached_cons; \
__u32 mask; \
__u32 size; \
__u32 *producer; \
__u32 *consumer; \
void *ring; \
__u32 *flags; \
}

DEFINE_XSK_RING(xsk_ring_prod);
DEFINE_XSK_RING(xsk_ring_cons);

附录

Libxdp

LIBXDP 文档, 这里取 af_xdp socket 相关部分

Control Path

Umem Area

1
2
3
4
// for rx
recvfrom(fd, NULL, 0, MSG_DONTWAIT, NULL, NULL);
// for tx
sendto(fd, NULL, 0, MSG_DONTWAIT, NULL, 0);

Sockets

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 创建新的 umem
0 on success, or a negative error in case of failure:
- EINVAL if arguments are invalid
- EFAULT if the memory address is invalid
- ENOMEM if no space memory left
int xsk_umem__create(struct xsk_umem **umem, void *umem_area, __u64 size,
struct xsk_ring_prod *fill,
struct xsk_ring_cons *comp,
const struct xsk_umem_config *config);

// 同 xsk_umem__create, 不过使用预分配的 fd 关联 umem
// 这里的 fd 实际上应该指向一个未绑定 umem 的 xsk
int xsk_umem__create_with_fd(struct xsk_umem **umem,
int fd, void *umem_area, __u64 size,
struct xsk_ring_prod *fill,
struct xsk_ring_cons *comp,
const struct xsk_umem_config *config);

// 删除 umem
int xsk_umem__delete(struct xsk_umem *umem);
// 获取 umem 关联的 fd
int xsk_umem__fd(const struct xsk_umem *umem);

// 对齐模式下, rx ring 获取的 xdp_desc->addr 转为一个真正指向 pkt 的指针
// 这里 pkt 一般是 l2 开始
void *xsk_umem__get_data(void *umem_area, __u64 addr);

// 非对齐模式下就需要先处理下 `xdp_desc->addr` 了
// 从 xdp_desc->addr 获取到真正 addr 和 偏移
__u64 xsk_umem__extract_addr(__u64 addr);
__u64 xsk_umem__extract_offset(__u64 addr);
// 或者一步到位, addr = xsk_umem__add_offset_to_addr(xdp_desc->addr) 解出偏移并加上偏移
// 这个时候就能直接调用 xsk_umem__get_data(addr) 获取到真正的 pkt 指针了
__u64 xsk_umem__add_offset_to_addr(__u64 addr);

Data Path

Producer Rings

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 创建 xsk 实例, 本质上还是 xsk_socket__create_shared 的包装
// 0 on success, or a negative error in case of failure:
- EINVAL if arguments are invalid
- EFAULT if memory address is invalid
- ENOMEM if no data space available
- ENOPROTOOPT if option is not supported by the protocol
int xsk_socket__create(struct xsk_socket **xsk,
const char *ifname, __u32 queue_id,
struct xsk_umem *umem,
struct xsk_ring_cons *rx,
struct xsk_ring_prod *tx,
const struct xsk_socket_config *config);

// 创建共享 umem 的 xsk
int xsk_socket__create_shared(struct xsk_socket **xsk_ptr,
const char *ifname,
__u32 queue_id, struct xsk_umem *umem,
struct xsk_ring_cons *rx,
struct xsk_ring_prod *tx,
struct xsk_ring_prod *fill,
struct xsk_ring_cons *comp,
const struct xsk_socket_config *config);
// 删除
void xsk_socket__delete(struct xsk_socket *xsk);
// xsk 转 fd
int xsk_socket__fd(const struct xsk_socket *xsk);

// 在指定网络接口上设置 xdp
int xsk_setup_xdp_prog(int ifindex, int *xsks_map_fd);
// 将 xsk 更新到 xsks_map 中, 其 key 是 queueid
// 多个 xsk 绑定同一个队列时,不能使用这个函数写入 xsk 到 xsks_map.
int xsk_socket__update_xskmap(struct xsk_socket *xsk, int xsks_map_fd);

Consumer Rings

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 预留出 nb 个位置
// 返回: 成功预留了多少位置, 0 错误, nb 成功, (0,nb) 则是部分成功
__u32 xsk_ring_prod__reserve(struct xsk_ring_prod *prod, __u32 nb, __u32 *idx)

// 返回指向 fill ring 第 idx 个 slot 的指针
__u64 *xsk_ring_prod__fill_addr(struct xsk_ring_prod *fill, __u32 idx);

// 提交填充完毕的 slot, 能够消费了
void xsk_ring_prod__submit(struct xsk_ring_prod *prod, __u32 nb);

// 获取 tx ring 中 idx 位置 xdp_desc 的指针
// 对 (struct xdp_desc *) 赋值,以写入待发送的 pkt 的 addr 和 len
struct xdp_desc *xsk_ring_prod__tx_desc(struct xsk_ring_prod *tx, __u32 idx);

// 检查是否需要唤醒内核处理生产者 ring 产生的事件
// 需要的就执行: ( rx 是 recvfrom, tx 是 sendto)
int xsk_ring_prod__needs_wakeup(const struct xsk_ring_prod *r);

  1. xdp 内丢弃重定向的包不会显示在 tcpdump, 要使用 xdpdump. ↩︎

  2. https://ebpf-docs.dylanreimerink.nl/linux/concepts/af_xdp/#setting-up-a-xsk ↩︎

  3. https://ebpf-docs.dylanreimerink.nl/linux/concepts/af_xdp/#xdp_umem_unaligned_chunk_flag ↩︎

  4. https://ebpf-docs.dylanreimerink.nl/linux/concepts/af_xdp/#headroom ↩︎

  5. https://www.kernel.org/doc/html/v6.1/networking/af_xdp.html#rings ↩︎

  6. https://www.kernel.org/doc/html/v6.1/networking/af_xdp.html#umem-fill-ring ↩︎

  7. https://github.com/xdp-project/bpf-examples/tree/master/AF_XDP-example#unaligned-mode ↩︎

  8. https://www.kernel.org/doc/html/v6.1/networking/af_xdp.html#xdp-shared-umem-bind-flag ↩︎

  9. https://ebpf-docs.dylanreimerink.nl/linux/concepts/af_xdp/#xdp_use_need_wakeup ↩︎

  10. https://ebpf-docs.dylanreimerink.nl/linux/concepts/af_xdp/#tx-or-rx-only-sockets ↩︎