libceph: don't use rb_init_node() in ceph_osdc_alloc_request()
[platform/kernel/linux-arm64.git] / net / ceph / osd_client.c
index ad427e6..7cd0a7f 100644 (file)
@@ -52,7 +52,7 @@ static int op_has_extent(int op)
                op == CEPH_OSD_OP_WRITE);
 }
 
-void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
+int ceph_calc_raw_layout(struct ceph_osd_client *osdc,
                        struct ceph_file_layout *layout,
                        u64 snapid,
                        u64 off, u64 *plen, u64 *bno,
@@ -62,12 +62,15 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
        struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
        u64 orig_len = *plen;
        u64 objoff, objlen;    /* extent in object */
+       int r;
 
        reqhead->snapid = cpu_to_le64(snapid);
 
        /* object extent? */
-       ceph_calc_file_object_mapping(layout, off, plen, bno,
-                                     &objoff, &objlen);
+       r = ceph_calc_file_object_mapping(layout, off, plen, bno,
+                                         &objoff, &objlen);
+       if (r < 0)
+               return r;
        if (*plen < orig_len)
                dout(" skipping last %llu, final file extent %llu~%llu\n",
                     orig_len - *plen, off, *plen);
@@ -83,7 +86,7 @@ void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
 
        dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
             *bno, objoff, objlen, req->r_num_pages);
-
+       return 0;
 }
 EXPORT_SYMBOL(ceph_calc_raw_layout);
 
@@ -112,20 +115,25 @@ EXPORT_SYMBOL(ceph_calc_raw_layout);
  *
  * fill osd op in request message.
  */
-static void calc_layout(struct ceph_osd_client *osdc,
-                       struct ceph_vino vino,
-                       struct ceph_file_layout *layout,
-                       u64 off, u64 *plen,
-                       struct ceph_osd_request *req,
-                       struct ceph_osd_req_op *op)
+static int calc_layout(struct ceph_osd_client *osdc,
+                      struct ceph_vino vino,
+                      struct ceph_file_layout *layout,
+                      u64 off, u64 *plen,
+                      struct ceph_osd_request *req,
+                      struct ceph_osd_req_op *op)
 {
        u64 bno;
+       int r;
 
-       ceph_calc_raw_layout(osdc, layout, vino.snap, off,
-                            plen, &bno, req, op);
+       r = ceph_calc_raw_layout(osdc, layout, vino.snap, off,
+                                plen, &bno, req, op);
+       if (r < 0)
+               return r;
 
        snprintf(req->r_oid, sizeof(req->r_oid), "%llx.%08llx", vino.ino, bno);
        req->r_oid_len = strlen(req->r_oid);
+
+       return r;
 }
 
 /*
@@ -213,7 +221,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
        kref_init(&req->r_kref);
        init_completion(&req->r_completion);
        init_completion(&req->r_safe_completion);
-       rb_init_node(&req->r_node);
+       RB_CLEAR_NODE(&req->r_node);
        INIT_LIST_HEAD(&req->r_unsafe_item);
        INIT_LIST_HEAD(&req->r_linger_item);
        INIT_LIST_HEAD(&req->r_linger_osd);
@@ -456,6 +464,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
 {
        struct ceph_osd_req_op ops[3];
        struct ceph_osd_request *req;
+       int r;
 
        ops[0].op = opcode;
        ops[0].extent.truncate_seq = truncate_seq;
@@ -474,10 +483,12 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
                                         use_mempool,
                                         GFP_NOFS, NULL, NULL);
        if (!req)
-               return NULL;
+               return ERR_PTR(-ENOMEM);
 
        /* calculate max write size */
-       calc_layout(osdc, vino, layout, off, plen, req, ops);
+       r = calc_layout(osdc, vino, layout, off, plen, req, ops);
+       if (r < 0)
+               return ERR_PTR(r);
        req->r_file_layout = *layout;  /* keep a copy */
 
        /* in case it differs from natural (file) alignment that
@@ -570,7 +581,7 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
 
        dout("__kick_osd_requests osd%d\n", osd->o_osd);
        err = __reset_osd(osdc, osd);
-       if (err == -EAGAIN)
+       if (err)
                return;
 
        list_for_each_entry(req, &osd->o_requests, r_osd_item) {
@@ -597,14 +608,6 @@ static void __kick_osd_requests(struct ceph_osd_client *osdc,
        }
 }
 
-static void kick_osd_requests(struct ceph_osd_client *osdc,
-                             struct ceph_osd *kickosd)
-{
-       mutex_lock(&osdc->request_mutex);
-       __kick_osd_requests(osdc, kickosd);
-       mutex_unlock(&osdc->request_mutex);
-}
-
 /*
  * If the osd connection drops, we need to resubmit all requests.
  */
@@ -618,7 +621,9 @@ static void osd_reset(struct ceph_connection *con)
        dout("osd_reset osd%d\n", osd->o_osd);
        osdc = osd->o_osdc;
        down_read(&osdc->map_sem);
-       kick_osd_requests(osdc, osd);
+       mutex_lock(&osdc->request_mutex);
+       __kick_osd_requests(osdc, osd);
+       mutex_unlock(&osdc->request_mutex);
        send_queued(osdc);
        up_read(&osdc->map_sem);
 }
