XDP-3-xdp-tutorial advanced03-AF_XDP

  • XDP 第三篇 xdp-tutorial advanced03-AF_XDP

  • 资料来源:

    • <>
  • 更新

    1
    2024.12.17 初始

导语

xdp 系列第三篇 xdp-tutorial/advanced03-AF_XDP 笔记

  • 这一篇将带有大量代码解释, 比较绕.

AF_XDP 是一个新的 socket 类型, 用于与 xdp 的交互, 实现 bypass;

raw-frames -> XDP_REDIRECT -> BPF-map -> AF_XDP sockets

why? 其速度可以比肩 dpdk?

  • 单生产者单消费者的环形队列;
  • 内存预分配 umem

前排提醒: 如果例程无法收包 / 收包不全, 一定将网卡的队列设置为 1.

af_xdp

先上图

  • xks 收包对应 Fill queue 和 RX queue
  • xks 发包对应 Completion 和 TX queue
  • UMEM 是 kernel 和 user 共享的一块内存区域, 真正 pkt 信息会被写入到 umem, fill queue/ rx queue / tx queue / conpltion queue 中存储的只是索引.

af_xdp 工作原理非常号的文献如下, 不献丑了.

  • 首先先看下 ebpf 文档: How it works, 这一篇非常详细的描述了单个 pkt 收发.
  • 然后是 cloudflare 这篇 一个调试故事:AF_XDP 中的受损数据包;是内核错误还是用户错误?

结合代码的讲解, 下面两篇非常不错.

  • AF_XDP技术详解
  • 使用AF_XDP Socket更高效的网络传输

理解 ring 工作

  • 解读eBPF XDP性能之路:它如何向着DPDK看齐?它在实际中又能带来多大效益?

初始化

让 xdp 运行起来的初始化准备…略复杂.借助 libxdp 已经简化了非常多了, 直接上 libbpf 会死人的…

  • 申请 配置 UMEM
  • 创建 xks
  • 预填充 fq
  • 加载 xdp 程序

申请 配置 UMEM

首先是 申请 配置 UMEM

  • 创建一整块内存区域
  • xsk_umem__create 将 umem 关联到这块内存区域
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 设置应用程序可以申请巨量内存
/* Allow unlimited locking of memory, so all memory needed for packet
* buffers can be locked.
*/
if (setrlimit(RLIMIT_MEMLOCK, &rlim)) {
fprintf(stderr, "ERROR: setrlimit(RLIMIT_MEMLOCK) \"%s\"\n",
strerror(errno));
exit(EXIT_FAILURE);
}

/* Allocate memory for NUM_FRAMES of the default XDP frame size */
// posix_memalign 申请到的是内存对齐的一整块空间
packet_buffer_size = NUM_FRAMES * FRAME_SIZE; // 4096 * 2^12
if (posix_memalign(&packet_buffer,
getpagesize(), /* PAGE_SIZE aligned */
packet_buffer_size)) {
fprintf(stderr, "ERROR: Can't allocate buffer memory \"%s\"\n",
strerror(errno));
exit(EXIT_FAILURE);
}

