fix
authorMiklos Szeredi <miklos@szeredi.hu>
Mon, 14 Feb 2005 17:22:08 +0000 (17:22 +0000)
committerMiklos Szeredi <miklos@szeredi.hu>
Mon, 14 Feb 2005 17:22:08 +0000 (17:22 +0000)
ChangeLog
sshfs.c

index dafdd17..3fc39f5 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,8 @@
+2005-02-14  Miklos Szeredi <miklos@szeredi.hu>
+
+       * Added asynchronous writeback (big performance gain) and made
+       this the default.  Can be disabled with '-o sshfs_sync'
 2005-02-09  Miklos Szeredi <miklos@szeredi.hu>
 
        * Added option to start arbitary command instead of 'ssh'
diff --git a/sshfs.c b/sshfs.c
index b58d156..d51f661 100644 (file)
--- a/sshfs.c
+++ b/sshfs.c
@@ -87,6 +87,7 @@
 static int infd;
 static int outfd;
 static int debug = 0;
+static int sync_write = 0;
 static char *base_path;
 
 struct buffer {
@@ -95,12 +96,24 @@ struct buffer {
     size_t size;
 };
 
+struct list_head {
+    struct list_head *prev;
+    struct list_head *next;
+};
+
+struct request;
+typedef void (*request_func)(struct request *);
+
 struct request {
     unsigned int want_reply;
     sem_t ready;
     uint8_t reply_type;
+    int replied;
     struct buffer reply;
     struct timeval start;
+    void *data;
+    request_func end_func;
+    struct list_head list;
 };
 
 struct openfile {
@@ -113,6 +126,9 @@ struct openfile {
 
 struct sshfs_file {
     struct buffer handle;
+    struct list_head write_req_list;
+    pthread_cond_t write_req_finished;
+    int write_error;
 };
 
 static GHashTable *reqtab;
@@ -166,12 +182,14 @@ static struct opt ssh_opts[] = {
 enum {
     SOPT_DIRECTPORT,
     SOPT_SSHCMD,
+    SOPT_SYNC,
     SOPT_LAST /* Last entry in this list! */
 };
 
 static struct opt sshfs_opts[] = {
     [SOPT_DIRECTPORT] = { .optname = "directport" },
     [SOPT_SSHCMD]     = { .optname = "ssh_command" },
+    [SOPT_SYNC]       = { .optname = "sshfs_sync" },
     [SOPT_LAST]       = { .optname = NULL }
 };
 
@@ -212,6 +230,43 @@ static const char *type_name(uint8_t type)
     }
 }
 
+#define container_of(ptr, type, member) ({                     \
+        const typeof( ((type *)0)->member ) *__mptr = (ptr);   \
+        (type *)( (char *)__mptr - offsetof(type,member) );})
+
+#define list_entry(ptr, type, member) \
+       container_of(ptr, type, member)
+
+static void list_init(struct list_head *head)
+{
+    head->next = head;
+    head->prev = head;
+}
+
+static void list_add(struct list_head *new, struct list_head *head)
+{
+    struct list_head *prev = head;
+    struct list_head *next = head->next;
+    next->prev = new;
+    new->next = next;
+    new->prev = prev;
+    prev->next = new;
+}
+
+static void list_del(struct list_head *entry)
+{
+    struct list_head *prev = entry->prev;
+    struct list_head *next = entry->next;
+    next->prev = prev;
+    prev->next = next;
+
+}
+
+static int list_empty(const struct list_head *head)
+{
+       return head->next == head;
+}
+
 static inline void buf_init(struct buffer *buf, size_t size)
 {
     if (size) {
@@ -671,10 +726,13 @@ static void *process_requests(void *_data)
             DEBUG("  [%05i] %14s %8ibytes (%ims)\n", id, type_name(type),
                   buf.size+5, difftime);
             req->reply = buf;
+            req->reply_type = type;
+            req->replied = 1;
             if (req->want_reply) {
-                req->reply_type = type;
                 sem_post(&req->ready);
             } else {
+                if (req->end_func)
+                    req->end_func(req);
                 buf_free(&req->reply);
                 sem_destroy(&req->ready);
                 free(req);
@@ -703,8 +761,10 @@ static int start_processing_thread(void)
     return 0;
 }
 
-static int sftp_request(uint8_t type, const struct buffer *buf,
-                        uint8_t expect_type, struct buffer *outbuf)
+static int sftp_request_common(uint8_t type, const struct buffer *buf,
+                               uint8_t expect_type, struct buffer *outbuf,
+                               request_func begin_func, request_func end_func,
+                               void *data)
 {
     int err;
     struct buffer buf2;
@@ -716,8 +776,12 @@ static int sftp_request(uint8_t type, const struct buffer *buf,
     buf_add_mem(&buf2, buf->p, buf->len);
 
     req->want_reply = expect_type != 0 ? 1 : 0;
+    req->end_func = end_func;
+    req->data = data;
     sem_init(&req->ready, 0, 0);
     buf_init(&req->reply, 0);
+    if (begin_func)
+        begin_func(req);
     pthread_mutex_lock(&lock);
     err = start_processing_thread();
     g_hash_table_insert(reqtab, (gpointer) id, req);
@@ -778,6 +842,8 @@ static int sftp_request(uint8_t type, const struct buffer *buf,
     }
 
  out:
+    if (end_func)
+        end_func(req);
     buf_free(&buf2);
     buf_free(&req->reply);
     sem_destroy(&req->ready);
@@ -785,6 +851,20 @@ static int sftp_request(uint8_t type, const struct buffer *buf,
     return err;        
 }
 
+static int sftp_request(uint8_t type, const struct buffer *buf,
+                        uint8_t expect_type, struct buffer *outbuf)
+{
+    return sftp_request_common(type, buf, expect_type, outbuf, NULL, NULL,
+                               NULL);
+}
+
+static int sftp_request_async(uint8_t type, const struct buffer *buf,
+                              request_func begin_func, request_func end_func,
+                              void *data)
+{
+    return sftp_request_common(type, buf, 0, NULL, begin_func, end_func, data);
+}
+
 static int sshfs_getattr(const char *path, struct stat *stbuf)
 {
     int err;
@@ -1018,6 +1098,8 @@ static int sshfs_open(const char *path, struct fuse_file_info *fi)
         return -EINVAL;
 
     sf = g_new0(struct sshfs_file, 1);
+    list_init(&sf->write_req_list);
+    pthread_cond_init(&sf->write_req_finished, NULL);
     buf_init(&buf, 0);
     buf_add_path(&buf, path);
     buf_add_uint32(&buf, pflags);
@@ -1032,11 +1114,44 @@ static int sshfs_open(const char *path, struct fuse_file_info *fi)
     return err;
 }
 
+static int sshfs_flush(const char *path, struct fuse_file_info *fi)
+{
+    int err;
+    struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
+    struct list_head write_req_list;
+    struct list_head *curr_list;
+
+    if (sync_write)
+        return 0;
+
+    (void) path;
+    pthread_mutex_lock(&lock);
+    if (!list_empty(&sf->write_req_list)) {
+        curr_list = sf->write_req_list.prev;
+        list_del(&sf->write_req_list);
+        list_init(&sf->write_req_list);
+        list_add(&write_req_list, curr_list);
+        while (!list_empty(&write_req_list))
+            pthread_cond_wait(&sf->write_req_finished, &lock);
+    }
+    err = sf->write_error;
+    sf->write_error = 0;
+    pthread_mutex_unlock(&lock);
+    return err;
+}
+
+static int sshfs_fsync(const char *path, int isdatasync,
+                       struct fuse_file_info *fi)
+{
+    (void) isdatasync;
+    return sshfs_flush(path, fi);
+}
+
 static int sshfs_release(const char *path, struct fuse_file_info *fi)
 {
     struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
     struct buffer *handle = &sf->handle;
-    (void) path;
+    sshfs_flush(path, fi);
     sftp_request(SSH_FXP_CLOSE, handle, 0, NULL);
     buf_free(handle);
     g_free(sf);
@@ -1075,6 +1190,31 @@ static int sshfs_read(const char *path, char *rbuf, size_t size, off_t offset,
     return err;    
 }
 
+static void sshfs_write_begin(struct request *req)
+{
+    struct sshfs_file *sf = (struct sshfs_file *) req->data;
+    pthread_mutex_lock(&lock);
+    list_add(&req->list, &sf->write_req_list);
+    pthread_mutex_unlock(&lock);
+}
+
+static void sshfs_write_end(struct request *req)
+{
+    uint32_t serr;
+    struct sshfs_file *sf = (struct sshfs_file *) req->data;
+
+    pthread_mutex_lock(&lock);
+    if (req->replied) {
+        if (req->reply_type != SSH_FXP_STATUS)
+            fprintf(stderr, "protocol error\n");
+        else if (buf_get_uint32(&req->reply, &serr) != -1 && serr != SSH_FX_OK)
+            sf->write_error = -EIO;
+    }
+    list_del(&req->list);
+    pthread_cond_broadcast(&sf->write_req_finished);
+    pthread_mutex_unlock(&lock);
+}
+
 static int sshfs_write(const char *path, const char *wbuf, size_t size,
                        off_t offset, struct fuse_file_info *fi)
 {
@@ -1083,6 +1223,7 @@ static int sshfs_write(const char *path, const char *wbuf, size_t size,
     struct buffer data;
     struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
     struct buffer *handle = &sf->handle;
+
     (void) path;
     data.p = (uint8_t *) wbuf;
     data.len = size;
@@ -1090,7 +1231,11 @@ static int sshfs_write(const char *path, const char *wbuf, size_t size,
     buf_add_buf(&buf, handle);
     buf_add_uint64(&buf, offset);
     buf_add_data(&buf, &data);
-    err = sftp_request(SSH_FXP_WRITE, &buf, SSH_FXP_STATUS, NULL);
+    if (!sync_write && !sf->write_error)
+        err = sftp_request_async(SSH_FXP_WRITE, &buf, sshfs_write_begin, 
+                                 sshfs_write_end, sf);
+    else
+        err = sftp_request(SSH_FXP_WRITE, &buf, SSH_FXP_STATUS, NULL);
     buf_free(&buf);
     return err ? err : (int) size;
 }
@@ -1151,6 +1296,8 @@ static struct fuse_cache_operations sshfs_oper = {
         .truncate   = sshfs_truncate,
         .utime      = sshfs_utime,
         .open       = sshfs_open,
+        .flush      = sshfs_flush,
+        .fsync      = sshfs_fsync,
         .release    = sshfs_release,
         .read       = sshfs_read,
         .write      = sshfs_write,
@@ -1169,7 +1316,8 @@ static void usage(const char *progname)
 "    -V                     show version information\n"
 "    -p PORT                equivalent to '-o port=PORT'\n"
 "    -C                     equivalent to '-o compression=yes'\n"
-"    -o cache=YESNO         Enable caching {yes,no} (default: yes)\n"
+"    -o sshfs_sync          synchronous writes\n"
+"    -o cache=YESNO         enable caching {yes,no} (default: yes)\n"
 "    -o cache_timeout=N     sets timeout for caches in seconds (default: 20)\n"
 "    -o cache_X_timeout=N   sets timeout for {stat,dir,link} cache\n"
 "    -o ssh_command=CMD     execute CMD instead of 'ssh'\n"
@@ -1242,6 +1390,8 @@ int main(int argc, char *argv[])
         base_path = g_strdup(base_path);
 
     process_options(&newargc, newargv, sshfs_opts, 1);
+    if (sshfs_opts[SOPT_SYNC].present)
+        sync_write = 1;
     if (sshfs_opts[SOPT_DIRECTPORT].present)
         res = connect_to(host, sshfs_opts[SOPT_DIRECTPORT].value);
     else {