编写属于自己的DPDK应用程序:DPDK入门向指南

2024-11-24 0 471

在当前技术界,众多函数的运作原理宛如幕后未解之谜,既充满趣味又相当复杂,令人心生探秘的渴望。以我们即将深入研究的这个函数为例,它表面看似简易,实则具备类似多线程或多进程的神奇特性。

函数的类似线程进程功能

编写属于自己的DPDK应用程序:DPDK入门向指南

这个函数可以与多线程或多进程函数相提并论。它仿佛开辟了一条新的路径来执行任务,借助另一个核心进行处理。这种独特的运行模式在众多大型项目中扮演着关键角色,例如在那些需要大规模数据处理的场合,它就像一位得力的助手。引入多线程和多进程的概念,旨在提升处理效率,正如多人协作完成一项任务通常比单人更快。

在实际使用中,这项功能的存在显著提升了系统的运行效率。比如,在处理大型文件的读取与分析时,若沿用传统方法,耗时可能会很长。然而,借助这一类似多线程的功能,我们可以并行处理,从而大幅缩短所需时间。

等待核结束的函数

这里存在一个特定的函数,它的作用是等待某个核心的结束。这个函数的内部运作方式颇为神秘,据推测,它通过让每个核心执行特定的任务来实现这一功能。这种等待机制可以比作接力赛中的传递接力棒,只有前一位选手成功完成传递,后一位选手才能继续前进。

/* SPDX-License-Identifier: BSD-3-Clause
 * Copyright(c) 2010-2015 Intel Corporation
 */
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32
static const struct rte_eth_conf port_conf_default = {
	.rxmode = {
		.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
	},
};
/* basicfwd.c: Basic DPDK skeleton forwarding example. */
/*
 * Initializes a given port using global settings and with the RX buffers
 * coming from the mbuf_pool passed as a parameter.
 */
static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{
	struct rte_eth_conf port_conf = port_conf_default;
	const uint16_t rx_rings = 1, tx_rings = 1;
	uint16_t nb_rxd = RX_RING_SIZE;
	uint16_t nb_txd = TX_RING_SIZE;
	int retval;
	uint16_t q;
	struct rte_eth_dev_info dev_info;
	struct rte_eth_txconf txconf;
	if (!rte_eth_dev_is_valid_port(port))
		return -1;
	retval = rte_eth_dev_info_get(port, &dev_info);
	if (retval != 0) {
		printf("Error during getting device (port %u) info: %sn",
				port, strerror(-retval));
		return retval;
	}
	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
		port_conf.txmode.offloads |=
			DEV_TX_OFFLOAD_MBUF_FAST_FREE;
	/* Configure the Ethernet device. */
	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
	if (retval != 0)
		return retval;
	retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
	if (retval != 0)
		return retval;
	/* Allocate and set up 1 RX queue per Ethernet port. */
	for (q = 0; q < rx_rings; q++) {
		retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
		if (retval < 0)
			return retval;
	}
	txconf = dev_info.default_txconf;
	txconf.offloads = port_conf.txmode.offloads;
	/* Allocate and set up 1 TX queue per Ethernet port. */
	for (q = 0; q < tx_rings; q++) {
		retval = rte_eth_tx_queue_setup(port, q, nb_txd,
				rte_eth_dev_socket_id(port), &txconf);
		if (retval < 0)
			return retval;
	}
	/* Start the Ethernet port. */
	retval = rte_eth_dev_start(port);
	if (retval < 0)
		return retval;
	/* Display the port MAC address. */
	struct rte_ether_addr addr;
	retval = rte_eth_macaddr_get(port, &addr);
	if (retval != 0)
		return retval;
	printf("Port %u MAC: " PRIx8 " " PRIx8 " " PRIx8
			   " " PRIx8 " " PRIx8 " " PRIx8 "n",
			port,
			addr.addr_bytes[0], addr.addr_bytes[1],
			addr.addr_bytes[2], addr.addr_bytes[3],
			addr.addr_bytes[4], addr.addr_bytes[5]);
	/* Enable RX in promiscuous mode for the Ethernet device. */
	retval = rte_eth_promiscuous_enable(port);
	if (retval != 0)
		return retval;
	return 0;
}
/*
 * The lcore main. This is the main thread that does the work, reading from
 * an input port and writing to an output port.
 */
