Version up
[sdk/emulator/qemu.git] / block / archipelago.c
1 /*
2  * QEMU Block driver for Archipelago
3  *
4  * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
5  *
6  * This work is licensed under the terms of the GNU GPL, version 2 or later.
7  * See the COPYING file in the top-level directory.
8  *
9  */
10
11 /*
12  * VM Image on Archipelago volume is specified like this:
13  *
14  * file.driver=archipelago,file.volume=<volumename>
15  * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
16  * [,file.segment=<segment_name>]]
17  *
18  * or
19  *
20  * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
21  * segment=<segment_name>]]
22  *
23  * 'archipelago' is the protocol.
24  *
25  * 'mport' is the port number on which mapperd is listening. This is optional
26  * and if not specified, QEMU will make Archipelago to use the default port.
27  *
28  * 'vport' is the port number on which vlmcd is listening. This is optional
29  * and if not specified, QEMU will make Archipelago to use the default port.
30  *
31  * 'segment' is the name of the shared memory segment Archipelago stack
32  * is using. This is optional and if not specified, QEMU will make Archipelago
33  * to use the default value, 'archipelago'.
34  *
35  * Examples:
36  *
37  * file.driver=archipelago,file.volume=my_vm_volume
38  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
39  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
40  *  file.vport=1234
41  * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
42  *  file.vport=1234,file.segment=my_segment
43  *
44  * or
45  *
46  * file=archipelago:my_vm_volume
47  * file=archipelago:my_vm_volume/mport=123
48  * file=archipelago:my_vm_volume/mport=123:vport=1234
49  * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
50  *
51  */
52
53 #include "qemu/osdep.h"
54 #include "qemu/cutils.h"
55 #include "block/block_int.h"
56 #include "qemu/error-report.h"
57 #include "qemu/thread.h"
58 #include "qapi/qmp/qint.h"
59 #include "qapi/qmp/qstring.h"
60 #include "qapi/qmp/qjson.h"
61 #include "qemu/atomic.h"
62
63 #include <xseg/xseg.h>
64 #include <xseg/protocol.h>
65
66 #define MAX_REQUEST_SIZE    524288
67
68 #define ARCHIPELAGO_OPT_VOLUME      "volume"
69 #define ARCHIPELAGO_OPT_SEGMENT     "segment"
70 #define ARCHIPELAGO_OPT_MPORT       "mport"
71 #define ARCHIPELAGO_OPT_VPORT       "vport"
72 #define ARCHIPELAGO_DFL_MPORT       1001
73 #define ARCHIPELAGO_DFL_VPORT       501
74
75 #define archipelagolog(fmt, ...) \
76     do {                         \
77         fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
78     } while (0)
79
80 typedef enum {
81     ARCHIP_OP_READ,
82     ARCHIP_OP_WRITE,
83     ARCHIP_OP_FLUSH,
84     ARCHIP_OP_VOLINFO,
85     ARCHIP_OP_TRUNCATE,
86 } ARCHIPCmd;
87
88 typedef struct ArchipelagoAIOCB {
89     BlockAIOCB common;
90     struct BDRVArchipelagoState *s;
91     QEMUIOVector *qiov;
92     ARCHIPCmd cmd;
93     int status;
94     int64_t size;
95     int64_t ret;
96 } ArchipelagoAIOCB;
97
98 typedef struct BDRVArchipelagoState {
99     ArchipelagoAIOCB *event_acb;
100     char *volname;
101     char *segment_name;
102     uint64_t size;
103     /* Archipelago specific */
104     struct xseg *xseg;
105     struct xseg_port *port;
106     xport srcport;
107     xport sport;
108     xport mportno;
109     xport vportno;
110     QemuMutex archip_mutex;
111     QemuCond archip_cond;
112     bool is_signaled;
113     /* Request handler specific */
114     QemuThread request_th;
115     QemuCond request_cond;
116     QemuMutex request_mutex;
117     bool th_is_signaled;
118     bool stopping;
119 } BDRVArchipelagoState;
120
121 typedef struct ArchipelagoSegmentedRequest {
122     size_t count;
123     size_t total;
124     int ref;
125     int failed;
126 } ArchipelagoSegmentedRequest;
127
128 typedef struct AIORequestData {
129     const char *volname;
130     off_t offset;
131     size_t size;
132     uint64_t bufidx;
133     int ret;
134     int op;
135     ArchipelagoAIOCB *aio_cb;
136     ArchipelagoSegmentedRequest *segreq;
137 } AIORequestData;
138
139 static void qemu_archipelago_complete_aio(void *opaque);
140
141 static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
142 {
143     if (xseg && (sport != srcport)) {
144         xseg_init_local_signal(xseg, srcport);
145         sport = srcport;
146     }
147 }
148
149 static void archipelago_finish_aiocb(AIORequestData *reqdata)
150 {
151     if (reqdata->aio_cb->ret != reqdata->segreq->total) {
152         reqdata->aio_cb->ret = -EIO;
153     } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
154         reqdata->aio_cb->ret = 0;
155     }
156     aio_bh_schedule_oneshot(
157                         bdrv_get_aio_context(reqdata->aio_cb->common.bs),
158                         qemu_archipelago_complete_aio, reqdata
159                         );
160 }
161
162 static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
163                       struct xseg_request *expected_req)
164 {
165     struct xseg_request *req;
166     xseg_prepare_wait(xseg, srcport);
167     void *psd = xseg_get_signal_desc(xseg, port);
168     while (1) {
169         req = xseg_receive(xseg, srcport, X_NONBLOCK);
170         if (req) {
171             if (req != expected_req) {
172                 archipelagolog("Unknown received request\n");
173                 xseg_put_request(xseg, req, srcport);
174             } else if (!(req->state & XS_SERVED)) {
175                 return -1;
176             } else {
177                 break;
178             }
179         }
180         xseg_wait_signal(xseg, psd, 100000UL);
181     }
182     xseg_cancel_wait(xseg, srcport);
183     return 0;
184 }
185
186 static void xseg_request_handler(void *state)
187 {
188     BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
189     void *psd = xseg_get_signal_desc(s->xseg, s->port);
190     qemu_mutex_lock(&s->request_mutex);
191
192     while (!s->stopping) {
193         struct xseg_request *req;
194         void *data;
195         xseg_prepare_wait(s->xseg, s->srcport);
196         req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
197         if (req) {
198             AIORequestData *reqdata;
199             ArchipelagoSegmentedRequest *segreq;
200             xseg_get_req_data(s->xseg, req, (void **)&reqdata);
201
202             switch (reqdata->op) {
203             case ARCHIP_OP_READ:
204                 data = xseg_get_data(s->xseg, req);
205                 segreq = reqdata->segreq;
206                 segreq->count += req->serviced;
207
208                 qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
209                                     data,
210                                     req->serviced);
211
212                 xseg_put_request(s->xseg, req, s->srcport);
213
214                 if (atomic_fetch_dec(&segreq->ref) == 1) {
215                     if (!segreq->failed) {
216                         reqdata->aio_cb->ret = segreq->count;
217                         archipelago_finish_aiocb(reqdata);
218                         g_free(segreq);
219                     } else {
220                         g_free(segreq);
221                         g_free(reqdata);
222                     }
223                 } else {
224                     g_free(reqdata);
225                 }
226                 break;
227             case ARCHIP_OP_WRITE:
228             case ARCHIP_OP_FLUSH:
229                 segreq = reqdata->segreq;
230                 segreq->count += req->serviced;
231                 xseg_put_request(s->xseg, req, s->srcport);
232
233                 if (atomic_fetch_dec(&segreq->ref) == 1) {
234                     if (!segreq->failed) {
235                         reqdata->aio_cb->ret = segreq->count;
236                         archipelago_finish_aiocb(reqdata);
237                         g_free(segreq);
238                     } else {
239                         g_free(segreq);
240                         g_free(reqdata);
241                     }
242                 } else {
243                     g_free(reqdata);
244                 }
245                 break;
246             case ARCHIP_OP_VOLINFO:
247             case ARCHIP_OP_TRUNCATE:
248                 s->is_signaled = true;
249                 qemu_cond_signal(&s->archip_cond);
250                 break;
251             }
252         } else {
253             xseg_wait_signal(s->xseg, psd, 100000UL);
254         }
255         xseg_cancel_wait(s->xseg, s->srcport);
256     }
257
258     s->th_is_signaled = true;
259     qemu_cond_signal(&s->request_cond);
260     qemu_mutex_unlock(&s->request_mutex);
261     qemu_thread_exit(NULL);
262 }
263
264 static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
265 {
266     if (xseg_initialize()) {
267         archipelagolog("Cannot initialize XSEG\n");
268         goto err_exit;
269     }
270
271     s->xseg = xseg_join("posix", s->segment_name,
272                         "posixfd", NULL);
273     if (!s->xseg) {
274         archipelagolog("Cannot join XSEG shared memory segment\n");
275         goto err_exit;
276     }
277     s->port = xseg_bind_dynport(s->xseg);
278     s->srcport = s->port->portno;
279     init_local_signal(s->xseg, s->sport, s->srcport);
280     return 0;
281
282 err_exit:
283     return -1;
284 }
285
286 static int qemu_archipelago_init(BDRVArchipelagoState *s)
287 {
288     int ret;
289
290     ret = qemu_archipelago_xseg_init(s);
291     if (ret < 0) {
292         error_report("Cannot initialize XSEG. Aborting...");
293         goto err_exit;
294     }
295
296     qemu_cond_init(&s->archip_cond);
297     qemu_mutex_init(&s->archip_mutex);
298     qemu_cond_init(&s->request_cond);
299     qemu_mutex_init(&s->request_mutex);
300     s->th_is_signaled = false;
301     qemu_thread_create(&s->request_th, "xseg_io_th",
302                        (void *) xseg_request_handler,
303                        (void *) s, QEMU_THREAD_JOINABLE);
304
305 err_exit:
306     return ret;
307 }
308
309 static void qemu_archipelago_complete_aio(void *opaque)
310 {
311     AIORequestData *reqdata = (AIORequestData *) opaque;
312     ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
313
314     aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
315     aio_cb->status = 0;
316
317     qemu_aio_unref(aio_cb);
318     g_free(reqdata);
319 }
320
321 static void xseg_find_port(char *pstr, const char *needle, xport *aport)
322 {
323     const char *a;
324     char *endptr = NULL;
325     unsigned long port;
326     if (strstart(pstr, needle, &a)) {
327         if (strlen(a) > 0) {
328             port = strtoul(a, &endptr, 10);
329             if (strlen(endptr)) {
330                 *aport = -2;
331                 return;
332             }
333             *aport = (xport) port;
334         }
335     }
336 }
337
338 static void xseg_find_segment(char *pstr, const char *needle,
339                               char **segment_name)
340 {
341     const char *a;
342     if (strstart(pstr, needle, &a)) {
343         if (strlen(a) > 0) {
344             *segment_name = g_strdup(a);
345         }
346     }
347 }
348
349 static void parse_filename_opts(const char *filename, Error **errp,
350                                 char **volume, char **segment_name,
351                                 xport *mport, xport *vport)
352 {
353     const char *start;
354     char *tokens[4], *ds;
355     int idx;
356     xport lmport = NoPort, lvport = NoPort;
357
358     strstart(filename, "archipelago:", &start);
359
360     ds = g_strdup(start);
361     tokens[0] = strtok(ds, "/");
362     tokens[1] = strtok(NULL, ":");
363     tokens[2] = strtok(NULL, ":");
364     tokens[3] = strtok(NULL, "\0");
365
366     if (!strlen(tokens[0])) {
367         error_setg(errp, "volume name must be specified first");
368         g_free(ds);
369         return;
370     }
371
372     for (idx = 1; idx < 4; idx++) {
373         if (tokens[idx] != NULL) {
374             if (strstart(tokens[idx], "mport=", NULL)) {
375                 xseg_find_port(tokens[idx], "mport=", &lmport);
376             }
377             if (strstart(tokens[idx], "vport=", NULL)) {
378                 xseg_find_port(tokens[idx], "vport=", &lvport);
379             }
380             if (strstart(tokens[idx], "segment=", NULL)) {
381                 xseg_find_segment(tokens[idx], "segment=", segment_name);
382             }
383         }
384     }
385
386     if ((lmport == -2) || (lvport == -2)) {
387         error_setg(errp, "mport and/or vport must be set");
388         g_free(ds);
389         return;
390     }
391     *volume = g_strdup(tokens[0]);
392     *mport = lmport;
393     *vport = lvport;
394     g_free(ds);
395 }
396
397 static void archipelago_parse_filename(const char *filename, QDict *options,
398                                        Error **errp)
399 {
400     const char *start;
401     char *volume = NULL, *segment_name = NULL;
402     xport mport = NoPort, vport = NoPort;
403
404     if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
405             || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
406             || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
407             || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
408         error_setg(errp, "volume/mport/vport/segment and a file name may not"
409                          " be specified at the same time");
410         return;
411     }
412
413     if (!strstart(filename, "archipelago:", &start)) {
414         error_setg(errp, "File name must start with 'archipelago:'");
415         return;
416     }
417
418     if (!strlen(start) || strstart(start, "/", NULL)) {
419         error_setg(errp, "volume name must be specified");
420         return;
421     }
422
423     parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
424
425     if (volume) {
426         qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
427         g_free(volume);
428     }
429     if (segment_name) {
430         qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
431                   qstring_from_str(segment_name));
432         g_free(segment_name);
433     }
434     if (mport != NoPort) {
435         qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
436     }
437     if (vport != NoPort) {
438         qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
439     }
440 }
441
442 static QemuOptsList archipelago_runtime_opts = {
443     .name = "archipelago",
444     .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
445     .desc = {
446         {
447             .name = ARCHIPELAGO_OPT_VOLUME,
448             .type = QEMU_OPT_STRING,
449             .help = "Name of the volume image",
450         },
451         {
452             .name = ARCHIPELAGO_OPT_SEGMENT,
453             .type = QEMU_OPT_STRING,
454             .help = "Name of the Archipelago shared memory segment",
455         },
456         {
457             .name = ARCHIPELAGO_OPT_MPORT,
458             .type = QEMU_OPT_NUMBER,
459             .help = "Archipelago mapperd port number"
460         },
461         {
462             .name = ARCHIPELAGO_OPT_VPORT,
463             .type = QEMU_OPT_NUMBER,
464             .help = "Archipelago vlmcd port number"
465
466         },
467         { /* end of list */ }
468     },
469 };
470
471 static int qemu_archipelago_open(BlockDriverState *bs,
472                                  QDict *options,
473                                  int bdrv_flags,
474                                  Error **errp)
475 {
476     int ret = 0;
477     const char *volume, *segment_name;
478     QemuOpts *opts;
479     Error *local_err = NULL;
480     BDRVArchipelagoState *s = bs->opaque;
481
482     opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
483     qemu_opts_absorb_qdict(opts, options, &local_err);
484     if (local_err) {
485         error_propagate(errp, local_err);
486         ret = -EINVAL;
487         goto err_exit;
488     }
489
490     s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
491                                      ARCHIPELAGO_DFL_MPORT);
492     s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
493                                      ARCHIPELAGO_DFL_VPORT);
494
495     segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
496     if (segment_name == NULL) {
497         s->segment_name = g_strdup("archipelago");
498     } else {
499         s->segment_name = g_strdup(segment_name);
500     }
501
502     volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
503     if (volume == NULL) {
504         error_setg(errp, "archipelago block driver requires the 'volume'"
505                    " option");
506         ret = -EINVAL;
507         goto err_exit;
508     }
509     s->volname = g_strdup(volume);
510
511     /* Initialize XSEG, join shared memory segment */
512     ret = qemu_archipelago_init(s);
513     if (ret < 0) {
514         error_setg(errp, "cannot initialize XSEG and join shared "
515                    "memory segment");
516         goto err_exit;
517     }
518
519     qemu_opts_del(opts);
520     return 0;
521
522 err_exit:
523     g_free(s->volname);
524     g_free(s->segment_name);
525     qemu_opts_del(opts);
526     return ret;
527 }
528
529 static void qemu_archipelago_close(BlockDriverState *bs)
530 {
531     int r, targetlen;
532     char *target;
533     struct xseg_request *req;
534     BDRVArchipelagoState *s = bs->opaque;
535
536     s->stopping = true;
537
538     qemu_mutex_lock(&s->request_mutex);
539     while (!s->th_is_signaled) {
540         qemu_cond_wait(&s->request_cond,
541                        &s->request_mutex);
542     }
543     qemu_mutex_unlock(&s->request_mutex);
544     qemu_thread_join(&s->request_th);
545     qemu_cond_destroy(&s->request_cond);
546     qemu_mutex_destroy(&s->request_mutex);
547
548     qemu_cond_destroy(&s->archip_cond);
549     qemu_mutex_destroy(&s->archip_mutex);
550
551     targetlen = strlen(s->volname);
552     req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
553     if (!req) {
554         archipelagolog("Cannot get XSEG request\n");
555         goto err_exit;
556     }
557     r = xseg_prep_request(s->xseg, req, targetlen, 0);
558     if (r < 0) {
559         xseg_put_request(s->xseg, req, s->srcport);
560         archipelagolog("Cannot prepare XSEG close request\n");
561         goto err_exit;
562     }
563
564     target = xseg_get_target(s->xseg, req);
565     memcpy(target, s->volname, targetlen);
566     req->size = req->datalen;
567     req->offset = 0;
568     req->op = X_CLOSE;
569
570     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
571     if (p == NoPort) {
572         xseg_put_request(s->xseg, req, s->srcport);
573         archipelagolog("Cannot submit XSEG close request\n");
574         goto err_exit;
575     }
576
577     xseg_signal(s->xseg, p);
578     wait_reply(s->xseg, s->srcport, s->port, req);
579
580     xseg_put_request(s->xseg, req, s->srcport);
581
582 err_exit:
583     g_free(s->volname);
584     g_free(s->segment_name);
585     xseg_quit_local_signal(s->xseg, s->srcport);
586     xseg_leave_dynport(s->xseg, s->port);
587     xseg_leave(s->xseg);
588 }
589
590 static int qemu_archipelago_create_volume(Error **errp, const char *volname,
591                                           char *segment_name,
592                                           uint64_t size, xport mportno,
593                                           xport vportno)
594 {
595     int ret, targetlen;
596     struct xseg *xseg = NULL;
597     struct xseg_request *req;
598     struct xseg_request_clone *xclone;
599     struct xseg_port *port;
600     xport srcport = NoPort, sport = NoPort;
601     char *target;
602
603     /* Try default values if none has been set */
604     if (mportno == (xport) -1) {
605         mportno = ARCHIPELAGO_DFL_MPORT;
606     }
607
608     if (vportno == (xport) -1) {
609         vportno = ARCHIPELAGO_DFL_VPORT;
610     }
611
612     if (xseg_initialize()) {
613         error_setg(errp, "Cannot initialize XSEG");
614         return -1;
615     }
616
617     xseg = xseg_join("posix", segment_name,
618                      "posixfd", NULL);
619
620     if (!xseg) {
621         error_setg(errp, "Cannot join XSEG shared memory segment");
622         return -1;
623     }
624
625     port = xseg_bind_dynport(xseg);
626     srcport = port->portno;
627     init_local_signal(xseg, sport, srcport);
628
629     req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
630     if (!req) {
631         error_setg(errp, "Cannot get XSEG request");
632         return -1;
633     }
634
635     targetlen = strlen(volname);
636     ret = xseg_prep_request(xseg, req, targetlen,
637                             sizeof(struct xseg_request_clone));
638     if (ret < 0) {
639         error_setg(errp, "Cannot prepare XSEG request");
640         goto err_exit;
641     }
642
643     target = xseg_get_target(xseg, req);
644     if (!target) {
645         error_setg(errp, "Cannot get XSEG target.");
646         goto err_exit;
647     }
648     memcpy(target, volname, targetlen);
649     xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
650     memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
651     xclone->targetlen = 0;
652     xclone->size = size;
653     req->offset = 0;
654     req->size = req->datalen;
655     req->op = X_CLONE;
656
657     xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
658     if (p == NoPort) {
659         error_setg(errp, "Could not submit XSEG request");
660         goto err_exit;
661     }
662     xseg_signal(xseg, p);
663
664     ret = wait_reply(xseg, srcport, port, req);
665     if (ret < 0) {
666         error_setg(errp, "wait_reply() error.");
667     }
668
669     xseg_put_request(xseg, req, srcport);
670     xseg_quit_local_signal(xseg, srcport);
671     xseg_leave_dynport(xseg, port);
672     xseg_leave(xseg);
673     return ret;
674
675 err_exit:
676     xseg_put_request(xseg, req, srcport);
677     xseg_quit_local_signal(xseg, srcport);
678     xseg_leave_dynport(xseg, port);
679     xseg_leave(xseg);
680     return -1;
681 }
682
683 static int qemu_archipelago_create(const char *filename,
684                                    QemuOpts *options,
685                                    Error **errp)
686 {
687     int ret = 0;
688     uint64_t total_size = 0;
689     char *volname = NULL, *segment_name = NULL;
690     const char *start;
691     xport mport = NoPort, vport = NoPort;
692
693     if (!strstart(filename, "archipelago:", &start)) {
694         error_setg(errp, "File name must start with 'archipelago:'");
695         return -1;
696     }
697
698     if (!strlen(start) || strstart(start, "/", NULL)) {
699         error_setg(errp, "volume name must be specified");
700         return -1;
701     }
702
703     parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
704                         &vport);
705     total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
706                           BDRV_SECTOR_SIZE);
707
708     if (segment_name == NULL) {
709         segment_name = g_strdup("archipelago");
710     }
711
712     /* Create an Archipelago volume */
713     ret = qemu_archipelago_create_volume(errp, volname, segment_name,
714                                          total_size, mport,
715                                          vport);
716
717     g_free(volname);
718     g_free(segment_name);
719     return ret;
720 }
721
722 static const AIOCBInfo archipelago_aiocb_info = {
723     .aiocb_size = sizeof(ArchipelagoAIOCB),
724 };
725
726 static int archipelago_submit_request(BDRVArchipelagoState *s,
727                                         uint64_t bufidx,
728                                         size_t count,
729                                         off_t offset,
730                                         ArchipelagoAIOCB *aio_cb,
731                                         ArchipelagoSegmentedRequest *segreq,
732                                         int op)
733 {
734     int ret, targetlen;
735     char *target;
736     void *data = NULL;
737     struct xseg_request *req;
738     AIORequestData *reqdata = g_new(AIORequestData, 1);
739
740     targetlen = strlen(s->volname);
741     req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
742     if (!req) {
743         archipelagolog("Cannot get XSEG request\n");
744         goto err_exit2;
745     }
746     ret = xseg_prep_request(s->xseg, req, targetlen, count);
747     if (ret < 0) {
748         archipelagolog("Cannot prepare XSEG request\n");
749         goto err_exit;
750     }
751     target = xseg_get_target(s->xseg, req);
752     if (!target) {
753         archipelagolog("Cannot get XSEG target\n");
754         goto err_exit;
755     }
756     memcpy(target, s->volname, targetlen);
757     req->size = count;
758     req->offset = offset;
759
760     switch (op) {
761     case ARCHIP_OP_READ:
762         req->op = X_READ;
763         break;
764     case ARCHIP_OP_WRITE:
765         req->op = X_WRITE;
766         break;
767     case ARCHIP_OP_FLUSH:
768         req->op = X_FLUSH;
769         break;
770     }
771     reqdata->volname = s->volname;
772     reqdata->offset = offset;
773     reqdata->size = count;
774     reqdata->bufidx = bufidx;
775     reqdata->aio_cb = aio_cb;
776     reqdata->segreq = segreq;
777     reqdata->op = op;
778
779     xseg_set_req_data(s->xseg, req, reqdata);
780     if (op == ARCHIP_OP_WRITE) {
781         data = xseg_get_data(s->xseg, req);
782         if (!data) {
783             archipelagolog("Cannot get XSEG data\n");
784             goto err_exit;
785         }
786         qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
787     }
788
789     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
790     if (p == NoPort) {
791         archipelagolog("Could not submit XSEG request\n");
792         goto err_exit;
793     }
794     xseg_signal(s->xseg, p);
795     return 0;
796
797 err_exit:
798     g_free(reqdata);
799     xseg_put_request(s->xseg, req, s->srcport);
800     return -EIO;
801 err_exit2:
802     g_free(reqdata);
803     return -EIO;
804 }
805
806 static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
807                                         size_t count,
808                                         off_t offset,
809                                         ArchipelagoAIOCB *aio_cb,
810                                         int op)
811 {
812     int ret, segments_nr;
813     size_t pos = 0;
814     ArchipelagoSegmentedRequest *segreq;
815
816     segreq = g_new0(ArchipelagoSegmentedRequest, 1);
817
818     if (op == ARCHIP_OP_FLUSH) {
819         segments_nr = 1;
820     } else {
821         segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
822                       ((count % MAX_REQUEST_SIZE) ? 1 : 0);
823     }
824     segreq->total = count;
825     atomic_mb_set(&segreq->ref, segments_nr);
826
827     while (segments_nr > 1) {
828         ret = archipelago_submit_request(s, pos,
829                                             MAX_REQUEST_SIZE,
830                                             offset + pos,
831                                             aio_cb, segreq, op);
832
833         if (ret < 0) {
834             goto err_exit;
835         }
836         count -= MAX_REQUEST_SIZE;
837         pos += MAX_REQUEST_SIZE;
838         segments_nr--;
839     }
840     ret = archipelago_submit_request(s, pos, count, offset + pos,
841                                      aio_cb, segreq, op);
842
843     if (ret < 0) {
844         goto err_exit;
845     }
846     return 0;
847
848 err_exit:
849     segreq->failed = 1;
850     if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
851         g_free(segreq);
852     }
853     return ret;
854 }
855
856 static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
857                                            int64_t sector_num,
858                                            QEMUIOVector *qiov,
859                                            int nb_sectors,
860                                            BlockCompletionFunc *cb,
861                                            void *opaque,
862                                            int op)
863 {
864     ArchipelagoAIOCB *aio_cb;
865     BDRVArchipelagoState *s = bs->opaque;
866     int64_t size, off;
867     int ret;
868
869     aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
870     aio_cb->cmd = op;
871     aio_cb->qiov = qiov;
872
873     aio_cb->ret = 0;
874     aio_cb->s = s;
875     aio_cb->status = -EINPROGRESS;
876
877     off = sector_num * BDRV_SECTOR_SIZE;
878     size = nb_sectors * BDRV_SECTOR_SIZE;
879     aio_cb->size = size;
880
881     ret = archipelago_aio_segmented_rw(s, size, off,
882                                        aio_cb, op);
883     if (ret < 0) {
884         goto err_exit;
885     }
886     return &aio_cb->common;
887
888 err_exit:
889     error_report("qemu_archipelago_aio_rw(): I/O Error");
890     qemu_aio_unref(aio_cb);
891     return NULL;
892 }
893
894 static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
895         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
896         BlockCompletionFunc *cb, void *opaque)
897 {
898     return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
899                                    opaque, ARCHIP_OP_READ);
900 }
901
902 static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
903         int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
904         BlockCompletionFunc *cb, void *opaque)
905 {
906     return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
907                                    opaque, ARCHIP_OP_WRITE);
908 }
909
910 static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
911 {
912     uint64_t size;
913     int ret, targetlen;
914     struct xseg_request *req;
915     struct xseg_reply_info *xinfo;
916     AIORequestData *reqdata = g_new(AIORequestData, 1);
917
918     const char *volname = s->volname;
919     targetlen = strlen(volname);
920     req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
921     if (!req) {
922         archipelagolog("Cannot get XSEG request\n");
923         goto err_exit2;
924     }
925     ret = xseg_prep_request(s->xseg, req, targetlen,
926                             sizeof(struct xseg_reply_info));
927     if (ret < 0) {
928         archipelagolog("Cannot prepare XSEG request\n");
929         goto err_exit;
930     }
931     char *target = xseg_get_target(s->xseg, req);
932     if (!target) {
933         archipelagolog("Cannot get XSEG target\n");
934         goto err_exit;
935     }
936     memcpy(target, volname, targetlen);
937     req->size = req->datalen;
938     req->offset = 0;
939     req->op = X_INFO;
940
941     reqdata->op = ARCHIP_OP_VOLINFO;
942     reqdata->volname = volname;
943     xseg_set_req_data(s->xseg, req, reqdata);
944
945     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
946     if (p == NoPort) {
947         archipelagolog("Cannot submit XSEG request\n");
948         goto err_exit;
949     }
950     xseg_signal(s->xseg, p);
951     qemu_mutex_lock(&s->archip_mutex);
952     while (!s->is_signaled) {
953         qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
954     }
955     s->is_signaled = false;
956     qemu_mutex_unlock(&s->archip_mutex);
957
958     xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
959     size = xinfo->size;
960     xseg_put_request(s->xseg, req, s->srcport);
961     g_free(reqdata);
962     s->size = size;
963     return size;
964
965 err_exit:
966     xseg_put_request(s->xseg, req, s->srcport);
967 err_exit2:
968     g_free(reqdata);
969     return -EIO;
970 }
971
972 static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
973 {
974     BDRVArchipelagoState *s = bs->opaque;
975
976     return archipelago_volume_info(s);
977 }
978
979 static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
980 {
981     int ret, targetlen;
982     struct xseg_request *req;
983     BDRVArchipelagoState *s = bs->opaque;
984     AIORequestData *reqdata = g_new(AIORequestData, 1);
985
986     const char *volname = s->volname;
987     targetlen = strlen(volname);
988     req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
989     if (!req) {
990         archipelagolog("Cannot get XSEG request\n");
991         goto err_exit2;
992     }
993
994     ret = xseg_prep_request(s->xseg, req, targetlen, 0);
995     if (ret < 0) {
996         archipelagolog("Cannot prepare XSEG request\n");
997         goto err_exit;
998     }
999     char *target = xseg_get_target(s->xseg, req);
1000     if (!target) {
1001         archipelagolog("Cannot get XSEG target\n");
1002         goto err_exit;
1003     }
1004     memcpy(target, volname, targetlen);
1005     req->offset = offset;
1006     req->op = X_TRUNCATE;
1007
1008     reqdata->op = ARCHIP_OP_TRUNCATE;
1009     reqdata->volname = volname;
1010
1011     xseg_set_req_data(s->xseg, req, reqdata);
1012
1013     xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1014     if (p == NoPort) {
1015         archipelagolog("Cannot submit XSEG request\n");
1016         goto err_exit;
1017     }
1018
1019     xseg_signal(s->xseg, p);
1020     qemu_mutex_lock(&s->archip_mutex);
1021     while (!s->is_signaled) {
1022         qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1023     }
1024     s->is_signaled = false;
1025     qemu_mutex_unlock(&s->archip_mutex);
1026     xseg_put_request(s->xseg, req, s->srcport);
1027     g_free(reqdata);
1028     return 0;
1029
1030 err_exit:
1031     xseg_put_request(s->xseg, req, s->srcport);
1032 err_exit2:
1033     g_free(reqdata);
1034     return -EIO;
1035 }
1036
1037 static QemuOptsList qemu_archipelago_create_opts = {
1038     .name = "archipelago-create-opts",
1039     .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1040     .desc = {
1041         {
1042             .name = BLOCK_OPT_SIZE,
1043             .type = QEMU_OPT_SIZE,
1044             .help = "Virtual disk size"
1045         },
1046         { /* end of list */ }
1047     }
1048 };
1049
1050 static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1051         BlockCompletionFunc *cb, void *opaque)
1052 {
1053     return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1054                                    ARCHIP_OP_FLUSH);
1055 }
1056
1057 static BlockDriver bdrv_archipelago = {
1058     .format_name         = "archipelago",
1059     .protocol_name       = "archipelago",
1060     .instance_size       = sizeof(BDRVArchipelagoState),
1061     .bdrv_parse_filename = archipelago_parse_filename,
1062     .bdrv_file_open      = qemu_archipelago_open,
1063     .bdrv_close          = qemu_archipelago_close,
1064     .bdrv_create         = qemu_archipelago_create,
1065     .bdrv_getlength      = qemu_archipelago_getlength,
1066     .bdrv_truncate       = qemu_archipelago_truncate,
1067     .bdrv_aio_readv      = qemu_archipelago_aio_readv,
1068     .bdrv_aio_writev     = qemu_archipelago_aio_writev,
1069     .bdrv_aio_flush      = qemu_archipelago_aio_flush,
1070     .bdrv_has_zero_init  = bdrv_has_zero_init_1,
1071     .create_opts         = &qemu_archipelago_create_opts,
1072 };
1073
1074 static void bdrv_archipelago_init(void)
1075 {
1076     bdrv_register(&bdrv_archipelago);
1077 }
1078
1079 block_init(bdrv_archipelago_init);