// 设置应用程序可以申请巨量内存 /* Allow unlimited locking of memory, so all memory needed for packet * buffers can be locked. */ if (setrlimit(RLIMIT_MEMLOCK, &rlim)) { fprintf(stderr, "ERROR: setrlimit(RLIMIT_MEMLOCK) \"%s\"\n", strerror(errno)); exit(EXIT_FAILURE); }
if (custom_xsk) { ret = xsk_socket__update_xskmap(xsk_info->xsk, xsk_map_fd); // xks 更新到 bpf_map if (ret) goto error_exit; } else { /* Getting the program ID must be after the xdp_socket__create() call */ if (bpf_xdp_query_id(cfg->ifindex, cfg->xdp_flags, &prog_id)) goto error_exit; } xxx }
预填充 Fq
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
// 也是在 xsk_configure_socket 函数中
/* Stuff the receive path with buffers, we assume we have enough */ // 预留位置 ret = xsk_ring_prod__reserve(&xsk_info->umem->fq, XSK_RING_PROD__DEFAULT_NUM_DESCS, &idx);
if (ret != XSK_RING_PROD__DEFAULT_NUM_DESCS) // 申请失败 goto error_exit;
for (i = 0; i < XSK_RING_PROD__DEFAULT_NUM_DESCS; i ++) *xsk_ring_prod__fill_addr(&xsk_info->umem->fq, idx++) = xsk_alloc_umem_frame(xsk_info); // 从全池子中申请
int index = ctx->rx_queue_index; // 取 queue id /* A set entry here means that the correspnding queue_id * has an active AF_XDP socket bound to it. */ if (bpf_map_lookup_elem(&xsks_map, &index)) // 确保 map 对应索引存在 xks return bpf_redirect_map(&xsks_map, index, 0); // 重定向
// 另一种方式, 也可以记录下错误记录 if (bpf_map_lookup_elem(&xsks_map, &index)) return XDP_PASS; int redirect_result = bpf_redirect_map(&xsks_map, rr, XDP_PASS); if (redirect_result < 0) // 重定向错误 // do some thing return XDP_PASS;
rcvd = xsk_ring_cons__peek(&xsk->rx, RX_BATCH_SIZE, &idx_rx); // 从 rx ring 中读取看看有没有已经到 rx ring 的包 if (!rcvd) // 没有包就返回 return;
/* Stuff the ring with as much frames as possible */ stock_frames = xsk_prod_nb_free(&xsk->umem->fq, xsk_umem_free_frames(xsk));
if (stock_frames > 0) { // fq 预留了空间
ret = xsk_ring_prod__reserve(&xsk->umem->fq, stock_frames, &idx_fq);// 预留了多少个?
/* This should not happen, but just in case */ // 预留的和上一步 xsk_prod_nb_free 申请的不符 // 一直循环直到确定预留了足够空间 while (ret != stock_frames) ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq); // 从全局池子申请资源填充 fq for (i = 0; i < stock_frames; i++) *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = xsk_alloc_umem_frame(xsk);
/* Process received packets */ // 真正读取 for (i = 0; i < rcvd; i++) { uint64_t addr = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx)->addr; // 读取地址 uint32_t len = xsk_ring_cons__rx_desc(&xsk->rx, idx_rx++)->len; // 读取偏移
if (!process_packet(xsk, addr, len)) xsk_free_umem_frame(xsk, addr); // 这里是真正处理包内容地方
if (false) { xxx // 从 tx ring 中预留一个位置 // 示例代码是处理一个 写一个 ret = xsk_ring_prod__reserve(&xsk->tx, 1, &tx_idx); if (ret != 1) { /* No more transmit slots, drop the packet */ // 没位置了 发送不了了 returnfalse; } // 发送需要 l2开始位置 和 长度 xsk_ring_prod__tx_desc(&xsk->tx, tx_idx)->addr = addr; xsk_ring_prod__tx_desc(&xsk->tx, tx_idx)->len = len; xsk_ring_prod__submit(&xsk->tx, 1); // tx 中写入 xsk->outstanding_tx++; // 统计 xsk->stats.tx_bytes += len; xsk->stats.tx_packets++; returntrue; } returnfalse; }
staticvoidhandle_receive_packets(struct xsk_socket_info *xsk) { xxx /* Do we need to wake up the kernel for transmission */ complete_tx(xsk); // 释放 cq 的地方 }