static __attribute__((noreturn)) void
lcore_main(void)
{
	uint16_t port;
	/*
	 * Check that the port is on the same NUMA node as the polling thread
	 * for best performance.
	 */
	RTE_ETH_FOREACH_DEV(port)
		if (rte_eth_dev_socket_id(port) > 0 &&
				rte_eth_dev_socket_id(port) !=
						(int)rte_socket_id())
			printf("WARNING, port %u is on remote NUMA node to "
					"polling thread.ntPerformance will "
					"not be optimal.n", port);
	printf("nCore %u forwarding packets. [Ctrl+C to quit]n",
			rte_lcore_id());
	/* Run until the application is quit or killed. */
	for (;;) {
		/*
		 * Receive packets on a port and forward them on the paired
		 * port. The mapping is 0 -> 1, 1 -> 0, 2 -> 3, 3 -> 2, etc.
		 */
		RTE_ETH_FOREACH_DEV(port) {
			/* Get burst of RX packets, from first port of pair. */
			struct rte_mbuf *bufs[BURST_SIZE];
			const uint16_t nb_rx = rte_eth_rx_burst(port, 0,
					bufs, BURST_SIZE);
			if (unlikely(nb_rx == 0))
				continue;
			/* Send burst of TX packets, to second port of pair. */
			const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,
					bufs, nb_rx);
			/* Free any unsent packets. */
			if (unlikely(nb_tx < nb_rx)) {
				uint16_t buf;
				for (buf = nb_tx; buf < nb_rx; buf++)
					rte_pktmbuf_free(bufs[buf]);
			}
		}
	}
}
/*
 * The main function, which does initialization and calls the per-lcore
 * functions.
 */
int
main(int argc, char *argv[])
{
	struct rte_mempool *mbuf_pool;
	unsigned nb_ports;
	uint16_t portid;
	/* Initialize the Environment Abstraction Layer (EAL). */
	int ret = rte_eal_init(argc, argv);
	if (ret < 0)
		rte_exit(EXIT_FAILURE, "Error with EAL initializationn");
	argc -= ret;
	argv += ret;
	/* Check that there is an even number of ports to send/receive on. */
	nb_ports = rte_eth_dev_count_avail();
	if (nb_ports < 2 || (nb_ports & 1))
		rte_exit(EXIT_FAILURE, "Error: number of ports must be evenn");
	/* Creates a new mempool in memory to hold the mbufs. */
	mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
	if (mbuf_pool == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create mbuf pooln");
	/* Initialize all ports. */
	RTE_ETH_FOREACH_DEV(portid)
		if (port_init(portid, mbuf_pool) != 0)
			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "n",
					portid);
	if (rte_lcore_count() > 1)
		printf("nWARNING: Too many lcores enabled. Only 1 used.n");
	/* Call lcore_main on the master core only. */
	lcore_main();
	return 0;
}

在软件开发的实际操作中,若该环节处理不当,便可能引发数据丢失或程序错误的问题。以银行系统的每日结算程序为例,若某个环节的核处理未能顺利进行,且缺乏及时发现的等待机制,便可能引发账目计算的失误。

函数调用获取设备port数量

此函数可查得设备网口数量,需注意这里的网口指的是设备的物理网口,而非TCP/IP协议中的端口。这一功能在网络设备管理领域极为重要。掌握网口数量,对于合理分配网络资源、规划网络布局具有至关重要的意义。

在大型企业的网络布局中,了解确切的网口数量至关重要。据此,便能合理分配各部门的网络设备,挑选合适的交换机和路由器,并合理布局。这样做,能确保企业网络的高效与稳定。

内存池在接收数据包中的应用

建立内存池非常重要,其作用是作为接收数据包的缓存。即便在某些应用场景中,发送数据包没有配置缓存,我们仍可自行添加。这好比在仓库的收货区已摆放好货架,而发货区却还未做好规划。

监控视频传输设备中,接收包缓存充足是保证视频数据不丢失的关键,它能让视频数据顺畅地传输至存储或显示设备。缓存不足时,视频画面可能会出现卡顿等问题。

网口初始化及相关操作

存在一个针对网口初始化的封装函数,它默认设置下即可用于我们的程序。另外,我们还拥有一个获取网口当前信息的函数,通过查阅API地址,可以详细了解其结构体内容。此外,还有专门用于设置网口接收和发送队列的函数,这些步骤都是为了确保网口能够正常传输数据。

