} send;
};
+struct submit_worker {
+ struct workqueue_struct *wq;
+ struct work_struct worker;
+
+ spinlock_t lock;
+ struct list_head writes;
+};
+
struct drbd_conf {
struct drbd_tconn *tconn;
int vnr; /* volume number within the connection */
atomic_t ap_in_flight; /* App sectors in flight (waiting for ack) */
unsigned int peer_max_bio_size;
unsigned int local_max_bio_size;
+
+ /* any requests that would block in drbd_make_request()
+ * are deferred to this single-threaded work queue */
+ struct submit_worker submit;
};
static inline struct drbd_conf *minor_to_mdev(unsigned int minor)
extern int proc_details;
/* drbd_req */
+extern void do_submit(struct work_struct *ws);
extern void __drbd_make_request(struct drbd_conf *, struct bio *, unsigned long);
extern void drbd_make_request(struct request_queue *q, struct bio *bio);
extern int drbd_read_remote(struct drbd_conf *mdev, struct drbd_request *req);
#include <linux/reboot.h>
#include <linux/notifier.h>
#include <linux/kthread.h>
-
+#include <linux/workqueue.h>
#define __KERNEL_SYSCALLS__
#include <linux/unistd.h>
#include <linux/vmalloc.h>
idr_for_each_entry(&minors, mdev, i) {
idr_remove(&minors, mdev_to_minor(mdev));
idr_remove(&mdev->tconn->volumes, mdev->vnr);
+ destroy_workqueue(mdev->submit.wq);
del_gendisk(mdev->vdisk);
/* synchronize_rcu(); No other threads running at this point */
kref_put(&mdev->kref, &drbd_minor_destroy);
kfree(tconn);
}
+int init_submitter(struct drbd_conf *mdev)
+{
+ /* opencoded create_singlethread_workqueue(),
+ * to be able to say "drbd%d", ..., minor */
+ mdev->submit.wq = alloc_workqueue("drbd%u_submit",
+ WQ_UNBOUND | WQ_MEM_RECLAIM, 1, mdev->minor);
+ if (!mdev->submit.wq)
+ return -ENOMEM;
+
+ INIT_WORK(&mdev->submit.worker, do_submit);
+ spin_lock_init(&mdev->submit.lock);
+ INIT_LIST_HEAD(&mdev->submit.writes);
+ return 0;
+}
+
enum drbd_ret_code conn_new_minor(struct drbd_tconn *tconn, unsigned int minor, int vnr)
{
struct drbd_conf *mdev;
goto out_idr_remove_minor;
}
+ if (init_submitter(mdev)) {
+ err = ERR_NOMEM;
+ drbd_msg_put_info("unable to create submit workqueue");
+ goto out_idr_remove_vol;
+ }
+
add_disk(disk);
kref_init(&mdev->kref); /* one ref for both idrs and the the add_disk */
return NO_ERROR;
+out_idr_remove_vol:
+ idr_remove(&tconn->volumes, vnr_got);
out_idr_remove_minor:
idr_remove(&minors, minor_got);
synchronize_rcu();
CS_VERBOSE + CS_WAIT_COMPLETE);
idr_remove(&mdev->tconn->volumes, mdev->vnr);
idr_remove(&minors, mdev_to_minor(mdev));
+ destroy_workqueue(mdev->submit.wq);
del_gendisk(mdev->vdisk);
synchronize_rcu();
kref_put(&mdev->kref, &drbd_minor_destroy);
drbd_send_and_submit(mdev, req);
}
+void __drbd_make_request_from_worker(struct drbd_conf *mdev, struct drbd_request *req)
+{
+ const int rw = bio_rw(req->master_bio);
+
+ if (rw == WRITE && req->private_bio && req->i.size
+ && !test_bit(AL_SUSPENDED, &mdev->flags)) {
+ drbd_al_begin_io(mdev, &req->i, false);
+ req->rq_state |= RQ_IN_ACT_LOG;
+ }
+ drbd_send_and_submit(mdev, req);
+}
+
+
+void do_submit(struct work_struct *ws)
+{
+ struct drbd_conf *mdev = container_of(ws, struct drbd_conf, submit.worker);
+ LIST_HEAD(writes);
+ struct drbd_request *req, *tmp;
+
+ spin_lock(&mdev->submit.lock);
+ list_splice_init(&mdev->submit.writes, &writes);
+ spin_unlock(&mdev->submit.lock);
+
+ list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
+ list_del_init(&req->tl_requests);
+ __drbd_make_request_from_worker(mdev, req);
+ }
+}
+
void drbd_make_request(struct request_queue *q, struct bio *bio)
{
struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;