/* Initialize shared packet_buffer for umem usage */
// 配置 umem 主要在这里了
umem = configure_xsk_umem(packet_buffer, packet_buffer_size);
if (umem == NULL) {
fprintf(stderr, "ERROR: Can't create umem \"%s\"\n",
strerror(errno));
exit(EXIT_FAILURE);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// fq cq 由用户程序定义, 通常会和 umem 组织在一起
struct xsk_umem_info {
struct xsk_ring_prod fq;
struct xsk_ring_cons cq;
struct xsk_umem *umem;
void *buffer;
};

static struct xsk_umem_info *configure_xsk_umem(void *buffer, uint64_t size)
{
struct xsk_umem_info *umem;
int ret;

umem = calloc(1, sizeof(*umem));
if (!umem)
return NULL;
// 创建 umem, libxdp 真正方便了太多了
// 传入 buffer size
ret = xsk_umem__create(&umem->umem, buffer, size, &umem->fq, &umem->cq, NULL);
if (ret) {
errno = -ret;
return NULL;
}

umem->buffer = buffer;
return umem;
}

创建 Xks

创建 xks

  • 首先需要一个有效配置
  • xsk_socket__create 创建 xks.
  • 需要传入 nic name 和 队列号.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/* Open and configure the AF_XDP (xsk) socket */
// main 函数中的入口
xsk_socket = xsk_configure_socket(&cfg, umem); // 配置 xks 和 fq

struct xsk_socket_info {
struct xsk_ring_cons rx;
struct xsk_ring_prod tx;
struct xsk_umem_info *umem;
struct xsk_socket *xsk;

uint64_t umem_frame_addr[NUM_FRAMES];
uint32_t umem_frame_free;
xxx
};

static struct xsk_socket_info *xsk_configure_socket(struct config *cfg,
struct xsk_umem_info *umem)
{
xxx

xsk_info->umem = umem;
xsk_cfg.rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS; // rx 尺寸
xsk_cfg.tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS; // tx 尺寸
xsk_cfg.xdp_flags = cfg->xdp_flags;
xsk_cfg.bind_flags = cfg->xsk_bind_flags;
// XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD 是需要加载用户 xdp 程序
xsk_cfg.libbpf_flags = (custom_xsk) ? XSK_LIBBPF_FLAGS__INHIBIT_PROG_LOAD: 0;

ret = xsk_socket__create(&xsk_info->xsk, cfg->ifname,
cfg->xsk_if_queue, umem->umem, &xsk_info->rx,
&xsk_info->tx, &xsk_cfg); // 这里 xks 是绑定了 umem 的 fq cq
if (ret)
goto error_exit;

if (custom_xsk) {
ret = xsk_socket__update_xskmap(xsk_info->xsk, xsk_map_fd); // xks 更新到 bpf_map
if (ret)
goto error_exit;
} else {
/* Getting the program ID must be after the xdp_socket__create() call */
if (bpf_xdp_query_id(cfg->ifindex, cfg->xdp_flags, &prog_id))
goto error_exit;
}
xxx
}

预填充 Fq

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 也是在 xsk_configure_socket 函数中

/* Stuff the receive path with buffers, we assume we have enough */
// 预留位置
ret = xsk_ring_prod__reserve(&xsk_info->umem->fq,
XSK_RING_PROD__DEFAULT_NUM_DESCS,
&idx);

if (ret != XSK_RING_PROD__DEFAULT_NUM_DESCS) // 申请失败
goto error_exit;

for (i = 0; i < XSK_RING_PROD__DEFAULT_NUM_DESCS; i ++)
*xsk_ring_prod__fill_addr(&xsk_info->umem->fq, idx++) =
xsk_alloc_umem_frame(xsk_info); // 从全池子中申请

xsk_ring_prod__submit(&xsk_info->umem->fq,
XSK_RING_PROD__DEFAULT_NUM_DESCS); // 真正推给 fq

收包

如上图所示 af_xdp 收包是遵循下面的路径, 我们就从 FR 开始;

  • pkt -> UMEM -> FR(index) -> xdp
  • XDP(XDP_REDIRECT -> BPF-map -> AF_XDP sockets) -> rr
  • RR(index) -> userspace —> pkt 处理 完毕, index 还到 FR 中.

advanced03-AF_XDP 的例子中 index 没有直接还到 fq 中,而是 还入了全局公共的池子, 相应的填充 fq 也是从全局池子申请.

pkt -> UMEM -> FR(index) 这个过程是网卡驱动完成, 无需要关心.

Xdp

xdp 内访问的 frame 是 L2 开始 # link

1
2
3
4
5
6
7
8
9
10
11
12
13
// 每个 xdp 程序的入口
SEC("xdp")
int xdp_sock_prog(struct xdp_md *ctx)
{
int index = ctx->rx_queue_index;
__u32 *pkt_count;
// L2 开始的位置,
// void *data = (void *)(long)ctx->data;
// 这里可以进行一系列处理
xxx

return XDP_PASS;
}

但是这里仅仅是为了重定向,因此不需要访问 frame 具体内容;

接下来需要将 frame 重定向到 bpf_map; 得需要一个 bpf_map 废话

bpf_map 需要在 xdp 程序声明,访问. 在用户程序写入 xks.

xdp 程序声明 bpf_map

1
2
3
4
5
6
struct {
__uint(type, BPF_MAP_TYPE_XSKMAP); // 存放 af_xdp socker 的 bpf map
__type(key, __u32); // 其实是 fd
__type(value, __u32);
__uint(max_entries, 64); // 最大容量 64 个
} xsks_map SEC(".maps");

用户空间写入 xks 到 bpf_map, 调用 xsk_socket__update_xskmap

  • 这个函数会将会以 queue_id 为索引.
1
2
3
4
5
6
7
8
9
10
11
// 写入 xks 到 bpf_map
ret = xsk_socket__update_xskmap(xsk_info->xsk, xsk_map_fd);

// libxdp 中 函数声明
int xsk_socket__update_xskmap(struct xsk_socket *xsk, int fd)
{
struct xsk_ctx *ctx = xsk->ctx;

ctx->xsks_map_fd = fd;
return bpf_map_update_elem(ctx->xsks_map_fd, &ctx->queue_id, &xsk->fd, 0); // 索引是 queue_id
}

xdp 程序中重定向 frame 到 af_xdp socket: bpf_redirect_map

1
2
3
4
5
6
7
8
9
10
11
12
13
int index = ctx->rx_queue_index; // 取 queue id
/* A set entry here means that the correspnding queue_id
* has an active AF_XDP socket bound to it. */
if (bpf_map_lookup_elem(&xsks_map, &index)) // 确保 map 对应索引存在 xks
return bpf_redirect_map(&xsks_map, index, 0); // 重定向

// 另一种方式, 也可以记录下错误记录
if (bpf_map_lookup_elem(&xsks_map, &index))
return XDP_PASS;
int redirect_result = bpf_redirect_map(&xsks_map, rr, XDP_PASS);
if (redirect_result < 0) // 重定向错误
// do some thing
return XDP_PASS;

Userspace

现在流程进展到了用户程序从 xks 中读取 pkt

主要收包的逻辑就在 handle_receive_packets 函数中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static void handle_receive_packets(struct xsk_socket_info *xsk)
{
unsigned int rcvd, stock_frames, i;
uint32_t idx_rx = 0, idx_fq = 0;
int ret;

rcvd = xsk_ring_cons__peek(&xsk->rx, RX_BATCH_SIZE, &idx_rx); // 从 rx ring 中读取看看有没有已经到 rx ring 的包
if (!rcvd) // 没有包就返回
return;

/* Stuff the ring with as much frames as possible */
stock_frames = xsk_prod_nb_free(&xsk->umem->fq,
xsk_umem_free_frames(xsk));

if (stock_frames > 0) { // fq 预留了空间

ret = xsk_ring_prod__reserve(&xsk->umem->fq, stock_frames,
&idx_fq);// 预留了多少个?

/* This should not happen, but just in case */
// 预留的和上一步 xsk_prod_nb_free 申请的不符
// 一直循环直到确定预留了足够空间
while (ret != stock_frames)
ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd,
&idx_fq);
// 从全局池子申请资源填充 fq
for (i = 0; i < stock_frames; i++)
*xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) =
xsk_alloc_umem_frame(xsk);

xsk_ring_prod__submit(&xsk->umem->fq, stock_frames);
}