以一个小型网络工作室为例,他们在建立自己的测试网络过程中,网口的设置和操作至关重要。这些设置直接关系到网络的基础性能,包括接收和发送的速率以及稳定性等方面。

业务逻辑处理及发包程序编写

port网口配置一旦基本完成,便进入业务逻辑处理阶段。通常,这一阶段的核心业务逻辑是围绕几个关键函数展开的。在自行编写发包程序时,我们可以参照相关源码,并依照开发流程来编写程序。文中提到的直接设定MAC地址、使用内存池进行缓存以及发送失败后重试等操作,都是实际开发过程中的典型例子。

在网络数据传输实验的多种场景中,这种编写发包程序的方法能够便捷迅速地执行小规模的网络数据传输测试。

最后,我想请大家思考一下:在网络开发或网络设备管理的过程中,哪一环节看似容易被忽略,实则至关重要?期待大家的热烈讨论,点赞和分享都是对我们最大的鼓励。

/* SPDX-License-Identifier: BSD-3-Clause
 * Copyright(c) 2010-2014 Intel Corporation
 */
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32
#define MAX_PKT_BURST 512
#define RTE_MAX_SEGS_PER_PKT 255
#define TXONLY_DEF_PACKET_LEN 1024
#define BURST_TX_WAIT_US 1
#define BURST_TX_RETRIES 64
uint32_t burst_tx_delay_time = BURST_TX_WAIT_US;
uint32_t burst_tx_retry_num = BURST_TX_RETRIES;
enum tx_pkt_split {
	TX_PKT_SPLIT_OFF,
	TX_PKT_SPLIT_ON,
	TX_PKT_SPLIT_RND,
};
enum tx_pkt_split tx_pkt_split = TX_PKT_SPLIT_OFF;
uint8_t  tx_pkt_nb_segs = 1;
uint16_t nb_pkt_per_burst = 128;
static const struct rte_eth_conf port_conf_default = {
	.rxmode = {
		.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
	},
};
uint16_t tx_pkt_seg_lengths[RTE_MAX_SEGS_PER_PKT] = {
	TXONLY_DEF_PACKET_LEN,
};
static inline int
port_init(uint16_t port, struct rte_mempool *mbuf_pool)
{
	struct rte_eth_conf port_conf = port_conf_default;
	const uint16_t rx_rings = 1, tx_rings = 1;
	uint16_t nb_rxd = RX_RING_SIZE;
	uint16_t nb_txd = TX_RING_SIZE;
	int retval;
	uint16_t q;
	struct rte_eth_dev_info dev_info;
	struct rte_eth_txconf txconf;
	if (!rte_eth_dev_is_valid_port(port))
		return -1;
	retval = rte_eth_dev_info_get(port, &dev_info);
	if (retval != 0) {
		printf("Error during getting device (port %u) info: %sn",
				port, strerror(-retval));
		return retval;
	}
	if (dev_info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
		port_conf.txmode.offloads |=
			DEV_TX_OFFLOAD_MBUF_FAST_FREE;
	/* Configure the Ethernet device. */
	retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
	if (retval != 0)
		return retval;
	retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
	if (retval != 0)
		return retval;
	/* Allocate and set up 1 RX queue per Ethernet port. */
	for (q = 0; q < rx_rings; q++) {
		retval = rte_eth_rx_queue_setup(port, q, nb_rxd,
				rte_eth_dev_socket_id(port), NULL, mbuf_pool);
		if (retval < 0)
			return retval;
	}
	txconf = dev_info.default_txconf;
	txconf.offloads = port_conf.txmode.offloads;
	/* Allocate and set up 1 TX queue per Ethernet port. */
	for (q = 0; q < tx_rings; q++) {
		retval = rte_eth_tx_queue_setup(port, q, nb_txd,
				rte_eth_dev_socket_id(port), &txconf);
		if (retval < 0)
			return retval;
	}
	/* Start the Ethernet port. */
	retval = rte_eth_dev_start(port);
	if (retval < 0)
		return retval;
	/* Display the port MAC address. */
	struct rte_ether_addr addr;
	retval = rte_eth_macaddr_get(port, &addr);
	if (retval != 0)
		return retval;
	printf("Port %u MAC: " PRIx8 " " PRIx8 " " PRIx8
			   " " PRIx8 " " PRIx8 " " PRIx8 "n",
			port,
			addr.addr_bytes[0], addr.addr_bytes[1],
			addr.addr_bytes[2], addr.addr_bytes[3],
			addr.addr_bytes[4], addr.addr_bytes[5]);
	/* Enable RX in promiscuous mode for the Ethernet device. */
	retval = rte_eth_promiscuous_enable(port);
	if (retval != 0)
		return retval;
	return 0;
}
static inline void
copy_buf_to_pkt(void* buf, unsigned len, struct rte_mbuf *pkt, unsigned offset)
{
	if (offset + len <= pkt->data_len) {
		rte_memcpy(rte_pktmbuf_mtod_offset(pkt, char *, offset),
			buf, (size_t) len);
		return;
	}
}
static inline int
pkt_burst_prepare(struct rte_mbuf *pkt, struct rte_mempool *mbp, struct rte_ether_hdr *eth_hdr){
	struct rte_mbuf *pkt_segs[RTE_MAX_SEGS_PER_PKT];
	struct rte_mbuf *pkt_seg;
	uint32_t nb_segs, pkt_len;
	uint8_t i;
	
	nb_segs = tx_pkt_nb_segs;
	rte_pktmbuf_reset_headroom(pkt);
	pkt->data_len = tx_pkt_seg_lengths[0];
	pkt->l2_len = sizeof(struct rte_ether_hdr);
	pkt->l3_len = sizeof(struct rte_ipv4_hdr);
	pkt_len = pkt->data_len;
	pkt_seg = pkt;
	pkt_seg->next = NULL;
	copy_buf_to_pkt(eth_hdr, sizeof(*eth_hdr), pkt, 0);
//	copy_buf_to_pkt(&pkt_ip_hdr, sizeof(pkt_ip_hdr), pkt, sizeof(struct rte_ether_hdr));
//	copy_buf_to_pkt(&pkt_udp_hdr, sizeof(pkt_udp_hdr), pkt, sizeof(struct rte_ether_hdr));
	pkt->nb_segs = nb_segs;
	pkt->pkt_len = pkt_len;
	return 1;
}
/*
 * The lcore main. This is the main thread that does the work, reading from
 * an input port and writing to an output port.
 */
static __attribute__((noreturn)) void
lcore_main(void)
{
	uint16_t port;
	uint16_t nb_tx;
	uint32_t retry;
	struct rte_ether_addr d_addr;
	struct rte_ether_addr s_addr;
	struct rte_ether_hdr eth_hdr;
	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
	uint16_t nb_pkt;
	struct rte_mbuf *pkt;
	struct rte_mempool *tx_mbuf_pool = rte_pktmbuf_pool_create("TX_MBUF_POOL", NUM_MBUFS * 2,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
	RTE_ETH_FOREACH_DEV(port)
		if (rte_eth_dev_socket_id(port) > 0 &&
				rte_eth_dev_socket_id(port) !=
						(int)rte_socket_id())
			printf("WARNING, port %u is on remote NUMA node to "
					"polling thread.ntPerformance will "
					"not be optimal.n", port);
	printf("nCore %u forwarding packets. [Ctrl+C to quit]n",
			rte_lcore_id());
	d_addr.addr_bytes[0] = 88;
	d_addr.addr_bytes[1] = 83;
	d_addr.addr_bytes[2] = 192;
	d_addr.addr_bytes[3] = 57;
	d_addr.addr_bytes[4] = 0;
	d_addr.addr_bytes[5] = 58;
	s_addr.addr_bytes[0] = 88;
	s_addr.addr_bytes[1] = 83;
	s_addr.addr_bytes[2] = 192;
	s_addr.addr_bytes[3] = 57;
	s_addr.addr_bytes[4] = 0;
	s_addr.addr_bytes[5] = 220;
	rte_ether_addr_copy(&d_addr, &eth_hdr.d_addr);
	rte_ether_addr_copy(&s_addr, &eth_hdr.s_addr);
	eth_hdr.ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
	
	for (;;) {
		// RTE_ETH_FOREACH_DEV(port) {
		// 	struct rte_mbuf *bufs[BURST_SIZE];
		// 	const uint16_t nb_rx = rte_eth_rx_burst(port, 0,
		// 	 		bufs, BURST_SIZE);
		// 	 if (unlikely(nb_rx == 0))
		//  	continue;
		// 	const uint16_t nb_tx = rte_eth_tx_burst(port ^ 1, 0,
		// 			bufs, nb_rx);
		// 	if (unlikely(nb_tx < nb_rx)) {
		// 		uint16_t buf;
		// 		for (buf = nb_tx; buf < nb_rx; buf++)
		// 			rte_pktmbuf_free(bufs[buf]);
		// 	}
		// }
	/*
	**pkt_burst_prepare
	*/
		if (rte_mempool_get_bulk(tx_mbuf_pool, (void **)pkts_burst,
					nb_pkt_per_burst) == 0) {
			for (nb_pkt = 0; nb_pkt < nb_pkt_per_burst; nb_pkt++) {
				if (unlikely(!pkt_burst_prepare(pkts_burst[nb_pkt], tx_mbuf_pool, &eth_hdr))) {
					rte_mempool_put_bulk(tx_mbuf_pool, (void **)&pkts_burst[nb_pkt], nb_pkt_per_burst - nb_pkt);
					break;
				}
			}
		} else {
			for (nb_pkt = 0; nb_pkt < nb_pkt_per_burst; nb_pkt++) {
				pkt = rte_mbuf_raw_alloc(tx_mbuf_pool);
				if (pkt == NULL)
					break;
				if (unlikely(!pkt_burst_prepare(pkt, tx_mbuf_pool, &eth_hdr))) {
					rte_pktmbuf_free(pkt);
					break;
				}
				pkts_burst[nb_pkt] = pkt;
			}
		}
		if (nb_pkt == 0)
			return;
		nb_tx = rte_eth_tx_burst(0, 0, pkts_burst, nb_pkt);
		/*
		** retry
		*/
		if (unlikely(nb_tx < nb_pkt)) {
			retry = 0;
			while (nb_tx < nb_pkt && retry++ < burst_tx_retry_num) {
				rte_delay_us(burst_tx_delay_time);
				nb_tx += rte_eth_tx_burst(0, 0,
						&pkts_burst[nb_tx], nb_pkt - nb_tx);
			}
		}
		if (unlikely(nb_tx < nb_pkt)) {
			do {
				rte_pktmbuf_free(pkts_burst[nb_tx]);
			} while (++nb_tx < nb_pkt);
		}
	}
}
/*
 * The main function, which does initialization and calls the per-lcore
 * functions.
 */
int
main(int argc, char *argv[])
{
	struct rte_mempool *mbuf_pool;
	unsigned nb_ports;
	uint16_t portid;
	/* Initialize the Environment Abstraction Layer (EAL). */
	int ret = rte_eal_init(argc, argv);
	if (ret < 0)
		rte_exit(EXIT_FAILURE, "Error with EAL initializationn");
	argc -= ret;
	argv += ret;
	/* Check that there is an even number of ports to send/receive on. */
	nb_ports = rte_eth_dev_count_avail();
	if (nb_ports < 2 || (nb_ports & 1))
		rte_exit(EXIT_FAILURE, "Error: number of ports must be evenn");
	/* Creates a new mempool in memory to hold the mbufs. */
	mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * nb_ports,
		MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
	if (mbuf_pool == NULL)
		rte_exit(EXIT_FAILURE, "Cannot create mbuf pooln");
	/* Initialize all ports. */
	RTE_ETH_FOREACH_DEV(portid)
		if (port_init(portid, mbuf_pool) != 0)
			rte_exit(EXIT_FAILURE, "Cannot init port %"PRIu16 "n",
					portid);
	if (rte_lcore_count() > 1)
		printf("nWARNING: Too many lcores enabled. Only 1 used.n");
	/* Call lcore_main on the master core only. */
	lcore_main();
	return 0;
}

申明:本文由第三方发布,内容仅代表作者观点,与本网站无关。对本文以及其中全部或者部分内容的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。本网发布或转载文章出于传递更多信息之目的,并不意味着赞同其观点或证实其描述,也不代表本网对其真实性负责。

七爪网 行业资讯 编写属于自己的DPDK应用程序:DPDK入门向指南 https://www.7claw.com/2797283.html

七爪网源码交易平台

相关文章

发表评论
暂无评论
官方客服团队

为您解决烦忧 - 24小时在线 专业服务