Merge remote-tracking branch 'stefanha/block' into staging
[sdk/emulator/qemu.git] / migration-rdma.c
1 /*
2  * RDMA protocol and interfaces
3  *
4  * Copyright IBM, Corp. 2010-2013
5  *
6  * Authors:
7  *  Michael R. Hines <mrhines@us.ibm.com>
8  *  Jiuxing Liu <jl@us.ibm.com>
9  *
10  * This work is licensed under the terms of the GNU GPL, version 2 or
11  * later.  See the COPYING file in the top-level directory.
12  *
13  */
14 #include "qemu-common.h"
15 #include "migration/migration.h"
16 #include "migration/qemu-file.h"
17 #include "exec/cpu-common.h"
18 #include "qemu/main-loop.h"
19 #include "qemu/sockets.h"
20 #include "qemu/bitmap.h"
21 #include "block/coroutine.h"
22 #include <stdio.h>
23 #include <sys/types.h>
24 #include <sys/socket.h>
25 #include <netdb.h>
26 #include <arpa/inet.h>
27 #include <string.h>
28 #include <rdma/rdma_cma.h>
29
30 #define DEBUG_RDMA
31 //#define DEBUG_RDMA_VERBOSE
32 //#define DEBUG_RDMA_REALLY_VERBOSE
33
34 #ifdef DEBUG_RDMA
35 #define DPRINTF(fmt, ...) \
36     do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
37 #else
38 #define DPRINTF(fmt, ...) \
39     do { } while (0)
40 #endif
41
42 #ifdef DEBUG_RDMA_VERBOSE
43 #define DDPRINTF(fmt, ...) \
44     do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
45 #else
46 #define DDPRINTF(fmt, ...) \
47     do { } while (0)
48 #endif
49
50 #ifdef DEBUG_RDMA_REALLY_VERBOSE
51 #define DDDPRINTF(fmt, ...) \
52     do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
53 #else
54 #define DDDPRINTF(fmt, ...) \
55     do { } while (0)
56 #endif
57
58 /*
59  * Print and error on both the Monitor and the Log file.
60  */
61 #define ERROR(errp, fmt, ...) \
62     do { \
63         fprintf(stderr, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
64         if (errp && (*(errp) == NULL)) { \
65             error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
66         } \
67     } while (0)
68
69 #define RDMA_RESOLVE_TIMEOUT_MS 10000
70
71 /* Do not merge data if larger than this. */
72 #define RDMA_MERGE_MAX (2 * 1024 * 1024)
73 #define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
74
75 #define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
76
77 /*
78  * This is only for non-live state being migrated.
79  * Instead of RDMA_WRITE messages, we use RDMA_SEND
80  * messages for that state, which requires a different
81  * delivery design than main memory.
82  */
83 #define RDMA_SEND_INCREMENT 32768
84
85 /*
86  * Maximum size infiniband SEND message
87  */
88 #define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
89 #define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
90
91 #define RDMA_CONTROL_VERSION_CURRENT 1
92 /*
93  * Capabilities for negotiation.
94  */
95 #define RDMA_CAPABILITY_PIN_ALL 0x01
96
97 /*
98  * Add the other flags above to this list of known capabilities
99  * as they are introduced.
100  */
101 static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
102
103 #define CHECK_ERROR_STATE() \
104     do { \
105         if (rdma->error_state) { \
106             if (!rdma->error_reported) { \
107                 fprintf(stderr, "RDMA is in an error state waiting migration" \
108                                 " to abort!\n"); \
109                 rdma->error_reported = 1; \
110             } \
111             return rdma->error_state; \
112         } \
113     } while (0);
114
115 /*
116  * A work request ID is 64-bits and we split up these bits
117  * into 3 parts:
118  *
119  * bits 0-15 : type of control message, 2^16
120  * bits 16-29: ram block index, 2^14
121  * bits 30-63: ram block chunk number, 2^34
122  *
123  * The last two bit ranges are only used for RDMA writes,
124  * in order to track their completion and potentially
125  * also track unregistration status of the message.
126  */
127 #define RDMA_WRID_TYPE_SHIFT  0UL
128 #define RDMA_WRID_BLOCK_SHIFT 16UL
129 #define RDMA_WRID_CHUNK_SHIFT 30UL
130
131 #define RDMA_WRID_TYPE_MASK \
132     ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
133
134 #define RDMA_WRID_BLOCK_MASK \
135     (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
136
137 #define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
138
139 /*
140  * RDMA migration protocol:
141  * 1. RDMA Writes (data messages, i.e. RAM)
142  * 2. IB Send/Recv (control channel messages)
143  */
144 enum {
145     RDMA_WRID_NONE = 0,
146     RDMA_WRID_RDMA_WRITE = 1,
147     RDMA_WRID_SEND_CONTROL = 2000,
148     RDMA_WRID_RECV_CONTROL = 4000,
149 };
150
151 const char *wrid_desc[] = {
152     [RDMA_WRID_NONE] = "NONE",
153     [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
154     [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
155     [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
156 };
157
158 /*
159  * Work request IDs for IB SEND messages only (not RDMA writes).
160  * This is used by the migration protocol to transmit
161  * control messages (such as device state and registration commands)
162  *
163  * We could use more WRs, but we have enough for now.
164  */
165 enum {
166     RDMA_WRID_READY = 0,
167     RDMA_WRID_DATA,
168     RDMA_WRID_CONTROL,
169     RDMA_WRID_MAX,
170 };
171
172 /*
173  * SEND/RECV IB Control Messages.
174  */
175 enum {
176     RDMA_CONTROL_NONE = 0,
177     RDMA_CONTROL_ERROR,
178     RDMA_CONTROL_READY,               /* ready to receive */
179     RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
180     RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
181     RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
182     RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
183     RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
184     RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
185     RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
186     RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
187     RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
188 };
189
190 const char *control_desc[] = {
191     [RDMA_CONTROL_NONE] = "NONE",
192     [RDMA_CONTROL_ERROR] = "ERROR",
193     [RDMA_CONTROL_READY] = "READY",
194     [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
195     [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
196     [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
197     [RDMA_CONTROL_COMPRESS] = "COMPRESS",
198     [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
199     [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
200     [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
201     [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
202     [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
203 };
204
205 /*
206  * Memory and MR structures used to represent an IB Send/Recv work request.
207  * This is *not* used for RDMA writes, only IB Send/Recv.
208  */
209 typedef struct {
210     uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
211     struct   ibv_mr *control_mr;               /* registration metadata */
212     size_t   control_len;                      /* length of the message */
213     uint8_t *control_curr;                     /* start of unconsumed bytes */
214 } RDMAWorkRequestData;
215
216 /*
217  * Negotiate RDMA capabilities during connection-setup time.
218  */
219 typedef struct {
220     uint32_t version;
221     uint32_t flags;
222 } RDMACapabilities;
223
224 static void caps_to_network(RDMACapabilities *cap)
225 {
226     cap->version = htonl(cap->version);
227     cap->flags = htonl(cap->flags);
228 }
229
230 static void network_to_caps(RDMACapabilities *cap)
231 {
232     cap->version = ntohl(cap->version);
233     cap->flags = ntohl(cap->flags);
234 }
235
236 /*
237  * Representation of a RAMBlock from an RDMA perspective.
238  * This is not transmitted, only local.
239  * This and subsequent structures cannot be linked lists
240  * because we're using a single IB message to transmit
241  * the information. It's small anyway, so a list is overkill.
242  */
243 typedef struct RDMALocalBlock {
244     uint8_t  *local_host_addr; /* local virtual address */
245     uint64_t remote_host_addr; /* remote virtual address */
246     uint64_t offset;
247     uint64_t length;
248     struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
249     struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
250     uint32_t *remote_keys;     /* rkeys for chunk-level registration */
251     uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
252     int      index;            /* which block are we */
253     bool     is_ram_block;
254     int      nb_chunks;
255     unsigned long *transit_bitmap;
256     unsigned long *unregister_bitmap;
257 } RDMALocalBlock;
258
259 /*
260  * Also represents a RAMblock, but only on the dest.
261  * This gets transmitted by the dest during connection-time
262  * to the source VM and then is used to populate the
263  * corresponding RDMALocalBlock with
264  * the information needed to perform the actual RDMA.
265  */
266 typedef struct QEMU_PACKED RDMARemoteBlock {
267     uint64_t remote_host_addr;
268     uint64_t offset;
269     uint64_t length;
270     uint32_t remote_rkey;
271     uint32_t padding;
272 } RDMARemoteBlock;
273
274 static uint64_t htonll(uint64_t v)
275 {
276     union { uint32_t lv[2]; uint64_t llv; } u;
277     u.lv[0] = htonl(v >> 32);
278     u.lv[1] = htonl(v & 0xFFFFFFFFULL);
279     return u.llv;
280 }
281
282 static uint64_t ntohll(uint64_t v) {
283     union { uint32_t lv[2]; uint64_t llv; } u;
284     u.llv = v;
285     return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
286 }
287
288 static void remote_block_to_network(RDMARemoteBlock *rb)
289 {
290     rb->remote_host_addr = htonll(rb->remote_host_addr);
291     rb->offset = htonll(rb->offset);
292     rb->length = htonll(rb->length);
293     rb->remote_rkey = htonl(rb->remote_rkey);
294 }
295
296 static void network_to_remote_block(RDMARemoteBlock *rb)
297 {
298     rb->remote_host_addr = ntohll(rb->remote_host_addr);
299     rb->offset = ntohll(rb->offset);
300     rb->length = ntohll(rb->length);
301     rb->remote_rkey = ntohl(rb->remote_rkey);
302 }
303
304 /*
305  * Virtual address of the above structures used for transmitting
306  * the RAMBlock descriptions at connection-time.
307  * This structure is *not* transmitted.
308  */
309 typedef struct RDMALocalBlocks {
310     int nb_blocks;
311     bool     init;             /* main memory init complete */
312     RDMALocalBlock *block;
313 } RDMALocalBlocks;
314
315 /*
316  * Main data structure for RDMA state.
317  * While there is only one copy of this structure being allocated right now,
318  * this is the place where one would start if you wanted to consider
319  * having more than one RDMA connection open at the same time.
320  */
321 typedef struct RDMAContext {
322     char *host;
323     int port;
324
325     RDMAWorkRequestData wr_data[RDMA_WRID_MAX + 1];
326
327     /*
328      * This is used by *_exchange_send() to figure out whether or not
329      * the initial "READY" message has already been received or not.
330      * This is because other functions may potentially poll() and detect
331      * the READY message before send() does, in which case we need to
332      * know if it completed.
333      */
334     int control_ready_expected;
335
336     /* number of outstanding writes */
337     int nb_sent;
338
339     /* store info about current buffer so that we can
340        merge it with future sends */
341     uint64_t current_addr;
342     uint64_t current_length;
343     /* index of ram block the current buffer belongs to */
344     int current_index;
345     /* index of the chunk in the current ram block */
346     int current_chunk;
347
348     bool pin_all;
349
350     /*
351      * infiniband-specific variables for opening the device
352      * and maintaining connection state and so forth.
353      *
354      * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
355      * cm_id->verbs, cm_id->channel, and cm_id->qp.
356      */
357     struct rdma_cm_id *cm_id;               /* connection manager ID */
358     struct rdma_cm_id *listen_id;
359
360     struct ibv_context          *verbs;
361     struct rdma_event_channel   *channel;
362     struct ibv_qp *qp;                      /* queue pair */
363     struct ibv_comp_channel *comp_channel;  /* completion channel */
364     struct ibv_pd *pd;                      /* protection domain */
365     struct ibv_cq *cq;                      /* completion queue */
366
367     /*
368      * If a previous write failed (perhaps because of a failed
369      * memory registration, then do not attempt any future work
370      * and remember the error state.
371      */
372     int error_state;
373     int error_reported;
374
375     /*
376      * Description of ram blocks used throughout the code.
377      */
378     RDMALocalBlocks local_ram_blocks;
379     RDMARemoteBlock *block;
380
381     /*
382      * Migration on *destination* started.
383      * Then use coroutine yield function.
384      * Source runs in a thread, so we don't care.
385      */
386     int migration_started_on_destination;
387
388     int total_registrations;
389     int total_writes;
390
391     int unregister_current, unregister_next;
392     uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
393
394     GHashTable *blockmap;
395 } RDMAContext;
396
397 /*
398  * Interface to the rest of the migration call stack.
399  */
400 typedef struct QEMUFileRDMA {
401     RDMAContext *rdma;
402     size_t len;
403     void *file;
404 } QEMUFileRDMA;
405
406 /*
407  * Main structure for IB Send/Recv control messages.
408  * This gets prepended at the beginning of every Send/Recv.
409  */
410 typedef struct QEMU_PACKED {
411     uint32_t len;     /* Total length of data portion */
412     uint32_t type;    /* which control command to perform */
413     uint32_t repeat;  /* number of commands in data portion of same type */
414     uint32_t padding;
415 } RDMAControlHeader;
416
417 static void control_to_network(RDMAControlHeader *control)
418 {
419     control->type = htonl(control->type);
420     control->len = htonl(control->len);
421     control->repeat = htonl(control->repeat);
422 }
423
424 static void network_to_control(RDMAControlHeader *control)
425 {
426     control->type = ntohl(control->type);
427     control->len = ntohl(control->len);
428     control->repeat = ntohl(control->repeat);
429 }
430
431 /*
432  * Register a single Chunk.
433  * Information sent by the source VM to inform the dest
434  * to register an single chunk of memory before we can perform
435  * the actual RDMA operation.
436  */
437 typedef struct QEMU_PACKED {
438     union QEMU_PACKED {
439         uint64_t current_addr;  /* offset into the ramblock of the chunk */
440         uint64_t chunk;         /* chunk to lookup if unregistering */
441     } key;
442     uint32_t current_index; /* which ramblock the chunk belongs to */
443     uint32_t padding;
444     uint64_t chunks;            /* how many sequential chunks to register */
445 } RDMARegister;
446
447 static void register_to_network(RDMARegister *reg)
448 {
449     reg->key.current_addr = htonll(reg->key.current_addr);
450     reg->current_index = htonl(reg->current_index);
451     reg->chunks = htonll(reg->chunks);
452 }
453
454 static void network_to_register(RDMARegister *reg)
455 {
456     reg->key.current_addr = ntohll(reg->key.current_addr);
457     reg->current_index = ntohl(reg->current_index);
458     reg->chunks = ntohll(reg->chunks);
459 }
460
461 typedef struct QEMU_PACKED {
462     uint32_t value;     /* if zero, we will madvise() */
463     uint32_t block_idx; /* which ram block index */
464     uint64_t offset;    /* where in the remote ramblock this chunk */
465     uint64_t length;    /* length of the chunk */
466 } RDMACompress;
467
468 static void compress_to_network(RDMACompress *comp)
469 {
470     comp->value = htonl(comp->value);
471     comp->block_idx = htonl(comp->block_idx);
472     comp->offset = htonll(comp->offset);
473     comp->length = htonll(comp->length);
474 }
475
476 static void network_to_compress(RDMACompress *comp)
477 {
478     comp->value = ntohl(comp->value);
479     comp->block_idx = ntohl(comp->block_idx);
480     comp->offset = ntohll(comp->offset);
481     comp->length = ntohll(comp->length);
482 }
483
484 /*
485  * The result of the dest's memory registration produces an "rkey"
486  * which the source VM must reference in order to perform
487  * the RDMA operation.
488  */
489 typedef struct QEMU_PACKED {
490     uint32_t rkey;
491     uint32_t padding;
492     uint64_t host_addr;
493 } RDMARegisterResult;
494
495 static void result_to_network(RDMARegisterResult *result)
496 {
497     result->rkey = htonl(result->rkey);
498     result->host_addr = htonll(result->host_addr);
499 };
500
501 static void network_to_result(RDMARegisterResult *result)
502 {
503     result->rkey = ntohl(result->rkey);
504     result->host_addr = ntohll(result->host_addr);
505 };
506
507 const char *print_wrid(int wrid);
508 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
509                                    uint8_t *data, RDMAControlHeader *resp,
510                                    int *resp_idx,
511                                    int (*callback)(RDMAContext *rdma));
512
513 static inline uint64_t ram_chunk_index(uint8_t *start, uint8_t *host)
514 {
515     return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
516 }
517
518 static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block,
519                                        uint64_t i)
520 {
521     return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
522                                     + (i << RDMA_REG_CHUNK_SHIFT));
523 }
524
525 static inline uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, uint64_t i)
526 {
527     uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
528                                          (1UL << RDMA_REG_CHUNK_SHIFT);
529
530     if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
531         result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
532     }
533
534     return result;
535 }
536
537 static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
538                          ram_addr_t block_offset, uint64_t length)
539 {
540     RDMALocalBlocks *local = &rdma->local_ram_blocks;
541     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
542         (void *) block_offset);
543     RDMALocalBlock *old = local->block;
544
545     assert(block == NULL);
546
547     local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
548
549     if (local->nb_blocks) {
550         int x;
551
552         for (x = 0; x < local->nb_blocks; x++) {
553             g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
554             g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
555                                                 &local->block[x]);
556         }
557         memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
558         g_free(old);
559     }
560
561     block = &local->block[local->nb_blocks];
562
563     block->local_host_addr = host_addr;
564     block->offset = block_offset;
565     block->length = length;
566     block->index = local->nb_blocks;
567     block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
568     block->transit_bitmap = bitmap_new(block->nb_chunks);
569     bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
570     block->unregister_bitmap = bitmap_new(block->nb_chunks);
571     bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
572     block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
573
574     block->is_ram_block = local->init ? false : true;
575
576     g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
577
578     DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
579            " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
580             local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
581             block->length, (uint64_t) (block->local_host_addr + block->length),
582                 BITS_TO_LONGS(block->nb_chunks) *
583                     sizeof(unsigned long) * 8, block->nb_chunks);
584
585     local->nb_blocks++;
586
587     return 0;
588 }
589
590 /*
591  * Memory regions need to be registered with the device and queue pairs setup
592  * in advanced before the migration starts. This tells us where the RAM blocks
593  * are so that we can register them individually.
594  */
595 static void qemu_rdma_init_one_block(void *host_addr,
596     ram_addr_t block_offset, ram_addr_t length, void *opaque)
597 {
598     __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
599 }
600
601 /*
602  * Identify the RAMBlocks and their quantity. They will be references to
603  * identify chunk boundaries inside each RAMBlock and also be referenced
604  * during dynamic page registration.
605  */
606 static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
607 {
608     RDMALocalBlocks *local = &rdma->local_ram_blocks;
609
610     assert(rdma->blockmap == NULL);
611     rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
612     memset(local, 0, sizeof *local);
613     qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
614     DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
615     rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
616                         rdma->local_ram_blocks.nb_blocks);
617     local->init = true;
618     return 0;
619 }
620
621 static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
622 {
623     RDMALocalBlocks *local = &rdma->local_ram_blocks;
624     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
625         (void *) block_offset);
626     RDMALocalBlock *old = local->block;
627     int x;
628
629     assert(block);
630
631     if (block->pmr) {
632         int j;
633
634         for (j = 0; j < block->nb_chunks; j++) {
635             if (!block->pmr[j]) {
636                 continue;
637             }
638             ibv_dereg_mr(block->pmr[j]);
639             rdma->total_registrations--;
640         }
641         g_free(block->pmr);
642         block->pmr = NULL;
643     }
644
645     if (block->mr) {
646         ibv_dereg_mr(block->mr);
647         rdma->total_registrations--;
648         block->mr = NULL;
649     }
650
651     g_free(block->transit_bitmap);
652     block->transit_bitmap = NULL;
653
654     g_free(block->unregister_bitmap);
655     block->unregister_bitmap = NULL;
656
657     g_free(block->remote_keys);
658     block->remote_keys = NULL;
659
660     for (x = 0; x < local->nb_blocks; x++) {
661         g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
662     }
663
664     if (local->nb_blocks > 1) {
665
666         local->block = g_malloc0(sizeof(RDMALocalBlock) *
667                                     (local->nb_blocks - 1));
668
669         if (block->index) {
670             memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
671         }
672
673         if (block->index < (local->nb_blocks - 1)) {
674             memcpy(local->block + block->index, old + (block->index + 1),
675                 sizeof(RDMALocalBlock) *
676                     (local->nb_blocks - (block->index + 1)));
677         }
678     } else {
679         assert(block == local->block);
680         local->block = NULL;
681     }
682
683     DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
684            " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
685             local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
686             block->length, (uint64_t) (block->local_host_addr + block->length),
687                 BITS_TO_LONGS(block->nb_chunks) *
688                     sizeof(unsigned long) * 8, block->nb_chunks);
689
690     g_free(old);
691
692     local->nb_blocks--;
693
694     if (local->nb_blocks) {
695         for (x = 0; x < local->nb_blocks; x++) {
696             g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
697                                                 &local->block[x]);
698         }
699     }
700
701     return 0;
702 }
703
704 /*
705  * Put in the log file which RDMA device was opened and the details
706  * associated with that device.
707  */
708 static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
709 {
710     printf("%s RDMA Device opened: kernel name %s "
711            "uverbs device name %s, "
712            "infiniband_verbs class device path %s,"
713            " infiniband class device path %s\n",
714                 who,
715                 verbs->device->name,
716                 verbs->device->dev_name,
717                 verbs->device->dev_path,
718                 verbs->device->ibdev_path);
719 }
720
721 /*
722  * Put in the log file the RDMA gid addressing information,
723  * useful for folks who have trouble understanding the
724  * RDMA device hierarchy in the kernel.
725  */
726 static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
727 {
728     char sgid[33];
729     char dgid[33];
730     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
731     inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
732     DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
733 }
734
735 /*
736  * Figure out which RDMA device corresponds to the requested IP hostname
737  * Also create the initial connection manager identifiers for opening
738  * the connection.
739  */
740 static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
741 {
742     int ret;
743     struct addrinfo *res;
744     char port_str[16];
745     struct rdma_cm_event *cm_event;
746     char ip[40] = "unknown";
747
748     if (rdma->host == NULL || !strcmp(rdma->host, "")) {
749         ERROR(errp, "RDMA hostname has not been set\n");
750         return -1;
751     }
752
753     /* create CM channel */
754     rdma->channel = rdma_create_event_channel();
755     if (!rdma->channel) {
756         ERROR(errp, "could not create CM channel\n");
757         return -1;
758     }
759
760     /* create CM id */
761     ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
762     if (ret) {
763         ERROR(errp, "could not create channel id\n");
764         goto err_resolve_create_id;
765     }
766
767     snprintf(port_str, 16, "%d", rdma->port);
768     port_str[15] = '\0';
769
770     ret = getaddrinfo(rdma->host, port_str, NULL, &res);
771     if (ret < 0) {
772         ERROR(errp, "could not getaddrinfo address %s\n", rdma->host);
773         goto err_resolve_get_addr;
774     }
775
776     inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
777                                 ip, sizeof ip);
778     DPRINTF("%s => %s\n", rdma->host, ip);
779
780     /* resolve the first address */
781     ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
782             RDMA_RESOLVE_TIMEOUT_MS);
783     if (ret) {
784         ERROR(errp, "could not resolve address %s\n", rdma->host);
785         goto err_resolve_get_addr;
786     }
787
788     qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
789
790     ret = rdma_get_cm_event(rdma->channel, &cm_event);
791     if (ret) {
792         ERROR(errp, "could not perform event_addr_resolved\n");
793         goto err_resolve_get_addr;
794     }
795
796     if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
797         ERROR(errp, "result not equal to event_addr_resolved %s\n",
798                 rdma_event_str(cm_event->event));
799         perror("rdma_resolve_addr");
800         goto err_resolve_get_addr;
801     }
802     rdma_ack_cm_event(cm_event);
803
804     /* resolve route */
805     ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
806     if (ret) {
807         ERROR(errp, "could not resolve rdma route\n");
808         goto err_resolve_get_addr;
809     }
810
811     ret = rdma_get_cm_event(rdma->channel, &cm_event);
812     if (ret) {
813         ERROR(errp, "could not perform event_route_resolved\n");
814         goto err_resolve_get_addr;
815     }
816     if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
817         ERROR(errp, "result not equal to event_route_resolved: %s\n",
818                         rdma_event_str(cm_event->event));
819         rdma_ack_cm_event(cm_event);
820         goto err_resolve_get_addr;
821     }
822     rdma_ack_cm_event(cm_event);
823     rdma->verbs = rdma->cm_id->verbs;
824     qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
825     qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
826     return 0;
827
828 err_resolve_get_addr:
829     rdma_destroy_id(rdma->cm_id);
830     rdma->cm_id = NULL;
831 err_resolve_create_id:
832     rdma_destroy_event_channel(rdma->channel);
833     rdma->channel = NULL;
834
835     return -1;
836 }
837
838 /*
839  * Create protection domain and completion queues
840  */
841 static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
842 {
843     /* allocate pd */
844     rdma->pd = ibv_alloc_pd(rdma->verbs);
845     if (!rdma->pd) {
846         fprintf(stderr, "failed to allocate protection domain\n");
847         return -1;
848     }
849
850     /* create completion channel */
851     rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
852     if (!rdma->comp_channel) {
853         fprintf(stderr, "failed to allocate completion channel\n");
854         goto err_alloc_pd_cq;
855     }
856
857     /*
858      * Completion queue can be filled by both read and write work requests,
859      * so must reflect the sum of both possible queue sizes.
860      */
861     rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
862             NULL, rdma->comp_channel, 0);
863     if (!rdma->cq) {
864         fprintf(stderr, "failed to allocate completion queue\n");
865         goto err_alloc_pd_cq;
866     }
867
868     return 0;
869
870 err_alloc_pd_cq:
871     if (rdma->pd) {
872         ibv_dealloc_pd(rdma->pd);
873     }
874     if (rdma->comp_channel) {
875         ibv_destroy_comp_channel(rdma->comp_channel);
876     }
877     rdma->pd = NULL;
878     rdma->comp_channel = NULL;
879     return -1;
880
881 }
882
883 /*
884  * Create queue pairs.
885  */
886 static int qemu_rdma_alloc_qp(RDMAContext *rdma)
887 {
888     struct ibv_qp_init_attr attr = { 0 };
889     int ret;
890
891     attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
892     attr.cap.max_recv_wr = 3;
893     attr.cap.max_send_sge = 1;
894     attr.cap.max_recv_sge = 1;
895     attr.send_cq = rdma->cq;
896     attr.recv_cq = rdma->cq;
897     attr.qp_type = IBV_QPT_RC;
898
899     ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
900     if (ret) {
901         return -1;
902     }
903
904     rdma->qp = rdma->cm_id->qp;
905     return 0;
906 }
907
908 static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
909 {
910     int i;
911     RDMALocalBlocks *local = &rdma->local_ram_blocks;
912
913     for (i = 0; i < local->nb_blocks; i++) {
914         local->block[i].mr =
915             ibv_reg_mr(rdma->pd,
916                     local->block[i].local_host_addr,
917                     local->block[i].length,
918                     IBV_ACCESS_LOCAL_WRITE |
919                     IBV_ACCESS_REMOTE_WRITE
920                     );
921         if (!local->block[i].mr) {
922             perror("Failed to register local dest ram block!\n");
923             break;
924         }
925         rdma->total_registrations++;
926     }
927
928     if (i >= local->nb_blocks) {
929         return 0;
930     }
931
932     for (i--; i >= 0; i--) {
933         ibv_dereg_mr(local->block[i].mr);
934         rdma->total_registrations--;
935     }
936
937     return -1;
938
939 }
940
941 /*
942  * Find the ram block that corresponds to the page requested to be
943  * transmitted by QEMU.
944  *
945  * Once the block is found, also identify which 'chunk' within that
946  * block that the page belongs to.
947  *
948  * This search cannot fail or the migration will fail.
949  */
950 static int qemu_rdma_search_ram_block(RDMAContext *rdma,
951                                       uint64_t block_offset,
952                                       uint64_t offset,
953                                       uint64_t length,
954                                       uint64_t *block_index,
955                                       uint64_t *chunk_index)
956 {
957     uint64_t current_addr = block_offset + offset;
958     RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
959                                                 (void *) block_offset);
960     assert(block);
961     assert(current_addr >= block->offset);
962     assert((current_addr + length) <= (block->offset + block->length));
963
964     *block_index = block->index;
965     *chunk_index = ram_chunk_index(block->local_host_addr,
966                 block->local_host_addr + (current_addr - block->offset));
967
968     return 0;
969 }
970
971 /*
972  * Register a chunk with IB. If the chunk was already registered
973  * previously, then skip.
974  *
975  * Also return the keys associated with the registration needed
976  * to perform the actual RDMA operation.
977  */
978 static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
979         RDMALocalBlock *block, uint8_t *host_addr,
980         uint32_t *lkey, uint32_t *rkey, int chunk,
981         uint8_t *chunk_start, uint8_t *chunk_end)
982 {
983     if (block->mr) {
984         if (lkey) {
985             *lkey = block->mr->lkey;
986         }
987         if (rkey) {
988             *rkey = block->mr->rkey;
989         }
990         return 0;
991     }
992
993     /* allocate memory to store chunk MRs */
994     if (!block->pmr) {
995         block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
996         if (!block->pmr) {
997             return -1;
998         }
999     }
1000
1001     /*
1002      * If 'rkey', then we're the destination, so grant access to the source.
1003      *
1004      * If 'lkey', then we're the source VM, so grant access only to ourselves.
1005      */
1006     if (!block->pmr[chunk]) {
1007         uint64_t len = chunk_end - chunk_start;
1008
1009         DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
1010                  len, chunk_start);
1011
1012         block->pmr[chunk] = ibv_reg_mr(rdma->pd,
1013                 chunk_start, len,
1014                 (rkey ? (IBV_ACCESS_LOCAL_WRITE |
1015                         IBV_ACCESS_REMOTE_WRITE) : 0));
1016
1017         if (!block->pmr[chunk]) {
1018             perror("Failed to register chunk!");
1019             fprintf(stderr, "Chunk details: block: %d chunk index %d"
1020                             " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
1021                             " local %" PRIu64 " registrations: %d\n",
1022                             block->index, chunk, (uint64_t) chunk_start,
1023                             (uint64_t) chunk_end, (uint64_t) host_addr,
1024                             (uint64_t) block->local_host_addr,
1025                             rdma->total_registrations);
1026             return -1;
1027         }
1028         rdma->total_registrations++;
1029     }
1030
1031     if (lkey) {
1032         *lkey = block->pmr[chunk]->lkey;
1033     }
1034     if (rkey) {
1035         *rkey = block->pmr[chunk]->rkey;
1036     }
1037     return 0;
1038 }
1039
1040 /*
1041  * Register (at connection time) the memory used for control
1042  * channel messages.
1043  */
1044 static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
1045 {
1046     rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
1047             rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
1048             IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
1049     if (rdma->wr_data[idx].control_mr) {
1050         rdma->total_registrations++;
1051         return 0;
1052     }
1053     fprintf(stderr, "qemu_rdma_reg_control failed!\n");
1054     return -1;
1055 }
1056
1057 const char *print_wrid(int wrid)
1058 {
1059     if (wrid >= RDMA_WRID_RECV_CONTROL) {
1060         return wrid_desc[RDMA_WRID_RECV_CONTROL];
1061     }
1062     return wrid_desc[wrid];
1063 }
1064
1065 /*
1066  * RDMA requires memory registration (mlock/pinning), but this is not good for
1067  * overcommitment.
1068  *
1069  * In preparation for the future where LRU information or workload-specific
1070  * writable writable working set memory access behavior is available to QEMU
1071  * it would be nice to have in place the ability to UN-register/UN-pin
1072  * particular memory regions from the RDMA hardware when it is determine that
1073  * those regions of memory will likely not be accessed again in the near future.
1074  *
1075  * While we do not yet have such information right now, the following
1076  * compile-time option allows us to perform a non-optimized version of this
1077  * behavior.
1078  *
1079  * By uncommenting this option, you will cause *all* RDMA transfers to be
1080  * unregistered immediately after the transfer completes on both sides of the
1081  * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
1082  *
1083  * This will have a terrible impact on migration performance, so until future
1084  * workload information or LRU information is available, do not attempt to use
1085  * this feature except for basic testing.
1086  */
1087 //#define RDMA_UNREGISTRATION_EXAMPLE
1088
1089 /*
1090  * Perform a non-optimized memory unregistration after every transfer
1091  * for demonsration purposes, only if pin-all is not requested.
1092  *
1093  * Potential optimizations:
1094  * 1. Start a new thread to run this function continuously
1095         - for bit clearing
1096         - and for receipt of unregister messages
1097  * 2. Use an LRU.
1098  * 3. Use workload hints.
1099  */
1100 static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
1101 {
1102     while (rdma->unregistrations[rdma->unregister_current]) {
1103         int ret;
1104         uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
1105         uint64_t chunk =
1106             (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1107         uint64_t index =
1108             (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1109         RDMALocalBlock *block =
1110             &(rdma->local_ram_blocks.block[index]);
1111         RDMARegister reg = { .current_index = index };
1112         RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
1113                                  };
1114         RDMAControlHeader head = { .len = sizeof(RDMARegister),
1115                                    .type = RDMA_CONTROL_UNREGISTER_REQUEST,
1116                                    .repeat = 1,
1117                                  };
1118
1119         DDPRINTF("Processing unregister for chunk: %" PRIu64
1120                  " at position %d\n", chunk, rdma->unregister_current);
1121
1122         rdma->unregistrations[rdma->unregister_current] = 0;
1123         rdma->unregister_current++;
1124
1125         if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
1126             rdma->unregister_current = 0;
1127         }
1128
1129
1130         /*
1131          * Unregistration is speculative (because migration is single-threaded
1132          * and we cannot break the protocol's inifinband message ordering).
1133          * Thus, if the memory is currently being used for transmission,
1134          * then abort the attempt to unregister and try again
1135          * later the next time a completion is received for this memory.
1136          */
1137         clear_bit(chunk, block->unregister_bitmap);
1138
1139         if (test_bit(chunk, block->transit_bitmap)) {
1140             DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
1141             continue;
1142         }
1143
1144         DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
1145
1146         ret = ibv_dereg_mr(block->pmr[chunk]);
1147         block->pmr[chunk] = NULL;
1148         block->remote_keys[chunk] = 0;
1149
1150         if (ret != 0) {
1151             perror("unregistration chunk failed");
1152             return -ret;
1153         }
1154         rdma->total_registrations--;
1155
1156         reg.key.chunk = chunk;
1157         register_to_network(&reg);
1158         ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1159                                 &resp, NULL, NULL);
1160         if (ret < 0) {
1161             return ret;
1162         }
1163
1164         DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
1165     }
1166
1167     return 0;
1168 }
1169
1170 static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
1171                                          uint64_t chunk)
1172 {
1173     uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
1174
1175     result |= (index << RDMA_WRID_BLOCK_SHIFT);
1176     result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
1177
1178     return result;
1179 }
1180
1181 /*
1182  * Set bit for unregistration in the next iteration.
1183  * We cannot transmit right here, but will unpin later.
1184  */
1185 static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
1186                                         uint64_t chunk, uint64_t wr_id)
1187 {
1188     if (rdma->unregistrations[rdma->unregister_next] != 0) {
1189         fprintf(stderr, "rdma migration: queue is full!\n");
1190     } else {
1191         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1192
1193         if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
1194             DDPRINTF("Appending unregister chunk %" PRIu64
1195                     " at position %d\n", chunk, rdma->unregister_next);
1196
1197             rdma->unregistrations[rdma->unregister_next++] =
1198                     qemu_rdma_make_wrid(wr_id, index, chunk);
1199
1200             if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
1201                 rdma->unregister_next = 0;
1202             }
1203         } else {
1204             DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
1205                     chunk);
1206         }
1207     }
1208 }
1209
1210 /*
1211  * Consult the connection manager to see a work request
1212  * (of any kind) has completed.
1213  * Return the work request ID that completed.
1214  */
1215 static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out)
1216 {
1217     int ret;
1218     struct ibv_wc wc;
1219     uint64_t wr_id;
1220
1221     ret = ibv_poll_cq(rdma->cq, 1, &wc);
1222
1223     if (!ret) {
1224         *wr_id_out = RDMA_WRID_NONE;
1225         return 0;
1226     }
1227
1228     if (ret < 0) {
1229         fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
1230         return ret;
1231     }
1232
1233     wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
1234
1235     if (wc.status != IBV_WC_SUCCESS) {
1236         fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
1237                         wc.status, ibv_wc_status_str(wc.status));
1238         fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
1239
1240         return -1;
1241     }
1242
1243     if (rdma->control_ready_expected &&
1244         (wr_id >= RDMA_WRID_RECV_CONTROL)) {
1245         DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
1246                   " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
1247                   wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
1248         rdma->control_ready_expected = 0;
1249     }
1250
1251     if (wr_id == RDMA_WRID_RDMA_WRITE) {
1252         uint64_t chunk =
1253             (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
1254         uint64_t index =
1255             (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
1256         RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
1257
1258         DDDPRINTF("completions %s (%" PRId64 ") left %d, "
1259                  "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
1260                  print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
1261                  block->local_host_addr, (void *)block->remote_host_addr);
1262
1263         clear_bit(chunk, block->transit_bitmap);
1264
1265         if (rdma->nb_sent > 0) {
1266             rdma->nb_sent--;
1267         }
1268
1269         if (!rdma->pin_all) {
1270             /*
1271              * FYI: If one wanted to signal a specific chunk to be unregistered
1272              * using LRU or workload-specific information, this is the function
1273              * you would call to do so. That chunk would then get asynchronously
1274              * unregistered later.
1275              */
1276 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1277             qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
1278 #endif
1279         }
1280     } else {
1281         DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
1282             print_wrid(wr_id), wr_id, rdma->nb_sent);
1283     }
1284
1285     *wr_id_out = wc.wr_id;
1286
1287     return  0;
1288 }
1289
1290 /*
1291  * Block until the next work request has completed.
1292  *
1293  * First poll to see if a work request has already completed,
1294  * otherwise block.
1295  *
1296  * If we encounter completed work requests for IDs other than
1297  * the one we're interested in, then that's generally an error.
1298  *
1299  * The only exception is actual RDMA Write completions. These
1300  * completions only need to be recorded, but do not actually
1301  * need further processing.
1302  */
1303 static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested)
1304 {
1305     int num_cq_events = 0, ret = 0;
1306     struct ibv_cq *cq;
1307     void *cq_ctx;
1308     uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
1309
1310     if (ibv_req_notify_cq(rdma->cq, 0)) {
1311         return -1;
1312     }
1313     /* poll cq first */
1314     while (wr_id != wrid_requested) {
1315         ret = qemu_rdma_poll(rdma, &wr_id_in);
1316         if (ret < 0) {
1317             return ret;
1318         }
1319
1320         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1321
1322         if (wr_id == RDMA_WRID_NONE) {
1323             break;
1324         }
1325         if (wr_id != wrid_requested) {
1326             DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1327                 print_wrid(wrid_requested),
1328                 wrid_requested, print_wrid(wr_id), wr_id);
1329         }
1330     }
1331
1332     if (wr_id == wrid_requested) {
1333         return 0;
1334     }
1335
1336     while (1) {
1337         /*
1338          * Coroutine doesn't start until process_incoming_migration()
1339          * so don't yield unless we know we're running inside of a coroutine.
1340          */
1341         if (rdma->migration_started_on_destination) {
1342             yield_until_fd_readable(rdma->comp_channel->fd);
1343         }
1344
1345         if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
1346             perror("ibv_get_cq_event");
1347             goto err_block_for_wrid;
1348         }
1349
1350         num_cq_events++;
1351
1352         if (ibv_req_notify_cq(cq, 0)) {
1353             goto err_block_for_wrid;
1354         }
1355
1356         while (wr_id != wrid_requested) {
1357             ret = qemu_rdma_poll(rdma, &wr_id_in);
1358             if (ret < 0) {
1359                 goto err_block_for_wrid;
1360             }
1361
1362             wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
1363
1364             if (wr_id == RDMA_WRID_NONE) {
1365                 break;
1366             }
1367             if (wr_id != wrid_requested) {
1368                 DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
1369                     print_wrid(wrid_requested), wrid_requested,
1370                     print_wrid(wr_id), wr_id);
1371             }
1372         }
1373
1374         if (wr_id == wrid_requested) {
1375             goto success_block_for_wrid;
1376         }
1377     }
1378
1379 success_block_for_wrid:
1380     if (num_cq_events) {
1381         ibv_ack_cq_events(cq, num_cq_events);
1382     }
1383     return 0;
1384
1385 err_block_for_wrid:
1386     if (num_cq_events) {
1387         ibv_ack_cq_events(cq, num_cq_events);
1388     }
1389     return ret;
1390 }
1391
1392 /*
1393  * Post a SEND message work request for the control channel
1394  * containing some data and block until the post completes.
1395  */
1396 static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
1397                                        RDMAControlHeader *head)
1398 {
1399     int ret = 0;
1400     RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_MAX];
1401     struct ibv_send_wr *bad_wr;
1402     struct ibv_sge sge = {
1403                            .addr = (uint64_t)(wr->control),
1404                            .length = head->len + sizeof(RDMAControlHeader),
1405                            .lkey = wr->control_mr->lkey,
1406                          };
1407     struct ibv_send_wr send_wr = {
1408                                    .wr_id = RDMA_WRID_SEND_CONTROL,
1409                                    .opcode = IBV_WR_SEND,
1410                                    .send_flags = IBV_SEND_SIGNALED,
1411                                    .sg_list = &sge,
1412                                    .num_sge = 1,
1413                                 };
1414
1415     DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
1416
1417     /*
1418      * We don't actually need to do a memcpy() in here if we used
1419      * the "sge" properly, but since we're only sending control messages
1420      * (not RAM in a performance-critical path), then its OK for now.
1421      *
1422      * The copy makes the RDMAControlHeader simpler to manipulate
1423      * for the time being.
1424      */
1425     memcpy(wr->control, head, sizeof(RDMAControlHeader));
1426     control_to_network((void *) wr->control);
1427
1428     if (buf) {
1429         memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
1430     }
1431
1432
1433     if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
1434         return -1;
1435     }
1436
1437     if (ret < 0) {
1438         fprintf(stderr, "Failed to use post IB SEND for control!\n");
1439         return ret;
1440     }
1441
1442     ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
1443     if (ret < 0) {
1444         fprintf(stderr, "rdma migration: send polling control error!\n");
1445     }
1446
1447     return ret;
1448 }
1449
1450 /*
1451  * Post a RECV work request in anticipation of some future receipt
1452  * of data on the control channel.
1453  */
1454 static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
1455 {
1456     struct ibv_recv_wr *bad_wr;
1457     struct ibv_sge sge = {
1458                             .addr = (uint64_t)(rdma->wr_data[idx].control),
1459                             .length = RDMA_CONTROL_MAX_BUFFER,
1460                             .lkey = rdma->wr_data[idx].control_mr->lkey,
1461                          };
1462
1463     struct ibv_recv_wr recv_wr = {
1464                                     .wr_id = RDMA_WRID_RECV_CONTROL + idx,
1465                                     .sg_list = &sge,
1466                                     .num_sge = 1,
1467                                  };
1468
1469
1470     if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
1471         return -1;
1472     }
1473
1474     return 0;
1475 }
1476
1477 /*
1478  * Block and wait for a RECV control channel message to arrive.
1479  */
1480 static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
1481                 RDMAControlHeader *head, int expecting, int idx)
1482 {
1483     int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
1484
1485     if (ret < 0) {
1486         fprintf(stderr, "rdma migration: recv polling control error!\n");
1487         return ret;
1488     }
1489
1490     network_to_control((void *) rdma->wr_data[idx].control);
1491     memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
1492
1493     DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
1494
1495     if (expecting == RDMA_CONTROL_NONE) {
1496         DDDPRINTF("Surprise: got %s (%d)\n",
1497                   control_desc[head->type], head->type);
1498     } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
1499         fprintf(stderr, "Was expecting a %s (%d) control message"
1500                 ", but got: %s (%d), length: %d\n",
1501                 control_desc[expecting], expecting,
1502                 control_desc[head->type], head->type, head->len);
1503         return -EIO;
1504     }
1505
1506     return 0;
1507 }
1508
1509 /*
1510  * When a RECV work request has completed, the work request's
1511  * buffer is pointed at the header.
1512  *
1513  * This will advance the pointer to the data portion
1514  * of the control message of the work request's buffer that
1515  * was populated after the work request finished.
1516  */
1517 static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
1518                                   RDMAControlHeader *head)
1519 {
1520     rdma->wr_data[idx].control_len = head->len;
1521     rdma->wr_data[idx].control_curr =
1522         rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
1523 }
1524
1525 /*
1526  * This is an 'atomic' high-level operation to deliver a single, unified
1527  * control-channel message.
1528  *
1529  * Additionally, if the user is expecting some kind of reply to this message,
1530  * they can request a 'resp' response message be filled in by posting an
1531  * additional work request on behalf of the user and waiting for an additional
1532  * completion.
1533  *
1534  * The extra (optional) response is used during registration to us from having
1535  * to perform an *additional* exchange of message just to provide a response by
1536  * instead piggy-backing on the acknowledgement.
1537  */
1538 static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
1539                                    uint8_t *data, RDMAControlHeader *resp,
1540                                    int *resp_idx,
1541                                    int (*callback)(RDMAContext *rdma))
1542 {
1543     int ret = 0;
1544
1545     /*
1546      * Wait until the dest is ready before attempting to deliver the message
1547      * by waiting for a READY message.
1548      */
1549     if (rdma->control_ready_expected) {
1550         RDMAControlHeader resp;
1551         ret = qemu_rdma_exchange_get_response(rdma,
1552                                     &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
1553         if (ret < 0) {
1554             return ret;
1555         }
1556     }
1557
1558     /*
1559      * If the user is expecting a response, post a WR in anticipation of it.
1560      */
1561     if (resp) {
1562         ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
1563         if (ret) {
1564             fprintf(stderr, "rdma migration: error posting"
1565                     " extra control recv for anticipated result!");
1566             return ret;
1567         }
1568     }
1569
1570     /*
1571      * Post a WR to replace the one we just consumed for the READY message.
1572      */
1573     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1574     if (ret) {
1575         fprintf(stderr, "rdma migration: error posting first control recv!");
1576         return ret;
1577     }
1578
1579     /*
1580      * Deliver the control message that was requested.
1581      */
1582     ret = qemu_rdma_post_send_control(rdma, data, head);
1583
1584     if (ret < 0) {
1585         fprintf(stderr, "Failed to send control buffer!\n");
1586         return ret;
1587     }
1588
1589     /*
1590      * If we're expecting a response, block and wait for it.
1591      */
1592     if (resp) {
1593         if (callback) {
1594             DDPRINTF("Issuing callback before receiving response...\n");
1595             ret = callback(rdma);
1596             if (ret < 0) {
1597                 return ret;
1598             }
1599         }
1600
1601         DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
1602         ret = qemu_rdma_exchange_get_response(rdma, resp,
1603                                               resp->type, RDMA_WRID_DATA);
1604
1605         if (ret < 0) {
1606             return ret;
1607         }
1608
1609         qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
1610         if (resp_idx) {
1611             *resp_idx = RDMA_WRID_DATA;
1612         }
1613         DDPRINTF("Response %s received.\n", control_desc[resp->type]);
1614     }
1615
1616     rdma->control_ready_expected = 1;
1617
1618     return 0;
1619 }
1620
1621 /*
1622  * This is an 'atomic' high-level operation to receive a single, unified
1623  * control-channel message.
1624  */
1625 static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
1626                                 int expecting)
1627 {
1628     RDMAControlHeader ready = {
1629                                 .len = 0,
1630                                 .type = RDMA_CONTROL_READY,
1631                                 .repeat = 1,
1632                               };
1633     int ret;
1634
1635     /*
1636      * Inform the source that we're ready to receive a message.
1637      */
1638     ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
1639
1640     if (ret < 0) {
1641         fprintf(stderr, "Failed to send control buffer!\n");
1642         return ret;
1643     }
1644
1645     /*
1646      * Block and wait for the message.
1647      */
1648     ret = qemu_rdma_exchange_get_response(rdma, head,
1649                                           expecting, RDMA_WRID_READY);
1650
1651     if (ret < 0) {
1652         return ret;
1653     }
1654
1655     qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
1656
1657     /*
1658      * Post a new RECV work request to replace the one we just consumed.
1659      */
1660     ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
1661     if (ret) {
1662         fprintf(stderr, "rdma migration: error posting second control recv!");
1663         return ret;
1664     }
1665
1666     return 0;
1667 }
1668
1669 /*
1670  * Write an actual chunk of memory using RDMA.
1671  *
1672  * If we're using dynamic registration on the dest-side, we have to
1673  * send a registration command first.
1674  */
1675 static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
1676                                int current_index, uint64_t current_addr,
1677                                uint64_t length)
1678 {
1679     struct ibv_sge sge;
1680     struct ibv_send_wr send_wr = { 0 };
1681     struct ibv_send_wr *bad_wr;
1682     int reg_result_idx, ret, count = 0;
1683     uint64_t chunk, chunks;
1684     uint8_t *chunk_start, *chunk_end;
1685     RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
1686     RDMARegister reg;
1687     RDMARegisterResult *reg_result;
1688     RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
1689     RDMAControlHeader head = { .len = sizeof(RDMARegister),
1690                                .type = RDMA_CONTROL_REGISTER_REQUEST,
1691                                .repeat = 1,
1692                              };
1693
1694 retry:
1695     sge.addr = (uint64_t)(block->local_host_addr +
1696                             (current_addr - block->offset));
1697     sge.length = length;
1698
1699     chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
1700     chunk_start = ram_chunk_start(block, chunk);
1701
1702     if (block->is_ram_block) {
1703         chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
1704
1705         if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1706             chunks--;
1707         }
1708     } else {
1709         chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
1710
1711         if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
1712             chunks--;
1713         }
1714     }
1715
1716     DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
1717         chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
1718
1719     chunk_end = ram_chunk_end(block, chunk + chunks);
1720
1721     if (!rdma->pin_all) {
1722 #ifdef RDMA_UNREGISTRATION_EXAMPLE
1723         qemu_rdma_unregister_waiting(rdma);
1724 #endif
1725     }
1726
1727     while (test_bit(chunk, block->transit_bitmap)) {
1728         (void)count;
1729         DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
1730                 " current %" PRIu64 " len %" PRIu64 " %d %d\n",
1731                 count++, current_index, chunk,
1732                 sge.addr, length, rdma->nb_sent, block->nb_chunks);
1733
1734         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
1735
1736         if (ret < 0) {
1737             fprintf(stderr, "Failed to Wait for previous write to complete "
1738                     "block %d chunk %" PRIu64
1739                     " current %" PRIu64 " len %" PRIu64 " %d\n",
1740                     current_index, chunk, sge.addr, length, rdma->nb_sent);
1741             return ret;
1742         }
1743     }
1744
1745     if (!rdma->pin_all || !block->is_ram_block) {
1746         if (!block->remote_keys[chunk]) {
1747             /*
1748              * This chunk has not yet been registered, so first check to see
1749              * if the entire chunk is zero. If so, tell the other size to
1750              * memset() + madvise() the entire chunk without RDMA.
1751              */
1752
1753             if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
1754                    && buffer_find_nonzero_offset((void *)sge.addr,
1755                                                     length) == length) {
1756                 RDMACompress comp = {
1757                                         .offset = current_addr,
1758                                         .value = 0,
1759                                         .block_idx = current_index,
1760                                         .length = length,
1761                                     };
1762
1763                 head.len = sizeof(comp);
1764                 head.type = RDMA_CONTROL_COMPRESS;
1765
1766                 DDPRINTF("Entire chunk is zero, sending compress: %"
1767                     PRIu64 " for %d "
1768                     "bytes, index: %d, offset: %" PRId64 "...\n",
1769                     chunk, sge.length, current_index, current_addr);
1770
1771                 compress_to_network(&comp);
1772                 ret = qemu_rdma_exchange_send(rdma, &head,
1773                                 (uint8_t *) &comp, NULL, NULL, NULL);
1774
1775                 if (ret < 0) {
1776                     return -EIO;
1777                 }
1778
1779                 acct_update_position(f, sge.length, true);
1780
1781                 return 1;
1782             }
1783
1784             /*
1785              * Otherwise, tell other side to register.
1786              */
1787             reg.current_index = current_index;
1788             if (block->is_ram_block) {
1789                 reg.key.current_addr = current_addr;
1790             } else {
1791                 reg.key.chunk = chunk;
1792             }
1793             reg.chunks = chunks;
1794
1795             DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
1796                     "bytes, index: %d, offset: %" PRId64 "...\n",
1797                     chunk, sge.length, current_index, current_addr);
1798
1799             register_to_network(&reg);
1800             ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) &reg,
1801                                     &resp, &reg_result_idx, NULL);
1802             if (ret < 0) {
1803                 return ret;
1804             }
1805
1806             /* try to overlap this single registration with the one we sent. */
1807             if (qemu_rdma_register_and_get_keys(rdma, block,
1808                                                 (uint8_t *) sge.addr,
1809                                                 &sge.lkey, NULL, chunk,
1810                                                 chunk_start, chunk_end)) {
1811                 fprintf(stderr, "cannot get lkey!\n");
1812                 return -EINVAL;
1813             }
1814
1815             reg_result = (RDMARegisterResult *)
1816                     rdma->wr_data[reg_result_idx].control_curr;
1817
1818             network_to_result(reg_result);
1819
1820             DDPRINTF("Received registration result:"
1821                     " my key: %x their key %x, chunk %" PRIu64 "\n",
1822                     block->remote_keys[chunk], reg_result->rkey, chunk);
1823
1824             block->remote_keys[chunk] = reg_result->rkey;
1825             block->remote_host_addr = reg_result->host_addr;
1826         } else {
1827             /* already registered before */
1828             if (qemu_rdma_register_and_get_keys(rdma, block,
1829                                                 (uint8_t *)sge.addr,
1830                                                 &sge.lkey, NULL, chunk,
1831                                                 chunk_start, chunk_end)) {
1832                 fprintf(stderr, "cannot get lkey!\n");
1833                 return -EINVAL;
1834             }
1835         }
1836
1837         send_wr.wr.rdma.rkey = block->remote_keys[chunk];
1838     } else {
1839         send_wr.wr.rdma.rkey = block->remote_rkey;
1840
1841         if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
1842                                                      &sge.lkey, NULL, chunk,
1843                                                      chunk_start, chunk_end)) {
1844             fprintf(stderr, "cannot get lkey!\n");
1845             return -EINVAL;
1846         }
1847     }
1848
1849     /*
1850      * Encode the ram block index and chunk within this wrid.
1851      * We will use this information at the time of completion
1852      * to figure out which bitmap to check against and then which
1853      * chunk in the bitmap to look for.
1854      */
1855     send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
1856                                         current_index, chunk);
1857
1858     send_wr.opcode = IBV_WR_RDMA_WRITE;
1859     send_wr.send_flags = IBV_SEND_SIGNALED;
1860     send_wr.sg_list = &sge;
1861     send_wr.num_sge = 1;
1862     send_wr.wr.rdma.remote_addr = block->remote_host_addr +
1863                                 (current_addr - block->offset);
1864
1865     DDDPRINTF("Posting chunk: %" PRIu64 ", addr: %lx"
1866               " remote: %lx, bytes %" PRIu32 "\n",
1867               chunk, sge.addr, send_wr.wr.rdma.remote_addr,
1868               sge.length);
1869
1870     /*
1871      * ibv_post_send() does not return negative error numbers,
1872      * per the specification they are positive - no idea why.
1873      */
1874     ret = ibv_post_send(rdma->qp, &send_wr, &bad_wr);
1875
1876     if (ret == ENOMEM) {
1877         DDPRINTF("send queue is full. wait a little....\n");
1878         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
1879         if (ret < 0) {
1880             fprintf(stderr, "rdma migration: failed to make "
1881                             "room in full send queue! %d\n", ret);
1882             return ret;
1883         }
1884
1885         goto retry;
1886
1887     } else if (ret > 0) {
1888         perror("rdma migration: post rdma write failed");
1889         return -ret;
1890     }
1891
1892     set_bit(chunk, block->transit_bitmap);
1893     acct_update_position(f, sge.length, false);
1894     rdma->total_writes++;
1895
1896     return 0;
1897 }
1898
1899 /*
1900  * Push out any unwritten RDMA operations.
1901  *
1902  * We support sending out multiple chunks at the same time.
1903  * Not all of them need to get signaled in the completion queue.
1904  */
1905 static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
1906 {
1907     int ret;
1908
1909     if (!rdma->current_length) {
1910         return 0;
1911     }
1912
1913     ret = qemu_rdma_write_one(f, rdma,
1914             rdma->current_index, rdma->current_addr, rdma->current_length);
1915
1916     if (ret < 0) {
1917         return ret;
1918     }
1919
1920     if (ret == 0) {
1921         rdma->nb_sent++;
1922         DDDPRINTF("sent total: %d\n", rdma->nb_sent);
1923     }
1924
1925     rdma->current_length = 0;
1926     rdma->current_addr = 0;
1927
1928     return 0;
1929 }
1930
1931 static inline int qemu_rdma_buffer_mergable(RDMAContext *rdma,
1932                     uint64_t offset, uint64_t len)
1933 {
1934     RDMALocalBlock *block =
1935         &(rdma->local_ram_blocks.block[rdma->current_index]);
1936     uint8_t *host_addr = block->local_host_addr + (offset - block->offset);
1937     uint8_t *chunk_end = ram_chunk_end(block, rdma->current_chunk);
1938
1939     if (rdma->current_length == 0) {
1940         return 0;
1941     }
1942
1943     /*
1944      * Only merge into chunk sequentially.
1945      */
1946     if (offset != (rdma->current_addr + rdma->current_length)) {
1947         return 0;
1948     }
1949
1950     if (rdma->current_index < 0) {
1951         return 0;
1952     }
1953
1954     if (offset < block->offset) {
1955         return 0;
1956     }
1957
1958     if ((offset + len) > (block->offset + block->length)) {
1959         return 0;
1960     }
1961
1962     if (rdma->current_chunk < 0) {
1963         return 0;
1964     }
1965
1966     if ((host_addr + len) > chunk_end) {
1967         return 0;
1968     }
1969
1970     return 1;
1971 }
1972
1973 /*
1974  * We're not actually writing here, but doing three things:
1975  *
1976  * 1. Identify the chunk the buffer belongs to.
1977  * 2. If the chunk is full or the buffer doesn't belong to the current
1978  *    chunk, then start a new chunk and flush() the old chunk.
1979  * 3. To keep the hardware busy, we also group chunks into batches
1980  *    and only require that a batch gets acknowledged in the completion
1981  *    qeueue instead of each individual chunk.
1982  */
1983 static int qemu_rdma_write(QEMUFile *f, RDMAContext *rdma,
1984                            uint64_t block_offset, uint64_t offset,
1985                            uint64_t len)
1986 {
1987     uint64_t current_addr = block_offset + offset;
1988     uint64_t index = rdma->current_index;
1989     uint64_t chunk = rdma->current_chunk;
1990     int ret;
1991
1992     /* If we cannot merge it, we flush the current buffer first. */
1993     if (!qemu_rdma_buffer_mergable(rdma, current_addr, len)) {
1994         ret = qemu_rdma_write_flush(f, rdma);
1995         if (ret) {
1996             return ret;
1997         }
1998         rdma->current_length = 0;
1999         rdma->current_addr = current_addr;
2000
2001         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2002                                          offset, len, &index, &chunk);
2003         if (ret) {
2004             fprintf(stderr, "ram block search failed\n");
2005             return ret;
2006         }
2007         rdma->current_index = index;
2008         rdma->current_chunk = chunk;
2009     }
2010
2011     /* merge it */
2012     rdma->current_length += len;
2013
2014     /* flush it if buffer is too large */
2015     if (rdma->current_length >= RDMA_MERGE_MAX) {
2016         return qemu_rdma_write_flush(f, rdma);
2017     }
2018
2019     return 0;
2020 }
2021
2022 static void qemu_rdma_cleanup(RDMAContext *rdma)
2023 {
2024     struct rdma_cm_event *cm_event;
2025     int ret, idx;
2026
2027     if (rdma->cm_id) {
2028         if (rdma->error_state) {
2029             RDMAControlHeader head = { .len = 0,
2030                                        .type = RDMA_CONTROL_ERROR,
2031                                        .repeat = 1,
2032                                      };
2033             fprintf(stderr, "Early error. Sending error.\n");
2034             qemu_rdma_post_send_control(rdma, NULL, &head);
2035         }
2036
2037         ret = rdma_disconnect(rdma->cm_id);
2038         if (!ret) {
2039             DDPRINTF("waiting for disconnect\n");
2040             ret = rdma_get_cm_event(rdma->channel, &cm_event);
2041             if (!ret) {
2042                 rdma_ack_cm_event(cm_event);
2043             }
2044         }
2045         DDPRINTF("Disconnected.\n");
2046         rdma->cm_id = NULL;
2047     }
2048
2049     g_free(rdma->block);
2050     rdma->block = NULL;
2051
2052     for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2053         if (rdma->wr_data[idx].control_mr) {
2054             rdma->total_registrations--;
2055             ibv_dereg_mr(rdma->wr_data[idx].control_mr);
2056         }
2057         rdma->wr_data[idx].control_mr = NULL;
2058     }
2059
2060     if (rdma->local_ram_blocks.block) {
2061         while (rdma->local_ram_blocks.nb_blocks) {
2062             __qemu_rdma_delete_block(rdma,
2063                     rdma->local_ram_blocks.block->offset);
2064         }
2065     }
2066
2067     if (rdma->qp) {
2068         ibv_destroy_qp(rdma->qp);
2069         rdma->qp = NULL;
2070     }
2071     if (rdma->cq) {
2072         ibv_destroy_cq(rdma->cq);
2073         rdma->cq = NULL;
2074     }
2075     if (rdma->comp_channel) {
2076         ibv_destroy_comp_channel(rdma->comp_channel);
2077         rdma->comp_channel = NULL;
2078     }
2079     if (rdma->pd) {
2080         ibv_dealloc_pd(rdma->pd);
2081         rdma->pd = NULL;
2082     }
2083     if (rdma->listen_id) {
2084         rdma_destroy_id(rdma->listen_id);
2085         rdma->listen_id = NULL;
2086     }
2087     if (rdma->cm_id) {
2088         rdma_destroy_id(rdma->cm_id);
2089         rdma->cm_id = NULL;
2090     }
2091     if (rdma->channel) {
2092         rdma_destroy_event_channel(rdma->channel);
2093         rdma->channel = NULL;
2094     }
2095 }
2096
2097
2098 static int qemu_rdma_source_init(RDMAContext *rdma, Error **errp, bool pin_all)
2099 {
2100     int ret, idx;
2101     Error *local_err = NULL, **temp = &local_err;
2102
2103     /*
2104      * Will be validated against destination's actual capabilities
2105      * after the connect() completes.
2106      */
2107     rdma->pin_all = pin_all;
2108
2109     ret = qemu_rdma_resolve_host(rdma, temp);
2110     if (ret) {
2111         goto err_rdma_source_init;
2112     }
2113
2114     ret = qemu_rdma_alloc_pd_cq(rdma);
2115     if (ret) {
2116         ERROR(temp, "rdma migration: error allocating pd and cq! Your mlock()"
2117                     " limits may be too low. Please check $ ulimit -a # and "
2118                     "search for 'ulimit -l' in the output\n");
2119         goto err_rdma_source_init;
2120     }
2121
2122     ret = qemu_rdma_alloc_qp(rdma);
2123     if (ret) {
2124         ERROR(temp, "rdma migration: error allocating qp!\n");
2125         goto err_rdma_source_init;
2126     }
2127
2128     ret = qemu_rdma_init_ram_blocks(rdma);
2129     if (ret) {
2130         ERROR(temp, "rdma migration: error initializing ram blocks!\n");
2131         goto err_rdma_source_init;
2132     }
2133
2134     for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2135         ret = qemu_rdma_reg_control(rdma, idx);
2136         if (ret) {
2137             ERROR(temp, "rdma migration: error registering %d control!\n",
2138                                                             idx);
2139             goto err_rdma_source_init;
2140         }
2141     }
2142
2143     return 0;
2144
2145 err_rdma_source_init:
2146     error_propagate(errp, local_err);
2147     qemu_rdma_cleanup(rdma);
2148     return -1;
2149 }
2150
2151 static int qemu_rdma_connect(RDMAContext *rdma, Error **errp)
2152 {
2153     RDMACapabilities cap = {
2154                                 .version = RDMA_CONTROL_VERSION_CURRENT,
2155                                 .flags = 0,
2156                            };
2157     struct rdma_conn_param conn_param = { .initiator_depth = 2,
2158                                           .retry_count = 5,
2159                                           .private_data = &cap,
2160                                           .private_data_len = sizeof(cap),
2161                                         };
2162     struct rdma_cm_event *cm_event;
2163     int ret;
2164
2165     /*
2166      * Only negotiate the capability with destination if the user
2167      * on the source first requested the capability.
2168      */
2169     if (rdma->pin_all) {
2170         DPRINTF("Server pin-all memory requested.\n");
2171         cap.flags |= RDMA_CAPABILITY_PIN_ALL;
2172     }
2173
2174     caps_to_network(&cap);
2175
2176     ret = rdma_connect(rdma->cm_id, &conn_param);
2177     if (ret) {
2178         perror("rdma_connect");
2179         ERROR(errp, "connecting to destination!\n");
2180         rdma_destroy_id(rdma->cm_id);
2181         rdma->cm_id = NULL;
2182         goto err_rdma_source_connect;
2183     }
2184
2185     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2186     if (ret) {
2187         perror("rdma_get_cm_event after rdma_connect");
2188         ERROR(errp, "connecting to destination!\n");
2189         rdma_ack_cm_event(cm_event);
2190         rdma_destroy_id(rdma->cm_id);
2191         rdma->cm_id = NULL;
2192         goto err_rdma_source_connect;
2193     }
2194
2195     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2196         perror("rdma_get_cm_event != EVENT_ESTABLISHED after rdma_connect");
2197         ERROR(errp, "connecting to destination!\n");
2198         rdma_ack_cm_event(cm_event);
2199         rdma_destroy_id(rdma->cm_id);
2200         rdma->cm_id = NULL;
2201         goto err_rdma_source_connect;
2202     }
2203
2204     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2205     network_to_caps(&cap);
2206
2207     /*
2208      * Verify that the *requested* capabilities are supported by the destination
2209      * and disable them otherwise.
2210      */
2211     if (rdma->pin_all && !(cap.flags & RDMA_CAPABILITY_PIN_ALL)) {
2212         ERROR(errp, "Server cannot support pinning all memory. "
2213                         "Will register memory dynamically.\n");
2214         rdma->pin_all = false;
2215     }
2216
2217     DPRINTF("Pin all memory: %s\n", rdma->pin_all ? "enabled" : "disabled");
2218
2219     rdma_ack_cm_event(cm_event);
2220
2221     ret = qemu_rdma_post_recv_control(rdma, 0);
2222     if (ret) {
2223         ERROR(errp, "posting second control recv!\n");
2224         goto err_rdma_source_connect;
2225     }
2226
2227     rdma->control_ready_expected = 1;
2228     rdma->nb_sent = 0;
2229     return 0;
2230
2231 err_rdma_source_connect:
2232     qemu_rdma_cleanup(rdma);
2233     return -1;
2234 }
2235
2236 static int qemu_rdma_dest_init(RDMAContext *rdma, Error **errp)
2237 {
2238     int ret = -EINVAL, idx;
2239     struct sockaddr_in sin;
2240     struct rdma_cm_id *listen_id;
2241     char ip[40] = "unknown";
2242
2243     for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2244         rdma->wr_data[idx].control_len = 0;
2245         rdma->wr_data[idx].control_curr = NULL;
2246     }
2247
2248     if (rdma->host == NULL) {
2249         ERROR(errp, "RDMA host is not set!\n");
2250         rdma->error_state = -EINVAL;
2251         return -1;
2252     }
2253     /* create CM channel */
2254     rdma->channel = rdma_create_event_channel();
2255     if (!rdma->channel) {
2256         ERROR(errp, "could not create rdma event channel\n");
2257         rdma->error_state = -EINVAL;
2258         return -1;
2259     }
2260
2261     /* create CM id */
2262     ret = rdma_create_id(rdma->channel, &listen_id, NULL, RDMA_PS_TCP);
2263     if (ret) {
2264         ERROR(errp, "could not create cm_id!\n");
2265         goto err_dest_init_create_listen_id;
2266     }
2267
2268     memset(&sin, 0, sizeof(sin));
2269     sin.sin_family = AF_INET;
2270     sin.sin_port = htons(rdma->port);
2271
2272     if (rdma->host && strcmp("", rdma->host)) {
2273         struct hostent *dest_addr;
2274         dest_addr = gethostbyname(rdma->host);
2275         if (!dest_addr) {
2276             ERROR(errp, "migration could not gethostbyname!\n");
2277             ret = -EINVAL;
2278             goto err_dest_init_bind_addr;
2279         }
2280         memcpy(&sin.sin_addr.s_addr, dest_addr->h_addr,
2281                 dest_addr->h_length);
2282         inet_ntop(AF_INET, dest_addr->h_addr, ip, sizeof ip);
2283     } else {
2284         sin.sin_addr.s_addr = INADDR_ANY;
2285     }
2286
2287     DPRINTF("%s => %s\n", rdma->host, ip);
2288
2289     ret = rdma_bind_addr(listen_id, (struct sockaddr *)&sin);
2290     if (ret) {
2291         ERROR(errp, "Error: could not rdma_bind_addr!\n");
2292         goto err_dest_init_bind_addr;
2293     }
2294
2295     rdma->listen_id = listen_id;
2296     qemu_rdma_dump_gid("dest_init", listen_id);
2297     return 0;
2298
2299 err_dest_init_bind_addr:
2300     rdma_destroy_id(listen_id);
2301 err_dest_init_create_listen_id:
2302     rdma_destroy_event_channel(rdma->channel);
2303     rdma->channel = NULL;
2304     rdma->error_state = ret;
2305     return ret;
2306
2307 }
2308
2309 static void *qemu_rdma_data_init(const char *host_port, Error **errp)
2310 {
2311     RDMAContext *rdma = NULL;
2312     InetSocketAddress *addr;
2313
2314     if (host_port) {
2315         rdma = g_malloc0(sizeof(RDMAContext));
2316         memset(rdma, 0, sizeof(RDMAContext));
2317         rdma->current_index = -1;
2318         rdma->current_chunk = -1;
2319
2320         addr = inet_parse(host_port, NULL);
2321         if (addr != NULL) {
2322             rdma->port = atoi(addr->port);
2323             rdma->host = g_strdup(addr->host);
2324         } else {
2325             ERROR(errp, "bad RDMA migration address '%s'", host_port);
2326             g_free(rdma);
2327             return NULL;
2328         }
2329     }
2330
2331     return rdma;
2332 }
2333
2334 /*
2335  * QEMUFile interface to the control channel.
2336  * SEND messages for control only.
2337  * pc.ram is handled with regular RDMA messages.
2338  */
2339 static int qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
2340                                 int64_t pos, int size)
2341 {
2342     QEMUFileRDMA *r = opaque;
2343     QEMUFile *f = r->file;
2344     RDMAContext *rdma = r->rdma;
2345     size_t remaining = size;
2346     uint8_t * data = (void *) buf;
2347     int ret;
2348
2349     CHECK_ERROR_STATE();
2350
2351     /*
2352      * Push out any writes that
2353      * we're queued up for pc.ram.
2354      */
2355     ret = qemu_rdma_write_flush(f, rdma);
2356     if (ret < 0) {
2357         rdma->error_state = ret;
2358         return ret;
2359     }
2360
2361     while (remaining) {
2362         RDMAControlHeader head;
2363
2364         r->len = MIN(remaining, RDMA_SEND_INCREMENT);
2365         remaining -= r->len;
2366
2367         head.len = r->len;
2368         head.type = RDMA_CONTROL_QEMU_FILE;
2369
2370         ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
2371
2372         if (ret < 0) {
2373             rdma->error_state = ret;
2374             return ret;
2375         }
2376
2377         data += r->len;
2378     }
2379
2380     return size;
2381 }
2382
2383 static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
2384                              int size, int idx)
2385 {
2386     size_t len = 0;
2387
2388     if (rdma->wr_data[idx].control_len) {
2389         DDDPRINTF("RDMA %" PRId64 " of %d bytes already in buffer\n",
2390                     rdma->wr_data[idx].control_len, size);
2391
2392         len = MIN(size, rdma->wr_data[idx].control_len);
2393         memcpy(buf, rdma->wr_data[idx].control_curr, len);
2394         rdma->wr_data[idx].control_curr += len;
2395         rdma->wr_data[idx].control_len -= len;
2396     }
2397
2398     return len;
2399 }
2400
2401 /*
2402  * QEMUFile interface to the control channel.
2403  * RDMA links don't use bytestreams, so we have to
2404  * return bytes to QEMUFile opportunistically.
2405  */
2406 static int qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
2407                                 int64_t pos, int size)
2408 {
2409     QEMUFileRDMA *r = opaque;
2410     RDMAContext *rdma = r->rdma;
2411     RDMAControlHeader head;
2412     int ret = 0;
2413
2414     CHECK_ERROR_STATE();
2415
2416     /*
2417      * First, we hold on to the last SEND message we
2418      * were given and dish out the bytes until we run
2419      * out of bytes.
2420      */
2421     r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
2422     if (r->len) {
2423         return r->len;
2424     }
2425
2426     /*
2427      * Once we run out, we block and wait for another
2428      * SEND message to arrive.
2429      */
2430     ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
2431
2432     if (ret < 0) {
2433         rdma->error_state = ret;
2434         return ret;
2435     }
2436
2437     /*
2438      * SEND was received with new bytes, now try again.
2439      */
2440     return qemu_rdma_fill(r->rdma, buf, size, 0);
2441 }
2442
2443 /*
2444  * Block until all the outstanding chunks have been delivered by the hardware.
2445  */
2446 static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
2447 {
2448     int ret;
2449
2450     if (qemu_rdma_write_flush(f, rdma) < 0) {
2451         return -EIO;
2452     }
2453
2454     while (rdma->nb_sent) {
2455         ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
2456         if (ret < 0) {
2457             fprintf(stderr, "rdma migration: complete polling error!\n");
2458             return -EIO;
2459         }
2460     }
2461
2462     qemu_rdma_unregister_waiting(rdma);
2463
2464     return 0;
2465 }
2466
2467 static int qemu_rdma_close(void *opaque)
2468 {
2469     DPRINTF("Shutting down connection.\n");
2470     QEMUFileRDMA *r = opaque;
2471     if (r->rdma) {
2472         qemu_rdma_cleanup(r->rdma);
2473         g_free(r->rdma);
2474     }
2475     g_free(r);
2476     return 0;
2477 }
2478
2479 /*
2480  * Parameters:
2481  *    @offset == 0 :
2482  *        This means that 'block_offset' is a full virtual address that does not
2483  *        belong to a RAMBlock of the virtual machine and instead
2484  *        represents a private malloc'd memory area that the caller wishes to
2485  *        transfer.
2486  *
2487  *    @offset != 0 :
2488  *        Offset is an offset to be added to block_offset and used
2489  *        to also lookup the corresponding RAMBlock.
2490  *
2491  *    @size > 0 :
2492  *        Initiate an transfer this size.
2493  *
2494  *    @size == 0 :
2495  *        A 'hint' or 'advice' that means that we wish to speculatively
2496  *        and asynchronously unregister this memory. In this case, there is no
2497  *        guarantee that the unregister will actually happen, for example,
2498  *        if the memory is being actively transmitted. Additionally, the memory
2499  *        may be re-registered at any future time if a write within the same
2500  *        chunk was requested again, even if you attempted to unregister it
2501  *        here.
2502  *
2503  *    @size < 0 : TODO, not yet supported
2504  *        Unregister the memory NOW. This means that the caller does not
2505  *        expect there to be any future RDMA transfers and we just want to clean
2506  *        things up. This is used in case the upper layer owns the memory and
2507  *        cannot wait for qemu_fclose() to occur.
2508  *
2509  *    @bytes_sent : User-specificed pointer to indicate how many bytes were
2510  *                  sent. Usually, this will not be more than a few bytes of
2511  *                  the protocol because most transfers are sent asynchronously.
2512  */
2513 static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
2514                                   ram_addr_t block_offset, ram_addr_t offset,
2515                                   size_t size, int *bytes_sent)
2516 {
2517     QEMUFileRDMA *rfile = opaque;
2518     RDMAContext *rdma = rfile->rdma;
2519     int ret;
2520
2521     CHECK_ERROR_STATE();
2522
2523     qemu_fflush(f);
2524
2525     if (size > 0) {
2526         /*
2527          * Add this page to the current 'chunk'. If the chunk
2528          * is full, or the page doen't belong to the current chunk,
2529          * an actual RDMA write will occur and a new chunk will be formed.
2530          */
2531         ret = qemu_rdma_write(f, rdma, block_offset, offset, size);
2532         if (ret < 0) {
2533             fprintf(stderr, "rdma migration: write error! %d\n", ret);
2534             goto err;
2535         }
2536
2537         /*
2538          * We always return 1 bytes because the RDMA
2539          * protocol is completely asynchronous. We do not yet know
2540          * whether an  identified chunk is zero or not because we're
2541          * waiting for other pages to potentially be merged with
2542          * the current chunk. So, we have to call qemu_update_position()
2543          * later on when the actual write occurs.
2544          */
2545         if (bytes_sent) {
2546             *bytes_sent = 1;
2547         }
2548     } else {
2549         uint64_t index, chunk;
2550
2551         /* TODO: Change QEMUFileOps prototype to be signed: size_t => long
2552         if (size < 0) {
2553             ret = qemu_rdma_drain_cq(f, rdma);
2554             if (ret < 0) {
2555                 fprintf(stderr, "rdma: failed to synchronously drain"
2556                                 " completion queue before unregistration.\n");
2557                 goto err;
2558             }
2559         }
2560         */
2561
2562         ret = qemu_rdma_search_ram_block(rdma, block_offset,
2563                                          offset, size, &index, &chunk);
2564
2565         if (ret) {
2566             fprintf(stderr, "ram block search failed\n");
2567             goto err;
2568         }
2569
2570         qemu_rdma_signal_unregister(rdma, index, chunk, 0);
2571
2572         /*
2573          * TODO: Synchronous, guaranteed unregistration (should not occur during
2574          * fast-path). Otherwise, unregisters will process on the next call to
2575          * qemu_rdma_drain_cq()
2576         if (size < 0) {
2577             qemu_rdma_unregister_waiting(rdma);
2578         }
2579         */
2580     }
2581
2582     /*
2583      * Drain the Completion Queue if possible, but do not block,
2584      * just poll.
2585      *
2586      * If nothing to poll, the end of the iteration will do this
2587      * again to make sure we don't overflow the request queue.
2588      */
2589     while (1) {
2590         uint64_t wr_id, wr_id_in;
2591         int ret = qemu_rdma_poll(rdma, &wr_id_in);
2592         if (ret < 0) {
2593             fprintf(stderr, "rdma migration: polling error! %d\n", ret);
2594             goto err;
2595         }
2596
2597         wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
2598
2599         if (wr_id == RDMA_WRID_NONE) {
2600             break;
2601         }
2602     }
2603
2604     return RAM_SAVE_CONTROL_DELAYED;
2605 err:
2606     rdma->error_state = ret;
2607     return ret;
2608 }
2609
2610 static int qemu_rdma_accept(RDMAContext *rdma)
2611 {
2612     RDMACapabilities cap;
2613     struct rdma_conn_param conn_param = {
2614                                             .responder_resources = 2,
2615                                             .private_data = &cap,
2616                                             .private_data_len = sizeof(cap),
2617                                          };
2618     struct rdma_cm_event *cm_event;
2619     struct ibv_context *verbs;
2620     int ret = -EINVAL;
2621     int idx;
2622
2623     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2624     if (ret) {
2625         goto err_rdma_dest_wait;
2626     }
2627
2628     if (cm_event->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
2629         rdma_ack_cm_event(cm_event);
2630         goto err_rdma_dest_wait;
2631     }
2632
2633     memcpy(&cap, cm_event->param.conn.private_data, sizeof(cap));
2634
2635     network_to_caps(&cap);
2636
2637     if (cap.version < 1 || cap.version > RDMA_CONTROL_VERSION_CURRENT) {
2638             fprintf(stderr, "Unknown source RDMA version: %d, bailing...\n",
2639                             cap.version);
2640             rdma_ack_cm_event(cm_event);
2641             goto err_rdma_dest_wait;
2642     }
2643
2644     /*
2645      * Respond with only the capabilities this version of QEMU knows about.
2646      */
2647     cap.flags &= known_capabilities;
2648
2649     /*
2650      * Enable the ones that we do know about.
2651      * Add other checks here as new ones are introduced.
2652      */
2653     if (cap.flags & RDMA_CAPABILITY_PIN_ALL) {
2654         rdma->pin_all = true;
2655     }
2656
2657     rdma->cm_id = cm_event->id;
2658     verbs = cm_event->id->verbs;
2659
2660     rdma_ack_cm_event(cm_event);
2661
2662     DPRINTF("Memory pin all: %s\n", rdma->pin_all ? "enabled" : "disabled");
2663
2664     caps_to_network(&cap);
2665
2666     DPRINTF("verbs context after listen: %p\n", verbs);
2667
2668     if (!rdma->verbs) {
2669         rdma->verbs = verbs;
2670     } else if (rdma->verbs != verbs) {
2671             fprintf(stderr, "ibv context not matching %p, %p!\n",
2672                     rdma->verbs, verbs);
2673             goto err_rdma_dest_wait;
2674     }
2675
2676     qemu_rdma_dump_id("dest_init", verbs);
2677
2678     ret = qemu_rdma_alloc_pd_cq(rdma);
2679     if (ret) {
2680         fprintf(stderr, "rdma migration: error allocating pd and cq!\n");
2681         goto err_rdma_dest_wait;
2682     }
2683
2684     ret = qemu_rdma_alloc_qp(rdma);
2685     if (ret) {
2686         fprintf(stderr, "rdma migration: error allocating qp!\n");
2687         goto err_rdma_dest_wait;
2688     }
2689
2690     ret = qemu_rdma_init_ram_blocks(rdma);
2691     if (ret) {
2692         fprintf(stderr, "rdma migration: error initializing ram blocks!\n");
2693         goto err_rdma_dest_wait;
2694     }
2695
2696     for (idx = 0; idx <= RDMA_WRID_MAX; idx++) {
2697         ret = qemu_rdma_reg_control(rdma, idx);
2698         if (ret) {
2699             fprintf(stderr, "rdma: error registering %d control!\n", idx);
2700             goto err_rdma_dest_wait;
2701         }
2702     }
2703
2704     qemu_set_fd_handler2(rdma->channel->fd, NULL, NULL, NULL, NULL);
2705
2706     ret = rdma_accept(rdma->cm_id, &conn_param);
2707     if (ret) {
2708         fprintf(stderr, "rdma_accept returns %d!\n", ret);
2709         goto err_rdma_dest_wait;
2710     }
2711
2712     ret = rdma_get_cm_event(rdma->channel, &cm_event);
2713     if (ret) {
2714         fprintf(stderr, "rdma_accept get_cm_event failed %d!\n", ret);
2715         goto err_rdma_dest_wait;
2716     }
2717
2718     if (cm_event->event != RDMA_CM_EVENT_ESTABLISHED) {
2719         fprintf(stderr, "rdma_accept not event established!\n");
2720         rdma_ack_cm_event(cm_event);
2721         goto err_rdma_dest_wait;
2722     }
2723
2724     rdma_ack_cm_event(cm_event);
2725
2726     ret = qemu_rdma_post_recv_control(rdma, 0);
2727     if (ret) {
2728         fprintf(stderr, "rdma migration: error posting second control recv!\n");
2729         goto err_rdma_dest_wait;
2730     }
2731
2732     qemu_rdma_dump_gid("dest_connect", rdma->cm_id);
2733
2734     return 0;
2735
2736 err_rdma_dest_wait:
2737     rdma->error_state = ret;
2738     qemu_rdma_cleanup(rdma);
2739     return ret;
2740 }
2741
2742 /*
2743  * During each iteration of the migration, we listen for instructions
2744  * by the source VM to perform dynamic page registrations before they
2745  * can perform RDMA operations.
2746  *
2747  * We respond with the 'rkey'.
2748  *
2749  * Keep doing this until the source tells us to stop.
2750  */
2751 static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque,
2752                                          uint64_t flags)
2753 {
2754     RDMAControlHeader reg_resp = { .len = sizeof(RDMARegisterResult),
2755                                .type = RDMA_CONTROL_REGISTER_RESULT,
2756                                .repeat = 0,
2757                              };
2758     RDMAControlHeader unreg_resp = { .len = 0,
2759                                .type = RDMA_CONTROL_UNREGISTER_FINISHED,
2760                                .repeat = 0,
2761                              };
2762     RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
2763                                  .repeat = 1 };
2764     QEMUFileRDMA *rfile = opaque;
2765     RDMAContext *rdma = rfile->rdma;
2766     RDMALocalBlocks *local = &rdma->local_ram_blocks;
2767     RDMAControlHeader head;
2768     RDMARegister *reg, *registers;
2769     RDMACompress *comp;
2770     RDMARegisterResult *reg_result;
2771     static RDMARegisterResult results[RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE];
2772     RDMALocalBlock *block;
2773     void *host_addr;
2774     int ret = 0;
2775     int idx = 0;
2776     int count = 0;
2777     int i = 0;
2778
2779     CHECK_ERROR_STATE();
2780
2781     do {
2782         DDDPRINTF("Waiting for next request %" PRIu64 "...\n", flags);
2783
2784         ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_NONE);
2785
2786         if (ret < 0) {
2787             break;
2788         }
2789
2790         if (head.repeat > RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE) {
2791             fprintf(stderr, "rdma: Too many requests in this message (%d)."
2792                             "Bailing.\n", head.repeat);
2793             ret = -EIO;
2794             break;
2795         }
2796
2797         switch (head.type) {
2798         case RDMA_CONTROL_COMPRESS:
2799             comp = (RDMACompress *) rdma->wr_data[idx].control_curr;
2800             network_to_compress(comp);
2801
2802             DDPRINTF("Zapping zero chunk: %" PRId64
2803                     " bytes, index %d, offset %" PRId64 "\n",
2804                     comp->length, comp->block_idx, comp->offset);
2805             block = &(rdma->local_ram_blocks.block[comp->block_idx]);
2806
2807             host_addr = block->local_host_addr +
2808                             (comp->offset - block->offset);
2809
2810             ram_handle_compressed(host_addr, comp->value, comp->length);
2811             break;
2812
2813         case RDMA_CONTROL_REGISTER_FINISHED:
2814             DDDPRINTF("Current registrations complete.\n");
2815             goto out;
2816
2817         case RDMA_CONTROL_RAM_BLOCKS_REQUEST:
2818             DPRINTF("Initial setup info requested.\n");
2819
2820             if (rdma->pin_all) {
2821                 ret = qemu_rdma_reg_whole_ram_blocks(rdma);
2822                 if (ret) {
2823                     fprintf(stderr, "rdma migration: error dest "
2824                                     "registering ram blocks!\n");
2825                     goto out;
2826                 }
2827             }
2828
2829             /*
2830              * Dest uses this to prepare to transmit the RAMBlock descriptions
2831              * to the source VM after connection setup.
2832              * Both sides use the "remote" structure to communicate and update
2833              * their "local" descriptions with what was sent.
2834              */
2835             for (i = 0; i < local->nb_blocks; i++) {
2836                 rdma->block[i].remote_host_addr =
2837                     (uint64_t)(local->block[i].local_host_addr);
2838
2839                 if (rdma->pin_all) {
2840                     rdma->block[i].remote_rkey = local->block[i].mr->rkey;
2841                 }
2842
2843                 rdma->block[i].offset = local->block[i].offset;
2844                 rdma->block[i].length = local->block[i].length;
2845
2846                 remote_block_to_network(&rdma->block[i]);
2847             }
2848
2849             blocks.len = rdma->local_ram_blocks.nb_blocks
2850                                                 * sizeof(RDMARemoteBlock);
2851
2852
2853             ret = qemu_rdma_post_send_control(rdma,
2854                                         (uint8_t *) rdma->block, &blocks);
2855
2856             if (ret < 0) {
2857                 fprintf(stderr, "rdma migration: error sending remote info!\n");
2858                 goto out;
2859             }
2860
2861             break;
2862         case RDMA_CONTROL_REGISTER_REQUEST:
2863             DDPRINTF("There are %d registration requests\n", head.repeat);
2864
2865             reg_resp.repeat = head.repeat;
2866             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
2867
2868             for (count = 0; count < head.repeat; count++) {
2869                 uint64_t chunk;
2870                 uint8_t *chunk_start, *chunk_end;
2871
2872                 reg = &registers[count];
2873                 network_to_register(reg);
2874
2875                 reg_result = &results[count];
2876
2877                 DDPRINTF("Registration request (%d): index %d, current_addr %"
2878                          PRIu64 " chunks: %" PRIu64 "\n", count,
2879                          reg->current_index, reg->key.current_addr, reg->chunks);
2880
2881                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
2882                 if (block->is_ram_block) {
2883                     host_addr = (block->local_host_addr +
2884                                 (reg->key.current_addr - block->offset));
2885                     chunk = ram_chunk_index(block->local_host_addr,
2886                                             (uint8_t *) host_addr);
2887                 } else {
2888                     chunk = reg->key.chunk;
2889                     host_addr = block->local_host_addr +
2890                         (reg->key.chunk * (1UL << RDMA_REG_CHUNK_SHIFT));
2891                 }
2892                 chunk_start = ram_chunk_start(block, chunk);
2893                 chunk_end = ram_chunk_end(block, chunk + reg->chunks);
2894                 if (qemu_rdma_register_and_get_keys(rdma, block,
2895                             (uint8_t *)host_addr, NULL, &reg_result->rkey,
2896                             chunk, chunk_start, chunk_end)) {
2897                     fprintf(stderr, "cannot get rkey!\n");
2898                     ret = -EINVAL;
2899                     goto out;
2900                 }
2901
2902                 reg_result->host_addr = (uint64_t) block->local_host_addr;
2903
2904                 DDPRINTF("Registered rkey for this request: %x\n",
2905                                 reg_result->rkey);
2906
2907                 result_to_network(reg_result);
2908             }
2909
2910             ret = qemu_rdma_post_send_control(rdma,
2911                             (uint8_t *) results, &reg_resp);
2912
2913             if (ret < 0) {
2914                 fprintf(stderr, "Failed to send control buffer!\n");
2915                 goto out;
2916             }
2917             break;
2918         case RDMA_CONTROL_UNREGISTER_REQUEST:
2919             DDPRINTF("There are %d unregistration requests\n", head.repeat);
2920             unreg_resp.repeat = head.repeat;
2921             registers = (RDMARegister *) rdma->wr_data[idx].control_curr;
2922
2923             for (count = 0; count < head.repeat; count++) {
2924                 reg = &registers[count];
2925                 network_to_register(reg);
2926
2927                 DDPRINTF("Unregistration request (%d): "
2928                          " index %d, chunk %" PRIu64 "\n",
2929                          count, reg->current_index, reg->key.chunk);
2930
2931                 block = &(rdma->local_ram_blocks.block[reg->current_index]);
2932
2933                 ret = ibv_dereg_mr(block->pmr[reg->key.chunk]);
2934                 block->pmr[reg->key.chunk] = NULL;
2935
2936                 if (ret != 0) {
2937                     perror("rdma unregistration chunk failed");
2938                     ret = -ret;
2939                     goto out;
2940                 }
2941
2942                 rdma->total_registrations--;
2943
2944                 DDPRINTF("Unregistered chunk %" PRIu64 " successfully.\n",
2945                             reg->key.chunk);
2946             }
2947
2948             ret = qemu_rdma_post_send_control(rdma, NULL, &unreg_resp);
2949
2950             if (ret < 0) {
2951                 fprintf(stderr, "Failed to send control buffer!\n");
2952                 goto out;
2953             }
2954             break;
2955         case RDMA_CONTROL_REGISTER_RESULT:
2956             fprintf(stderr, "Invalid RESULT message at dest.\n");
2957             ret = -EIO;
2958             goto out;
2959         default:
2960             fprintf(stderr, "Unknown control message %s\n",
2961                                 control_desc[head.type]);
2962             ret = -EIO;
2963             goto out;
2964         }
2965     } while (1);
2966 out:
2967     if (ret < 0) {
2968         rdma->error_state = ret;
2969     }
2970     return ret;
2971 }
2972
2973 static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
2974                                         uint64_t flags)
2975 {
2976     QEMUFileRDMA *rfile = opaque;
2977     RDMAContext *rdma = rfile->rdma;
2978
2979     CHECK_ERROR_STATE();
2980
2981     DDDPRINTF("start section: %" PRIu64 "\n", flags);
2982     qemu_put_be64(f, RAM_SAVE_FLAG_HOOK);
2983     qemu_fflush(f);
2984
2985     return 0;
2986 }
2987
2988 /*
2989  * Inform dest that dynamic registrations are done for now.
2990  * First, flush writes, if any.
2991  */
2992 static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
2993                                        uint64_t flags)
2994 {
2995     Error *local_err = NULL, **errp = &local_err;
2996     QEMUFileRDMA *rfile = opaque;
2997     RDMAContext *rdma = rfile->rdma;
2998     RDMAControlHeader head = { .len = 0, .repeat = 1 };
2999     int ret = 0;
3000
3001     CHECK_ERROR_STATE();
3002
3003     qemu_fflush(f);
3004     ret = qemu_rdma_drain_cq(f, rdma);
3005
3006     if (ret < 0) {
3007         goto err;
3008     }
3009
3010     if (flags == RAM_CONTROL_SETUP) {
3011         RDMAControlHeader resp = {.type = RDMA_CONTROL_RAM_BLOCKS_RESULT };
3012         RDMALocalBlocks *local = &rdma->local_ram_blocks;
3013         int reg_result_idx, i, j, nb_remote_blocks;
3014
3015         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
3016         DPRINTF("Sending registration setup for ram blocks...\n");
3017
3018         /*
3019          * Make sure that we parallelize the pinning on both sides.
3020          * For very large guests, doing this serially takes a really
3021          * long time, so we have to 'interleave' the pinning locally
3022          * with the control messages by performing the pinning on this
3023          * side before we receive the control response from the other
3024          * side that the pinning has completed.
3025          */
3026         ret = qemu_rdma_exchange_send(rdma, &head, NULL, &resp,
3027                     &reg_result_idx, rdma->pin_all ?
3028                     qemu_rdma_reg_whole_ram_blocks : NULL);
3029         if (ret < 0) {
3030             ERROR(errp, "receiving remote info!\n");
3031             return ret;
3032         }
3033
3034         qemu_rdma_move_header(rdma, reg_result_idx, &resp);
3035         memcpy(rdma->block,
3036             rdma->wr_data[reg_result_idx].control_curr, resp.len);
3037
3038         nb_remote_blocks = resp.len / sizeof(RDMARemoteBlock);
3039
3040         /*
3041          * The protocol uses two different sets of rkeys (mutually exclusive):
3042          * 1. One key to represent the virtual address of the entire ram block.
3043          *    (dynamic chunk registration disabled - pin everything with one rkey.)
3044          * 2. One to represent individual chunks within a ram block.
3045          *    (dynamic chunk registration enabled - pin individual chunks.)
3046          *
3047          * Once the capability is successfully negotiated, the destination transmits
3048          * the keys to use (or sends them later) including the virtual addresses
3049          * and then propagates the remote ram block descriptions to his local copy.
3050          */
3051
3052         if (local->nb_blocks != nb_remote_blocks) {
3053             ERROR(errp, "ram blocks mismatch #1! "
3054                         "Your QEMU command line parameters are probably "
3055                         "not identical on both the source and destination.\n");
3056             return -EINVAL;
3057         }
3058
3059         for (i = 0; i < nb_remote_blocks; i++) {
3060             network_to_remote_block(&rdma->block[i]);
3061
3062             /* search local ram blocks */
3063             for (j = 0; j < local->nb_blocks; j++) {
3064                 if (rdma->block[i].offset != local->block[j].offset) {
3065                     continue;
3066                 }
3067
3068                 if (rdma->block[i].length != local->block[j].length) {
3069                     ERROR(errp, "ram blocks mismatch #2! "
3070                         "Your QEMU command line parameters are probably "
3071                         "not identical on both the source and destination.\n");
3072                     return -EINVAL;
3073                 }
3074                 local->block[j].remote_host_addr =
3075                         rdma->block[i].remote_host_addr;
3076                 local->block[j].remote_rkey = rdma->block[i].remote_rkey;
3077                 break;
3078             }
3079
3080             if (j >= local->nb_blocks) {
3081                 ERROR(errp, "ram blocks mismatch #3! "
3082                         "Your QEMU command line parameters are probably "
3083                         "not identical on both the source and destination.\n");
3084                 return -EINVAL;
3085             }
3086         }
3087     }
3088
3089     DDDPRINTF("Sending registration finish %" PRIu64 "...\n", flags);
3090
3091     head.type = RDMA_CONTROL_REGISTER_FINISHED;
3092     ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
3093
3094     if (ret < 0) {
3095         goto err;
3096     }
3097
3098     return 0;
3099 err:
3100     rdma->error_state = ret;
3101     return ret;
3102 }
3103
3104 static int qemu_rdma_get_fd(void *opaque)
3105 {
3106     QEMUFileRDMA *rfile = opaque;
3107     RDMAContext *rdma = rfile->rdma;
3108
3109     return rdma->comp_channel->fd;
3110 }
3111
3112 const QEMUFileOps rdma_read_ops = {
3113     .get_buffer    = qemu_rdma_get_buffer,
3114     .get_fd        = qemu_rdma_get_fd,
3115     .close         = qemu_rdma_close,
3116     .hook_ram_load = qemu_rdma_registration_handle,
3117 };
3118
3119 const QEMUFileOps rdma_write_ops = {
3120     .put_buffer         = qemu_rdma_put_buffer,
3121     .close              = qemu_rdma_close,
3122     .before_ram_iterate = qemu_rdma_registration_start,
3123     .after_ram_iterate  = qemu_rdma_registration_stop,
3124     .save_page          = qemu_rdma_save_page,
3125 };
3126
3127 static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
3128 {
3129     QEMUFileRDMA *r = g_malloc0(sizeof(QEMUFileRDMA));
3130
3131     if (qemu_file_mode_is_not_valid(mode)) {
3132         return NULL;
3133     }
3134
3135     r->rdma = rdma;
3136
3137     if (mode[0] == 'w') {
3138         r->file = qemu_fopen_ops(r, &rdma_write_ops);
3139     } else {
3140         r->file = qemu_fopen_ops(r, &rdma_read_ops);
3141     }
3142
3143     return r->file;
3144 }
3145
3146 static void rdma_accept_incoming_migration(void *opaque)
3147 {
3148     RDMAContext *rdma = opaque;
3149     int ret;
3150     QEMUFile *f;
3151     Error *local_err = NULL, **errp = &local_err;
3152
3153     DPRINTF("Accepting rdma connection...\n");
3154     ret = qemu_rdma_accept(rdma);
3155
3156     if (ret) {
3157         ERROR(errp, "RDMA Migration initialization failed!\n");
3158         return;
3159     }
3160
3161     DPRINTF("Accepted migration\n");
3162
3163     f = qemu_fopen_rdma(rdma, "rb");
3164     if (f == NULL) {
3165         ERROR(errp, "could not qemu_fopen_rdma!\n");
3166         qemu_rdma_cleanup(rdma);
3167         return;
3168     }
3169
3170     rdma->migration_started_on_destination = 1;
3171     process_incoming_migration(f);
3172 }
3173
3174 void rdma_start_incoming_migration(const char *host_port, Error **errp)
3175 {
3176     int ret;
3177     RDMAContext *rdma;
3178     Error *local_err = NULL;
3179
3180     DPRINTF("Starting RDMA-based incoming migration\n");
3181     rdma = qemu_rdma_data_init(host_port, &local_err);
3182
3183     if (rdma == NULL) {
3184         goto err;
3185     }
3186
3187     ret = qemu_rdma_dest_init(rdma, &local_err);
3188
3189     if (ret) {
3190         goto err;
3191     }
3192
3193     DPRINTF("qemu_rdma_dest_init success\n");
3194
3195     ret = rdma_listen(rdma->listen_id, 5);
3196
3197     if (ret) {
3198         ERROR(errp, "listening on socket!\n");
3199         goto err;
3200     }
3201
3202     DPRINTF("rdma_listen success\n");
3203
3204     qemu_set_fd_handler2(rdma->channel->fd, NULL,
3205                          rdma_accept_incoming_migration, NULL,
3206                             (void *)(intptr_t) rdma);
3207     return;
3208 err:
3209     error_propagate(errp, local_err);
3210     g_free(rdma);
3211 }
3212
3213 void rdma_start_outgoing_migration(void *opaque,
3214                             const char *host_port, Error **errp)
3215 {
3216     MigrationState *s = opaque;
3217     Error *local_err = NULL, **temp = &local_err;
3218     RDMAContext *rdma = qemu_rdma_data_init(host_port, &local_err);
3219     int ret = 0;
3220
3221     if (rdma == NULL) {
3222         ERROR(temp, "Failed to initialize RDMA data structures! %d\n", ret);
3223         goto err;
3224     }
3225
3226     ret = qemu_rdma_source_init(rdma, &local_err,
3227         s->enabled_capabilities[MIGRATION_CAPABILITY_X_RDMA_PIN_ALL]);
3228
3229     if (ret) {
3230         goto err;
3231     }
3232
3233     DPRINTF("qemu_rdma_source_init success\n");
3234     ret = qemu_rdma_connect(rdma, &local_err);
3235
3236     if (ret) {
3237         goto err;
3238     }
3239
3240     DPRINTF("qemu_rdma_source_connect success\n");
3241
3242     s->file = qemu_fopen_rdma(rdma, "wb");
3243     migrate_fd_connect(s);
3244     return;
3245 err:
3246     error_propagate(errp, local_err);
3247     g_free(rdma);
3248     migrate_fd_error(s);
3249 }