Merge tag 'for-linus-5.17-ofs-1' of git://git.kernel.org/pub/scm/linux/kernel/git...
[platform/kernel/linux-rpi.git] / samples / bpf / xsk_fwd.c
1 // SPDX-License-Identifier: GPL-2.0
2 /* Copyright(c) 2020 Intel Corporation. */
3
4 #define _GNU_SOURCE
5 #include <poll.h>
6 #include <pthread.h>
7 #include <signal.h>
8 #include <sched.h>
9 #include <stdio.h>
10 #include <stdlib.h>
11 #include <string.h>
12 #include <sys/mman.h>
13 #include <sys/resource.h>
14 #include <sys/socket.h>
15 #include <sys/types.h>
16 #include <time.h>
17 #include <unistd.h>
18 #include <getopt.h>
19 #include <netinet/ether.h>
20 #include <net/if.h>
21
22 #include <linux/bpf.h>
23 #include <linux/if_link.h>
24 #include <linux/if_xdp.h>
25
26 #include <bpf/libbpf.h>
27 #include <bpf/xsk.h>
28 #include <bpf/bpf.h>
29
30 /* libbpf APIs for AF_XDP are deprecated starting from v0.7 */
31 #pragma GCC diagnostic ignored "-Wdeprecated-declarations"
32
33 #define ARRAY_SIZE(x) (sizeof(x) / sizeof((x)[0]))
34
35 typedef __u64 u64;
36 typedef __u32 u32;
37 typedef __u16 u16;
38 typedef __u8  u8;
39
40 /* This program illustrates the packet forwarding between multiple AF_XDP
41  * sockets in multi-threaded environment. All threads are sharing a common
42  * buffer pool, with each socket having its own private buffer cache.
43  *
44  * Example 1: Single thread handling two sockets. The packets received by socket
45  * A (interface IFA, queue QA) are forwarded to socket B (interface IFB, queue
46  * QB), while the packets received by socket B are forwarded to socket A. The
47  * thread is running on CPU core X:
48  *
49  *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -c X
50  *
51  * Example 2: Two threads, each handling two sockets. The thread running on CPU
52  * core X forwards all the packets received by socket A to socket B, and all the
53  * packets received by socket B to socket A. The thread running on CPU core Y is
54  * performing the same packet forwarding between sockets C and D:
55  *
56  *         ./xsk_fwd -i IFA -q QA -i IFB -q QB -i IFC -q QC -i IFD -q QD
57  *         -c CX -c CY
58  */
59
60 /*
61  * Buffer pool and buffer cache
62  *
63  * For packet forwarding, the packet buffers are typically allocated from the
64  * pool for packet reception and freed back to the pool for further reuse once
65  * the packet transmission is completed.
66  *
67  * The buffer pool is shared between multiple threads. In order to minimize the
68  * access latency to the shared buffer pool, each thread creates one (or
69  * several) buffer caches, which, unlike the buffer pool, are private to the
70  * thread that creates them and therefore cannot be shared with other threads.
71  * The access to the shared pool is only needed either (A) when the cache gets
72  * empty due to repeated buffer allocations and it needs to be replenished from
73  * the pool, or (B) when the cache gets full due to repeated buffer free and it
74  * needs to be flushed back to the pull.
75  *
76  * In a packet forwarding system, a packet received on any input port can
77  * potentially be transmitted on any output port, depending on the forwarding
78  * configuration. For AF_XDP sockets, for this to work with zero-copy of the
79  * packet buffers when, it is required that the buffer pool memory fits into the
80  * UMEM area shared by all the sockets.
81  */
82
83 struct bpool_params {
84         u32 n_buffers;
85         u32 buffer_size;
86         int mmap_flags;
87
88         u32 n_users_max;
89         u32 n_buffers_per_slab;
90 };
91
92 /* This buffer pool implementation organizes the buffers into equally sized
93  * slabs of *n_buffers_per_slab*. Initially, there are *n_slabs* slabs in the
94  * pool that are completely filled with buffer pointers (full slabs).
95  *
96  * Each buffer cache has a slab for buffer allocation and a slab for buffer
97  * free, with both of these slabs initially empty. When the cache's allocation
98  * slab goes empty, it is swapped with one of the available full slabs from the
99  * pool, if any is available. When the cache's free slab goes full, it is
100  * swapped for one of the empty slabs from the pool, which is guaranteed to
101  * succeed.
102  *
103  * Partially filled slabs never get traded between the cache and the pool
104  * (except when the cache itself is destroyed), which enables fast operation
105  * through pointer swapping.
106  */
107 struct bpool {
108         struct bpool_params params;
109         pthread_mutex_t lock;
110         void *addr;
111
112         u64 **slabs;
113         u64 **slabs_reserved;
114         u64 *buffers;
115         u64 *buffers_reserved;
116
117         u64 n_slabs;
118         u64 n_slabs_reserved;
119         u64 n_buffers;
120
121         u64 n_slabs_available;
122         u64 n_slabs_reserved_available;
123
124         struct xsk_umem_config umem_cfg;
125         struct xsk_ring_prod umem_fq;
126         struct xsk_ring_cons umem_cq;
127         struct xsk_umem *umem;
128 };
129
130 static struct bpool *
131 bpool_init(struct bpool_params *params,
132            struct xsk_umem_config *umem_cfg)
133 {
134         struct rlimit r = {RLIM_INFINITY, RLIM_INFINITY};
135         u64 n_slabs, n_slabs_reserved, n_buffers, n_buffers_reserved;
136         u64 slabs_size, slabs_reserved_size;
137         u64 buffers_size, buffers_reserved_size;
138         u64 total_size, i;
139         struct bpool *bp;
140         u8 *p;
141         int status;
142
143         /* mmap prep. */
144         if (setrlimit(RLIMIT_MEMLOCK, &r))
145                 return NULL;
146
147         /* bpool internals dimensioning. */
148         n_slabs = (params->n_buffers + params->n_buffers_per_slab - 1) /
149                 params->n_buffers_per_slab;
150         n_slabs_reserved = params->n_users_max * 2;
151         n_buffers = n_slabs * params->n_buffers_per_slab;
152         n_buffers_reserved = n_slabs_reserved * params->n_buffers_per_slab;
153
154         slabs_size = n_slabs * sizeof(u64 *);
155         slabs_reserved_size = n_slabs_reserved * sizeof(u64 *);
156         buffers_size = n_buffers * sizeof(u64);
157         buffers_reserved_size = n_buffers_reserved * sizeof(u64);
158
159         total_size = sizeof(struct bpool) +
160                 slabs_size + slabs_reserved_size +
161                 buffers_size + buffers_reserved_size;
162
163         /* bpool memory allocation. */
164         p = calloc(total_size, sizeof(u8));
165         if (!p)
166                 return NULL;
167
168         /* bpool memory initialization. */
169         bp = (struct bpool *)p;
170         memcpy(&bp->params, params, sizeof(*params));
171         bp->params.n_buffers = n_buffers;
172
173         bp->slabs = (u64 **)&p[sizeof(struct bpool)];
174         bp->slabs_reserved = (u64 **)&p[sizeof(struct bpool) +
175                 slabs_size];
176         bp->buffers = (u64 *)&p[sizeof(struct bpool) +
177                 slabs_size + slabs_reserved_size];
178         bp->buffers_reserved = (u64 *)&p[sizeof(struct bpool) +
179                 slabs_size + slabs_reserved_size + buffers_size];
180
181         bp->n_slabs = n_slabs;
182         bp->n_slabs_reserved = n_slabs_reserved;
183         bp->n_buffers = n_buffers;
184
185         for (i = 0; i < n_slabs; i++)
186                 bp->slabs[i] = &bp->buffers[i * params->n_buffers_per_slab];
187         bp->n_slabs_available = n_slabs;
188
189         for (i = 0; i < n_slabs_reserved; i++)
190                 bp->slabs_reserved[i] = &bp->buffers_reserved[i *
191                         params->n_buffers_per_slab];
192         bp->n_slabs_reserved_available = n_slabs_reserved;
193
194         for (i = 0; i < n_buffers; i++)
195                 bp->buffers[i] = i * params->buffer_size;
196
197         /* lock. */
198         status = pthread_mutex_init(&bp->lock, NULL);
199         if (status) {
200                 free(p);
201                 return NULL;
202         }
203
204         /* mmap. */
205         bp->addr = mmap(NULL,
206                         n_buffers * params->buffer_size,
207                         PROT_READ | PROT_WRITE,
208                         MAP_PRIVATE | MAP_ANONYMOUS | params->mmap_flags,
209                         -1,
210                         0);
211         if (bp->addr == MAP_FAILED) {
212                 pthread_mutex_destroy(&bp->lock);
213                 free(p);
214                 return NULL;
215         }
216
217         /* umem. */
218         status = xsk_umem__create(&bp->umem,
219                                   bp->addr,
220                                   bp->params.n_buffers * bp->params.buffer_size,
221                                   &bp->umem_fq,
222                                   &bp->umem_cq,
223                                   umem_cfg);
224         if (status) {
225                 munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
226                 pthread_mutex_destroy(&bp->lock);
227                 free(p);
228                 return NULL;
229         }
230         memcpy(&bp->umem_cfg, umem_cfg, sizeof(*umem_cfg));
231
232         return bp;
233 }
234
235 static void
236 bpool_free(struct bpool *bp)
237 {
238         if (!bp)
239                 return;
240
241         xsk_umem__delete(bp->umem);
242         munmap(bp->addr, bp->params.n_buffers * bp->params.buffer_size);
243         pthread_mutex_destroy(&bp->lock);
244         free(bp);
245 }
246
247 struct bcache {
248         struct bpool *bp;
249
250         u64 *slab_cons;
251         u64 *slab_prod;
252
253         u64 n_buffers_cons;
254         u64 n_buffers_prod;
255 };
256
257 static u32
258 bcache_slab_size(struct bcache *bc)
259 {
260         struct bpool *bp = bc->bp;
261
262         return bp->params.n_buffers_per_slab;
263 }
264
265 static struct bcache *
266 bcache_init(struct bpool *bp)
267 {
268         struct bcache *bc;
269
270         bc = calloc(1, sizeof(struct bcache));
271         if (!bc)
272                 return NULL;
273
274         bc->bp = bp;
275         bc->n_buffers_cons = 0;
276         bc->n_buffers_prod = 0;
277
278         pthread_mutex_lock(&bp->lock);
279         if (bp->n_slabs_reserved_available == 0) {
280                 pthread_mutex_unlock(&bp->lock);
281                 free(bc);
282                 return NULL;
283         }
284
285         bc->slab_cons = bp->slabs_reserved[bp->n_slabs_reserved_available - 1];
286         bc->slab_prod = bp->slabs_reserved[bp->n_slabs_reserved_available - 2];
287         bp->n_slabs_reserved_available -= 2;
288         pthread_mutex_unlock(&bp->lock);
289
290         return bc;
291 }
292
293 static void
294 bcache_free(struct bcache *bc)
295 {
296         struct bpool *bp;
297
298         if (!bc)
299                 return;
300
301         /* In order to keep this example simple, the case of freeing any
302          * existing buffers from the cache back to the pool is ignored.
303          */
304
305         bp = bc->bp;
306         pthread_mutex_lock(&bp->lock);
307         bp->slabs_reserved[bp->n_slabs_reserved_available] = bc->slab_prod;
308         bp->slabs_reserved[bp->n_slabs_reserved_available + 1] = bc->slab_cons;
309         bp->n_slabs_reserved_available += 2;
310         pthread_mutex_unlock(&bp->lock);
311
312         free(bc);
313 }
314
315 /* To work correctly, the implementation requires that the *n_buffers* input
316  * argument is never greater than the buffer pool's *n_buffers_per_slab*. This
317  * is typically the case, with one exception taking place when large number of
318  * buffers are allocated at init time (e.g. for the UMEM fill queue setup).
319  */
320 static inline u32
321 bcache_cons_check(struct bcache *bc, u32 n_buffers)
322 {
323         struct bpool *bp = bc->bp;
324         u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
325         u64 n_buffers_cons = bc->n_buffers_cons;
326         u64 n_slabs_available;
327         u64 *slab_full;
328
329         /*
330          * Consumer slab is not empty: Use what's available locally. Do not
331          * look for more buffers from the pool when the ask can only be
332          * partially satisfied.
333          */
334         if (n_buffers_cons)
335                 return (n_buffers_cons < n_buffers) ?
336                         n_buffers_cons :
337                         n_buffers;
338
339         /*
340          * Consumer slab is empty: look to trade the current consumer slab
341          * (full) for a full slab from the pool, if any is available.
342          */
343         pthread_mutex_lock(&bp->lock);
344         n_slabs_available = bp->n_slabs_available;
345         if (!n_slabs_available) {
346                 pthread_mutex_unlock(&bp->lock);
347                 return 0;
348         }
349
350         n_slabs_available--;
351         slab_full = bp->slabs[n_slabs_available];
352         bp->slabs[n_slabs_available] = bc->slab_cons;
353         bp->n_slabs_available = n_slabs_available;
354         pthread_mutex_unlock(&bp->lock);
355
356         bc->slab_cons = slab_full;
357         bc->n_buffers_cons = n_buffers_per_slab;
358         return n_buffers;
359 }
360
361 static inline u64
362 bcache_cons(struct bcache *bc)
363 {
364         u64 n_buffers_cons = bc->n_buffers_cons - 1;
365         u64 buffer;
366
367         buffer = bc->slab_cons[n_buffers_cons];
368         bc->n_buffers_cons = n_buffers_cons;
369         return buffer;
370 }
371
372 static inline void
373 bcache_prod(struct bcache *bc, u64 buffer)
374 {
375         struct bpool *bp = bc->bp;
376         u64 n_buffers_per_slab = bp->params.n_buffers_per_slab;
377         u64 n_buffers_prod = bc->n_buffers_prod;
378         u64 n_slabs_available;
379         u64 *slab_empty;
380
381         /*
382          * Producer slab is not yet full: store the current buffer to it.
383          */
384         if (n_buffers_prod < n_buffers_per_slab) {
385                 bc->slab_prod[n_buffers_prod] = buffer;
386                 bc->n_buffers_prod = n_buffers_prod + 1;
387                 return;
388         }
389
390         /*
391          * Producer slab is full: trade the cache's current producer slab
392          * (full) for an empty slab from the pool, then store the current
393          * buffer to the new producer slab. As one full slab exists in the
394          * cache, it is guaranteed that there is at least one empty slab
395          * available in the pool.
396          */
397         pthread_mutex_lock(&bp->lock);
398         n_slabs_available = bp->n_slabs_available;
399         slab_empty = bp->slabs[n_slabs_available];
400         bp->slabs[n_slabs_available] = bc->slab_prod;
401         bp->n_slabs_available = n_slabs_available + 1;
402         pthread_mutex_unlock(&bp->lock);
403
404         slab_empty[0] = buffer;
405         bc->slab_prod = slab_empty;
406         bc->n_buffers_prod = 1;
407 }
408
409 /*
410  * Port
411  *
412  * Each of the forwarding ports sits on top of an AF_XDP socket. In order for
413  * packet forwarding to happen with no packet buffer copy, all the sockets need
414  * to share the same UMEM area, which is used as the buffer pool memory.
415  */
416 #ifndef MAX_BURST_RX
417 #define MAX_BURST_RX 64
418 #endif
419
420 #ifndef MAX_BURST_TX
421 #define MAX_BURST_TX 64
422 #endif
423
424 struct burst_rx {
425         u64 addr[MAX_BURST_RX];
426         u32 len[MAX_BURST_RX];
427 };
428
429 struct burst_tx {
430         u64 addr[MAX_BURST_TX];
431         u32 len[MAX_BURST_TX];
432         u32 n_pkts;
433 };
434
435 struct port_params {
436         struct xsk_socket_config xsk_cfg;
437         struct bpool *bp;
438         const char *iface;
439         u32 iface_queue;
440 };
441
442 struct port {
443         struct port_params params;
444
445         struct bcache *bc;
446
447         struct xsk_ring_cons rxq;
448         struct xsk_ring_prod txq;
449         struct xsk_ring_prod umem_fq;
450         struct xsk_ring_cons umem_cq;
451         struct xsk_socket *xsk;
452         int umem_fq_initialized;
453
454         u64 n_pkts_rx;
455         u64 n_pkts_tx;
456 };
457
458 static void
459 port_free(struct port *p)
460 {
461         if (!p)
462                 return;
463
464         /* To keep this example simple, the code to free the buffers from the
465          * socket's receive and transmit queues, as well as from the UMEM fill
466          * and completion queues, is not included.
467          */
468
469         if (p->xsk)
470                 xsk_socket__delete(p->xsk);
471
472         bcache_free(p->bc);
473
474         free(p);
475 }
476
477 static struct port *
478 port_init(struct port_params *params)
479 {
480         struct port *p;
481         u32 umem_fq_size, pos = 0;
482         int status, i;
483
484         /* Memory allocation and initialization. */
485         p = calloc(sizeof(struct port), 1);
486         if (!p)
487                 return NULL;
488
489         memcpy(&p->params, params, sizeof(p->params));
490         umem_fq_size = params->bp->umem_cfg.fill_size;
491
492         /* bcache. */
493         p->bc = bcache_init(params->bp);
494         if (!p->bc ||
495             (bcache_slab_size(p->bc) < umem_fq_size) ||
496             (bcache_cons_check(p->bc, umem_fq_size) < umem_fq_size)) {
497                 port_free(p);
498                 return NULL;
499         }
500
501         /* xsk socket. */
502         status = xsk_socket__create_shared(&p->xsk,
503                                            params->iface,
504                                            params->iface_queue,
505                                            params->bp->umem,
506                                            &p->rxq,
507                                            &p->txq,
508                                            &p->umem_fq,
509                                            &p->umem_cq,
510                                            &params->xsk_cfg);
511         if (status) {
512                 port_free(p);
513                 return NULL;
514         }
515
516         /* umem fq. */
517         xsk_ring_prod__reserve(&p->umem_fq, umem_fq_size, &pos);
518
519         for (i = 0; i < umem_fq_size; i++)
520                 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
521                         bcache_cons(p->bc);
522
523         xsk_ring_prod__submit(&p->umem_fq, umem_fq_size);
524         p->umem_fq_initialized = 1;
525
526         return p;
527 }
528
529 static inline u32
530 port_rx_burst(struct port *p, struct burst_rx *b)
531 {
532         u32 n_pkts, pos, i;
533
534         /* Free buffers for FQ replenish. */
535         n_pkts = ARRAY_SIZE(b->addr);
536
537         n_pkts = bcache_cons_check(p->bc, n_pkts);
538         if (!n_pkts)
539                 return 0;
540
541         /* RXQ. */
542         n_pkts = xsk_ring_cons__peek(&p->rxq, n_pkts, &pos);
543         if (!n_pkts) {
544                 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
545                         struct pollfd pollfd = {
546                                 .fd = xsk_socket__fd(p->xsk),
547                                 .events = POLLIN,
548                         };
549
550                         poll(&pollfd, 1, 0);
551                 }
552                 return 0;
553         }
554
555         for (i = 0; i < n_pkts; i++) {
556                 b->addr[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->addr;
557                 b->len[i] = xsk_ring_cons__rx_desc(&p->rxq, pos + i)->len;
558         }
559
560         xsk_ring_cons__release(&p->rxq, n_pkts);
561         p->n_pkts_rx += n_pkts;
562
563         /* UMEM FQ. */
564         for ( ; ; ) {
565                 int status;
566
567                 status = xsk_ring_prod__reserve(&p->umem_fq, n_pkts, &pos);
568                 if (status == n_pkts)
569                         break;
570
571                 if (xsk_ring_prod__needs_wakeup(&p->umem_fq)) {
572                         struct pollfd pollfd = {
573                                 .fd = xsk_socket__fd(p->xsk),
574                                 .events = POLLIN,
575                         };
576
577                         poll(&pollfd, 1, 0);
578                 }
579         }
580
581         for (i = 0; i < n_pkts; i++)
582                 *xsk_ring_prod__fill_addr(&p->umem_fq, pos + i) =
583                         bcache_cons(p->bc);
584
585         xsk_ring_prod__submit(&p->umem_fq, n_pkts);
586
587         return n_pkts;
588 }
589
590 static inline void
591 port_tx_burst(struct port *p, struct burst_tx *b)
592 {
593         u32 n_pkts, pos, i;
594         int status;
595
596         /* UMEM CQ. */
597         n_pkts = p->params.bp->umem_cfg.comp_size;
598
599         n_pkts = xsk_ring_cons__peek(&p->umem_cq, n_pkts, &pos);
600
601         for (i = 0; i < n_pkts; i++) {
602                 u64 addr = *xsk_ring_cons__comp_addr(&p->umem_cq, pos + i);
603
604                 bcache_prod(p->bc, addr);
605         }
606
607         xsk_ring_cons__release(&p->umem_cq, n_pkts);
608
609         /* TXQ. */
610         n_pkts = b->n_pkts;
611
612         for ( ; ; ) {
613                 status = xsk_ring_prod__reserve(&p->txq, n_pkts, &pos);
614                 if (status == n_pkts)
615                         break;
616
617                 if (xsk_ring_prod__needs_wakeup(&p->txq))
618                         sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT,
619                                NULL, 0);
620         }
621
622         for (i = 0; i < n_pkts; i++) {
623                 xsk_ring_prod__tx_desc(&p->txq, pos + i)->addr = b->addr[i];
624                 xsk_ring_prod__tx_desc(&p->txq, pos + i)->len = b->len[i];
625         }
626
627         xsk_ring_prod__submit(&p->txq, n_pkts);
628         if (xsk_ring_prod__needs_wakeup(&p->txq))
629                 sendto(xsk_socket__fd(p->xsk), NULL, 0, MSG_DONTWAIT, NULL, 0);
630         p->n_pkts_tx += n_pkts;
631 }
632
633 /*
634  * Thread
635  *
636  * Packet forwarding threads.
637  */
638 #ifndef MAX_PORTS_PER_THREAD
639 #define MAX_PORTS_PER_THREAD 16
640 #endif
641
642 struct thread_data {
643         struct port *ports_rx[MAX_PORTS_PER_THREAD];
644         struct port *ports_tx[MAX_PORTS_PER_THREAD];
645         u32 n_ports_rx;
646         struct burst_rx burst_rx;
647         struct burst_tx burst_tx[MAX_PORTS_PER_THREAD];
648         u32 cpu_core_id;
649         int quit;
650 };
651
652 static void swap_mac_addresses(void *data)
653 {
654         struct ether_header *eth = (struct ether_header *)data;
655         struct ether_addr *src_addr = (struct ether_addr *)&eth->ether_shost;
656         struct ether_addr *dst_addr = (struct ether_addr *)&eth->ether_dhost;
657         struct ether_addr tmp;
658
659         tmp = *src_addr;
660         *src_addr = *dst_addr;
661         *dst_addr = tmp;
662 }
663
664 static void *
665 thread_func(void *arg)
666 {
667         struct thread_data *t = arg;
668         cpu_set_t cpu_cores;
669         u32 i;
670
671         CPU_ZERO(&cpu_cores);
672         CPU_SET(t->cpu_core_id, &cpu_cores);
673         pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpu_cores);
674
675         for (i = 0; !t->quit; i = (i + 1) & (t->n_ports_rx - 1)) {
676                 struct port *port_rx = t->ports_rx[i];
677                 struct port *port_tx = t->ports_tx[i];
678                 struct burst_rx *brx = &t->burst_rx;
679                 struct burst_tx *btx = &t->burst_tx[i];
680                 u32 n_pkts, j;
681
682                 /* RX. */
683                 n_pkts = port_rx_burst(port_rx, brx);
684                 if (!n_pkts)
685                         continue;
686
687                 /* Process & TX. */
688                 for (j = 0; j < n_pkts; j++) {
689                         u64 addr = xsk_umem__add_offset_to_addr(brx->addr[j]);
690                         u8 *pkt = xsk_umem__get_data(port_rx->params.bp->addr,
691                                                      addr);
692
693                         swap_mac_addresses(pkt);
694
695                         btx->addr[btx->n_pkts] = brx->addr[j];
696                         btx->len[btx->n_pkts] = brx->len[j];
697                         btx->n_pkts++;
698
699                         if (btx->n_pkts == MAX_BURST_TX) {
700                                 port_tx_burst(port_tx, btx);
701                                 btx->n_pkts = 0;
702                         }
703                 }
704         }
705
706         return NULL;
707 }
708
709 /*
710  * Process
711  */
712 static const struct bpool_params bpool_params_default = {
713         .n_buffers = 64 * 1024,
714         .buffer_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
715         .mmap_flags = 0,
716
717         .n_users_max = 16,
718         .n_buffers_per_slab = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
719 };
720
721 static const struct xsk_umem_config umem_cfg_default = {
722         .fill_size = XSK_RING_PROD__DEFAULT_NUM_DESCS * 2,
723         .comp_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
724         .frame_size = XSK_UMEM__DEFAULT_FRAME_SIZE,
725         .frame_headroom = XSK_UMEM__DEFAULT_FRAME_HEADROOM,
726         .flags = 0,
727 };
728
729 static const struct port_params port_params_default = {
730         .xsk_cfg = {
731                 .rx_size = XSK_RING_CONS__DEFAULT_NUM_DESCS,
732                 .tx_size = XSK_RING_PROD__DEFAULT_NUM_DESCS,
733                 .libbpf_flags = 0,
734                 .xdp_flags = XDP_FLAGS_DRV_MODE,
735                 .bind_flags = XDP_USE_NEED_WAKEUP | XDP_ZEROCOPY,
736         },
737
738         .bp = NULL,
739         .iface = NULL,
740         .iface_queue = 0,
741 };
742
743 #ifndef MAX_PORTS
744 #define MAX_PORTS 64
745 #endif
746
747 #ifndef MAX_THREADS
748 #define MAX_THREADS 64
749 #endif
750
751 static struct bpool_params bpool_params;
752 static struct xsk_umem_config umem_cfg;
753 static struct bpool *bp;
754
755 static struct port_params port_params[MAX_PORTS];
756 static struct port *ports[MAX_PORTS];
757 static u64 n_pkts_rx[MAX_PORTS];
758 static u64 n_pkts_tx[MAX_PORTS];
759 static int n_ports;
760
761 static pthread_t threads[MAX_THREADS];
762 static struct thread_data thread_data[MAX_THREADS];
763 static int n_threads;
764
765 static void
766 print_usage(char *prog_name)
767 {
768         const char *usage =
769                 "Usage:\n"
770                 "\t%s [ -b SIZE ] -c CORE -i INTERFACE [ -q QUEUE ]\n"
771                 "\n"
772                 "-c CORE        CPU core to run a packet forwarding thread\n"
773                 "               on. May be invoked multiple times.\n"
774                 "\n"
775                 "-b SIZE        Number of buffers in the buffer pool shared\n"
776                 "               by all the forwarding threads. Default: %u.\n"
777                 "\n"
778                 "-i INTERFACE   Network interface. Each (INTERFACE, QUEUE)\n"
779                 "               pair specifies one forwarding port. May be\n"
780                 "               invoked multiple times.\n"
781                 "\n"
782                 "-q QUEUE       Network interface queue for RX and TX. Each\n"
783                 "               (INTERFACE, QUEUE) pair specified one\n"
784                 "               forwarding port. Default: %u. May be invoked\n"
785                 "               multiple times.\n"
786                 "\n";
787         printf(usage,
788                prog_name,
789                bpool_params_default.n_buffers,
790                port_params_default.iface_queue);
791 }
792
793 static int
794 parse_args(int argc, char **argv)
795 {
796         struct option lgopts[] = {
797                 { NULL,  0, 0, 0 }
798         };
799         int opt, option_index;
800
801         /* Parse the input arguments. */
802         for ( ; ;) {
803                 opt = getopt_long(argc, argv, "c:i:q:", lgopts, &option_index);
804                 if (opt == EOF)
805                         break;
806
807                 switch (opt) {
808                 case 'b':
809                         bpool_params.n_buffers = atoi(optarg);
810                         break;
811
812                 case 'c':
813                         if (n_threads == MAX_THREADS) {
814                                 printf("Max number of threads (%d) reached.\n",
815                                        MAX_THREADS);
816                                 return -1;
817                         }
818
819                         thread_data[n_threads].cpu_core_id = atoi(optarg);
820                         n_threads++;
821                         break;
822
823                 case 'i':
824                         if (n_ports == MAX_PORTS) {
825                                 printf("Max number of ports (%d) reached.\n",
826                                        MAX_PORTS);
827                                 return -1;
828                         }
829
830                         port_params[n_ports].iface = optarg;
831                         port_params[n_ports].iface_queue = 0;
832                         n_ports++;
833                         break;
834
835                 case 'q':
836                         if (n_ports == 0) {
837                                 printf("No port specified for queue.\n");
838                                 return -1;
839                         }
840                         port_params[n_ports - 1].iface_queue = atoi(optarg);
841                         break;
842
843                 default:
844                         printf("Illegal argument.\n");
845                         return -1;
846                 }
847         }
848
849         optind = 1; /* reset getopt lib */
850
851         /* Check the input arguments. */
852         if (!n_ports) {
853                 printf("No ports specified.\n");
854                 return -1;
855         }
856
857         if (!n_threads) {
858                 printf("No threads specified.\n");
859                 return -1;
860         }
861
862         if (n_ports % n_threads) {
863                 printf("Ports cannot be evenly distributed to threads.\n");
864                 return -1;
865         }
866
867         return 0;
868 }
869
870 static void
871 print_port(u32 port_id)
872 {
873         struct port *port = ports[port_id];
874
875         printf("Port %u: interface = %s, queue = %u\n",
876                port_id, port->params.iface, port->params.iface_queue);
877 }
878
879 static void
880 print_thread(u32 thread_id)
881 {
882         struct thread_data *t = &thread_data[thread_id];
883         u32 i;
884
885         printf("Thread %u (CPU core %u): ",
886                thread_id, t->cpu_core_id);
887
888         for (i = 0; i < t->n_ports_rx; i++) {
889                 struct port *port_rx = t->ports_rx[i];
890                 struct port *port_tx = t->ports_tx[i];
891
892                 printf("(%s, %u) -> (%s, %u), ",
893                        port_rx->params.iface,
894                        port_rx->params.iface_queue,
895                        port_tx->params.iface,
896                        port_tx->params.iface_queue);
897         }
898
899         printf("\n");
900 }
901
902 static void
903 print_port_stats_separator(void)
904 {
905         printf("+-%4s-+-%12s-+-%13s-+-%12s-+-%13s-+\n",
906                "----",
907                "------------",
908                "-------------",
909                "------------",
910                "-------------");
911 }
912
913 static void
914 print_port_stats_header(void)
915 {
916         print_port_stats_separator();
917         printf("| %4s | %12s | %13s | %12s | %13s |\n",
918                "Port",
919                "RX packets",
920                "RX rate (pps)",
921                "TX packets",
922                "TX_rate (pps)");
923         print_port_stats_separator();
924 }
925
926 static void
927 print_port_stats_trailer(void)
928 {
929         print_port_stats_separator();
930         printf("\n");
931 }
932
933 static void
934 print_port_stats(int port_id, u64 ns_diff)
935 {
936         struct port *p = ports[port_id];
937         double rx_pps, tx_pps;
938
939         rx_pps = (p->n_pkts_rx - n_pkts_rx[port_id]) * 1000000000. / ns_diff;
940         tx_pps = (p->n_pkts_tx - n_pkts_tx[port_id]) * 1000000000. / ns_diff;
941
942         printf("| %4d | %12llu | %13.0f | %12llu | %13.0f |\n",
943                port_id,
944                p->n_pkts_rx,
945                rx_pps,
946                p->n_pkts_tx,
947                tx_pps);
948
949         n_pkts_rx[port_id] = p->n_pkts_rx;
950         n_pkts_tx[port_id] = p->n_pkts_tx;
951 }
952
953 static void
954 print_port_stats_all(u64 ns_diff)
955 {
956         int i;
957
958         print_port_stats_header();
959         for (i = 0; i < n_ports; i++)
960                 print_port_stats(i, ns_diff);
961         print_port_stats_trailer();
962 }
963
964 static int quit;
965
966 static void
967 signal_handler(int sig)
968 {
969         quit = 1;
970 }
971
972 static void remove_xdp_program(void)
973 {
974         int i;
975
976         for (i = 0 ; i < n_ports; i++)
977                 bpf_set_link_xdp_fd(if_nametoindex(port_params[i].iface), -1,
978                                     port_params[i].xsk_cfg.xdp_flags);
979 }
980
981 int main(int argc, char **argv)
982 {
983         struct timespec time;
984         u64 ns0;
985         int i;
986
987         /* Parse args. */
988         memcpy(&bpool_params, &bpool_params_default,
989                sizeof(struct bpool_params));
990         memcpy(&umem_cfg, &umem_cfg_default,
991                sizeof(struct xsk_umem_config));
992         for (i = 0; i < MAX_PORTS; i++)
993                 memcpy(&port_params[i], &port_params_default,
994                        sizeof(struct port_params));
995
996         if (parse_args(argc, argv)) {
997                 print_usage(argv[0]);
998                 return -1;
999         }
1000
1001         /* Buffer pool initialization. */
1002         bp = bpool_init(&bpool_params, &umem_cfg);
1003         if (!bp) {
1004                 printf("Buffer pool initialization failed.\n");
1005                 return -1;
1006         }
1007         printf("Buffer pool created successfully.\n");
1008
1009         /* Ports initialization. */
1010         for (i = 0; i < MAX_PORTS; i++)
1011                 port_params[i].bp = bp;
1012
1013         for (i = 0; i < n_ports; i++) {
1014                 ports[i] = port_init(&port_params[i]);
1015                 if (!ports[i]) {
1016                         printf("Port %d initialization failed.\n", i);
1017                         return -1;
1018                 }
1019                 print_port(i);
1020         }
1021         printf("All ports created successfully.\n");
1022
1023         /* Threads. */
1024         for (i = 0; i < n_threads; i++) {
1025                 struct thread_data *t = &thread_data[i];
1026                 u32 n_ports_per_thread = n_ports / n_threads, j;
1027
1028                 for (j = 0; j < n_ports_per_thread; j++) {
1029                         t->ports_rx[j] = ports[i * n_ports_per_thread + j];
1030                         t->ports_tx[j] = ports[i * n_ports_per_thread +
1031                                 (j + 1) % n_ports_per_thread];
1032                 }
1033
1034                 t->n_ports_rx = n_ports_per_thread;
1035
1036                 print_thread(i);
1037         }
1038
1039         for (i = 0; i < n_threads; i++) {
1040                 int status;
1041
1042                 status = pthread_create(&threads[i],
1043                                         NULL,
1044                                         thread_func,
1045                                         &thread_data[i]);
1046                 if (status) {
1047                         printf("Thread %d creation failed.\n", i);
1048                         return -1;
1049                 }
1050         }
1051         printf("All threads created successfully.\n");
1052
1053         /* Print statistics. */
1054         signal(SIGINT, signal_handler);
1055         signal(SIGTERM, signal_handler);
1056         signal(SIGABRT, signal_handler);
1057
1058         clock_gettime(CLOCK_MONOTONIC, &time);
1059         ns0 = time.tv_sec * 1000000000UL + time.tv_nsec;
1060         for ( ; !quit; ) {
1061                 u64 ns1, ns_diff;
1062
1063                 sleep(1);
1064                 clock_gettime(CLOCK_MONOTONIC, &time);
1065                 ns1 = time.tv_sec * 1000000000UL + time.tv_nsec;
1066                 ns_diff = ns1 - ns0;
1067                 ns0 = ns1;
1068
1069                 print_port_stats_all(ns_diff);
1070         }
1071
1072         /* Threads completion. */
1073         printf("Quit.\n");
1074         for (i = 0; i < n_threads; i++)
1075                 thread_data[i].quit = 1;
1076
1077         for (i = 0; i < n_threads; i++)
1078                 pthread_join(threads[i], NULL);
1079
1080         for (i = 0; i < n_ports; i++)
1081                 port_free(ports[i]);
1082
1083         bpool_free(bp);
1084
1085         remove_xdp_program();
1086
1087         return 0;
1088 }