rdma: proper getaddrinfo() handling
[sdk/emulator/qemu.git] / migration-rdma.c
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  *
6  * Authors:
7  *  Michael R. Hines <mrhines@us.ibm.com>
8  *  Jiuxing Liu <jl@us.ibm.com>
9  *
10  * This work is licensed under the terms of the GNU GPL, version 2 or
11  * later.  See the COPYING file in the top-level directory.
12  *
13  */
14 #include "qemu-common.h"
15 #include "migration/migration.h"
16 #include "migration/qemu-file.h"
17 #include "exec/cpu-common.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/sockets.h"
20 #include "qemu/bitmap.h"
21 #include "block/coroutine.h"
22 #include <stdio.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netdb.h>
26 #include <arpa/inet.h>
27 #include <string.h>
28 #include <rdma/rdma_cma.h>
29
30 //#define DEBUG_RDMA
31 //#define DEBUG_RDMA_VERBOSE
32 //#define DEBUG_RDMA_REALLY_VERBOSE
33
34 #ifdef DEBUG_RDMA
35 #define DPRINTF(fmt, ...) \
36     do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
37 #else
38 #define DPRINTF(fmt, ...) \
39     do { } while (0)
40 #endif
41
42 #ifdef DEBUG_RDMA_VERBOSE
43 #define DDPRINTF(fmt, ...) \
44     do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
45 #else
46 #define DDPRINTF(fmt, ...) \
47     do { } while (0)
48 #endif
49
50 #ifdef DEBUG_RDMA_REALLY_VERBOSE
51 #define DDDPRINTF(fmt, ...) \
52     do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
53 #else
54 #define DDDPRINTF(fmt, ...) \
55     do { } while (0)
56 #endif
57
58 /*
59  * Print and error on both the Monitor and the Log file.
60  */
61 #define ERROR(errp, fmt, ...) \
62     do { \
63         fprintf(stderr, "RDMA ERROR: " fmt "\n", ## __VA_ARGS__); \
64         if (errp && (*(errp) == NULL)) { \
65             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
66         } \
67     } while (0)
68
69 #define RDMA_RESOLVE_TIMEOUT_MS 10000
70
71 /* Do not merge data if larger than this. */
72 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
73 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
74
75 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
76
77 /*
78  * This is only for non-live state being migrated.
79  * Instead of RDMA_WRITE messages, we use RDMA_SEND
80  * messages for that state, which requires a different
81  * delivery design than main memory.
82  */
83 #define RDMA_SEND_INCREMENT 32768
84
85 /*
86  * Maximum size infiniband SEND message
87  */
88 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
89 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
90
91 #define RDMA_CONTROL_VERSION_CURRENT 1
92 /*
93  * Capabilities for negotiation.
94  */
95 #define RDMA_CAPABILITY_PIN_ALL 0x01
96
97 /*
98  * Add the other flags above to this list of known capabilities
99  * as they are introduced.
100  */
101 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
102
103 #define CHECK_ERROR_STATE() \
104     do { \
105         if (rdma->error_state) { \
106             if (!rdma->error_reported) { \
107                 fprintf(stderr, "RDMA is in an error state waiting migration" \
108                                 " to abort!\n"); \
109                 rdma->error_reported = 1; \
110             } \
111             return rdma->error_state; \
112         } \
113     } while (0);
114
115 /*
116  * A work request ID is 64-bits and we split up these bits
117  * into 3 parts:
118  *
119  * bits 0-15 : type of control message, 2^16
120  * bits 16-29: ram block index, 2^14
121  * bits 30-63: ram block chunk number, 2^34
122  *
123  * The last two bit ranges are only used for RDMA writes,
124  * in order to track their completion and potentially
125  * also track unregistration status of the message.
126  */
127 #define RDMA_WRID_TYPE_SHIFT  0UL
128 #define RDMA_WRID_BLOCK_SHIFT 16UL
129 #define RDMA_WRID_CHUNK_SHIFT 30UL
130
131 #define RDMA_WRID_TYPE_MASK \
132     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
133
134 #define RDMA_WRID_BLOCK_MASK \
135     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
136
137 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
138
139 /*
140  * RDMA migration protocol:
141  * 1. RDMA Writes (data messages, i.e. RAM)
142  * 2. IB Send/Recv (control channel messages)
143  */
144 enum {
145     RDMA_WRID_NONE = 0,
146     RDMA_WRID_RDMA_WRITE = 1,
147     RDMA_WRID_SEND_CONTROL = 2000,
148     RDMA_WRID_RECV_CONTROL = 4000,
149 };
150
151 const char *wrid_desc[] = {
152     [RDMA_WRID_NONE] = "NONE",
153     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
154     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
155     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
156 };
157
158 /*
159  * Work request IDs for IB SEND messages only (not RDMA writes).
160  * This is used by the migration protocol to transmit
161  * control messages (such as device state and registration commands)
162  *
163  * We could use more WRs, but we have enough for now.
164  */
165 enum {
166     RDMA_WRID_READY = 0,
167     RDMA_WRID_DATA,
168     RDMA_WRID_CONTROL,
169     RDMA_WRID_MAX,
170 };
171
172 /*
173  * SEND/RECV IB Control Messages.
174  */
175 enum {
176     RDMA_CONTROL_NONE = 0,
177     RDMA_CONTROL_ERROR,
178     RDMA_CONTROL_READY,               /* ready to receive */
179     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
180     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
181     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
182     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
183     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
184     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
185     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
186     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
187     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
188 };
189
190 const char *control_desc[] = {
191     [RDMA_CONTROL_NONE] = "NONE",
192     [RDMA_CONTROL_ERROR] = "ERROR",
193     [RDMA_CONTROL_READY] = "READY",
194     [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
195     [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
196     [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
197     [RDMA_CONTROL_COMPRESS] = "COMPRESS",
198     [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
199     [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
200     [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
201     [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
202     [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
203 };
204
205 /*
206  * Memory and MR structures used to represent an IB Send/Recv work request.
207  * This is *not* used for RDMA writes, only IB Send/Recv.
208  */
209 typedef struct {
210     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
211     struct   ibv_mr *control_mr;               /* registration metadata */
212     size_t   control_len;                      /* length of the message */
213     uint8_t *control_curr;                     /* start of unconsumed bytes */
214 } RDMAWorkRequestData;
215
216 /*
217  * Negotiate RDMA capabilities during connection-setup time.
218  */
219 typedef struct {
220     uint32_t version;
221     uint32_t flags;
222 } RDMACapabilities;
223
224 static void caps_to_network(RDMACapabilities *cap)
225 {
226     cap->version = htonl(cap->version);
227     cap->flags = htonl(cap->flags);
228 }
229
230 static void network_to_caps(RDMACapabilities *cap)
231 {
232     cap->version = ntohl(cap->version);
233     cap->flags = ntohl(cap->flags);
234 }
235
236 /*
237  * Representation of a RAMBlock from an RDMA perspective.
238  * This is not transmitted, only local.
239  * This and subsequent structures cannot be linked lists
240  * because we're using a single IB message to transmit
241  * the information. It's small anyway, so a list is overkill.
242  */
243 typedef struct RDMALocalBlock {
244     uint8_t  *local_host_addr; /* local virtual address */
245     uint64_t remote_host_addr; /* remote virtual address */
246     uint64_t offset;
247     uint64_t length;
248     struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
249     struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
250     uint32_t *remote_keys;     /* rkeys for chunk-level registration */
251     uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
252     int      index;            /* which block are we */
253     bool     is_ram_block;
254     int      nb_chunks;
255     unsigned long *transit_bitmap;
256     unsigned long *unregister_bitmap;
257 } RDMALocalBlock;
258
259 /*
260  * Also represents a RAMblock, but only on the dest.
261  * This gets transmitted by the dest during connection-time
262  * to the source VM and then is used to populate the
263  * corresponding RDMALocalBlock with
264  * the information needed to perform the actual RDMA.
265  */
266 typedef struct QEMU_PACKED RDMARemoteBlock {
267     uint64_t remote_host_addr;
268     uint64_t offset;
269     uint64_t length;
270     uint32_t remote_rkey;
271     uint32_t padding;
272 } RDMARemoteBlock;
273
274 static uint64_t htonll(uint64_t v)
275 {
276     union { uint32_t lv[2]; uint64_t llv; } u;
277     u.lv[0] = htonl(v >> 32);
278     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
279     return u.llv;
280 }
281
282 static uint64_t ntohll(uint64_t v) {
283     union { uint32_t lv[2]; uint64_t llv; } u;
284     u.llv = v;
285     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
286 }
287
288 static void remote_block_to_network(RDMARemoteBlock *rb)
289 {
290     rb->remote_host_addr = htonll(rb->remote_host_addr);
291     rb->offset = htonll(rb->offset);
292     rb->length = htonll(rb->length);
293     rb->remote_rkey = htonl(rb->remote_rkey);
294 }
295
296 static void network_to_remote_block(RDMARemoteBlock *rb)
297 {
298     rb->remote_host_addr = ntohll(rb->remote_host_addr);
299     rb->offset = ntohll(rb->offset);
300     rb->length = ntohll(rb->length);
301     rb->remote_rkey = ntohl(rb->remote_rkey);
302 }
303
304 /*
305  * Virtual address of the above structures used for transmitting
306  * the RAMBlock descriptions at connection-time.
307  * This structure is *not* transmitted.
308  */
309 typedef struct RDMALocalBlocks {
310     int nb_blocks;
311     bool     init;             /* main memory init complete */
312     RDMALocalBlock *block;
313 } RDMALocalBlocks;
314
315 /*
316  * Main data structure for RDMA state.
317  * While there is only one copy of this structure being allocated right now,
318  * this is the place where one would start if you wanted to consider
319  * having more than one RDMA connection open at the same time.
320  */
321 typedef struct RDMAContext {
322     char *host;
323     int port;
324
325     RDMAWorkRequestData wr_data[RDMA_WRID_MAX];
326
327     /*
328      * This is used by *_exchange_send() to figure out whether or not
329      * the initial "READY" message has already been received or not.
330      * This is because other functions may potentially poll() and detect
331      * the READY message before send() does, in which case we need to
332      * know if it completed.
333      */
334     int control_ready_expected;
335
336     /* number of outstanding writes */
337     int nb_sent;
338
339     /* store info about current buffer so that we can
340        merge it with future sends */
341     uint64_t current_addr;
342     uint64_t current_length;
343     /* index of ram block the current buffer belongs to */
344     int current_index;
345     /* index of the chunk in the current ram block */
346     int current_chunk;
347
348     bool pin_all;
349
350     /*
351      * infiniband-specific variables for opening the device
352      * and maintaining connection state and so forth.
353      *
354      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
355      * cm_id->verbs, cm_id->channel, and cm_id->qp.
356      */
357     struct rdma_cm_id *cm_id;               /* connection manager ID */
358     struct rdma_cm_id *listen_id;
359
360     struct ibv_context          *verbs;
361     struct rdma_event_channel   *channel;
362     struct ibv_qp *qp;                      /* queue pair */
363     struct ibv_comp_channel *comp_channel;  /* completion channel */
364     struct ibv_pd *pd;                      /* protection domain */
365     struct ibv_cq *cq;                      /* completion queue */
366
367     /*
368      * If a previous write failed (perhaps because of a failed
369      * memory registration, then do not attempt any future work
370      * and remember the error state.
371      */
372     int error_state;
373     int error_reported;
374
375     /*
376      * Description of ram blocks used throughout the code.
377      */
378     RDMALocalBlocks local_ram_blocks;
379     RDMARemoteBlock *block;
380
381     /*
382      * Migration on *destination* started.
383      * Then use coroutine yield function.
384      * Source runs in a thread, so we don't care.
385      */
386     int migration_started_on_destination;
387
388     int total_registrations;
389     int total_writes;
390
391     int unregister_current, unregister_next;
392     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
393
394     GHashTable *blockmap;
395 } RDMAContext;
396
397 /*
398  * Interface to the rest of the migration call stack.
399  */
400 typedef struct QEMUFileRDMA {
401     RDMAContext *rdma;
402     size_t len;
403     void *file;
404 } QEMUFileRDMA;
405
406 /*
407  * Main structure for IB Send/Recv control messages.
408  * This gets prepended at the beginning of every Send/Recv.
409  */
410 typedef struct QEMU_PACKED {
411     uint32_t len;     /* Total length of data portion */
412     uint32_t type;    /* which control command to perform */
413     uint32_t repeat;  /* number of commands in data portion of same type */
414     uint32_t padding;
415 } RDMAControlHeader;
416
417 static void control_to_network(RDMAControlHeader *control)
418 {
419     control->type = htonl(control->type);
420     control->len = htonl(control->len);
421     control->repeat = htonl(control->repeat);
422 }
423
424 static void network_to_control(RDMAControlHeader *control)
425 {
426     control->type = ntohl(control->type);
427     control->len = ntohl(control->len);
428     control->repeat = ntohl(control->repeat);
429 }
430
431 /*
432  * Register a single Chunk.
433  * Information sent by the source VM to inform the dest
434  * to register an single chunk of memory before we can perform
435  * the actual RDMA operation.
436  */
437 typedef struct QEMU_PACKED {
438     union QEMU_PACKED {
439         uint64_t current_addr;  /* offset into the ramblock of the chunk */
440         uint64_t chunk;         /* chunk to lookup if unregistering */
441     } key;
442     uint32_t current_index; /* which ramblock the chunk belongs to */
443     uint32_t padding;
444     uint64_t chunks;            /* how many sequential chunks to register */
445 } RDMARegister;
446
447 static void register_to_network(RDMARegister *reg)
448 {
449     reg->key.current_addr = htonll(reg->key.current_addr);
450     reg->current_index = htonl(reg->current_index);
451     reg->chunks = htonll(reg->chunks);
452 }
453
454 static void network_to_register(RDMARegister *reg)
455 {
456     reg->key.current_addr = ntohll(reg->key.current_addr);
457     reg->current_index = ntohl(reg->current_index);
458     reg->chunks = ntohll(reg->chunks);
459 }
460
461 typedef struct QEMU_PACKED {
462     uint32_t value;     /* if zero, we will madvise() */
463     uint32_t block_idx; /* which ram block index */
464     uint64_t offset;    /* where in the remote ramblock this chunk */
465     uint64_t length;    /* length of the chunk */
466 } RDMACompress;
467
468 static void compress_to_network(RDMACompress *comp)
469 {
470     comp->value = htonl(comp->value);
471     comp->block_idx = htonl(comp->block_idx);
472     comp->offset = htonll(comp->offset);
473     comp->length = htonll(comp->length);
474 }
475
476 static void network_to_compress(RDMACompress *comp)
477 {
478     comp->value = ntohl(comp->value);
479     comp->block_idx = ntohl(comp->block_idx);
480     comp->offset = ntohll(comp->offset);
481     comp->length = ntohll(comp->length);
482 }
483
484 /*
485  * The result of the dest's memory registration produces an "rkey"
486  * which the source VM must reference in order to perform
487  * the RDMA operation.
488  */
489 typedef struct QEMU_PACKED {
490     uint32_t rkey;
491     uint32_t padding;
492     uint64_t host_addr;
493 } RDMARegisterResult;
494
495 static void result_to_network(RDMARegisterResult *result)
496 {
497     result->rkey = htonl(result->rkey);
498     result->host_addr = htonll(result->host_addr);
499 };
500
501 static void network_to_result(RDMARegisterResult *result)
502 {
503     result->rkey = ntohl(result->rkey);
504     result->host_addr = ntohll(result->host_addr);
505 };
506
507 const char *print_wrid(int wrid);
508 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
509                                    uint8_t *data, RDMAControlHeader *resp,
510                                    int *resp_idx,
511                                    int (*callback)(RDMAContext *rdma));
512
513 static inline uint64_t ram_chunk_index(uint8_t *start, uint8_t *host)
514 {
515     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
516 }
517
518 static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block,
519                                        uint64_t i)
520 {
521     return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
522                                     + (i << RDMA_REG_CHUNK_SHIFT));
523 }
524
525 static inline uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, uint64_t i)
526 {
527     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
528                                          (1UL << RDMA_REG_CHUNK_SHIFT);
529
530     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
531         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
532     }
533
534     return result;
535 }
536
537 static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
538                          ram_addr_t block_offset, uint64_t length)
539 {
540     RDMALocalBlocks *local = &rdma->local_ram_blocks;
541     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
542         (void *) block_offset);
543     RDMALocalBlock *old = local->block;
544
545     assert(block == NULL);
546
547     local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
548
549     if (local->nb_blocks) {
550         int x;
551
552         for (x = 0; x < local->nb_blocks; x++) {
553             g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
554             g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
555                                                 &local->block[x]);
556         }
557         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
558         g_free(old);
559     }
560
561     block = &local->block[local->nb_blocks];
562
563     block->local_host_addr = host_addr;
564     block->offset = block_offset;
565     block->length = length;
566     block->index = local->nb_blocks;
567     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
568     block->transit_bitmap = bitmap_new(block->nb_chunks);
569     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
570     block->unregister_bitmap = bitmap_new(block->nb_chunks);
571     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
572     block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
573
574     block->is_ram_block = local->init ? false : true;
575
576     g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
577
578     DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
579            " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
580             local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
581             block->length, (uint64_t) (block->local_host_addr + block->length),
582                 BITS_TO_LONGS(block->nb_chunks) *
583                     sizeof(unsigned long) * 8, block->nb_chunks);
584
585     local->nb_blocks++;
586
587     return 0;
588 }
589
590 /*
591  * Memory regions need to be registered with the device and queue pairs setup
592  * in advanced before the migration starts. This tells us where the RAM blocks
593  * are so that we can register them individually.
594  */
595 static void qemu_rdma_init_one_block(void *host_addr,
596     ram_addr_t block_offset, ram_addr_t length, void *opaque)
597 {
598     __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
599 }
600
601 /*
602  * Identify the RAMBlocks and their quantity. They will be references to
603  * identify chunk boundaries inside each RAMBlock and also be referenced
604  * during dynamic page registration.
605  */
606 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
607 {
608     RDMALocalBlocks *local = &rdma->local_ram_blocks;
609
610     assert(rdma->blockmap == NULL);
611     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
612     memset(local, 0, sizeof *local);
613     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
614     DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
615     rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
616                         rdma->local_ram_blocks.nb_blocks);
617     local->init = true;
618     return 0;
619 }
620
621 static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
622 {
623     RDMALocalBlocks *local = &rdma->local_ram_blocks;
624     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
625         (void *) block_offset);
626     RDMALocalBlock *old = local->block;
627     int x;
628
629     assert(block);
630
631     if (block->pmr) {
632         int j;
633
634         for (j = 0; j < block->nb_chunks; j++) {
635             if (!block->pmr[j]) {
636                 continue;
637             }
638             ibv_dereg_mr(block->pmr[j]);
639             rdma->total_registrations--;
640         }
641         g_free(block->pmr);
642         block->pmr = NULL;
643     }
644
645     if (block->mr) {
646         ibv_dereg_mr(block->mr);
647         rdma->total_registrations--;
648         block->mr = NULL;
649     }
650
651     g_free(block->transit_bitmap);
652     block->transit_bitmap = NULL;
653
654     g_free(block->unregister_bitmap);
655     block->unregister_bitmap = NULL;
656
657     g_free(block->remote_keys);
658     block->remote_keys = NULL;
659
660     for (x = 0; x < local->nb_blocks; x++) {
661         g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
662     }
663
664     if (local->nb_blocks > 1) {
665
666         local->block = g_malloc0(sizeof(RDMALocalBlock) *
667                                     (local->nb_blocks - 1));
668
669         if (block->index) {
670             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
671         }
672
673         if (block->index < (local->nb_blocks - 1)) {
674             memcpy(local->block + block->index, old + (block->index + 1),
675                 sizeof(RDMALocalBlock) *
676                     (local->nb_blocks - (block->index + 1)));
677         }
678     } else {
679         assert(block == local->block);
680         local->block = NULL;
681     }
682
683     DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
684            " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
685             local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
686             block->length, (uint64_t) (block->local_host_addr + block->length),
687                 BITS_TO_LONGS(block->nb_chunks) *
688                     sizeof(unsigned long) * 8, block->nb_chunks);
689
690     g_free(old);
691
692     local->nb_blocks--;
693
694     if (local->nb_blocks) {
695         for (x = 0; x < local->nb_blocks; x++) {
696             g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
697                                                 &local->block[x]);
698         }
699     }
700
701     return 0;
702 }
703
704 /*
705  * Put in the log file which RDMA device was opened and the details
706  * associated with that device.
707  */
708 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
709 {
710     printf("%s RDMA Device opened: kernel name %s "
711            "uverbs device name %s, "
712            "infiniband_verbs class device path %s,"
713            " infiniband class device path %s\n",
714                 who,
715                 verbs->device->name,
716                 verbs->device->dev_name,
717                 verbs->device->dev_path,
718                 verbs->device->ibdev_path);
719 }
720
721 /*
722  * Put in the log file the RDMA gid addressing information,
723  * useful for folks who have trouble understanding the
724  * RDMA device hierarchy in the kernel.
725  */
726 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
727 {
728     char sgid[33];
729     char dgid[33];
730     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
731     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
732     DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
733 }
734
735 /*
736  * Figure out which RDMA device corresponds to the requested IP hostname
737  * Also create the initial connection manager identifiers for opening
738  * the connection.
739  */
740 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
741 {
742     int ret;
743     struct addrinfo *res;
744     char port_str[16];
745     struct rdma_cm_event *cm_event;
746     char ip[40] = "unknown";
747     struct addrinfo *e;
748
749     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
750         ERROR(errp, "RDMA hostname has not been set");
751         return -1;
752     }
753
754     /* create CM channel */
755     rdma->channel = rdma_create_event_channel();
756     if (!rdma->channel) {
757         ERROR(errp, "could not create CM channel");
758         return -1;
759     }
760
761     /* create CM id */
762     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
763     if (ret) {
764         ERROR(errp, "could not create channel id");
765         goto err_resolve_create_id;
766     }
767
768     snprintf(port_str, 16, "%d", rdma->port);
769     port_str[15] = '\0';
770
771     ret = getaddrinfo(rdma->host, port_str, NULL, &res);
772     if (ret < 0) {
773         ERROR(errp, "could not getaddrinfo address %s", rdma->host);
774         goto err_resolve_get_addr;
775     }
776
777     for (e = res; e != NULL; e = e->ai_next) {
778         inet_ntop(e->ai_family,
779             &((struct sockaddr_in *) e->ai_addr)->sin_addr, ip, sizeof ip);
780         DPRINTF("Trying %s => %s\n", rdma->host, ip);
781
782         /* resolve the first address */
783         ret = rdma_resolve_addr(rdma->cm_id, NULL, e->ai_addr,
784                 RDMA_RESOLVE_TIMEOUT_MS);
785         if (!ret) {
786             goto route;
787         }
788     }
789
790     ERROR(errp, "could not resolve address %s", rdma->host);
791     goto err_resolve_get_addr;
792
793 route:
794     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
795
796     ret = rdma_get_cm_event(rdma->channel, &cm_event);
797     if (ret) {
798         ERROR(errp, "could not perform event_addr_resolved");
799         goto err_resolve_get_addr;
800     }
801
802     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
803         ERROR(errp, "result not equal to event_addr_resolved %s",
804                 rdma_event_str(cm_event->event));
805         perror("rdma_resolve_addr");
806         goto err_resolve_get_addr;
807     }
808     rdma_ack_cm_event(cm_event);
809
810     /* resolve route */
811     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
812     if (ret) {
813         ERROR(errp, "could not resolve rdma route");
814         goto err_resolve_get_addr;
815     }
816
817     ret = rdma_get_cm_event(rdma->channel, &cm_event);
818     if (ret) {
819         ERROR(errp, "could not perform event_route_resolved");
820         goto err_resolve_get_addr;
821     }
822     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
823         ERROR(errp, "result not equal to event_route_resolved: %s",
824                         rdma_event_str(cm_event->event));
825         rdma_ack_cm_event(cm_event);
826         goto err_resolve_get_addr;
827     }
828     rdma_ack_cm_event(cm_event);
829     rdma->verbs = rdma->cm_id->verbs;
830     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
831     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
832     return 0;
833
834 err_resolve_get_addr:
835     rdma_destroy_id(rdma->cm_id);
836     rdma->cm_id = NULL;
837 err_resolve_create_id:
838     rdma_destroy_event_channel(rdma->channel);
839     rdma->channel = NULL;
840
841     return -1;
842 }
843
844 /*
845  * Create protection domain and completion queues
846  */
847 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
848 {
849     /* allocate pd */
850     rdma->pd = ibv_alloc_pd(rdma->verbs);
851     if (!rdma->pd) {
852         fprintf(stderr, "failed to allocate protection domain\n");
853         return -1;
854     }
855
856     /* create completion channel */
857     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
858     if (!rdma->comp_channel) {
859         fprintf(stderr, "failed to allocate completion channel\n");
860         goto err_alloc_pd_cq;
861     }
862
863     /*
864      * Completion queue can be filled by both read and write work requests,
865      * so must reflect the sum of both possible queue sizes.
866      */
867     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
868             NULL, rdma->comp_channel, 0);
869     if (!rdma->cq) {
870         fprintf(stderr, "failed to allocate completion queue\n");
871         goto err_alloc_pd_cq;
872     }
873
874     return 0;
875
876 err_alloc_pd_cq:
877     if (rdma->pd) {
878         ibv_dealloc_pd(rdma->pd);
879     }
880     if (rdma->comp_channel) {
881         ibv_destroy_comp_channel(rdma->comp_channel);
882     }
883     rdma->pd = NULL;
884     rdma->comp_channel = NULL;
885     return -1;
886
887 }
888
889 /*
890  * Create queue pairs.
891  */
892 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
893 {
894     struct ibv_qp_init_attr attr = { 0 };
895     int ret;
896
897     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
898     attr.cap.max_recv_wr = 3;
899     attr.cap.max_send_sge = 1;
900     attr.cap.max_recv_sge = 1;
901     attr.send_cq = rdma->cq;
902     attr.recv_cq = rdma->cq;
903     attr.qp_type = IBV_QPT_RC;
904
905     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
906     if (ret) {
907         return -1;
908     }
909
910     rdma->qp = rdma->cm_id->qp;
911     return 0;
912 }
913
914 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
915 {
916     int i;
917     RDMALocalBlocks *local = &rdma->local_ram_blocks;
918
919     for (i = 0; i < local->nb_blocks; i++) {
920         local->block[i].mr =
921             ibv_reg_mr(rdma->pd,
922                     local->block[i].local_host_addr,
923                     local->block[i].length,
924                     IBV_ACCESS_LOCAL_WRITE |
925                     IBV_ACCESS_REMOTE_WRITE
926                     );
927         if (!local->block[i].mr) {
928             perror("Failed to register local dest ram block!\n");
929             break;
930         }
931         rdma->total_registrations++;
932     }
933
934     if (i >= local->nb_blocks) {
935         return 0;
936     }
937
938     for (i--; i >= 0; i--) {
939         ibv_dereg_mr(local->block[i].mr);
940         rdma->total_registrations--;
941     }
942
943     return -1;
944
945 }
946
947 /*
948  * Find the ram block that corresponds to the page requested to be
949  * transmitted by QEMU.
950  *
951  * Once the block is found, also identify which 'chunk' within that
952  * block that the page belongs to.
953  *
954  * This search cannot fail or the migration will fail.
955  */
956 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
957                                       uint64_t block_offset,
958                                       uint64_t offset,
959                                       uint64_t length,
960                                       uint64_t *block_index,
961                                       uint64_t *chunk_index)
962 {
963     uint64_t current_addr = block_offset + offset;
964     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
965                                                 (void *) block_offset);
966     assert(block);
967     assert(current_addr >= block->offset);
968     assert((current_addr + length) <= (block->offset + block->length));
969
970     *block_index = block->index;
971     *chunk_index = ram_chunk_index(block->local_host_addr,
972                 block->local_host_addr + (current_addr - block->offset));
973
974     return 0;
975 }
976
977 /*
978  * Register a chunk with IB. If the chunk was already registered
979  * previously, then skip.
980  *
981  * Also return the keys associated with the registration needed
982  * to perform the actual RDMA operation.
983  */
984 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
985         RDMALocalBlock *block, uint8_t *host_addr,
986         uint32_t *lkey, uint32_t *rkey, int chunk,
987         uint8_t *chunk_start, uint8_t *chunk_end)
988 {
989     if (block->mr) {
990         if (lkey) {
991             *lkey = block->mr->lkey;
992         }
993         if (rkey) {
994             *rkey = block->mr->rkey;
995         }
996         return 0;
997     }
998
999     /* allocate memory to store chunk MRs */
1000     if (!block->pmr) {
1001         block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
1002         if (!block->pmr) {
1003             return -1;
1004         }
1005     }
1006
1007     /*
1008      * If 'rkey', then we're the destination, so grant access to the source.
1009      *
1010      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1011      */
1012     if (!block->pmr[chunk]) {
1013         uint64_t len = chunk_end - chunk_start;
1014
1015         DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
1016                  len, chunk_start);
1017
1018         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1019                 chunk_start, len,
1020                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1021                         IBV_ACCESS_REMOTE_WRITE) : 0));
1022
1023         if (!block->pmr[chunk]) {
1024             perror("Failed to register chunk!");
1025             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1026                             " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
1027                             " local %" PRIu64 " registrations: %d\n",
1028                             block->index, chunk, (uint64_t) chunk_start,
1029                             (uint64_t) chunk_end, (uint64_t) host_addr,
1030                             (uint64_t) block->local_host_addr,
1031                             rdma->total_registrations);
1032             return -1;
1033         }
1034         rdma->total_registrations++;
1035     }
1036
1037     if (lkey) {
1038         *lkey = block->pmr[chunk]->lkey;
1039     }
1040     if (rkey) {
1041         *rkey = block->pmr[chunk]->rkey;
1042     }
1043     return 0;
1044 }
1045
1046 /*
1047  * Register (at connection time) the memory used for control
1048  * channel messages.
1049  */
1050 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1051 {
1052     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1053             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1054             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1055     if (rdma->wr_data[idx].control_mr) {
1056         rdma->total_registrations++;
1057         return 0;
1058     }
1059     fprintf(stderr, "qemu_rdma_reg_control failed!\n");
1060     return -1;
1061 }
1062
1063 const char *print_wrid(int wrid)
1064 {
1065     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1066         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1067     }
1068     return wrid_desc[wrid];
1069 }
1070
1071 /*
1072  * RDMA requires memory registration (mlock/pinning), but this is not good for
1073  * overcommitment.
1074  *
1075  * In preparation for the future where LRU information or workload-specific
1076  * writable writable working set memory access behavior is available to QEMU
1077  * it would be nice to have in place the ability to UN-register/UN-pin
1078  * particular memory regions from the RDMA hardware when it is determine that
1079  * those regions of memory will likely not be accessed again in the near future.
1080  *
1081  * While we do not yet have such information right now, the following
1082  * compile-time option allows us to perform a non-optimized version of this
1083  * behavior.
1084  *
1085  * By uncommenting this option, you will cause *all* RDMA transfers to be
1086  * unregistered immediately after the transfer completes on both sides of the
1087  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1088  *
1089  * This will have a terrible impact on migration performance, so until future
1090  * workload information or LRU information is available, do not attempt to use
1091  * this feature except for basic testing.
1092  */
1093 //#define RDMA_UNREGISTRATION_EXAMPLE
1094
1095 /*
1096  * Perform a non-optimized memory unregistration after every transfer
1097  * for demonsration purposes, only if pin-all is not requested.
1098  *
1099  * Potential optimizations:
1100  * 1. Start a new thread to run this function continuously
1101         - for bit clearing
1102         - and for receipt of unregister messages
1103  * 2. Use an LRU.
1104  * 3. Use workload hints.
1105  */
1106 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1107 {
1108     while (rdma->unregistrations[rdma->unregister_current]) {
1109         int ret;
1110         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1111         uint64_t chunk =
1112             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1113         uint64_t index =
1114             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1115         RDMALocalBlock *block =
1116             &(rdma->local_ram_blocks.block[index]);
1117         RDMARegister reg = { .current_index = index };
1118         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1119                                  };
1120         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1121                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1122                                    .repeat = 1,
1123                                  };
1124
1125         DDPRINTF("Processing unregister for chunk: %" PRIu64
1126                  " at position %d\n", chunk, rdma->unregister_current);
1127
1128         rdma->unregistrations[rdma->unregister_current] = 0;
1129         rdma->unregister_current++;
1130
1131         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1132             rdma->unregister_current = 0;
1133         }
1134
1135
1136         /*
1137          * Unregistration is speculative (because migration is single-threaded
1138          * and we cannot break the protocol's inifinband message ordering).
1139          * Thus, if the memory is currently being used for transmission,
1140          * then abort the attempt to unregister and try again
1141          * later the next time a completion is received for this memory.
1142          */
1143         clear_bit(chunk, block->unregister_bitmap);
1144
1145         if (test_bit(chunk, block->transit_bitmap)) {
1146             DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
1147             continue;
1148         }
1149
1150         DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
1151
1152         ret = ibv_dereg_mr(block->pmr[chunk]);
1153         block->pmr[chunk] = NULL;
1154         block->remote_keys[chunk] = 0;
1155
1156         if (ret != 0) {
1157             perror("unregistration chunk failed");
1158             return -ret;
1159         }
1160         rdma->total_registrations--;
1161
1162         reg.key.chunk = chunk;
1163         register_to_network(&reg);
1164         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1165                                 &resp, NULL, NULL);
1166         if (ret < 0) {
1167             return ret;
1168         }
1169
1170         DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
1171     }
1172
1173     return 0;
1174 }
1175
1176 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1177                                          uint64_t chunk)
1178 {
1179     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1180
1181     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1182     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1183
1184     return result;
1185 }
1186
1187 /*
1188  * Set bit for unregistration in the next iteration.
1189  * We cannot transmit right here, but will unpin later.
1190  */
1191 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1192                                         uint64_t chunk, uint64_t wr_id)
1193 {
1194     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1195         fprintf(stderr, "rdma migration: queue is full!\n");
1196     } else {
1197         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1198
1199         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1200             DDPRINTF("Appending unregister chunk %" PRIu64
1201                     " at position %d\n", chunk, rdma->unregister_next);
1202
1203             rdma->unregistrations[rdma->unregister_next++] =
1204                     qemu_rdma_make_wrid(wr_id, index, chunk);
1205
1206             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1207                 rdma->unregister_next = 0;
1208             }
1209         } else {
1210             DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
1211                     chunk);
1212         }
1213     }
1214 }
1215
1216 /*
1217  * Consult the connection manager to see a work request
1218  * (of any kind) has completed.
1219  * Return the work request ID that completed.
1220  */
1221 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out,
1222                                uint32_t *byte_len)
1223 {
1224     int ret;
1225     struct ibv_wc wc;
1226     uint64_t wr_id;
1227
1228     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1229
1230     if (!ret) {
1231         *wr_id_out = RDMA_WRID_NONE;
1232         return 0;
1233     }
1234
1235     if (ret < 0) {
1236         fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
1237         return ret;
1238     }
1239
1240     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1241
1242     if (wc.status != IBV_WC_SUCCESS) {
1243         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1244                         wc.status, ibv_wc_status_str(wc.status));
1245         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1246
1247         return -1;
1248     }
1249
1250     if (rdma->control_ready_expected &&
1251         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1252         DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
1253                   " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
1254                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1255         rdma->control_ready_expected = 0;
1256     }
1257
1258     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1259         uint64_t chunk =
1260             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1261         uint64_t index =
1262             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1263         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1264
1265         DDDPRINTF("completions %s (%" PRId64 ") left %d, "
1266                  "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
1267                  print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
1268                  block->local_host_addr, (void *)block->remote_host_addr);
1269
1270         clear_bit(chunk, block->transit_bitmap);
1271
1272         if (rdma->nb_sent > 0) {
1273             rdma->nb_sent--;
1274         }
1275
1276         if (!rdma->pin_all) {
1277             /*
1278              * FYI: If one wanted to signal a specific chunk to be unregistered
1279              * using LRU or workload-specific information, this is the function
1280              * you would call to do so. That chunk would then get asynchronously
1281              * unregistered later.
1282              */
1283 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1284             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1285 #endif
1286         }
1287     } else {
1288         DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
1289             print_wrid(wr_id), wr_id, rdma->nb_sent);
1290     }
1291
1292     *wr_id_out = wc.wr_id;
1293     if (byte_len) {
1294         *byte_len = wc.byte_len;
1295     }
1296
1297     return  0;
1298 }
1299
1300 /*
1301  * Block until the next work request has completed.
1302  *
1303  * First poll to see if a work request has already completed,
1304  * otherwise block.
1305  *
1306  * If we encounter completed work requests for IDs other than
1307  * the one we're interested in, then that's generally an error.
1308  *
1309  * The only exception is actual RDMA Write completions. These
1310  * completions only need to be recorded, but do not actually
1311  * need further processing.
1312  */
1313 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
1314                                     uint32_t *byte_len)
1315 {
1316     int num_cq_events = 0, ret = 0;
1317     struct ibv_cq *cq;
1318     void *cq_ctx;
1319     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1320
1321     if (ibv_req_notify_cq(rdma->cq, 0)) {
1322         return -1;
1323     }
1324     /* poll cq first */
1325     while (wr_id != wrid_requested) {
1326         ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1327         if (ret < 0) {
1328             return ret;
1329         }
1330
1331         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1332
1333         if (wr_id == RDMA_WRID_NONE) {
1334             break;
1335         }
1336         if (wr_id != wrid_requested) {
1337             DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1338                 print_wrid(wrid_requested),
1339                 wrid_requested, print_wrid(wr_id), wr_id);
1340         }
1341     }
1342
1343     if (wr_id == wrid_requested) {
1344         return 0;
1345     }
1346
1347     while (1) {
1348         /*
1349          * Coroutine doesn't start until process_incoming_migration()
1350          * so don't yield unless we know we're running inside of a coroutine.
1351          */
1352         if (rdma->migration_started_on_destination) {
1353             yield_until_fd_readable(rdma->comp_channel->fd);
1354         }
1355
1356         if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1357             perror("ibv_get_cq_event");
1358             goto err_block_for_wrid;
1359         }
1360
1361         num_cq_events++;
1362
1363         if (ibv_req_notify_cq(cq, 0)) {
1364             goto err_block_for_wrid;
1365         }
1366
1367         while (wr_id != wrid_requested) {
1368             ret = qemu_rdma_poll(rdma, &wr_id_in, byte_len);
1369             if (ret < 0) {
1370                 goto err_block_for_wrid;
1371             }
1372
1373             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1374
1375             if (wr_id == RDMA_WRID_NONE) {
1376                 break;
1377             }
1378             if (wr_id != wrid_requested) {
1379                 DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1380                     print_wrid(wrid_requested), wrid_requested,
1381                     print_wrid(wr_id), wr_id);
1382             }
1383         }
1384
1385         if (wr_id == wrid_requested) {
1386             goto success_block_for_wrid;
1387         }
1388     }
1389
1390 success_block_for_wrid:
1391     if (num_cq_events) {
1392         ibv_ack_cq_events(cq, num_cq_events);
1393     }
1394     return 0;
1395
1396 err_block_for_wrid:
1397     if (num_cq_events) {
1398         ibv_ack_cq_events(cq, num_cq_events);
1399     }
1400     return ret;
1401 }
1402
1403 /*
1404  * Post a SEND message work request for the control channel
1405  * containing some data and block until the post completes.
1406  */
1407 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1408                                        RDMAControlHeader *head)
1409 {
1410     int ret = 0;
1411     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_CONTROL];
1412     struct ibv_send_wr *bad_wr;
1413     struct ibv_sge sge = {
1414                            .addr = (uint64_t)(wr->control),
1415                            .length = head->len + sizeof(RDMAControlHeader),
1416                            .lkey = wr->control_mr->lkey,
1417                          };
1418     struct ibv_send_wr send_wr = {
1419                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1420                                    .opcode = IBV_WR_SEND,
1421                                    .send_flags = IBV_SEND_SIGNALED,
1422                                    .sg_list = &sge,
1423                                    .num_sge = 1,
1424                                 };
1425
1426     DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
1427
1428     /*
1429      * We don't actually need to do a memcpy() in here if we used
1430      * the "sge" properly, but since we're only sending control messages
1431      * (not RAM in a performance-critical path), then its OK for now.
1432      *
1433      * The copy makes the RDMAControlHeader simpler to manipulate
1434      * for the time being.
1435      */
1436     assert(head->len <= RDMA_CONTROL_MAX_BUFFER - sizeof(*head));
1437     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1438     control_to_network((void *) wr->control);
1439
1440     if (buf) {
1441         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1442     }
1443
1444
1445     if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
1446         return -1;
1447     }
1448
1449     if (ret < 0) {
1450         fprintf(stderr, "Failed to use post IB SEND for control!\n");
1451         return ret;
1452     }
1453
1454     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL, NULL);
1455     if (ret < 0) {
1456         fprintf(stderr, "rdma migration: send polling control error!\n");
1457     }
1458
1459     return ret;
1460 }
1461
1462 /*
1463  * Post a RECV work request in anticipation of some future receipt
1464  * of data on the control channel.
1465  */
1466 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1467 {
1468     struct ibv_recv_wr *bad_wr;
1469     struct ibv_sge sge = {
1470                             .addr = (uint64_t)(rdma->wr_data[idx].control),
1471                             .length = RDMA_CONTROL_MAX_BUFFER,
1472                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1473                          };
1474
1475     struct ibv_recv_wr recv_wr = {
1476                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1477                                     .sg_list = &sge,
1478                                     .num_sge = 1,
1479                                  };
1480
1481
1482     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1483         return -1;
1484     }
1485
1486     return 0;
1487 }
1488
1489 /*
1490  * Block and wait for a RECV control channel message to arrive.
1491  */
1492 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1493                 RDMAControlHeader *head, int expecting, int idx)
1494 {
1495     uint32_t byte_len;
1496     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx,
1497                                        &byte_len);
1498
1499     if (ret < 0) {
1500         fprintf(stderr, "rdma migration: recv polling control error!\n");
1501         return ret;
1502     }
1503
1504     network_to_control((void *) rdma->wr_data[idx].control);
1505     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1506
1507     DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
1508
1509     if (expecting == RDMA_CONTROL_NONE) {
1510         DDDPRINTF("Surprise: got %s (%d)\n",
1511                   control_desc[head->type], head->type);
1512     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1513         fprintf(stderr, "Was expecting a %s (%d) control message"
1514                 ", but got: %s (%d), length: %d\n",
1515                 control_desc[expecting], expecting,
1516                 control_desc[head->type], head->type, head->len);
1517         return -EIO;
1518     }
1519     if (head->len > RDMA_CONTROL_MAX_BUFFER - sizeof(*head)) {
1520         fprintf(stderr, "too long length: %d\n", head->len);
1521         return -EINVAL;
1522     }
1523     if (sizeof(*head) + head->len != byte_len) {
1524         fprintf(stderr, "Malformed length: %d byte_len %d\n",
1525                 head->len, byte_len);
1526         return -EINVAL;
1527     }
1528
1529     return 0;
1530 }
1531
1532 /*
1533  * When a RECV work request has completed, the work request's
1534  * buffer is pointed at the header.
1535  *
1536  * This will advance the pointer to the data portion
1537  * of the control message of the work request's buffer that
1538  * was populated after the work request finished.
1539  */
1540 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1541                                   RDMAControlHeader *head)
1542 {
1543     rdma->wr_data[idx].control_len = head->len;
1544     rdma->wr_data[idx].control_curr =
1545         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1546 }
1547
1548 /*
1549  * This is an 'atomic' high-level operation to deliver a single, unified
1550  * control-channel message.
1551  *
1552  * Additionally, if the user is expecting some kind of reply to this message,
1553  * they can request a 'resp' response message be filled in by posting an
1554  * additional work request on behalf of the user and waiting for an additional
1555  * completion.
1556  *
1557  * The extra (optional) response is used during registration to us from having
1558  * to perform an *additional* exchange of message just to provide a response by
1559  * instead piggy-backing on the acknowledgement.
1560  */
1561 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1562                                    uint8_t *data, RDMAControlHeader *resp,
1563                                    int *resp_idx,
1564                                    int (*callback)(RDMAContext *rdma))
1565 {
1566     int ret = 0;
1567
1568     /*
1569      * Wait until the dest is ready before attempting to deliver the message
1570      * by waiting for a READY message.
1571      */
1572     if (rdma->control_ready_expected) {
1573         RDMAControlHeader resp;
1574         ret = qemu_rdma_exchange_get_response(rdma,
1575                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1576         if (ret < 0) {
1577             return ret;
1578         }
1579     }
1580
1581     /*
1582      * If the user is expecting a response, post a WR in anticipation of it.
1583      */
1584     if (resp) {
1585         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1586         if (ret) {
1587             fprintf(stderr, "rdma migration: error posting"
1588                     " extra control recv for anticipated result!");
1589             return ret;
1590         }
1591     }
1592
1593     /*
1594      * Post a WR to replace the one we just consumed for the READY message.
1595      */
1596     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1597     if (ret) {
1598         fprintf(stderr, "rdma migration: error posting first control recv!");
1599         return ret;
1600     }
1601
1602     /*
1603      * Deliver the control message that was requested.
1604      */
1605     ret = qemu_rdma_post_send_control(rdma, data, head);
1606
1607     if (ret < 0) {
1608         fprintf(stderr, "Failed to send control buffer!\n");
1609         return ret;
1610     }
1611
1612     /*
1613      * If we're expecting a response, block and wait for it.
1614      */
1615     if (resp) {
1616         if (callback) {
1617             DDPRINTF("Issuing callback before receiving response...\n");
1618             ret = callback(rdma);
1619             if (ret < 0) {
1620                 return ret;
1621             }
1622         }
1623
1624         DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
1625         ret = qemu_rdma_exchange_get_response(rdma, resp,
1626                                               resp->type, RDMA_WRID_DATA);
1627
1628         if (ret < 0) {
1629             return ret;
1630         }
1631
1632         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1633         if (resp_idx) {
1634             *resp_idx = RDMA_WRID_DATA;
1635         }
1636         DDPRINTF("Response %s received.\n", control_desc[resp->type]);
1637     }
1638
1639     rdma->control_ready_expected = 1;
1640
1641     return 0;
1642 }
1643
1644 /*
1645  * This is an 'atomic' high-level operation to receive a single, unified
1646  * control-channel message.
1647  */
1648 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1649                                 int expecting)
1650 {
1651     RDMAControlHeader ready = {
1652                                 .len = 0,
1653                                 .type = RDMA_CONTROL_READY,
1654                                 .repeat = 1,
1655                               };
1656     int ret;
1657
1658     /*
1659      * Inform the source that we're ready to receive a message.
1660      */
1661     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1662
1663     if (ret < 0) {
1664         fprintf(stderr, "Failed to send control buffer!\n");
1665         return ret;
1666     }
1667
1668     /*
1669      * Block and wait for the message.
1670      */
1671     ret = qemu_rdma_exchange_get_response(rdma, head,
1672                                           expecting, RDMA_WRID_READY);
1673
1674     if (ret < 0) {
1675         return ret;
1676     }
1677
1678     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1679
1680     /*
1681      * Post a new RECV work request to replace the one we just consumed.
1682      */
1683     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1684     if (ret) {
1685         fprintf(stderr, "rdma migration: error posting second control recv!");
1686         return ret;
1687     }
1688
1689     return 0;
1690 }
1691
1692 /*
1693  * Write an actual chunk of memory using RDMA.
1694  *
1695  * If we're using dynamic registration on the dest-side, we have to
1696  * send a registration command first.
1697  */
1698 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1699                                int current_index, uint64_t current_addr,
1700                                uint64_t length)
1701 {
1702     struct ibv_sge sge;
1703     struct ibv_send_wr send_wr = { 0 };
1704     struct ibv_send_wr *bad_wr;
1705     int reg_result_idx, ret, count = 0;
1706     uint64_t chunk, chunks;
1707     uint8_t *chunk_start, *chunk_end;
1708     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1709     RDMARegister reg;
1710     RDMARegisterResult *reg_result;
1711     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1712     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1713                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1714                                .repeat = 1,
1715                              };
1716
1717 retry:
1718     sge.addr = (uint64_t)(block->local_host_addr +
1719                             (current_addr - block->offset));
1720     sge.length = length;
1721
1722     chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
1723     chunk_start = ram_chunk_start(block, chunk);
1724
1725     if (block->is_ram_block) {
1726         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1727
1728         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1729             chunks--;
1730         }
1731     } else {
1732         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1733
1734         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1735             chunks--;
1736         }
1737     }
1738
1739     DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
1740         chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1741
1742     chunk_end = ram_chunk_end(block, chunk + chunks);
1743
1744     if (!rdma->pin_all) {
1745 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1746         qemu_rdma_unregister_waiting(rdma);
1747 #endif
1748     }
1749
1750     while (test_bit(chunk, block->transit_bitmap)) {
1751         (void)count;
1752         DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
1753                 " current %" PRIu64 " len %" PRIu64 " %d %d\n",
1754                 count++, current_index, chunk,
1755                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1756
1757         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1758
1759         if (ret < 0) {
1760             fprintf(stderr, "Failed to Wait for previous write to complete "
1761                     "block %d chunk %" PRIu64
1762                     " current %" PRIu64 " len %" PRIu64 " %d\n",
1763                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1764             return ret;
1765         }
1766     }
1767
1768     if (!rdma->pin_all || !block->is_ram_block) {
1769         if (!block->remote_keys[chunk]) {
1770             /*
1771              * This chunk has not yet been registered, so first check to see
1772              * if the entire chunk is zero. If so, tell the other size to
1773              * memset() + madvise() the entire chunk without RDMA.
1774              */
1775
1776             if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
1777                    && buffer_find_nonzero_offset((void *)sge.addr,
1778                                                     length) == length) {
1779                 RDMACompress comp = {
1780                                         .offset = current_addr,
1781                                         .value = 0,
1782                                         .block_idx = current_index,
1783                                         .length = length,
1784                                     };
1785
1786                 head.len = sizeof(comp);
1787                 head.type = RDMA_CONTROL_COMPRESS;
1788
1789                 DDPRINTF("Entire chunk is zero, sending compress: %"
1790                     PRIu64 " for %d "
1791                     "bytes, index: %d, offset: %" PRId64 "...\n",
1792                     chunk, sge.length, current_index, current_addr);
1793
1794                 compress_to_network(&comp);
1795                 ret = qemu_rdma_exchange_send(rdma, &head,
1796                                 (uint8_t *) &comp, NULL, NULL, NULL);
1797
1798                 if (ret < 0) {
1799                     return -EIO;
1800                 }
1801
1802                 acct_update_position(f, sge.length, true);
1803
1804                 return 1;
1805             }
1806
1807             /*
1808              * Otherwise, tell other side to register.
1809              */
1810             reg.current_index = current_index;
1811             if (block->is_ram_block) {
1812                 reg.key.current_addr = current_addr;
1813             } else {
1814                 reg.key.chunk = chunk;
1815             }
1816             reg.chunks = chunks;
1817
1818             DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
1819                     "bytes, index: %d, offset: %" PRId64 "...\n",
1820                     chunk, sge.length, current_index, current_addr);
1821
1822             register_to_network(&reg);
1823             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1824                                     &resp, &reg_result_idx, NULL);
1825             if (ret < 0) {
1826                 return ret;
1827             }
1828
1829             /* try to overlap this single registration with the one we sent. */
1830             if (qemu_rdma_register_and_get_keys(rdma, block,
1831                                                 (uint8_t *) sge.addr,
1832                                                 &sge.lkey, NULL, chunk,
1833                                                 chunk_start, chunk_end)) {
1834                 fprintf(stderr, "cannot get lkey!\n");
1835                 return -EINVAL;
1836             }
1837
1838             reg_result = (RDMARegisterResult *)
1839                     rdma->wr_data[reg_result_idx].control_curr;
1840
1841             network_to_result(reg_result);
1842
1843             DDPRINTF("Received registration result:"
1844                     " my key: %x their key %x, chunk %" PRIu64 "\n",
1845                     block->remote_keys[chunk], reg_result->rkey, chunk);
1846
1847             block->remote_keys[chunk] = reg_result->rkey;
1848             block->remote_host_addr = reg_result->host_addr;
1849         } else {
1850             /* already registered before */
1851             if (qemu_rdma_register_and_get_keys(rdma, block,
1852                                                 (uint8_t *)sge.addr,
1853                                                 &sge.lkey, NULL, chunk,
1854                                                 chunk_start, chunk_end)) {
1855                 fprintf(stderr, "cannot get lkey!\n");
1856                 return -EINVAL;
1857             }
1858         }
1859
1860         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
1861     } else {
1862         send_wr.wr.rdma.rkey = block->remote_rkey;
1863
1864         if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
1865                                                      &sge.lkey, NULL, chunk,
1866                                                      chunk_start, chunk_end)) {
1867             fprintf(stderr, "cannot get lkey!\n");
1868             return -EINVAL;
1869         }
1870     }
1871
1872     /*
1873      * Encode the ram block index and chunk within this wrid.
1874      * We will use this information at the time of completion
1875      * to figure out which bitmap to check against and then which
1876      * chunk in the bitmap to look for.
1877      */
1878     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
1879                                         current_index, chunk);
1880
1881     send_wr.opcode = IBV_WR_RDMA_WRITE;
1882     send_wr.send_flags = IBV_SEND_SIGNALED;
1883     send_wr.sg_list = &sge;
1884     send_wr.num_sge = 1;
1885     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
1886                                 (current_addr - block->offset);
1887
1888     DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
1889               " remote: %lx, bytes %" PRIu32 "\n",
1890               chunk, sge.addr, send_wr.wr.rdma.remote_addr,
1891               sge.length);
1892
1893     /*
1894      * ibv_post_send() does not return negative error numbers,
1895      * per the specification they are positive - no idea why.
1896      */
1897     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1898
1899     if (ret == ENOMEM) {
1900         DDPRINTF("send queue is full. wait a little....\n");
1901         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
1902         if (ret < 0) {
1903             fprintf(stderr, "rdma migration: failed to make "
1904                             "room in full send queue! %d\n", ret);
1905             return ret;
1906         }
1907
1908         goto retry;
1909
1910     } else if (ret > 0) {
1911         perror("rdma migration: post rdma write failed");
1912         return -ret;
1913     }
1914
1915     set_bit(chunk, block->transit_bitmap);
1916     acct_update_position(f, sge.length, false);
1917     rdma->total_writes++;
1918
1919     return 0;
1920 }
1921
1922 /*
1923  * Push out any unwritten RDMA operations.
1924  *
1925  * We support sending out multiple chunks at the same time.
1926  * Not all of them need to get signaled in the completion queue.
1927  */
1928 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
1929 {
1930     int ret;
1931
1932     if (!rdma->current_length) {
1933         return 0;
1934     }
1935
1936     ret = qemu_rdma_write_one(f, rdma,
1937             rdma->current_index, rdma->current_addr, rdma->current_length);
1938
1939     if (ret < 0) {
1940         return ret;
1941     }
1942
1943     if (ret == 0) {
1944         rdma->nb_sent++;
1945         DDDPRINTF("sent total: %d\n", rdma->nb_sent);
1946     }
1947
1948     rdma->current_length = 0;
1949     rdma->current_addr = 0;
1950
1951     return 0;
1952 }
1953
1954 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
1955                     uint64_t offset, uint64_t len)
1956 {
1957     RDMALocalBlock *block;
1958     uint8_t *host_addr;
1959     uint8_t *chunk_end;
1960
1961     if (rdma->current_index < 0) {
1962         return 0;
1963     }
1964
1965     if (rdma->current_chunk < 0) {
1966         return 0;
1967     }
1968
1969     block = &(rdma->local_ram_blocks.block[rdma->current_index]);
1970     host_addr = block->local_host_addr + (offset - block->offset);
1971     chunk_end = ram_chunk_end(block, rdma->current_chunk);
1972
1973     if (rdma->current_length == 0) {
1974         return 0;
1975     }
1976
1977     /*
1978      * Only merge into chunk sequentially.
1979      */
1980     if (offset != (rdma->current_addr + rdma->current_length)) {
1981         return 0;
1982     }
1983
1984     if (offset < block->offset) {
1985         return 0;
1986     }
1987
1988     if ((offset + len) > (block->offset + block->length)) {
1989         return 0;
1990     }
1991
1992     if ((host_addr + len) > chunk_end) {
1993         return 0;
1994     }
1995
1996     return 1;
1997 }
1998
1999 /*
2000  * We're not actually writing here, but doing three things:
2001  *
2002  * 1. Identify the chunk the buffer belongs to.
2003  * 2. If the chunk is full or the buffer doesn't belong to the current
2004  *    chunk, then start a new chunk and flush() the old chunk.
2005  * 3. To keep the hardware busy, we also group chunks into batches
2006  *    and only require that a batch gets acknowledged in the completion
2007  *    qeueue instead of each individual chunk.
2008  */
2009 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
2010                            uint64_t block_offset, uint64_t offset,
2011                            uint64_t len)
2012 {
2013     uint64_t current_addr = block_offset + offset;
2014     uint64_t index = rdma->current_index;
2015     uint64_t chunk = rdma->current_chunk;
2016     int ret;
2017
2018     /* If we cannot merge it, we flush the current buffer first. */
2019     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
2020         ret = qemu_rdma_write_flush(f, rdma);
2021         if (ret) {
2022             return ret;
2023         }
2024         rdma->current_length = 0;
2025         rdma->current_addr = current_addr;
2026
2027         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2028                                          offset, len, &index, &chunk);
2029         if (ret) {
2030             fprintf(stderr, "ram block search failed\n");
2031             return ret;
2032         }
2033         rdma->current_index = index;
2034         rdma->current_chunk = chunk;
2035     }
2036
2037     /* merge it */
2038     rdma->current_length += len;
2039
2040     /* flush it if buffer is too large */
2041     if (rdma->current_length >= RDMA_MERGE_MAX) {
2042         return qemu_rdma_write_flush(f, rdma);
2043     }
2044
2045     return 0;
2046 }
2047
2048 static void qemu_rdma_cleanup(RDMAContext *rdma)
2049 {
2050     struct rdma_cm_event *cm_event;
2051     int ret, idx;
2052
2053     if (rdma->cm_id) {
2054         if (rdma->error_state) {
2055             RDMAControlHeader head = { .len = 0,
2056                                        .type = RDMA_CONTROL_ERROR,
2057                                        .repeat = 1,
2058                                      };
2059             fprintf(stderr, "Early error. Sending error.\n");
2060             qemu_rdma_post_send_control(rdma, NULL, &head);
2061         }
2062
2063         ret = rdma_disconnect(rdma->cm_id);
2064         if (!ret) {
2065             DDPRINTF("waiting for disconnect\n");
2066             ret = rdma_get_cm_event(rdma->channel, &cm_event);
2067             if (!ret) {
2068                 rdma_ack_cm_event(cm_event);
2069             }
2070         }
2071         DDPRINTF("Disconnected.\n");
2072         rdma->cm_id = NULL;
2073     }
2074
2075     g_free(rdma->block);
2076     rdma->block = NULL;
2077
2078     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2079         if (rdma->wr_data[idx].control_mr) {
2080             rdma->total_registrations--;
2081             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2082         }
2083         rdma->wr_data[idx].control_mr = NULL;
2084     }
2085
2086     if (rdma->local_ram_blocks.block) {
2087         while (rdma->local_ram_blocks.nb_blocks) {
2088             __qemu_rdma_delete_block(rdma,
2089                     rdma->local_ram_blocks.block->offset);
2090         }
2091     }
2092
2093     if (rdma->qp) {
2094         ibv_destroy_qp(rdma->qp);
2095         rdma->qp = NULL;
2096     }
2097     if (rdma->cq) {
2098         ibv_destroy_cq(rdma->cq);
2099         rdma->cq = NULL;
2100     }
2101     if (rdma->comp_channel) {
2102         ibv_destroy_comp_channel(rdma->comp_channel);
2103         rdma->comp_channel = NULL;
2104     }
2105     if (rdma->pd) {
2106         ibv_dealloc_pd(rdma->pd);
2107         rdma->pd = NULL;
2108     }
2109     if (rdma->listen_id) {
2110         rdma_destroy_id(rdma->listen_id);
2111         rdma->listen_id = NULL;
2112     }
2113     if (rdma->cm_id) {
2114         rdma_destroy_id(rdma->cm_id);
2115         rdma->cm_id = NULL;
2116     }
2117     if (rdma->channel) {
2118         rdma_destroy_event_channel(rdma->channel);
2119         rdma->channel = NULL;
2120     }
2121     g_free(rdma->host);
2122     rdma->host = NULL;
2123 }
2124
2125
2126 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2127 {
2128     int ret, idx;
2129     Error *local_err = NULL, **temp = &local_err;
2130
2131     /*
2132      * Will be validated against destination's actual capabilities
2133      * after the connect() completes.
2134      */
2135     rdma->pin_all = pin_all;
2136
2137     ret = qemu_rdma_resolve_host(rdma, temp);
2138     if (ret) {
2139         goto err_rdma_source_init;
2140     }
2141
2142     ret = qemu_rdma_alloc_pd_cq(rdma);
2143     if (ret) {
2144         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2145                     " limits may be too low. Please check $ ulimit -a # and "
2146                     "search for 'ulimit -l' in the output");
2147         goto err_rdma_source_init;
2148     }
2149
2150     ret = qemu_rdma_alloc_qp(rdma);
2151     if (ret) {
2152         ERROR(temp, "rdma migration: error allocating qp!");
2153         goto err_rdma_source_init;
2154     }
2155
2156     ret = qemu_rdma_init_ram_blocks(rdma);
2157     if (ret) {
2158         ERROR(temp, "rdma migration: error initializing ram blocks!");
2159         goto err_rdma_source_init;
2160     }
2161
2162     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2163         ret = qemu_rdma_reg_control(rdma, idx);
2164         if (ret) {
2165             ERROR(temp, "rdma migration: error registering %d control!",
2166                                                             idx);
2167             goto err_rdma_source_init;
2168         }
2169     }
2170
2171     return 0;
2172
2173 err_rdma_source_init:
2174     error_propagate(errp, local_err);
2175     qemu_rdma_cleanup(rdma);
2176     return -1;
2177 }
2178
2179 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2180 {
2181     RDMACapabilities cap = {
2182                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2183                                 .flags = 0,
2184                            };
2185     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2186                                           .retry_count = 5,
2187                                           .private_data = &cap,
2188                                           .private_data_len = sizeof(cap),
2189                                         };
2190     struct rdma_cm_event *cm_event;
2191     int ret;
2192
2193     /*
2194      * Only negotiate the capability with destination if the user
2195      * on the source first requested the capability.
2196      */
2197     if (rdma->pin_all) {
2198         DPRINTF("Server pin-all memory requested.\n");
2199         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2200     }
2201
2202     caps_to_network(&cap);
2203
2204     ret = rdma_connect(rdma->cm_id, &conn_param);
2205     if (ret) {
2206         perror("rdma_connect");
2207         ERROR(errp, "connecting to destination!");
2208         rdma_destroy_id(rdma->cm_id);
2209         rdma->cm_id = NULL;
2210         goto err_rdma_source_connect;
2211     }
2212
2213     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2214     if (ret) {
2215         perror("rdma_get_cm_event after rdma_connect");
2216         ERROR(errp, "connecting to destination!");
2217         rdma_ack_cm_event(cm_event);
2218         rdma_destroy_id(rdma->cm_id);
2219         rdma->cm_id = NULL;
2220         goto err_rdma_source_connect;
2221     }
2222
2223     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2224         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2225         ERROR(errp, "connecting to destination!");
2226         rdma_ack_cm_event(cm_event);
2227         rdma_destroy_id(rdma->cm_id);
2228         rdma->cm_id = NULL;
2229         goto err_rdma_source_connect;
2230     }
2231
2232     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2233     network_to_caps(&cap);
2234
2235     /*
2236      * Verify that the *requested* capabilities are supported by the destination
2237      * and disable them otherwise.
2238      */
2239     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2240         ERROR(errp, "Server cannot support pinning all memory. "
2241                         "Will register memory dynamically.");
2242         rdma->pin_all = false;
2243     }
2244
2245     DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
2246
2247     rdma_ack_cm_event(cm_event);
2248
2249     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2250     if (ret) {
2251         ERROR(errp, "posting second control recv!");
2252         goto err_rdma_source_connect;
2253     }
2254
2255     rdma->control_ready_expected = 1;
2256     rdma->nb_sent = 0;
2257     return 0;
2258
2259 err_rdma_source_connect:
2260     qemu_rdma_cleanup(rdma);
2261     return -1;
2262 }
2263
2264 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2265 {
2266     int ret = -EINVAL, idx;
2267     struct rdma_cm_id *listen_id;
2268     char ip[40] = "unknown";
2269     struct addrinfo *res;
2270     char port_str[16];
2271
2272     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2273         rdma->wr_data[idx].control_len = 0;
2274         rdma->wr_data[idx].control_curr = NULL;
2275     }
2276
2277     if (rdma->host == NULL) {
2278         ERROR(errp, "RDMA host is not set!");
2279         rdma->error_state = -EINVAL;
2280         return -1;
2281     }
2282     /* create CM channel */
2283     rdma->channel = rdma_create_event_channel();
2284     if (!rdma->channel) {
2285         ERROR(errp, "could not create rdma event channel");
2286         rdma->error_state = -EINVAL;
2287         return -1;
2288     }
2289
2290     /* create CM id */
2291     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2292     if (ret) {
2293         ERROR(errp, "could not create cm_id!");
2294         goto err_dest_init_create_listen_id;
2295     }
2296
2297     snprintf(port_str, 16, "%d", rdma->port);
2298     port_str[15] = '\0';
2299
2300     if (rdma->host && strcmp("", rdma->host)) {
2301         struct addrinfo *e;
2302
2303         ret = getaddrinfo(rdma->host, port_str, NULL, &res);
2304         if (ret < 0) {
2305             ERROR(errp, "could not getaddrinfo address %s", rdma->host);
2306             goto err_dest_init_bind_addr;
2307         }
2308
2309         for (e = res; e != NULL; e = e->ai_next) {
2310             inet_ntop(e->ai_family,
2311                 &((struct sockaddr_in *) e->ai_addr)->sin_addr, ip, sizeof ip);
2312             DPRINTF("Trying %s => %s\n", rdma->host, ip);
2313             ret = rdma_bind_addr(listen_id, e->ai_addr);
2314             if (!ret) {
2315                 goto listen;
2316             }
2317         }
2318
2319         ERROR(errp, "Error: could not rdma_bind_addr!");
2320         goto err_dest_init_bind_addr;
2321     } else {
2322         ERROR(errp, "migration host and port not specified!");
2323         ret = -EINVAL;
2324         goto err_dest_init_bind_addr;
2325     }
2326 listen:
2327
2328     rdma->listen_id = listen_id;
2329     qemu_rdma_dump_gid("dest_init", listen_id);
2330     return 0;
2331
2332 err_dest_init_bind_addr:
2333     rdma_destroy_id(listen_id);
2334 err_dest_init_create_listen_id:
2335     rdma_destroy_event_channel(rdma->channel);
2336     rdma->channel = NULL;
2337     rdma->error_state = ret;
2338     return ret;
2339
2340 }
2341
2342 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2343 {
2344     RDMAContext *rdma = NULL;
2345     InetSocketAddress *addr;
2346
2347     if (host_port) {
2348         rdma = g_malloc0(sizeof(RDMAContext));
2349         memset(rdma, 0, sizeof(RDMAContext));
2350         rdma->current_index = -1;
2351         rdma->current_chunk = -1;
2352
2353         addr = inet_parse(host_port, NULL);
2354         if (addr != NULL) {
2355             rdma->port = atoi(addr->port);
2356             rdma->host = g_strdup(addr->host);
2357         } else {
2358             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2359             g_free(rdma);
2360             return NULL;
2361         }
2362     }
2363
2364     return rdma;
2365 }
2366
2367 /*
2368  * QEMUFile interface to the control channel.
2369  * SEND messages for control only.
2370  * pc.ram is handled with regular RDMA messages.
2371  */
2372 static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
2373                                 int64_t pos, int size)
2374 {
2375     QEMUFileRDMA *r = opaque;
2376     QEMUFile *f = r->file;
2377     RDMAContext *rdma = r->rdma;
2378     size_t remaining = size;
2379     uint8_t * data = (void *) buf;
2380     int ret;
2381
2382     CHECK_ERROR_STATE();
2383
2384     /*
2385      * Push out any writes that
2386      * we're queued up for pc.ram.
2387      */
2388     ret = qemu_rdma_write_flush(f, rdma);
2389     if (ret < 0) {
2390         rdma->error_state = ret;
2391         return ret;
2392     }
2393
2394     while (remaining) {
2395         RDMAControlHeader head;
2396
2397         r->len = MIN(remaining, RDMA_SEND_INCREMENT);
2398         remaining -= r->len;
2399
2400         head.len = r->len;
2401         head.type = RDMA_CONTROL_QEMU_FILE;
2402
2403         ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2404
2405         if (ret < 0) {
2406             rdma->error_state = ret;
2407             return ret;
2408         }
2409
2410         data += r->len;
2411     }
2412
2413     return size;
2414 }
2415
2416 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2417                              int size, int idx)
2418 {
2419     size_t len = 0;
2420
2421     if (rdma->wr_data[idx].control_len) {
2422         DDDPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
2423                     rdma->wr_data[idx].control_len, size);
2424
2425         len = MIN(size, rdma->wr_data[idx].control_len);
2426         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2427         rdma->wr_data[idx].control_curr += len;
2428         rdma->wr_data[idx].control_len -= len;
2429     }
2430
2431     return len;
2432 }
2433
2434 /*
2435  * QEMUFile interface to the control channel.
2436  * RDMA links don't use bytestreams, so we have to
2437  * return bytes to QEMUFile opportunistically.
2438  */
2439 static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
2440                                 int64_t pos, int size)
2441 {
2442     QEMUFileRDMA *r = opaque;
2443     RDMAContext *rdma = r->rdma;
2444     RDMAControlHeader head;
2445     int ret = 0;
2446
2447     CHECK_ERROR_STATE();
2448
2449     /*
2450      * First, we hold on to the last SEND message we
2451      * were given and dish out the bytes until we run
2452      * out of bytes.
2453      */
2454     r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
2455     if (r->len) {
2456         return r->len;
2457     }
2458
2459     /*
2460      * Once we run out, we block and wait for another
2461      * SEND message to arrive.
2462      */
2463     ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2464
2465     if (ret < 0) {
2466         rdma->error_state = ret;
2467         return ret;
2468     }
2469
2470     /*
2471      * SEND was received with new bytes, now try again.
2472      */
2473     return qemu_rdma_fill(r->rdma, buf, size, 0);
2474 }
2475
2476 /*
2477  * Block until all the outstanding chunks have been delivered by the hardware.
2478  */
2479 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2480 {
2481     int ret;
2482
2483     if (qemu_rdma_write_flush(f, rdma) < 0) {
2484         return -EIO;
2485     }
2486
2487     while (rdma->nb_sent) {
2488         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE, NULL);
2489         if (ret < 0) {
2490             fprintf(stderr, "rdma migration: complete polling error!\n");
2491             return -EIO;
2492         }
2493     }
2494
2495     qemu_rdma_unregister_waiting(rdma);
2496
2497     return 0;
2498 }
2499
2500 static int qemu_rdma_close(void *opaque)
2501 {
2502     DPRINTF("Shutting down connection.\n");
2503     QEMUFileRDMA *r = opaque;
2504     if (r->rdma) {
2505         qemu_rdma_cleanup(r->rdma);
2506         g_free(r->rdma);
2507     }
2508     g_free(r);
2509     return 0;
2510 }
2511
2512 /*
2513  * Parameters:
2514  *    @offset == 0 :
2515  *        This means that 'block_offset' is a full virtual address that does not
2516  *        belong to a RAMBlock of the virtual machine and instead
2517  *        represents a private malloc'd memory area that the caller wishes to
2518  *        transfer.
2519  *
2520  *    @offset != 0 :
2521  *        Offset is an offset to be added to block_offset and used
2522  *        to also lookup the corresponding RAMBlock.
2523  *
2524  *    @size > 0 :
2525  *        Initiate an transfer this size.
2526  *
2527  *    @size == 0 :
2528  *        A 'hint' or 'advice' that means that we wish to speculatively
2529  *        and asynchronously unregister this memory. In this case, there is no
2530  *        guarantee that the unregister will actually happen, for example,
2531  *        if the memory is being actively transmitted. Additionally, the memory
2532  *        may be re-registered at any future time if a write within the same
2533  *        chunk was requested again, even if you attempted to unregister it
2534  *        here.
2535  *
2536  *    @size < 0 : TODO, not yet supported
2537  *        Unregister the memory NOW. This means that the caller does not
2538  *        expect there to be any future RDMA transfers and we just want to clean
2539  *        things up. This is used in case the upper layer owns the memory and
2540  *        cannot wait for qemu_fclose() to occur.
2541  *
2542  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2543  *                  sent. Usually, this will not be more than a few bytes of
2544  *                  the protocol because most transfers are sent asynchronously.
2545  */
2546 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2547                                   ram_addr_t block_offset, ram_addr_t offset,
2548                                   size_t size, int *bytes_sent)
2549 {
2550     QEMUFileRDMA *rfile = opaque;
2551     RDMAContext *rdma = rfile->rdma;
2552     int ret;
2553
2554     CHECK_ERROR_STATE();
2555
2556     qemu_fflush(f);
2557
2558     if (size > 0) {
2559         /*
2560          * Add this page to the current 'chunk'. If the chunk
2561          * is full, or the page doen't belong to the current chunk,
2562          * an actual RDMA write will occur and a new chunk will be formed.
2563          */
2564         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2565         if (ret < 0) {
2566             fprintf(stderr, "rdma migration: write error! %d\n", ret);
2567             goto err;
2568         }
2569
2570         /*
2571          * We always return 1 bytes because the RDMA
2572          * protocol is completely asynchronous. We do not yet know
2573          * whether an  identified chunk is zero or not because we're
2574          * waiting for other pages to potentially be merged with
2575          * the current chunk. So, we have to call qemu_update_position()
2576          * later on when the actual write occurs.
2577          */
2578         if (bytes_sent) {
2579             *bytes_sent = 1;
2580         }
2581     } else {
2582         uint64_t index, chunk;
2583
2584         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2585         if (size < 0) {
2586             ret = qemu_rdma_drain_cq(f, rdma);
2587             if (ret < 0) {
2588                 fprintf(stderr, "rdma: failed to synchronously drain"
2589                                 " completion queue before unregistration.\n");
2590                 goto err;
2591             }
2592         }
2593         */
2594
2595         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2596                                          offset, size, &index, &chunk);
2597
2598         if (ret) {
2599             fprintf(stderr, "ram block search failed\n");
2600             goto err;
2601         }
2602
2603         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2604
2605         /*
2606          * TODO: Synchronous, guaranteed unregistration (should not occur during
2607          * fast-path). Otherwise, unregisters will process on the next call to
2608          * qemu_rdma_drain_cq()
2609         if (size < 0) {
2610             qemu_rdma_unregister_waiting(rdma);
2611         }
2612         */
2613     }
2614
2615     /*
2616      * Drain the Completion Queue if possible, but do not block,
2617      * just poll.
2618      *
2619      * If nothing to poll, the end of the iteration will do this
2620      * again to make sure we don't overflow the request queue.
2621      */
2622     while (1) {
2623         uint64_t wr_id, wr_id_in;
2624         int ret = qemu_rdma_poll(rdma, &wr_id_in, NULL);
2625         if (ret < 0) {
2626             fprintf(stderr, "rdma migration: polling error! %d\n", ret);
2627             goto err;
2628         }
2629
2630         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2631
2632         if (wr_id == RDMA_WRID_NONE) {
2633             break;
2634         }
2635     }
2636
2637     return RAM_SAVE_CONTROL_DELAYED;
2638 err:
2639     rdma->error_state = ret;
2640     return ret;
2641 }
2642
2643 static int qemu_rdma_accept(RDMAContext *rdma)
2644 {
2645     RDMACapabilities cap;
2646     struct rdma_conn_param conn_param = {
2647                                             .responder_resources = 2,
2648                                             .private_data = &cap,
2649                                             .private_data_len = sizeof(cap),
2650                                          };
2651     struct rdma_cm_event *cm_event;
2652     struct ibv_context *verbs;
2653     int ret = -EINVAL;
2654     int idx;
2655
2656     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2657     if (ret) {
2658         goto err_rdma_dest_wait;
2659     }
2660
2661     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2662         rdma_ack_cm_event(cm_event);
2663         goto err_rdma_dest_wait;
2664     }
2665
2666     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2667
2668     network_to_caps(&cap);
2669
2670     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2671             fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
2672                             cap.version);
2673             rdma_ack_cm_event(cm_event);
2674             goto err_rdma_dest_wait;
2675     }
2676
2677     /*
2678      * Respond with only the capabilities this version of QEMU knows about.
2679      */
2680     cap.flags &= known_capabilities;
2681
2682     /*
2683      * Enable the ones that we do know about.
2684      * Add other checks here as new ones are introduced.
2685      */
2686     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2687         rdma->pin_all = true;
2688     }
2689
2690     rdma->cm_id = cm_event->id;
2691     verbs = cm_event->id->verbs;
2692
2693     rdma_ack_cm_event(cm_event);
2694
2695     DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
2696
2697     caps_to_network(&cap);
2698
2699     DPRINTF("verbs context after listen: %p\n", verbs);
2700
2701     if (!rdma->verbs) {
2702         rdma->verbs = verbs;
2703     } else if (rdma->verbs != verbs) {
2704             fprintf(stderr, "ibv context not matching %p, %p!\n",
2705                     rdma->verbs, verbs);
2706             goto err_rdma_dest_wait;
2707     }
2708
2709     qemu_rdma_dump_id("dest_init", verbs);
2710
2711     ret = qemu_rdma_alloc_pd_cq(rdma);
2712     if (ret) {
2713         fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
2714         goto err_rdma_dest_wait;
2715     }
2716
2717     ret = qemu_rdma_alloc_qp(rdma);
2718     if (ret) {
2719         fprintf(stderr, "rdma migration: error allocating qp!\n");
2720         goto err_rdma_dest_wait;
2721     }
2722
2723     ret = qemu_rdma_init_ram_blocks(rdma);
2724     if (ret) {
2725         fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
2726         goto err_rdma_dest_wait;
2727     }
2728
2729     for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
2730         ret = qemu_rdma_reg_control(rdma, idx);
2731         if (ret) {
2732             fprintf(stderr, "rdma: error registering %d control!\n", idx);
2733             goto err_rdma_dest_wait;
2734         }
2735     }
2736
2737     qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
2738
2739     ret = rdma_accept(rdma->cm_id, &conn_param);
2740     if (ret) {
2741         fprintf(stderr, "rdma_accept returns %d!\n", ret);
2742         goto err_rdma_dest_wait;
2743     }
2744
2745     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2746     if (ret) {
2747         fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
2748         goto err_rdma_dest_wait;
2749     }
2750
2751     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2752         fprintf(stderr, "rdma_accept not event established!\n");
2753         rdma_ack_cm_event(cm_event);
2754         goto err_rdma_dest_wait;
2755     }
2756
2757     rdma_ack_cm_event(cm_event);
2758
2759     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
2760     if (ret) {
2761         fprintf(stderr, "rdma migration: error posting second control recv!\n");
2762         goto err_rdma_dest_wait;
2763     }
2764
2765     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
2766
2767     return 0;
2768
2769 err_rdma_dest_wait:
2770     rdma->error_state = ret;
2771     qemu_rdma_cleanup(rdma);
2772     return ret;
2773 }
2774
2775 /*
2776  * During each iteration of the migration, we listen for instructions
2777  * by the source VM to perform dynamic page registrations before they
2778  * can perform RDMA operations.
2779  *
2780  * We respond with the 'rkey'.
2781  *
2782  * Keep doing this until the source tells us to stop.
2783  */
2784 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
2785                                          uint64_t flags)
2786 {
2787     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
2788                                .type = RDMA_CONTROL_REGISTER_RESULT,
2789                                .repeat = 0,
2790                              };
2791     RDMAControlHeader unreg_resp = { .len = 0,
2792                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
2793                                .repeat = 0,
2794                              };
2795     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
2796                                  .repeat = 1 };
2797     QEMUFileRDMA *rfile = opaque;
2798     RDMAContext *rdma = rfile->rdma;
2799     RDMALocalBlocks *local = &rdma->local_ram_blocks;
2800     RDMAControlHeader head;
2801     RDMARegister *reg, *registers;
2802     RDMACompress *comp;
2803     RDMARegisterResult *reg_result;
2804     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
2805     RDMALocalBlock *block;
2806     void *host_addr;
2807     int ret = 0;
2808     int idx = 0;
2809     int count = 0;
2810     int i = 0;
2811
2812     CHECK_ERROR_STATE();
2813
2814     do {
2815         DDDPRINTF("Waiting for next request %" PRIu64 "...\n", flags);
2816
2817         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
2818
2819         if (ret < 0) {
2820             break;
2821         }
2822
2823         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
2824             fprintf(stderr, "rdma: Too many requests in this message (%d)."
2825                             "Bailing.\n", head.repeat);
2826             ret = -EIO;
2827             break;
2828         }
2829
2830         switch (head.type) {
2831         case RDMA_CONTROL_COMPRESS:
2832             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
2833             network_to_compress(comp);
2834
2835             DDPRINTF("Zapping zero chunk: %" PRId64
2836                     " bytes, index %d, offset %" PRId64 "\n",
2837                     comp->length, comp->block_idx, comp->offset);
2838             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
2839
2840             host_addr = block->local_host_addr +
2841                             (comp->offset - block->offset);
2842
2843             ram_handle_compressed(host_addr, comp->value, comp->length);
2844             break;
2845
2846         case RDMA_CONTROL_REGISTER_FINISHED:
2847             DDDPRINTF("Current registrations complete.\n");
2848             goto out;
2849
2850         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
2851             DPRINTF("Initial setup info requested.\n");
2852
2853             if (rdma->pin_all) {
2854                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
2855                 if (ret) {
2856                     fprintf(stderr, "rdma migration: error dest "
2857                                     "registering ram blocks!\n");
2858                     goto out;
2859                 }
2860             }
2861
2862             /*
2863              * Dest uses this to prepare to transmit the RAMBlock descriptions
2864              * to the source VM after connection setup.
2865              * Both sides use the "remote" structure to communicate and update
2866              * their "local" descriptions with what was sent.
2867              */
2868             for (i = 0; i < local->nb_blocks; i++) {
2869                 rdma->block[i].remote_host_addr =
2870                     (uint64_t)(local->block[i].local_host_addr);
2871
2872                 if (rdma->pin_all) {
2873                     rdma->block[i].remote_rkey = local->block[i].mr->rkey;
2874                 }
2875
2876                 rdma->block[i].offset = local->block[i].offset;
2877                 rdma->block[i].length = local->block[i].length;
2878
2879                 remote_block_to_network(&rdma->block[i]);
2880             }
2881
2882             blocks.len = rdma->local_ram_blocks.nb_blocks
2883                                                 * sizeof(RDMARemoteBlock);
2884
2885
2886             ret = qemu_rdma_post_send_control(rdma,
2887                                         (uint8_t *) rdma->block, &blocks);
2888
2889             if (ret < 0) {
2890                 fprintf(stderr, "rdma migration: error sending remote info!\n");
2891                 goto out;
2892             }
2893
2894             break;
2895         case RDMA_CONTROL_REGISTER_REQUEST:
2896             DDPRINTF("There are %d registration requests\n", head.repeat);
2897
2898             reg_resp.repeat = head.repeat;
2899             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
2900
2901             for (count = 0; count < head.repeat; count++) {
2902                 uint64_t chunk;
2903                 uint8_t *chunk_start, *chunk_end;
2904
2905                 reg = &registers[count];
2906                 network_to_register(reg);
2907
2908                 reg_result = &results[count];
2909
2910                 DDPRINTF("Registration request (%d): index %d, current_addr %"
2911                          PRIu64 " chunks: %" PRIu64 "\n", count,
2912                          reg->current_index, reg->key.current_addr, reg->chunks);
2913
2914                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
2915                 if (block->is_ram_block) {
2916                     host_addr = (block->local_host_addr +
2917                                 (reg->key.current_addr - block->offset));
2918                     chunk = ram_chunk_index(block->local_host_addr,
2919                                             (uint8_t *) host_addr);
2920                 } else {
2921                     chunk = reg->key.chunk;
2922                     host_addr = block->local_host_addr +
2923                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
2924                 }
2925                 chunk_start = ram_chunk_start(block, chunk);
2926                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
2927                 if (qemu_rdma_register_and_get_keys(rdma, block,
2928                             (uint8_t *)host_addr, NULL, &reg_result->rkey,
2929                             chunk, chunk_start, chunk_end)) {
2930                     fprintf(stderr, "cannot get rkey!\n");
2931                     ret = -EINVAL;
2932                     goto out;
2933                 }
2934
2935                 reg_result->host_addr = (uint64_t) block->local_host_addr;
2936
2937                 DDPRINTF("Registered rkey for this request: %x\n",
2938                                 reg_result->rkey);
2939
2940                 result_to_network(reg_result);
2941             }
2942
2943             ret = qemu_rdma_post_send_control(rdma,
2944                             (uint8_t *) results, &reg_resp);
2945
2946             if (ret < 0) {
2947                 fprintf(stderr, "Failed to send control buffer!\n");
2948                 goto out;
2949             }
2950             break;
2951         case RDMA_CONTROL_UNREGISTER_REQUEST:
2952             DDPRINTF("There are %d unregistration requests\n", head.repeat);
2953             unreg_resp.repeat = head.repeat;
2954             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
2955
2956             for (count = 0; count < head.repeat; count++) {
2957                 reg = &registers[count];
2958                 network_to_register(reg);
2959
2960                 DDPRINTF("Unregistration request (%d): "
2961                          " index %d, chunk %" PRIu64 "\n",
2962                          count, reg->current_index, reg->key.chunk);
2963
2964                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
2965
2966                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
2967                 block->pmr[reg->key.chunk] = NULL;
2968
2969                 if (ret != 0) {
2970                     perror("rdma unregistration chunk failed");
2971                     ret = -ret;
2972                     goto out;
2973                 }
2974
2975                 rdma->total_registrations--;
2976
2977                 DDPRINTF("Unregistered chunk %" PRIu64 " successfully.\n",
2978                             reg->key.chunk);
2979             }
2980
2981             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
2982
2983             if (ret < 0) {
2984                 fprintf(stderr, "Failed to send control buffer!\n");
2985                 goto out;
2986             }
2987             break;
2988         case RDMA_CONTROL_REGISTER_RESULT:
2989             fprintf(stderr, "Invalid RESULT message at dest.\n");
2990             ret = -EIO;
2991             goto out;
2992         default:
2993             fprintf(stderr, "Unknown control message %s\n",
2994                                 control_desc[head.type]);
2995             ret = -EIO;
2996             goto out;
2997         }
2998     } while (1);
2999 out:
3000     if (ret < 0) {
3001         rdma->error_state = ret;
3002     }
3003     return ret;
3004 }
3005
3006 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
3007                                         uint64_t flags)
3008 {
3009     QEMUFileRDMA *rfile = opaque;
3010     RDMAContext *rdma = rfile->rdma;
3011
3012     CHECK_ERROR_STATE();
3013
3014     DDDPRINTF("start section: %" PRIu64 "\n", flags);
3015     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
3016     qemu_fflush(f);
3017
3018     return 0;
3019 }
3020
3021 /*
3022  * Inform dest that dynamic registrations are done for now.
3023  * First, flush writes, if any.
3024  */
3025 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
3026                                        uint64_t flags)
3027 {
3028     Error *local_err = NULL, **errp = &local_err;
3029     QEMUFileRDMA *rfile = opaque;
3030     RDMAContext *rdma = rfile->rdma;
3031     RDMAControlHeader head = { .len = 0, .repeat = 1 };
3032     int ret = 0;
3033
3034     CHECK_ERROR_STATE();
3035
3036     qemu_fflush(f);
3037     ret = qemu_rdma_drain_cq(f, rdma);
3038
3039     if (ret < 0) {
3040         goto err;
3041     }
3042
3043     if (flags == RAM_CONTROL_SETUP) {
3044         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3045         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3046         int reg_result_idx, i, j, nb_remote_blocks;
3047
3048         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3049         DPRINTF("Sending registration setup for ram blocks...\n");
3050
3051         /*
3052          * Make sure that we parallelize the pinning on both sides.
3053          * For very large guests, doing this serially takes a really
3054          * long time, so we have to 'interleave' the pinning locally
3055          * with the control messages by performing the pinning on this
3056          * side before we receive the control response from the other
3057          * side that the pinning has completed.
3058          */
3059         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3060                     &reg_result_idx, rdma->pin_all ?
3061                     qemu_rdma_reg_whole_ram_blocks : NULL);
3062         if (ret < 0) {
3063             ERROR(errp, "receiving remote info!");
3064             return ret;
3065         }
3066
3067         nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
3068
3069         /*
3070          * The protocol uses two different sets of rkeys (mutually exclusive):
3071          * 1. One key to represent the virtual address of the entire ram block.
3072          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3073          * 2. One to represent individual chunks within a ram block.
3074          *    (dynamic chunk registration enabled - pin individual chunks.)
3075          *
3076          * Once the capability is successfully negotiated, the destination transmits
3077          * the keys to use (or sends them later) including the virtual addresses
3078          * and then propagates the remote ram block descriptions to his local copy.
3079          */
3080
3081         if (local->nb_blocks != nb_remote_blocks) {
3082             ERROR(errp, "ram blocks mismatch #1! "
3083                         "Your QEMU command line parameters are probably "
3084                         "not identical on both the source and destination.");
3085             return -EINVAL;
3086         }
3087
3088         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3089         memcpy(rdma->block,
3090             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3091         for (i = 0; i < nb_remote_blocks; i++) {
3092             network_to_remote_block(&rdma->block[i]);
3093
3094             /* search local ram blocks */
3095             for (j = 0; j < local->nb_blocks; j++) {
3096                 if (rdma->block[i].offset != local->block[j].offset) {
3097                     continue;
3098                 }
3099
3100                 if (rdma->block[i].length != local->block[j].length) {
3101                     ERROR(errp, "ram blocks mismatch #2! "
3102                         "Your QEMU command line parameters are probably "
3103                         "not identical on both the source and destination.");
3104                     return -EINVAL;
3105                 }
3106                 local->block[j].remote_host_addr =
3107                         rdma->block[i].remote_host_addr;
3108                 local->block[j].remote_rkey = rdma->block[i].remote_rkey;
3109                 break;
3110             }
3111
3112             if (j >= local->nb_blocks) {
3113                 ERROR(errp, "ram blocks mismatch #3! "
3114                         "Your QEMU command line parameters are probably "
3115                         "not identical on both the source and destination.");
3116                 return -EINVAL;
3117             }
3118         }
3119     }
3120
3121     DDDPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
3122
3123     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3124     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3125
3126     if (ret < 0) {
3127         goto err;
3128     }
3129
3130     return 0;
3131 err:
3132     rdma->error_state = ret;
3133     return ret;
3134 }
3135
3136 static int qemu_rdma_get_fd(void *opaque)
3137 {
3138     QEMUFileRDMA *rfile = opaque;
3139     RDMAContext *rdma = rfile->rdma;
3140
3141     return rdma->comp_channel->fd;
3142 }
3143
3144 const QEMUFileOps rdma_read_ops = {
3145     .get_buffer    = qemu_rdma_get_buffer,
3146     .get_fd        = qemu_rdma_get_fd,
3147     .close         = qemu_rdma_close,
3148     .hook_ram_load = qemu_rdma_registration_handle,
3149 };
3150
3151 const QEMUFileOps rdma_write_ops = {
3152     .put_buffer         = qemu_rdma_put_buffer,
3153     .close              = qemu_rdma_close,
3154     .before_ram_iterate = qemu_rdma_registration_start,
3155     .after_ram_iterate  = qemu_rdma_registration_stop,
3156     .save_page          = qemu_rdma_save_page,
3157 };
3158
3159 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3160 {
3161     QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
3162
3163     if (qemu_file_mode_is_not_valid(mode)) {
3164         return NULL;
3165     }
3166
3167     r->rdma = rdma;
3168
3169     if (mode[0] == 'w') {
3170         r->file = qemu_fopen_ops(r, &rdma_write_ops);
3171     } else {
3172         r->file = qemu_fopen_ops(r, &rdma_read_ops);
3173     }
3174
3175     return r->file;
3176 }
3177
3178 static void rdma_accept_incoming_migration(void *opaque)
3179 {
3180     RDMAContext *rdma = opaque;
3181     int ret;
3182     QEMUFile *f;
3183     Error *local_err = NULL, **errp = &local_err;
3184
3185     DPRINTF("Accepting rdma connection...\n");
3186     ret = qemu_rdma_accept(rdma);
3187
3188     if (ret) {
3189         ERROR(errp, "RDMA Migration initialization failed!");
3190         return;
3191     }
3192
3193     DPRINTF("Accepted migration\n");
3194
3195     f = qemu_fopen_rdma(rdma, "rb");
3196     if (f == NULL) {
3197         ERROR(errp, "could not qemu_fopen_rdma!");
3198         qemu_rdma_cleanup(rdma);
3199         return;
3200     }
3201
3202     rdma->migration_started_on_destination = 1;
3203     process_incoming_migration(f);
3204 }
3205
3206 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3207 {
3208     int ret;
3209     RDMAContext *rdma;
3210     Error *local_err = NULL;
3211
3212     DPRINTF("Starting RDMA-based incoming migration\n");
3213     rdma = qemu_rdma_data_init(host_port, &local_err);
3214
3215     if (rdma == NULL) {
3216         goto err;
3217     }
3218
3219     ret = qemu_rdma_dest_init(rdma, &local_err);
3220
3221     if (ret) {
3222         goto err;
3223     }
3224
3225     DPRINTF("qemu_rdma_dest_init success\n");
3226
3227     ret = rdma_listen(rdma->listen_id, 5);
3228
3229     if (ret) {
3230         ERROR(errp, "listening on socket!");
3231         goto err;
3232     }
3233
3234     DPRINTF("rdma_listen success\n");
3235
3236     qemu_set_fd_handler2(rdma->channel->fd, NULL,
3237                          rdma_accept_incoming_migration, NULL,
3238                             (void *)(intptr_t) rdma);
3239     return;
3240 err:
3241     error_propagate(errp, local_err);
3242     g_free(rdma);
3243 }
3244
3245 void rdma_start_outgoing_migration(void *opaque,
3246                             const char *host_port, Error **errp)
3247 {
3248     MigrationState *s = opaque;
3249     Error *local_err = NULL, **temp = &local_err;
3250     RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
3251     int ret = 0;
3252
3253     if (rdma == NULL) {
3254         ERROR(temp, "Failed to initialize RDMA data structures! %d", ret);
3255         goto err;
3256     }
3257
3258     ret = qemu_rdma_source_init(rdma, &local_err,
3259         s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
3260
3261     if (ret) {
3262         goto err;
3263     }
3264
3265     DPRINTF("qemu_rdma_source_init success\n");
3266     ret = qemu_rdma_connect(rdma, &local_err);
3267
3268     if (ret) {
3269         goto err;
3270     }
3271
3272     DPRINTF("qemu_rdma_source_connect success\n");
3273
3274     s->file = qemu_fopen_rdma(rdma, "wb");
3275     migrate_fd_connect(s);
3276     return;
3277 err:
3278     error_propagate(errp, local_err);
3279     g_free(rdma);
3280     migrate_fd_error(s);
3281 }