/* Process received packets */
// 真正读取
for (i = 0; i < rcvd; i++) {
uint64_t addr = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx)->addr; // 读取地址
uint32_t len = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx++)->len; // 读取偏移

if (!process_packet(xsk, addr, len))
xsk_free_umem_frame(xsk, addr); // 这里是真正处理包内容地方

xsk->stats.rx_bytes += len; // 统计
}

xsk_ring_cons__release(&xsk->rx, rcvd); // rx 中释放 rcvd 个
xsk->stats.rx_packets += rcvd;

/* Do we need to wake up the kernel for transmission */
complete_tx(xsk);
}

真正读取包的内容 xsk_umem__get_data

1
2
3
4
5
6
7
static bool process_packet(struct xsk_socket_info *xsk,
uint64_t addr, uint32_t len)
{
uint8_t *pkt = xsk_umem__get_data(xsk->umem->buffer, addr); // 自 l2 开始的 pkt
xxxx
return false;
}

发包

发包的流程比接收简单, 还是借用 AF_XDP技术详解 的图:

  • 用户态填充 pkt 到 umem , index 写入 tx
  • 再次从 cq 中消费, 确认已经成功发送.

代码还是在 process_packet 中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
static bool process_packet(struct xsk_socket_info *xsk,
uint64_t addr, uint32_t len)
{
uint8_t *pkt = xsk_umem__get_data(xsk->umem->buffer, addr);

if (false) {
xxx
// 从 tx ring 中预留一个位置
// 示例代码是处理一个 写一个
ret = xsk_ring_prod__reserve(&xsk->tx, 1, &tx_idx);
if (ret != 1) {
/* No more transmit slots, drop the packet */
// 没位置了 发送不了了
return false;
}
// 发送需要 l2开始位置 和 长度
xsk_ring_prod__tx_desc(&xsk->tx, tx_idx)->addr = addr;
xsk_ring_prod__tx_desc(&xsk->tx, tx_idx)->len = len;
xsk_ring_prod__submit(&xsk->tx, 1); // tx 中写入
xsk->outstanding_tx++; // 统计
xsk->stats.tx_bytes += len;
xsk->stats.tx_packets++;
return true;
}
return false;
}

static void handle_receive_packets(struct xsk_socket_info *xsk)
{
xxx
/* Do we need to wake up the kernel for transmission */
complete_tx(xsk); // 释放 cq 的地方
}

static void complete_tx(struct xsk_socket_info *xsk)
{
unsigned int completed;
uint32_t idx_cq;

if (!xsk->outstanding_tx)
return;

sendto(xsk_socket__fd(xsk->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0); // 这里无意义, 涉及到 xdp 另外一种收发模式

/* Collect/free completed TX buffers */
completed = xsk_ring_cons__peek(&xsk->umem->cq,
XSK_RING_CONS__DEFAULT_NUM_DESCS,
&idx_cq); // 从 cq 中看看有多少待消费的

if (completed > 0) {
for (int i = 0; i < completed; i++)
xsk_free_umem_frame(xsk,
*xsk_ring_cons__comp_addr(&xsk->umem->cq,
idx_cq++)); // 填充回全局

xsk_ring_cons__release(&xsk->umem->cq, completed);// 全局池子的计数
xsk->outstanding_tx -= completed < xsk->outstanding_tx ?
completed : xsk->outstanding_tx;
}
}

cq 相关工作由内核负责, 可以不太关注, 相信内核…

收尾

清理 xks umem

1
2
xsk_socket__delete(xsk_socket->xsk);
xsk_umem__delete(umem->umem);

尾巴

一篇欠稿, 最后 xks 会专门有一篇 all in one, 尝试总结目前涉及到 xks 内容, 但是可能需要等到月底了. 这个月的 flag 就是把 xdp 写完.剩余 3 篇;