Skip to content
Snippets Groups Projects

host_simplified.c

  • Clone with SSH
  • Clone with HTTPS
  • Embed
  • Share
    The snippet can be accessed without any authentication.
    Authored by Henrik Schuh
    host_simplified.c 6.29 KiB
    #include <rte_cycles.h>
    #include <rte_spinlock.h>
    #include <rte_io.h>
    #include <rte_mbuf.h>
    #include <rte_hexdump.h>
    
    #include "host.h"
    #include "host_mem.h"
    #include "lat.h"
    #include "config.h"
    
    extern struct rte_mempool *pool;
    
    volatile uint64_t host_start;
    rte_spinlock_t stats_mutex;
    uint64_t stats_rx_total;
    uint64_t stats_tx_total;
    uint64_t stats_host_count;
    
    __rte_cache_aligned __thread uint8_t pkt_template[64];
    
    int host_init() {
        host_start = 0;
    
        rte_spinlock_init(&stats_mutex);
        stats_rx_total = 0;
        stats_tx_total = 0;
        stats_host_count++;
        return 0;
    }
    
    void host_pkt_init(int p, int q) {
        struct rte_ether_hdr *eth_hdr;
        struct rte_ipv4_hdr *ip_hdr;
        struct rte_udp_hdr *udp_hdr;
    
        eth_hdr = (struct rte_ether_hdr *) pkt_template;
        eth_hdr->ether_type = rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4);
        rte_eth_macaddr_get(p, &eth_hdr->src_addr);
        rte_eth_macaddr_get(p, &eth_hdr->dst_addr);
    
        ip_hdr = (struct rte_ipv4_hdr *) (eth_hdr + 1);
        memset(ip_hdr, 0, sizeof(struct rte_ipv4_hdr));
        ip_hdr->version_ihl = RTE_IPV4_VHL_DEF;
        ip_hdr->time_to_live = IPDEFTTL;
        ip_hdr->next_proto_id = IPPROTO_UDP;
        ip_hdr->src_addr = rte_cpu_to_be_32(RTE_IPV4(192,168,200,1));
        ip_hdr->dst_addr = ip_hdr->src_addr;
        ip_hdr->total_length = rte_cpu_to_be_16(PKT_TOTAL_SIZE - sizeof(struct rte_ether_hdr));
        ip_hdr->hdr_checksum = rte_ipv4_cksum(ip_hdr);
    
        udp_hdr = (struct rte_udp_hdr *) (ip_hdr + 1);
        udp_hdr->src_port = rte_cpu_to_be_16(6000 + q);
        udp_hdr->dst_port = rte_cpu_to_be_16(6000 + q);
        udp_hdr->dgram_cksum = 0;
        udp_hdr->dgram_len = rte_cpu_to_be_16(PKT_TOTAL_SIZE - sizeof(struct rte_ether_hdr)
            - sizeof(struct rte_ipv4_hdr));
    }
    
    static inline int host_process_rx(struct rte_mbuf *rx_buf, uint64_t rx_tsc) {
        uint8_t *rx_pkt;
        uint64_t *tsc;
    
        if (unlikely(rx_buf == NULL)) {
            return -1;
        }
    
        rx_pkt = rte_pktmbuf_mtod(rx_buf, uint8_t *);
        tsc = (uint64_t *) (rx_pkt + PKT_HEADER_SIZE);
    
        lat_store_sample(rx_tsc - *tsc);
    
    #if PKT_TOTAL_SIZE > 64
        uint64_t off;
        uint8_t tmp_buf[64] __rte_cache_aligned;
    
        for (off = 64; off < PKT_TOTAL_SIZE; off += 64) {
            _mm512_store_si512(tmp_buf, _mm512_load_si512(rx_pkt + off));
        }
    #endif
    
        return 0;
    }
    
    static inline int host_process_tx(struct rte_mbuf *tx_buf, uint64_t tx_tsc) {
        uint8_t *tx_pkt;
        uint64_t *tsc;
    
        if (unlikely(tx_buf == NULL)) {
            return -1;
        }
    
        tx_buf->l2_len = sizeof(struct rte_ether_hdr);
        tx_buf->l3_len = sizeof(struct rte_ipv4_hdr);
        tx_buf->pkt_len = PKT_TOTAL_SIZE;
        tx_buf->data_len = PKT_TOTAL_SIZE;
    
        tx_pkt = rte_pktmbuf_mtod(tx_buf, uint8_t *);
    
        _mm512_store_si512(tx_pkt, _mm512_load_si512(pkt_template));
        
        tsc = (uint64_t *) (tx_pkt + PKT_HEADER_SIZE);
        *tsc = tx_tsc;
    
    #if PKT_TOTAL_SIZE > 64
        uint64_t off;
    
        for (off = 64; off < PKT_TOTAL_SIZE; off += 64) {
            _mm512_stream_si512(tx_pkt + off, _mm512_setr_epi64(tx_tsc, 0,0,0,0,0,0,0));
        }
    #endif
    
        return 0;
    }
    
    int host_thread(void *arg) {
        uintptr_t host_i;
        uint16_t host_p;
        uint16_t host_q;
        uint64_t start_tsc;
        uint64_t end_tsc;
        uint64_t rx_total;
        uint64_t tx_total;
        uint16_t rx_count;
        uint64_t tx_count;
        uint64_t burst_tsc;
        struct rte_mbuf *rx_burst[HOST_MAX_RX_BURST];
        struct rte_mbuf *tx_burst[HOST_MAX_TX_BURST];
        int i;
        int ret;
    
        host_i = (uintptr_t) arg;
    
        // assign host thread to dedicated queue index, ethdev port 0
        host_p = 0;
        host_q = host_i;
    
        printf("hello from host %zu on socket %d lcore %d: ethdev p %u q %u\n", 
            host_i, rte_socket_id(), rte_lcore_id(), host_p, host_q);
    
        // build packet header template
        host_pkt_init(host_p, host_q);
    
        // wait for all threads to start then set signal
        if (host_i == 0) {
            rte_delay_us_block(1E6);
            host_start = 1;
        }
        while (host_start == 0);
        start_tsc = rte_rdtsc_precise();
    
    
        rx_total = 0;
        tx_total = 0;
        rx_count = 0;
        tx_count = HOST_MAX_TX_BURST;
    
        while (!force_quit) {
            // 1. handle up to HOST_MAX_RX_BURST packets
            rx_count = rte_eth_rx_burst(host_p, host_q, rx_burst, HOST_MAX_RX_BURST);
            if (rx_count) {
                burst_tsc = rte_rdtsc_precise();
                for (i = 0; i < rx_count; i++) {
                    // read packet payload and store timestamp
                    ret = host_process_rx(rx_burst[i], burst_tsc);
                    if (unlikely(ret != 0)) {
                        printf("stopping due to host_process_rx return %d\n", ret);
                        force_quit = 1;
                        return -1;
                    }
                }
    
                rx_total += rx_count;
                rte_pktmbuf_free_bulk(rx_burst, rx_count);
            }
    
            // bypass TX if limit of inflight packets (HOST_MAX_INFLIGHT) is reached
            if (rx_total < tx_total && tx_total - rx_total >= HOST_MAX_INFLIGHT) {
                rte_pause();
                continue;
            }
    
            burst_tsc = rte_rdtsc_precise();
    
            // 2. refill tx_burst list with new bufs from mempool
            for (i = 0; i < tx_count; i++) {
                tx_burst[i] = rte_mbuf_raw_alloc(pool);
                rte_prefetch1_write(tx_burst[i]);
            }
    
            // 3. write tx bufs using pkt_template headers and burst timestamp burst_tsc
            for (i = 0; i < tx_count; i++) {
                ret = host_process_tx(tx_burst[i], burst_tsc);
                if (unlikely(ret != 0)) {
                    printf("stopping due to host_process_tx return %d\n", ret);
                    force_quit = 1;
                    return -1;
                }
            }
    
            // 4. send up to HOST_MAX_TX_BURST packets
            tx_count = rte_eth_tx_burst(host_p, host_q, tx_burst, HOST_MAX_TX_BURST);
            tx_total += tx_count;
        }
    
        end_tsc = rte_rdtsc_precise();
    
        rte_spinlock_lock(&stats_mutex);
        stats_tx_total += tx_total;
        stats_rx_total += rx_total;
        stats_host_count++;
        rte_spinlock_unlock(&stats_mutex);
    
        if (host_i == 0) {
            rte_delay_us_block(1E6);
            while (stats_host_count < HOST_CORES);
            printf("total tx tput %lf Mpps\n", stats_tx_total / 1E6 / ((end_tsc - start_tsc) * 1.0 / rte_get_tsc_hz()));
            printf("total rx tput %lf Mpps\n", stats_rx_total / 1E6 / ((end_tsc - start_tsc) * 1.0 / rte_get_tsc_hz()));
            lat_print_pct();
            lat_print_dist();
        }
    
        return 0;
    }
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment