static int infd;
static int outfd;
static int debug = 0;
+static int sync_write = 0;
static char *base_path;
struct buffer {
size_t size;
};
+struct list_head {
+ struct list_head *prev;
+ struct list_head *next;
+};
+
+struct request;
+typedef void (*request_func)(struct request *);
+
struct request {
unsigned int want_reply;
sem_t ready;
uint8_t reply_type;
+ int replied;
struct buffer reply;
struct timeval start;
+ void *data;
+ request_func end_func;
+ struct list_head list;
};
struct openfile {
struct sshfs_file {
struct buffer handle;
+ struct list_head write_req_list;
+ pthread_cond_t write_req_finished;
+ int write_error;
};
static GHashTable *reqtab;
enum {
SOPT_DIRECTPORT,
SOPT_SSHCMD,
+ SOPT_SYNC,
SOPT_LAST /* Last entry in this list! */
};
static struct opt sshfs_opts[] = {
[SOPT_DIRECTPORT] = { .optname = "directport" },
[SOPT_SSHCMD] = { .optname = "ssh_command" },
+ [SOPT_SYNC] = { .optname = "sshfs_sync" },
[SOPT_LAST] = { .optname = NULL }
};
}
}
+#define container_of(ptr, type, member) ({ \
+ const typeof( ((type *)0)->member ) *__mptr = (ptr); \
+ (type *)( (char *)__mptr - offsetof(type,member) );})
+
+#define list_entry(ptr, type, member) \
+ container_of(ptr, type, member)
+
+static void list_init(struct list_head *head)
+{
+ head->next = head;
+ head->prev = head;
+}
+
+static void list_add(struct list_head *new, struct list_head *head)
+{
+ struct list_head *prev = head;
+ struct list_head *next = head->next;
+ next->prev = new;
+ new->next = next;
+ new->prev = prev;
+ prev->next = new;
+}
+
+static void list_del(struct list_head *entry)
+{
+ struct list_head *prev = entry->prev;
+ struct list_head *next = entry->next;
+ next->prev = prev;
+ prev->next = next;
+
+}
+
+static int list_empty(const struct list_head *head)
+{
+ return head->next == head;
+}
+
static inline void buf_init(struct buffer *buf, size_t size)
{
if (size) {
DEBUG(" [%05i] %14s %8ibytes (%ims)\n", id, type_name(type),
buf.size+5, difftime);
req->reply = buf;
+ req->reply_type = type;
+ req->replied = 1;
if (req->want_reply) {
- req->reply_type = type;
sem_post(&req->ready);
} else {
+ if (req->end_func)
+ req->end_func(req);
buf_free(&req->reply);
sem_destroy(&req->ready);
free(req);
return 0;
}
-static int sftp_request(uint8_t type, const struct buffer *buf,
- uint8_t expect_type, struct buffer *outbuf)
+static int sftp_request_common(uint8_t type, const struct buffer *buf,
+ uint8_t expect_type, struct buffer *outbuf,
+ request_func begin_func, request_func end_func,
+ void *data)
{
int err;
struct buffer buf2;
buf_add_mem(&buf2, buf->p, buf->len);
req->want_reply = expect_type != 0 ? 1 : 0;
+ req->end_func = end_func;
+ req->data = data;
sem_init(&req->ready, 0, 0);
buf_init(&req->reply, 0);
+ if (begin_func)
+ begin_func(req);
pthread_mutex_lock(&lock);
err = start_processing_thread();
g_hash_table_insert(reqtab, (gpointer) id, req);
}
out:
+ if (end_func)
+ end_func(req);
buf_free(&buf2);
buf_free(&req->reply);
sem_destroy(&req->ready);
return err;
}
+static int sftp_request(uint8_t type, const struct buffer *buf,
+ uint8_t expect_type, struct buffer *outbuf)
+{
+ return sftp_request_common(type, buf, expect_type, outbuf, NULL, NULL,
+ NULL);
+}
+
+static int sftp_request_async(uint8_t type, const struct buffer *buf,
+ request_func begin_func, request_func end_func,
+ void *data)
+{
+ return sftp_request_common(type, buf, 0, NULL, begin_func, end_func, data);
+}
+
static int sshfs_getattr(const char *path, struct stat *stbuf)
{
int err;
return -EINVAL;
sf = g_new0(struct sshfs_file, 1);
+ list_init(&sf->write_req_list);
+ pthread_cond_init(&sf->write_req_finished, NULL);
buf_init(&buf, 0);
buf_add_path(&buf, path);
buf_add_uint32(&buf, pflags);
return err;
}
+static int sshfs_flush(const char *path, struct fuse_file_info *fi)
+{
+ int err;
+ struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
+ struct list_head write_req_list;
+ struct list_head *curr_list;
+
+ if (sync_write)
+ return 0;
+
+ (void) path;
+ pthread_mutex_lock(&lock);
+ if (!list_empty(&sf->write_req_list)) {
+ curr_list = sf->write_req_list.prev;
+ list_del(&sf->write_req_list);
+ list_init(&sf->write_req_list);
+ list_add(&write_req_list, curr_list);
+ while (!list_empty(&write_req_list))
+ pthread_cond_wait(&sf->write_req_finished, &lock);
+ }
+ err = sf->write_error;
+ sf->write_error = 0;
+ pthread_mutex_unlock(&lock);
+ return err;
+}
+
+static int sshfs_fsync(const char *path, int isdatasync,
+ struct fuse_file_info *fi)
+{
+ (void) isdatasync;
+ return sshfs_flush(path, fi);
+}
+
static int sshfs_release(const char *path, struct fuse_file_info *fi)
{
struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
struct buffer *handle = &sf->handle;
- (void) path;
+ sshfs_flush(path, fi);
sftp_request(SSH_FXP_CLOSE, handle, 0, NULL);
buf_free(handle);
g_free(sf);
return err;
}
+static void sshfs_write_begin(struct request *req)
+{
+ struct sshfs_file *sf = (struct sshfs_file *) req->data;
+ pthread_mutex_lock(&lock);
+ list_add(&req->list, &sf->write_req_list);
+ pthread_mutex_unlock(&lock);
+}
+
+static void sshfs_write_end(struct request *req)
+{
+ uint32_t serr;
+ struct sshfs_file *sf = (struct sshfs_file *) req->data;
+
+ pthread_mutex_lock(&lock);
+ if (req->replied) {
+ if (req->reply_type != SSH_FXP_STATUS)
+ fprintf(stderr, "protocol error\n");
+ else if (buf_get_uint32(&req->reply, &serr) != -1 && serr != SSH_FX_OK)
+ sf->write_error = -EIO;
+ }
+ list_del(&req->list);
+ pthread_cond_broadcast(&sf->write_req_finished);
+ pthread_mutex_unlock(&lock);
+}
+
static int sshfs_write(const char *path, const char *wbuf, size_t size,
off_t offset, struct fuse_file_info *fi)
{
struct buffer data;
struct sshfs_file *sf = (struct sshfs_file *) fi->fh;
struct buffer *handle = &sf->handle;
+
(void) path;
data.p = (uint8_t *) wbuf;
data.len = size;
buf_add_buf(&buf, handle);
buf_add_uint64(&buf, offset);
buf_add_data(&buf, &data);
- err = sftp_request(SSH_FXP_WRITE, &buf, SSH_FXP_STATUS, NULL);
+ if (!sync_write && !sf->write_error)
+ err = sftp_request_async(SSH_FXP_WRITE, &buf, sshfs_write_begin,
+ sshfs_write_end, sf);
+ else
+ err = sftp_request(SSH_FXP_WRITE, &buf, SSH_FXP_STATUS, NULL);
buf_free(&buf);
return err ? err : (int) size;
}
.truncate = sshfs_truncate,
.utime = sshfs_utime,
.open = sshfs_open,
+ .flush = sshfs_flush,
+ .fsync = sshfs_fsync,
.release = sshfs_release,
.read = sshfs_read,
.write = sshfs_write,
" -V show version information\n"
" -p PORT equivalent to '-o port=PORT'\n"
" -C equivalent to '-o compression=yes'\n"
-" -o cache=YESNO Enable caching {yes,no} (default: yes)\n"
+" -o sshfs_sync synchronous writes\n"
+" -o cache=YESNO enable caching {yes,no} (default: yes)\n"
" -o cache_timeout=N sets timeout for caches in seconds (default: 20)\n"
" -o cache_X_timeout=N sets timeout for {stat,dir,link} cache\n"
" -o ssh_command=CMD execute CMD instead of 'ssh'\n"
base_path = g_strdup(base_path);
process_options(&newargc, newargv, sshfs_opts, 1);
+ if (sshfs_opts[SOPT_SYNC].present)
+ sync_write = 1;
if (sshfs_opts[SOPT_DIRECTPORT].present)
res = connect_to(host, sshfs_opts[SOPT_DIRECTPORT].value);
else {