提交 bbc13a42 编写于 作者: weixin_43103506's avatar weixin_43103506

add dpdk daq src code

上级 c343b323
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <errno.h>
#include <getopt.h>
#include <net/if.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#ifdef LIBPCAP_AVAILABLE
#include <pcap.h>
#include <pthread.h>
#else
#include "daq_dlt.h"
#endif
#include <rte_config.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include "daq_module_api.h"
#include "dpdk_port_conf.h"
#include "dpdk_param.h"
#define INJECT_BUF_NUM (128)
#define POOL_NAME_LEN (64)
#define BURST_SIZE (32)
#define DESC_POOL_NUM (2048)
#define SET_ERROR(modinst, ...) daq_base_api.set_errbuf(modinst, __VA_ARGS__)
//#define HIGH_PERF_ENABLE (1)
#define DAQ_DPDK_VERSION 1915
#define MEMPOOL_CACHE_SIZE (64)
typedef struct _dpdk_packet_pkt_desc
{
DAQ_Msg_t msg;
DAQ_PktHdr_t pkthdr;
uint8_t *data;
unsigned int length;
struct _dpdk_packet_pkt_desc *next;
} DPDKPacketPktDesc;
typedef struct _dpdk_packet_msg_pool
{
DPDKPacketPktDesc *pool;
DPDKPacketPktDesc *freelist;
DAQ_MsgPoolInfo_t info;
} DPDKPacketMsgPool;
typedef struct _dpdk_packet_context
{
/* Configuration */
uint16_t port_id;
uint16_t queue_id;
char *filter;
int snaplen;
int timeout;
uint8_t debug;
/* State */
DAQ_ModuleInstance_h modinst;
struct timeval ts;
#ifdef LIBPCAP_AVAILABLE
struct bpf_program fcode;
#endif
DPDKPacketMsgPool pool;
struct rte_mempool *inject_mbuf_pool;
volatile uint8_t interrupted;
DAQ_Stats_t stats;
} DPDK_Packet_Context_t;
static DAQ_BaseAPI_t daq_base_api;
static pthread_mutex_t bpf_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t dpdk_start_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t dpdk_stop_mutex = PTHREAD_MUTEX_INITIALIZER;
static DAQ_VariableDesc_t dpdk_variable_descriptions[] = {
{ "debug", "Enable debugging output to stdout", DAQ_VAR_DESC_FORBIDS_ARGUMENT },
};
static void destroy_packet_pool(DPDK_Packet_Context_t *dpdk_pctx)
{
DPDKPacketMsgPool *pool = &dpdk_pctx->pool;
if (pool->pool)
{
while (pool->info.size > 0)
free(pool->pool[--pool->info.size].data);
free(pool->pool);
pool->pool = NULL;
}
pool->freelist = NULL;
pool->info.available = 0;
pool->info.mem_size = 0;
}
static int create_packet_pool(DPDK_Packet_Context_t *dpdk_pctx, unsigned size)
{
DPDKPacketMsgPool *pool = &dpdk_pctx->pool;
pool->pool = calloc(sizeof(DPDKPacketPktDesc), size);
if (!pool->pool)
{
SET_ERROR(dpdk_pctx->modinst, "%s: Could not allocate %zu bytes for a packet descriptor pool!",
__func__, sizeof(DPDKPacketPktDesc) * size);
return DAQ_ERROR_NOMEM;
}
pool->info.mem_size = sizeof(DPDKPacketPktDesc) * size;
while (pool->info.size < size)
{
/* Allocate packet data and set up descriptor */
DPDKPacketPktDesc *desc = &pool->pool[pool->info.size];
desc->data = malloc(dpdk_pctx->snaplen);
if (!desc->data)
{
SET_ERROR(dpdk_pctx->modinst, "%s: Could not allocate %d bytes for a packet descriptor message buffer!",
__func__, dpdk_pctx->snaplen);
return DAQ_ERROR_NOMEM;
}
pool->info.mem_size += dpdk_pctx->snaplen;
/* Initialize non-zero invariant packet header fields. */
DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
pkthdr->ingress_group = DAQ_PKTHDR_UNKNOWN;
pkthdr->egress_group = DAQ_PKTHDR_UNKNOWN;
/* Initialize non-zero invariant message header fields. */
DAQ_Msg_t *msg = &desc->msg;
msg->type = DAQ_MSG_TYPE_PACKET;
msg->hdr_len = sizeof(desc->pkthdr);
msg->hdr = &desc->pkthdr;
msg->data = desc->data;
msg->owner = dpdk_pctx->modinst;
msg->priv = desc;
/* Place it on the free list */
desc->next = pool->freelist;
pool->freelist = desc;
pool->info.size++;
}
pool->info.available = pool->info.size;
return DAQ_SUCCESS;
}
static int dpdk_daq_module_load(const DAQ_BaseAPI_t *base_api)
{
if (base_api->api_version != DAQ_BASE_API_VERSION || base_api->api_size != sizeof(DAQ_BaseAPI_t))
return DAQ_ERROR;
daq_base_api = *base_api;
return DAQ_SUCCESS;
}
static int dpdk_daq_module_unload(void)
{
memset(&daq_base_api, 0, sizeof(daq_base_api));
return DAQ_SUCCESS;
}
static int dpdk_daq_get_variable_descs(const DAQ_VariableDesc_t **var_desc_table)
{
*var_desc_table = dpdk_variable_descriptions;
return sizeof(dpdk_variable_descriptions) / sizeof(DAQ_VariableDesc_t);
}
static int dpdk_daq_instantiate(const DAQ_ModuleConfig_h modcfg, DAQ_ModuleInstance_h modinst, void **ctxt_ptr)
{
int rval=DAQ_SUCCESS,ret;
DPDK_Packet_Context_t *dpdk_pctx;
static int first_time_init = 1,pool_index = 0;
char pool_name[64];
dpdk_pctx = calloc(1, sizeof(DPDK_Packet_Context_t));
if (!dpdk_pctx)
{
SET_ERROR(modinst, "%s: Couldn't allocate memory for the new DPDK Packet context!", __func__);
rval = DAQ_ERROR_NOMEM;
goto err;
}
//dpdk init
if (first_time_init)
{
first_time_init = 0;
printf("in eal init!\n");
if(dpdk_conf_parse() != 0)
{
printf("conf parse error!\n");
goto err;
}
ret = rte_eal_init(dpdk_get_param_cnt(), dpdk_get_param());
if (ret < 0)
{
printf( "Cannot init EAL\n");
goto err;
}
dpdk_port_setup();
}
snprintf(pool_name,POOL_NAME_LEN,"inject_mbuf_pool_%d",pool_index);
pool_index++;
dpdk_pctx->inject_mbuf_pool = rte_pktmbuf_pool_create(pool_name, INJECT_BUF_NUM, MEMPOOL_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, 1);
if (dpdk_pctx->inject_mbuf_pool == NULL)
{
printf( "%s:%s Couldn't create mbuf pool!\n", __FUNCTION__,pool_name);
rval = DAQ_ERROR_NOMEM;
goto err;
}
if (daq_base_api.config_get_mode(modcfg) != DAQ_MODE_PASSIVE)
{
uint16_t ports = dpdk_get_port_num();
if (ports % 2)
{
printf("DAQ_MODE_INLINE ports should bi dual \n");
goto err;
}
}
//config info get
const char *varKey, *varValue;
daq_base_api.config_first_variable(modcfg, &varKey, &varValue);
while (varKey)
{
if (!strcmp(varKey, "debug"))
dpdk_pctx->debug = 1;
daq_base_api.config_next_variable(modcfg, &varKey, &varValue);
}
dpdk_pctx->snaplen = daq_base_api.config_get_snaplen(modcfg);
dpdk_pctx->timeout = (int) daq_base_api.config_get_timeout(modcfg);
if (dpdk_pctx->timeout == 0)
dpdk_pctx->timeout = -1;
gettimeofday(&dpdk_pctx->ts, NULL);
/* Finally, create the message buffer pool. */
uint32_t pool_size = daq_base_api.config_get_msg_pool_size(modcfg);
if (pool_size == 0)
pool_size = DESC_POOL_NUM;
if ((rval = create_packet_pool(dpdk_pctx, pool_size)) != DAQ_SUCCESS)
goto err;
dpdk_get_port_and_queue(&dpdk_pctx->port_id,&dpdk_pctx->queue_id);
dpdk_pctx->modinst = modinst;
*ctxt_ptr = dpdk_pctx;
return rval;
err:
if (dpdk_pctx)
free(dpdk_pctx);
return rval;
}
static void dpdk_daq_destroy(void *handle)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
#ifdef LIBPCAP_AVAILABLE
pcap_freecode(&dpdk_pctx->fcode);
#endif
if(dpdk_pctx)
free(dpdk_pctx);
destroy_packet_pool(dpdk_pctx);
}
static int dpdk_daq_set_filter(void *handle, const char *filter)
{
#ifdef LIBPCAP_AVAILABLE
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
struct bpf_program fcode;
if (dpdk_pctx->filter)
free(dpdk_pctx->filter);
dpdk_pctx->filter = strdup(filter);
if (!dpdk_pctx->filter)
{
SET_ERROR(dpdk_pctx->modinst, "%s: Couldn't allocate memory for the filter string!", __func__);
return DAQ_ERROR;
}
pthread_mutex_lock(&bpf_mutex);
if (pcap_compile_nopcap(dpdk_pctx->snaplen, DLT_EN10MB, &fcode, dpdk_pctx->filter, 1, PCAP_NETMASK_UNKNOWN) == -1)
{
pthread_mutex_unlock(&bpf_mutex);
SET_ERROR(dpdk_pctx->modinst, "%s: BPF state machine compilation failed!", __func__);
return DAQ_ERROR;
}
pthread_mutex_unlock(&bpf_mutex);
pcap_freecode(&dpdk_pctx->fcode);
dpdk_pctx->fcode.bf_len = fcode.bf_len;
dpdk_pctx->fcode.bf_insns = fcode.bf_insns;
return DAQ_SUCCESS;
#else
return DAQ_ERROR_NOTSUP;
#endif
}
static int dpdk_daq_start(void *handle)
{
static int first_time_start = 1;
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
pthread_mutex_lock(&dpdk_start_mutex);
if (first_time_start)
{
first_time_start = 0;
//first start
dpdk_port_start();
}
pthread_mutex_unlock(&dpdk_start_mutex);
memset(&dpdk_pctx->stats, 0, sizeof(DAQ_Stats_t));
printf("THREAD runnging on Port:%d Queue:%d!\n",dpdk_pctx->port_id,dpdk_pctx->queue_id);
return DAQ_SUCCESS;
}
static int dpdk_inject_packet(DPDK_Packet_Context_t *dpdk_pctx, uint16_t out_port_id, uint16_t out_queue_id,const uint8_t *data, uint32_t data_len)
{
struct rte_mbuf *m;
m = rte_pktmbuf_alloc(dpdk_pctx->inject_mbuf_pool);
if (!m)
{
printf("%s: Couldn't allocate memory for packet.",__FUNCTION__);
return DAQ_ERROR_NOMEM;
}
rte_memcpy(rte_pktmbuf_mtod(m, void *), data, data_len);
rte_pktmbuf_data_len(m) = data_len;
uint16_t nb_tx = rte_eth_tx_burst(out_port_id, out_queue_id, &m, 1);
if (unlikely(nb_tx == 0))
{
printf( "%s: Couldn't send packet. Try again.", __FUNCTION__);
rte_pktmbuf_free(m);
return DAQ_ERROR_AGAIN;
}
rte_pktmbuf_free(m);
dpdk_pctx->stats.packets_injected++;
return DAQ_SUCCESS;
}
static int dpdk_daq_inject(void *handle, DAQ_MsgType type, const void *hdr, const uint8_t *data, uint32_t data_len)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
if (type != DAQ_MSG_TYPE_PACKET)
return DAQ_ERROR_NOTSUP;
return dpdk_inject_packet(dpdk_pctx,dpdk_pctx->port_id,dpdk_pctx->queue_id,data,data_len);
}
static int dpdk_daq_inject_relative(void *handle, const DAQ_Msg_t *msg, const uint8_t *data, uint32_t data_len, int reverse)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
uint16_t reverse_port = dpdk_pctx->port_id;
if (reverse)
reverse_port = dpdk_pctx->port_id % 2 ? (dpdk_pctx->port_id - 1):(dpdk_pctx->port_id + 1);
return dpdk_inject_packet(dpdk_pctx,reverse_port,dpdk_pctx->queue_id,data,data_len);
}
static int dpdk_daq_interrupt(void *handle)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
dpdk_pctx->interrupted = 1;
return DAQ_SUCCESS;
}
static int dpdk_daq_stop(void *handle)
{
static int first_time_stop = 1;
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
pthread_mutex_lock(&dpdk_stop_mutex);
if (first_time_stop)
{
first_time_stop = 0;
rte_eth_dev_stop(dpdk_pctx->port_id);
rte_eth_dev_close(dpdk_pctx->port_id);
}
pthread_mutex_unlock(&dpdk_stop_mutex);
return DAQ_SUCCESS;
}
static int dpdk_daq_ioctl(void *handle, DAQ_IoctlCmd cmd, void *arg, size_t arglen)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
/* Only supports GET_DEVICE_INDEX for now */
if (cmd != DIOCTL_GET_DEVICE_INDEX || arglen != sizeof(DIOCTL_QueryDeviceIndex))
return DAQ_ERROR_NOTSUP;
DIOCTL_QueryDeviceIndex *qdi = (DIOCTL_QueryDeviceIndex *) arg;
if (!qdi->device)
{
SET_ERROR(dpdk_pctx->modinst, "No device name to find the index of!");
return DAQ_ERROR_INVAL;
}
//undo:
qdi->index = 0;
return DAQ_SUCCESS;
}
static int dpdk_daq_get_stats(void *handle, DAQ_Stats_t *stats)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
rte_memcpy(stats, &dpdk_pctx->stats, sizeof(DAQ_Stats_t));
return DAQ_SUCCESS;
}
static void dpdk_daq_reset_stats(void *handle)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
memset(&dpdk_pctx->stats, 0, sizeof(DAQ_Stats_t));
}
static int dpdk_daq_get_snaplen(void *handle)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
return dpdk_pctx->snaplen;
}
static uint32_t dpdk_daq_get_capabilities(void *handle)
{
uint32_t capabilities = DAQ_CAPA_BLOCK | DAQ_CAPA_REPLACE | DAQ_CAPA_INJECT |
DAQ_CAPA_UNPRIV_START | DAQ_CAPA_INTERRUPT | DAQ_CAPA_DEVICE_INDEX;
#ifdef LIBPCAP_AVAILABLE
capabilities |= DAQ_CAPA_BPF;
#endif
return capabilities;
}
static int dpdk_daq_get_datalink_type(void *handle)
{
return DLT_EN10MB;
}
static unsigned dpdk_daq_msg_receive(void *handle, const unsigned max_recv, const DAQ_Msg_t *msgs[], DAQ_RecvStatus *rstat)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
DAQ_RecvStatus status = DAQ_RSTAT_OK;
unsigned idx = 0,loop = 0;
uint8_t *data;
uint16_t len,max_recv_ok = max_recv;
struct rte_mbuf *bufs[BURST_SIZE];
if (dpdk_pctx->interrupted)
{
dpdk_pctx->interrupted = 0;
status = DAQ_RSTAT_INTERRUPTED;
goto err;
}
if (max_recv > BURST_SIZE)
max_recv_ok = BURST_SIZE;
uint16_t nb_rx = rte_eth_rx_burst(dpdk_pctx->port_id, dpdk_pctx->queue_id, bufs, max_recv_ok);
for (loop = 0; loop < nb_rx; loop++)
{
//printf("port:%d queue:%d\n",dpdk_pctx->port_id,dpdk_pctx->queue_id);
data = rte_pktmbuf_mtod(bufs[loop], void *);
len = rte_pktmbuf_data_len(bufs[loop]);
//rte_pktmbuf_dump(stdout,bufs[loop],bufs[loop]->pkt_len);
#ifdef LIBPCAP_AVAILABLE
if (dpdk_pctx->fcode.bf_insns && bpf_filter(dpdk_pctx->fcode.bf_insns, data, len, len) == 0)
{
dpdk_pctx->stats.packets_filtered++;
rte_pktmbuf_free(bufs[loop]);
continue;
}
#endif
dpdk_pctx->stats.packets_received++;
DPDKPacketPktDesc *desc = dpdk_pctx->pool.freelist;
if (!desc)
{
rte_pktmbuf_free(bufs[loop]);
status = DAQ_RSTAT_NOBUF;
break;
}
//undo: not copy data to desc,should store the rte_mbuf desc to data,and later free it.
desc->length = len;
/* Next, set up the DAQ message. Most fields are prepopulated and unchanging. */
DAQ_Msg_t *msg = &desc->msg;
msg->data_len = len;
#ifdef HIGH_PERF_ENABLE
msg->data = data;
msg->priv = bufs[loop];
#else
rte_memcpy(desc->data, data, len);
rte_pktmbuf_free(bufs[loop]);
#endif
/* Then, set up the DAQ packet header. */
DAQ_PktHdr_t *pkthdr = &desc->pkthdr;
pkthdr->ts.tv_sec = 0;
pkthdr->ts.tv_usec = 0;
pkthdr->pktlen = len;
pkthdr->ingress_index = dpdk_pctx->port_id;
pkthdr->egress_index =
dpdk_pctx->port_id % 2 ? (dpdk_pctx->port_id - 1):(dpdk_pctx->port_id + 1);
pkthdr->flags = 0;
dpdk_pctx->pool.freelist = desc->next;
desc->next = NULL;
dpdk_pctx->pool.info.available--;
msgs[idx] = &desc->msg;
idx++;
}
#if 0
if (!nb_rx && (dpdk_pctx->timeout != -1 ))
{
struct timeval now;
/* If time out, return control to the caller. */
gettimeofday(&now, NULL);
if (now.tv_sec > dpdk_pctx->ts.tv_sec ||
(now.tv_usec - dpdk_pctx->ts.tv_usec) > dpdk_pctx->timeout * 1000)
status = DAQ_RSTAT_TIMEOUT;
else
gettimeofday(&dpdk_pctx->ts, NULL);
}
#endif
err:
*rstat = status;
return idx;
}
static const DAQ_Verdict verdict_translation_table[MAX_DAQ_VERDICT] = {
DAQ_VERDICT_PASS, /* DAQ_VERDICT_PASS */
DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLOCK */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_REPLACE */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_WHITELIST */
DAQ_VERDICT_BLOCK, /* DAQ_VERDICT_BLACKLIST */
DAQ_VERDICT_PASS, /* DAQ_VERDICT_IGNORE */
DAQ_VERDICT_BLOCK /* DAQ_VERDICT_RETRY */
};
static int dpdk_daq_msg_finalize(void *handle, const DAQ_Msg_t *msg, DAQ_Verdict verdict)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
DPDKPacketPktDesc *desc = (DPDKPacketPktDesc *) msg->priv;
/* Sanitize and enact the verdict. */
if (verdict >= MAX_DAQ_VERDICT)
verdict = DAQ_VERDICT_PASS;
dpdk_pctx->stats.verdicts[verdict]++;
verdict = verdict_translation_table[verdict];
if (verdict == DAQ_VERDICT_PASS)
dpdk_inject_packet(dpdk_pctx,dpdk_pctx->port_id,dpdk_pctx->queue_id,msg->data,msg->data_len);
desc->next = dpdk_pctx->pool.freelist;
dpdk_pctx->pool.freelist = desc;
dpdk_pctx->pool.info.available++;
#ifdef HIGH_PERF_ENABLE
rte_pktmbuf_free(msg->priv);
#endif
return DAQ_SUCCESS;
}
static int dpdk_daq_get_msg_pool_info(void *handle, DAQ_MsgPoolInfo_t *info)
{
DPDK_Packet_Context_t *dpdk_pctx = (DPDK_Packet_Context_t *) handle;
*info = dpdk_pctx->pool.info;
return DAQ_SUCCESS;
}
#ifdef BUILDING_SO
DAQ_SO_PUBLIC const DAQ_ModuleAPI_t DAQ_MODULE_DATA =
#else
const DAQ_ModuleAPI_t dpdk_daq_module_data =
#endif
{
/* .api_version = */ DAQ_MODULE_API_VERSION,
/* .api_size = */ sizeof(DAQ_ModuleAPI_t),
/* .module_version = */ DAQ_DPDK_VERSION,
/* .name = */ "dpdk",
/* .type = */ DAQ_TYPE_INTF_CAPABLE | DAQ_TYPE_INLINE_CAPABLE | DAQ_TYPE_MULTI_INSTANCE,
/* .load = */ dpdk_daq_module_load,
/* .unload = */ dpdk_daq_module_unload,
/* .get_variable_descs = */ dpdk_daq_get_variable_descs,
/* .instantiate = */ dpdk_daq_instantiate,
/* .destroy = */ dpdk_daq_destroy,
/* .set_filter = */ dpdk_daq_set_filter,
/* .start = */ dpdk_daq_start,
/* .inject = */ dpdk_daq_inject,
/* .inject_relative = */ dpdk_daq_inject_relative,
/* .interrupt = */ dpdk_daq_interrupt,
/* .stop = */ dpdk_daq_stop,
/* .ioctl = */ dpdk_daq_ioctl,
/* .get_stats = */ dpdk_daq_get_stats,
/* .reset_stats = */ dpdk_daq_reset_stats,
/* .get_snaplen = */ dpdk_daq_get_snaplen,
/* .get_capabilities = */ dpdk_daq_get_capabilities,
/* .get_datalink_type = */ dpdk_daq_get_datalink_type,
/* .config_load = */ NULL,
/* .config_swap = */ NULL,
/* .config_free = */ NULL,
/* .msg_receive = */ dpdk_daq_msg_receive,
/* .msg_finalize = */ dpdk_daq_msg_finalize,
/* .get_msg_pool_info = */ dpdk_daq_get_msg_pool_info,
};
[EAL]
-l=10-15
-w=0000:06:00.0
[PORT-0]
queue-num=63
rss-tuple=3
jumbo=no
mtu=1500
[PORT-1]
queue-num=63
rss-tuple=3
jumbo=no
mtu=1500
/**
* \file
*
* \author kuangxiaohong <1002361031@qq.com>
*
* DPDK param moudle.
*/
#include "dpdk_param.h"
#include <stdio.h>
#include <stdint.h>
#include <string.h>
#include <strings.h>
#include <stdlib.h>
#include <rte_cfgfile.h>
//#define DEBUG_PARAM
#ifdef DEBUG_PARAM
#define DEBUG(fmt,args...) printf(fmt, ##args)
#else
#define DEBUG(fmt,args...) /* do nothing */
#endif
#define MAX_EAL_ENTRY (32)
static char argument[MAX_EAL_ENTRY][MAX_EAL_ENTRY * 2] = {{"./build/app/phytium_dpdk"}, {""}};
static uint16_t argument_count = 1;
static char *args[MAX_EAL_ENTRY];
static dpdk_port_conf_t dpdk_ports[RTE_MAX_ETHPORTS];
static int dpdk_port_num = 0;
/**
*dpdk配置文件解析
*
* @param
*
* @return 0成功,其它失败
*
*/
int dpdk_conf_parse(void)
{
int i,j;
struct rte_cfgfile *file = NULL;
struct rte_cfgfile_entry entries[MAX_EAL_ENTRY];
file = rte_cfgfile_load("dpdk.cfg", 0);
if (file == NULL){
printf("rte_cfgfile_load:%s\n","dpdk.cfg");
return -1;
}
/* get section name EAL */
if (rte_cfgfile_has_section(file, "EAL")) {
DEBUG(" section (EAL); count %d\n", rte_cfgfile_num_sections(file, "EAL", sizeof("EAL") - 1));
DEBUG(" section (EAL) has entries %d\n", rte_cfgfile_section_num_entries(file, "EAL"));
int n_entries = rte_cfgfile_section_num_entries(file, "EAL");
if (n_entries > MAX_EAL_ENTRY)
{
DEBUG("EAL entry (%d) overflow!\n",n_entries);
return -1;
}
if (rte_cfgfile_section_entries(file, "EAL", entries, n_entries) != -1) {
for (i = 0; i < n_entries; i++) {
DEBUG(" - name: (%s) value: (%s)\n", entries[i].name, entries[i].value);
snprintf(argument[i * 2 + 1], MAX_EAL_ENTRY * 2, "%s", entries[i].name);
snprintf(argument[i * 2 + 2], MAX_EAL_ENTRY * 2, "%s", entries[i].value);
DEBUG(" - argument: (%s) (%s)\n", argument[i * 2 + 1], argument[i * 2 + 2]);
argument_count += (((entries[i].name) ? 1 : 0) + ((entries[i].value) ? 1 : 0));
}
}
}
/* get section name PORT-X */
for (i = 0; i < RTE_MAX_ETHPORTS; i++) {
char port_section_name[15] = {""};
sprintf(port_section_name, "%s%d", "PORT-", i);
if (rte_cfgfile_has_section(file, port_section_name)) {
dpdk_port_num++;
int n_port_entries = rte_cfgfile_section_num_entries(file, port_section_name);
DEBUG(" %s\n", port_section_name);
DEBUG(" section (PORT) has %d entries\n", n_port_entries);
struct rte_cfgfile_entry entries[MAX_EAL_ENTRY];
if (rte_cfgfile_section_entries(file, port_section_name, entries, n_port_entries) != -1) {
for (j = 0; j < n_port_entries; j++) {
DEBUG(" section_name %s entry_name: (%s) entry_value: (%s)\n", port_section_name, entries[j].name, entries[j].value);
if (strcasecmp("mtu", entries[j].name) == 0)
dpdk_ports[i].mtu = atoi(entries[j].value);
else if (strcasecmp("rss-tuple", entries[j].name) == 0)
dpdk_ports[i].rss_tuple = atoi(entries[j].value);
else if (strcasecmp("jumbo", entries[j].name) == 0)
dpdk_ports[i].jumbo = (strcasecmp(entries[j].value, "yes") == 0) ? 1 : 0;
else if (strcasecmp("queue-num", entries[j].name) == 0)
dpdk_ports[i].queue_num= atoi(entries[j].value);
//todo:other port conf
}
}
}
}
rte_cfgfile_close(file);
return 0;
}
/**
*dpdk配置参数个数获取
*
* @param
*
* @return 返回配置参数个数
*
*/
int dpdk_get_param_cnt(void)
{
DEBUG("argument count (%d)\n",argument_count);
return argument_count;
}
/**
*dpdk配置参数获取
*
* @param
*
* @return 返回配置参数
*
*/
char **dpdk_get_param(void)
{
int i,j;
for (j = 0; j < argument_count; j++)
args[j] = argument[j];
for (i = 0; i < argument_count; i++)
DEBUG(" %s\n",argument[i]);
return (char **)args;
}
/**
*dpdk网口配置个数获取
*
* @param
*
* @return 返回网口配置个数
*
*/
int dpdk_get_port_cnt(void)
{
DEBUG("dpdk port count (%d)\n",dpdk_port_num);
return dpdk_port_num;
}
/**
*dpdk网口配置信息获取
*
* @param
*
* @return 返回某个网口配置信息
*
*/
dpdk_port_conf_t* dpdk_get_port_conf(int port_id)
{
if (port_id >= RTE_MAX_ETHPORTS)
{
printf("dpdk_get_port_conf invalid port_id\n");
return NULL;
}
DEBUG("dpdk port_id(%d) port_queue_num(%d)\n",port_id,dpdk_ports[port_id].queue_num);
DEBUG("dpdk port_id(%d) port_rss_tuple(%d)\n",port_id,dpdk_ports[port_id].rss_tuple);
DEBUG("dpdk port_id(%d) port_mtu(%d)\n",port_id,dpdk_ports[port_id].mtu);
DEBUG("dpdk port_id(%d) port_jumbo(%s)\n",port_id,(dpdk_ports[port_id].jumbo == 1) ? "yes" : "no" );
return &dpdk_ports[port_id];
}
#ifndef __DPDK_PARAM__
#define __DPDK_PARAM__
typedef struct dpdk_port_conf
{
int queue_num;
int mtu;
int rss_tuple;
int jumbo;
}dpdk_port_conf_t;
/**
*dpdk配置文件解析
*
* @param
*
* @return 0成功,其它失败
*
*/
int dpdk_conf_parse(void);
/**
*dpdk配置参数个数获取
*
* @param
*
* @return 返回配置参数个数
*
*/
int dpdk_get_param_cnt(void);
/**
*dpdk配置参数获取
*
* @param
*
* @return 返回配置参数
*
*/
char **dpdk_get_param(void);
/**
*dpdk网口配置个数获取
*
* @param
*
* @return 返回网口配置个数
*
*/
int dpdk_get_port_cnt(void);
/**
*dpdk网口配置信息获取
*
* @param
*
* @return 返回某个网口配置信息
*
*/
dpdk_port_conf_t* dpdk_get_port_conf(int port_id);
#endif
/**
* \file
*
* \author kuangxiaohong <1002361031@qq.com>
*
* DPDK support.
*/
#include "dpdk_port_conf.h"
#include "dpdk_param.h"
#include <stdio.h>
#include <stddef.h>
#include <sys/types.h>
#include <rte_cycles.h>
#include <rte_ethdev.h>
#ifndef MAX
#define MAX(v1, v2) ((v1) > (v2) ? (v1) : (v2))
#endif
#ifndef MIN
#define MIN(v1, v2) ((v1) < (v2) ? (v1) : (v2))
#endif
#define RX_RING_SIZE (4096)
#define TX_RING_SIZE (512)
#define RSS_HASH_KEY_LENGTH 40
#define PKT_PRIV_SIZE (256)
static uint32_t port_queue_num[RTE_MAX_ETHPORTS] = {0};
static uint32_t port_cnt = 0;
//对称RSS,dpdk默认RSS为非对称的
static uint8_t g_arrayHashKey[RSS_HASH_KEY_LENGTH] =
{
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,0x6D, 0x5A, 0x6D, 0x5A,
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A
};
static struct rte_mempool *pktmbuf_pool[NB_SOCKETS] = {NULL};
/**
* numa架构下为每个核和端口到其所在内存节点上分配内存
*
* @param nb_mbuf
* 分配mbuf个数
* @return
* 0 表示成功,其它为失败
*/
static int init_mem(uint32_t nb_mbuf)
{
int socketid,i;
unsigned lcore_id;
char s[64];
for (lcore_id = 0; lcore_id < RTE_MAX_LCORE; lcore_id++) {
if (rte_lcore_is_enabled(lcore_id) == 0)
continue;
socketid = rte_lcore_to_socket_id(lcore_id);
if (socketid >= NB_SOCKETS)
rte_exit(EXIT_FAILURE,"Socket %d of lcore %u is out of range %d\n",socketid, lcore_id, NB_SOCKETS);
if (pktmbuf_pool[socketid] == NULL) {
snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
pktmbuf_pool[socketid] = rte_pktmbuf_pool_create(s, nb_mbuf, 64, PKT_PRIV_SIZE, RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
if (pktmbuf_pool[socketid] == NULL)
rte_exit(EXIT_FAILURE,"Cannot init mbuf pool on socket %d\n",socketid);
else
printf("Allocated mbuf pool on socket %d\n",socketid);
}
}
RTE_ETH_FOREACH_DEV(i){
socketid = rte_eth_dev_socket_id(i);
if (socketid >= NB_SOCKETS)
rte_exit(EXIT_FAILURE,"Socket %d of port_id %u is out of range %d\n",socketid, i, NB_SOCKETS);
if (pktmbuf_pool[socketid] == NULL) {
snprintf(s, sizeof(s), "mbuf_pool_%d", socketid);
pktmbuf_pool[socketid] = rte_pktmbuf_pool_create(s, nb_mbuf, 64, 0, RTE_MBUF_DEFAULT_BUF_SIZE, socketid);
if (pktmbuf_pool[socketid] == NULL)
rte_exit(EXIT_FAILURE,"Cannot init mbuf pool on socket %d\n",socketid);
else
printf("Allocated mbuf pool on socket %d\n",socketid);
}
}
return 0;
}
/**
* 根据网口配置进行dpdk网口初始化
*
* @param dpdk_port_conf
* 网口配置信息
* @param port
* 网口
* @param num_queues
* 网口配置队列数
* @return
*
*/
static inline void dpdk_port_init(dpdk_port_conf_t *dpdk_port_conf,uint16_t port,uint16_t num_queues)
{
struct rte_eth_conf port_conf =
{
.rxmode = {
.mq_mode = ETH_MQ_RX_RSS,
//.max_rx_pkt_len = 2000,
.split_hdr_size = 0,
.offloads = DEV_RX_OFFLOAD_KEEP_CRC,
//.offloads = DEV_RX_OFFLOAD_CRC_STRIP,//(DEV_RX_OFFLOAD_CHECKSUM |
// DEV_RX_OFFLOAD_CRC_STRIP),
},
.rx_adv_conf = {
.rss_conf = {
.rss_key = g_arrayHashKey,
.rss_key_len = RSS_HASH_KEY_LENGTH,
.rss_hf = ETH_RSS_UDP|ETH_RSS_TCP,
},
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
}
};
const uint16_t rx_rings = num_queues, tx_rings = num_queues;
struct rte_eth_dev_info info;
struct rte_eth_rxconf rxq_conf;
struct rte_eth_txconf txq_conf;
int retval;
uint16_t q,mtu;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
uint64_t rss_hf_tmp;
int socketid = rte_eth_dev_socket_id(port);
/* init port */
printf("Initializing port %u in socketid %d ... ", port,socketid);
fflush(stdout);
rte_eth_dev_info_get(port, &info);
info.default_rxconf.rx_drop_en = 0;
if (info.tx_offload_capa & DEV_TX_OFFLOAD_MBUF_FAST_FREE)
port_conf.txmode.offloads |= DEV_TX_OFFLOAD_MBUF_FAST_FREE;
rss_hf_tmp = port_conf.rx_adv_conf.rss_conf.rss_hf;
port_conf.rx_adv_conf.rss_conf.rss_hf &= info.flow_type_rss_offloads;
if (port_conf.rx_adv_conf.rss_conf.rss_hf != rss_hf_tmp)
{
printf("Port %u modified RSS hash function based on hardware support,"
"requested:%#"PRIx64" configured:%#"PRIx64"\n",
port,
rss_hf_tmp,
port_conf.rx_adv_conf.rss_conf.rss_hf);
}
if (dpdk_port_conf->jumbo)
port_conf.rxmode.offloads |= DEV_RX_OFFLOAD_JUMBO_FRAME;
retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
if (retval < 0)
rte_exit(EXIT_FAILURE,"rte_eth_dev_configure failed\n");
retval = rte_eth_dev_adjust_nb_rx_tx_desc(port, &nb_rxd, &nb_txd);
if (retval < 0)
rte_exit(EXIT_FAILURE,"rte_eth_dev_configure failed\n");
rxq_conf = info.default_rxconf;
rxq_conf.offloads = port_conf.rxmode.offloads;
for (q = 0; q < rx_rings; q ++)
{
retval = rte_eth_rx_queue_setup(port, q, nb_rxd,socketid,&rxq_conf, pktmbuf_pool[socketid]);
if (retval < 0)
rte_exit(EXIT_FAILURE,"rte_eth_rx_queue_setup q(%d) failed\n",q);
}
txq_conf = info.default_txconf;
txq_conf.offloads = port_conf.txmode.offloads;
for (q = 0; q < tx_rings; q ++)
{
retval = rte_eth_tx_queue_setup(port, q, nb_txd,socketid,&txq_conf);
if (retval < 0)
rte_exit(EXIT_FAILURE,"rte_eth_tx_queue_setup q(%d) failed\n",q);
}
if (rte_eth_dev_get_mtu (port, &mtu) != 0)
rte_exit(EXIT_FAILURE,"rte_eth_dev_get_mtu port(%d) failed\n",port);
if (mtu != dpdk_port_conf->mtu)
if (rte_eth_dev_set_mtu (port, mtu) != 0)
rte_exit(EXIT_FAILURE,"rte_eth_dev_set_mtu port(%d) failed\n",port);
rte_eth_promiscuous_enable(port);
return ;
}
/**
* 检测网口link状态
*
* @param port_num
* 网口个数
* @param port_mask
* 网口掩码
* @return
*
*/
void check_all_ports_link_status(uint8_t port_num, uint32_t port_mask)
{
#define CHECK_INTERVAL 100 /* 100ms */
#define MAX_CHECK_TIME 40 /* 9s (90 * 100ms) in total */
uint8_t portid, count, all_ports_up, print_flag = 0;
struct rte_eth_link link;
printf("\nChecking link status\n");
fflush(stdout);
for (count = 0; count <= MAX_CHECK_TIME; count++)
{
all_ports_up = 1;
for (portid = 0; portid < port_num; portid++)
{
if ((port_mask & (1 << portid)) == 0)
continue;
memset(&link, 0, sizeof(link));
rte_eth_link_get_nowait(portid, &link);
/* print link status if flag set */
if (print_flag == 1)
{
if (link.link_status)
printf("Port %d Link Up - speed %u Mbps - %s\n", (uint8_t)portid,(unsigned)link.link_speed,
(link.link_duplex == ETH_LINK_FULL_DUPLEX) ?("full-duplex") : ("half-duplex\n"));
else
printf("Port %d Link Down\n",(uint8_t)portid);
continue;
}
/* clear all_ports_up flag if any link down */
if (link.link_status == ETH_LINK_DOWN)
{
all_ports_up = 0;
break;
}
}
/* after finally printing all link status, get out */
if (print_flag == 1)
{
break;
}
if (all_ports_up == 0)
{
printf(".");
fflush(stdout);
rte_delay_ms(CHECK_INTERVAL);
}
/* set the print_flag if all ports up or timeout */
if (all_ports_up == 1 || count == (MAX_CHECK_TIME - 1))
{
print_flag = 1;
printf("done\n");
}
}
}
/**
* 根据网口配置信息来确定网口个数和队列数,并进行网口初始化
*
* @param
*
* @return
*
*/
void dpdk_port_setup_proc(void)
{
dpdk_port_conf_t *port_conf;
struct rte_eth_dev_info dev_info;
char portName[RTE_ETH_NAME_MAX_LEN] = {0};
int port_num,port_id,num_max_queue;
int port_conf_num = dpdk_get_port_cnt();
int port_total = rte_eth_dev_count_avail();
if (port_conf_num <= 0 || port_conf_num > port_total)
printf("WARING: port_conf_num %d larger port total %d\n",port_conf_num, port_total);
port_num = MIN(port_conf_num,port_total);
port_cnt = port_num;
printf("port_num:%d port_conf_num:%d port_avail:%d\n",port_num,port_conf_num,port_total);
int num_cores = rte_lcore_count();
if ( num_cores < 1 )
rte_exit(EXIT_FAILURE,"lcore num is %d, lower 0\n",num_cores);
//queue_num should compare with lcore_num,because one queue bind to one lcore
for (port_id = 0; port_id < port_num; port_id++)
{
if (!rte_eth_dev_is_valid_port(port_id))
rte_exit(EXIT_FAILURE,"invalid port id %d\n",port_id);
if (rte_eth_dev_get_name_by_port(port_id, portName) == 0)
printf(" - port (%u) Name (%s)\n", port_id, portName);
port_conf = dpdk_get_port_conf(port_id);
rte_eth_dev_info_get(port_id, &dev_info);
num_max_queue = port_conf->queue_num > dev_info.max_rx_queues ? dev_info.max_rx_queues : port_conf->queue_num;
num_max_queue = num_max_queue > dev_info.max_tx_queues ? dev_info.max_tx_queues : num_max_queue;
if (num_max_queue > num_cores)
num_max_queue = num_cores;
port_queue_num[port_id] = num_max_queue;
printf("port=%d max_rx_queue=%d max_tx_queue=%d queue=%d lcores=%d\n",port_id, dev_info.max_rx_queues, dev_info.max_tx_queues, num_max_queue,num_cores);
printf("dev_info.driver_name = %s,dev_info.if_index=%d\n", dev_info.driver_name, dev_info.if_index);
dpdk_port_init(port_conf,port_id,num_max_queue);
}
check_all_ports_link_status((uint8_t)port_num,(~0x0));
return;
}
/**
* 网口启动收包
*
* @param
*
* @return
*
*/
void dpdk_port_start(void)
{
int port_id,ret;
RTE_ETH_FOREACH_DEV(port_id) {
/* Start device */
//printf("dpdk_port_start: port_id:%d\n",port_id);
ret = rte_eth_dev_start(port_id);
if (ret < 0)
rte_exit(EXIT_FAILURE,"rte_eth_dev_start: err=%d, port=%d\n",ret, port_id);
}
}
/**
* 网口建立并初始化
*
* @param
*
* @return
*
*/
void dpdk_port_setup(void)
{
init_mem(NB_MBUF);
dpdk_port_setup_proc();
}
/**
* 获取某网口的队列数
*
* @param port_id
* 网口id
* @return 返回网口队列数
*
*/
int dpdk_get_port_queue_num(int port_id)
{
return port_queue_num[port_id];
}
/**
* 获取网口数
*
* @param
*
* @return 返回网口数
*
*/
int dpdk_get_port_num(void)
{
return port_cnt;
}
/**
*每调用一次返回一个网口和队列,用于多线程处理
*
* @param *out_port
* 网口返回值
* @param *out_queue
* 队列返回值
*
* @return
*
*/
int dpdk_get_port_and_queue(uint16_t *out_port,uint16_t *out_queue)
{
static uint8_t first_time = 1;
static uint16_t port = 0;
static uint16_t queue = 0;
int nr_ports = dpdk_get_port_num();
int nr_queues = dpdk_get_port_queue_num(port);
if (first_time == 1)
{
first_time = 0;
goto complete;
}
if ((queue + 1) < nr_queues)
queue += 1;
else {
port += 1;
queue = 0;
}
if(port >= nr_ports)
return -1;
complete:
*out_port = port;
*out_queue = queue;
return 0;
}
/**
*打印所有网口信息
*
* @param
*
* @return
*
*/
void dpdk_ports_print(void)
{
uint16_t nb_ports = 0, i = 0;
nb_ports = dpdk_get_port_num();
printf("--- DPDK Ports ---");
printf("Overall Ports: %d ", nb_ports);
for (; i < nb_ports; i++) {
struct rte_eth_dev_info info;
struct rte_eth_link link;
printf(" -- Port: %d", i);
rte_eth_dev_info_get(i, &info);
rte_eth_link_get(i, &link);
printf(" -- promiscuous: %s", rte_eth_promiscuous_get(i)?"yes":"no");
printf(" -- link info: speed %u, duplex %u, autoneg %u, status %u",
link.link_speed, link.link_duplex,
link.link_autoneg, link.link_status);
printf(" -- driver: %s", info.driver_name);
printf(" -- NUMA node: %d", rte_eth_dev_socket_id(i));
}
return;
}
/**
*获取所有队列个数
*
* @param
*
* @return 返回所有队列数
*
*/
int dpdk_get_port_queue_total(void)
{
int total_rx_queue = 0;
int port_num = dpdk_get_port_num();
int port_id;
for (port_id = 0; port_id < port_num; port_id++)
total_rx_queue += dpdk_get_port_queue_num(port_id);
return total_rx_queue;
}
#ifndef __DPDK_PORT_CONF__
#define __DPDK_PORT_CONF__
#include "stdint.h"
//can get from dpdk param
#define NB_MBUF (1024*1024)
#define NB_SOCKETS (8)
/**
* 网口建立并初始化
*
* @param
*
* @return
*
*/
void dpdk_port_setup(void);
/**
* 网口启动收包
*
* @param
*
* @return
*
*/
void dpdk_port_start(void);
/**
* 获取某网口的队列数
*
* @param port_id
* 网口id
* @return 返回网口队列数
*
*/
int dpdk_get_port_queue_num(int port_id);
/**
* 获取网口数
*
* @param
*
* @return 返回网口数
*
*/
int dpdk_get_port_num(void);
/**
*每调用一次返回一个网口和队列,用于多线程处理
*
* @param *out_port
* 网口返回值
* @param *out_queue
* 队列返回值
*
* @return
*
*/
int dpdk_get_port_and_queue(uint16_t *out_port,uint16_t *out_queue);
/**
*打印所有网口信息
*
* @param
*
* @return
*
*/
void dpdk_ports_print(void);
/**
* 检测网口link状态
*
* @param port_num
* 网口个数
* @param port_mask
* 网口掩码
* @return
*
*/
void check_all_ports_link_status(uint8_t port_num, uint32_t port_mask);
/**
*获取所有队列个数
*
* @param
*
* @return 返回所有队列数
*
*/
int dpdk_get_port_queue_total(void);
/**
* 根据网口配置信息来确定网口个数和队列数,并进行网口初始化
*
* @param
*
* @return
*
*/
void dpdk_port_setup_proc(void);
#endif
# libdaq_static_dpdk pkg-config file
prefix=@prefix@
exec_prefix=@exec_prefix@
libdir=@libdir@
includedir=@includedir@
Name: libdaq_static_dpdk
Description: DPDK static DAQ module
URL: https://snort.org/downloads
Version: @VERSION@
Requires:
Conflicts:
Libs: -L${libdir} -ldaq_static_dpdk @DAQ_DPDK_LIBS@
#Cflags:-I/home/tools/dpdk-stable-19.11.5/arm64-armv8a-linuxapp-gcc/include/
Cflags:
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册