@@ -637,6 +642,7 @@ static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
        atomic_set(&osd->o_ref, 1);
        osd->o_osdc = osdc;
        osd->o_osd = onum;
+       RB_CLEAR_NODE(&osd->o_node);
        INIT_LIST_HEAD(&osd->o_requests);
        INIT_LIST_HEAD(&osd->o_linger_requests);
        INIT_LIST_HEAD(&osd->o_osd_lru);
@@ -740,6 +746,7 @@ static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
        if (list_empty(&osd->o_requests) &&
            list_empty(&osd->o_linger_requests)) {
                __remove_osd(osdc, osd);
+               ret = -ENODEV;
        } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
                          &osd->o_con.peer_addr,
                          sizeof(osd->o_con.peer_addr)) == 0 &&
@@ -866,9 +873,9 @@ static void __unregister_request(struct ceph_osd_client *osdc,
                        req->r_osd = NULL;
        }
 
+       list_del_init(&req->r_req_lru_item);
        ceph_osdc_put_request(req);
 
-       list_del_init(&req->r_req_lru_item);
        if (osdc->num_requests == 0) {
                dout(" no requests, canceling timeout\n");
                __cancel_osd_timeout(osdc);
@@ -900,8 +907,8 @@ static void __unregister_linger_request(struct ceph_osd_client *osdc,
                                        struct ceph_osd_request *req)
 {
        dout("__unregister_linger_request %p\n", req);
+       list_del_init(&req->r_linger_item);
        if (req->r_osd) {
-               list_del_init(&req->r_linger_item);
                list_del_init(&req->r_linger_osd);
 
                if (list_empty(&req->r_osd->o_requests) &&
@@ -1080,12 +1087,10 @@ static void handle_timeout(struct work_struct *work)
 {
        struct ceph_osd_client *osdc =
                container_of(work, struct ceph_osd_client, timeout_work.work);
-       struct ceph_osd_request *req, *last_req = NULL;
+       struct ceph_osd_request *req;
        struct ceph_osd *osd;
-       unsigned long timeout = osdc->client->options->osd_timeout * HZ;
        unsigned long keepalive =
                osdc->client->options->osd_keepalive_timeout * HZ;
-       unsigned long last_stamp = 0;
        struct list_head slow_osds;
        dout("timeout\n");
        down_read(&osdc->map_sem);
@@ -1095,37 +1100,6 @@ static void handle_timeout(struct work_struct *work)
        mutex_lock(&osdc->request_mutex);
 
        /*
-        * reset osds that appear to be _really_ unresponsive.  this
-        * is a failsafe measure.. we really shouldn't be getting to
-        * this point if the system is working properly.  the monitors
-        * should mark the osd as failed and we should find out about
-        * it from an updated osd map.
-        */
-       while (timeout && !list_empty(&osdc->req_lru)) {
-               req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
-                                r_req_lru_item);
-
-               /* hasn't been long enough since we sent it? */
-               if (time_before(jiffies, req->r_stamp + timeout))
-                       break;
-
-               /* hasn't been long enough since it was acked? */
-               if (req->r_request->ack_stamp == 0 ||
-                   time_before(jiffies, req->r_request->ack_stamp + timeout))
-                       break;
-
-               BUG_ON(req == last_req && req->r_stamp == last_stamp);
-               last_req = req;
-               last_stamp = req->r_stamp;
-
-               osd = req->r_osd;
-               BUG_ON(!osd);
-               pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
-                          req->r_tid, osd->o_osd);
-               __kick_osd_requests(osdc, osd);
-       }
-
-       /*
         * ping osds that are a bit slow.  this ensures that if there
         * is a break in the TCP connection we will notice, and reopen
         * a connection with that osd (from the fault callback).
@@ -1589,6 +1563,7 @@ int ceph_osdc_create_event(struct ceph_osd_client *osdc,
        event->data = data;
        event->osdc = osdc;
        INIT_LIST_HEAD(&event->osd_node);
+       RB_CLEAR_NODE(&event->node);
        kref_init(&event->kref);   /* one ref for us */
        kref_get(&event->kref);    /* one ref for the caller */
        init_completion(&event->completion);
@@ -1920,8 +1895,8 @@ int ceph_osdc_readpages(struct ceph_osd_client *osdc,
                                    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
                                    NULL, 0, truncate_seq, truncate_size, NULL,
                                    false, 1, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* it may be a short read due to an object boundary */
        req->r_pages = pages;
@@ -1963,8 +1938,8 @@ int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
                                    snapc, do_sync,
                                    truncate_seq, truncate_size, mtime,
                                    nofail, 1, page_align);
-       if (!req)
-               return -ENOMEM;
+       if (IS_ERR(req))
+               return PTR_ERR(req);
 
        /* it may be a short write due to an object boundary */
        req->r_pages = pages;
@@ -2037,8 +2012,8 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
        if (!req) {
                *skip = 1;
                m = NULL;
-               pr_info("get_reply unknown tid %llu from osd%d\n", tid,
-                       osd->o_osd);
+               dout("get_reply unknown tid %llu from osd%d\n", tid,
+                    osd->o_osd);
                goto out;
        }