staging: ramster: disable build in anticipation of renaming
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / staging / ramster / ramster / r2net.c
1 /*
2  * r2net.c
3  *
4  * Copyright (c) 2011-2012, Dan Magenheimer, Oracle Corp.
5  *
6  * Ramster_r2net provides an interface between zcache and r2net.
7  *
8  * FIXME: support more than two nodes
9  */
10
11 #include <linux/list.h>
12 #include "tcp.h"
13 #include "nodemanager.h"
14 #include "../tmem.h"
15 #include "../zcache.h"
16 #include "ramster.h"
17
18 #define RAMSTER_TESTING
19
20 #define RMSTR_KEY       0x77347734
21
22 enum {
23         RMSTR_TMEM_PUT_EPH = 100,
24         RMSTR_TMEM_PUT_PERS,
25         RMSTR_TMEM_ASYNC_GET_REQUEST,
26         RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
27         RMSTR_TMEM_ASYNC_GET_REPLY,
28         RMSTR_TMEM_FLUSH,
29         RMSTR_TMEM_FLOBJ,
30         RMSTR_TMEM_DESTROY_POOL,
31 };
32
33 #define RMSTR_R2NET_MAX_LEN \
34                 (R2NET_MAX_PAYLOAD_BYTES - sizeof(struct tmem_xhandle))
35
36 #include "tcp_internal.h"
37
38 static struct r2nm_node *r2net_target_node;
39 static int r2net_target_nodenum;
40
41 int r2net_remote_target_node_set(int node_num)
42 {
43         int ret = -1;
44
45         r2net_target_node = r2nm_get_node_by_num(node_num);
46         if (r2net_target_node != NULL) {
47                 r2net_target_nodenum = node_num;
48                 r2nm_node_put(r2net_target_node);
49                 ret = 0;
50         }
51         return ret;
52 }
53
54 /* FIXME following buffer should be per-cpu, protected by preempt_disable */
55 static char ramster_async_get_buf[R2NET_MAX_PAYLOAD_BYTES];
56
57 static int ramster_remote_async_get_request_handler(struct r2net_msg *msg,
58                                 u32 len, void *data, void **ret_data)
59 {
60         char *pdata;
61         struct tmem_xhandle xh;
62         int found;
63         size_t size = RMSTR_R2NET_MAX_LEN;
64         u16 msgtype = be16_to_cpu(msg->msg_type);
65         bool get_and_free = (msgtype == RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST);
66         unsigned long flags;
67
68         xh = *(struct tmem_xhandle *)msg->buf;
69         if (xh.xh_data_size > RMSTR_R2NET_MAX_LEN)
70                 BUG();
71         pdata = ramster_async_get_buf;
72         *(struct tmem_xhandle *)pdata = xh;
73         pdata += sizeof(struct tmem_xhandle);
74         local_irq_save(flags);
75         found = zcache_get_page(xh.client_id, xh.pool_id, &xh.oid, xh.index,
76                                 pdata, &size, true, get_and_free ? 1 : -1);
77         local_irq_restore(flags);
78         if (found < 0) {
79                 /* a zero size indicates the get failed */
80                 size = 0;
81         }
82         if (size > RMSTR_R2NET_MAX_LEN)
83                 BUG();
84         *ret_data = pdata - sizeof(struct tmem_xhandle);
85         /* now make caller (r2net_process_message) handle specially */
86         r2net_force_data_magic(msg, RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY);
87         return size + sizeof(struct tmem_xhandle);
88 }
89
90 static int ramster_remote_async_get_reply_handler(struct r2net_msg *msg,
91                                 u32 len, void *data, void **ret_data)
92 {
93         char *in = (char *)msg->buf;
94         int datalen = len - sizeof(struct r2net_msg);
95         int ret = -1;
96         struct tmem_xhandle *xh = (struct tmem_xhandle *)in;
97
98         in += sizeof(struct tmem_xhandle);
99         datalen -= sizeof(struct tmem_xhandle);
100         BUG_ON(datalen < 0 || datalen > PAGE_SIZE);
101         ret = ramster_localify(xh->pool_id, &xh->oid, xh->index,
102                                 in, datalen, xh->extra);
103 #ifdef RAMSTER_TESTING
104         if (ret == -EEXIST)
105                 pr_err("TESTING ArrgREP, aborted overwrite on racy put\n");
106 #endif
107         return ret;
108 }
109
110 int ramster_remote_put_handler(struct r2net_msg *msg,
111                                 u32 len, void *data, void **ret_data)
112 {
113         struct tmem_xhandle *xh;
114         char *p = (char *)msg->buf;
115         int datalen = len - sizeof(struct r2net_msg) -
116                                 sizeof(struct tmem_xhandle);
117         u16 msgtype = be16_to_cpu(msg->msg_type);
118         bool ephemeral = (msgtype == RMSTR_TMEM_PUT_EPH);
119         unsigned long flags;
120         int ret;
121
122         xh = (struct tmem_xhandle *)p;
123         p += sizeof(struct tmem_xhandle);
124         zcache_autocreate_pool(xh->client_id, xh->pool_id, ephemeral);
125         local_irq_save(flags);
126         ret = zcache_put_page(xh->client_id, xh->pool_id, &xh->oid, xh->index,
127                                 p, datalen, true, ephemeral);
128         local_irq_restore(flags);
129         return ret;
130 }
131
132 int ramster_remote_flush_handler(struct r2net_msg *msg,
133                                 u32 len, void *data, void **ret_data)
134 {
135         struct tmem_xhandle *xh;
136         char *p = (char *)msg->buf;
137
138         xh = (struct tmem_xhandle *)p;
139         p += sizeof(struct tmem_xhandle);
140         (void)zcache_flush_page(xh->client_id, xh->pool_id,
141                                         &xh->oid, xh->index);
142         return 0;
143 }
144
145 int ramster_remote_flobj_handler(struct r2net_msg *msg,
146                                 u32 len, void *data, void **ret_data)
147 {
148         struct tmem_xhandle *xh;
149         char *p = (char *)msg->buf;
150
151         xh = (struct tmem_xhandle *)p;
152         p += sizeof(struct tmem_xhandle);
153         (void)zcache_flush_object(xh->client_id, xh->pool_id, &xh->oid);
154         return 0;
155 }
156
157 int r2net_remote_async_get(struct tmem_xhandle *xh, bool free, int remotenode,
158                                 size_t expect_size, uint8_t expect_cksum,
159                                 void *extra)
160 {
161         int nodenum, ret = -1, status;
162         struct r2nm_node *node = NULL;
163         struct kvec vec[1];
164         size_t veclen = 1;
165         u32 msg_type;
166         struct r2net_node *nn;
167
168         node = r2nm_get_node_by_num(remotenode);
169         if (node == NULL)
170                 goto out;
171         xh->client_id = r2nm_this_node(); /* which node is getting */
172         xh->xh_data_cksum = expect_cksum;
173         xh->xh_data_size = expect_size;
174         xh->extra = extra;
175         vec[0].iov_len = sizeof(*xh);
176         vec[0].iov_base = xh;
177
178         node = r2net_target_node;
179         if (!node)
180                 goto out;
181
182         nodenum = r2net_target_nodenum;
183
184         r2nm_node_get(node);
185         nn = r2net_nn_from_num(nodenum);
186         if (nn->nn_persistent_error || !nn->nn_sc_valid) {
187                 ret = -ENOTCONN;
188                 r2nm_node_put(node);
189                 goto out;
190         }
191
192         if (free)
193                 msg_type = RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST;
194         else
195                 msg_type = RMSTR_TMEM_ASYNC_GET_REQUEST;
196         ret = r2net_send_message_vec(msg_type, RMSTR_KEY,
197                                         vec, veclen, remotenode, &status);
198         r2nm_node_put(node);
199         if (ret < 0) {
200                 if (ret == -ENOTCONN || ret == -EHOSTDOWN)
201                         goto out;
202                 if (ret == -EAGAIN)
203                         goto out;
204                 /* FIXME handle bad message possibilities here? */
205                 pr_err("UNTESTED ret<0 in ramster_remote_async_get: ret=%d\n",
206                                 ret);
207         }
208         ret = status;
209 out:
210         return ret;
211 }
212
213 #ifdef RAMSTER_TESTING
214 /* leave me here to see if it catches a weird crash */
215 static void ramster_check_irq_counts(void)
216 {
217         static int last_hardirq_cnt, last_softirq_cnt, last_preempt_cnt;
218         int cur_hardirq_cnt, cur_softirq_cnt, cur_preempt_cnt;
219
220         cur_hardirq_cnt = hardirq_count() >> HARDIRQ_SHIFT;
221         if (cur_hardirq_cnt > last_hardirq_cnt) {
222                 last_hardirq_cnt = cur_hardirq_cnt;
223                 if (!(last_hardirq_cnt&(last_hardirq_cnt-1)))
224                         pr_err("RAMSTER TESTING RRP hardirq_count=%d\n",
225                                 last_hardirq_cnt);
226         }
227         cur_softirq_cnt = softirq_count() >> SOFTIRQ_SHIFT;
228         if (cur_softirq_cnt > last_softirq_cnt) {
229                 last_softirq_cnt = cur_softirq_cnt;
230                 if (!(last_softirq_cnt&(last_softirq_cnt-1)))
231                         pr_err("RAMSTER TESTING RRP softirq_count=%d\n",
232                                 last_softirq_cnt);
233         }
234         cur_preempt_cnt = preempt_count() & PREEMPT_MASK;
235         if (cur_preempt_cnt > last_preempt_cnt) {
236                 last_preempt_cnt = cur_preempt_cnt;
237                 if (!(last_preempt_cnt&(last_preempt_cnt-1)))
238                         pr_err("RAMSTER TESTING RRP preempt_count=%d\n",
239                                 last_preempt_cnt);
240         }
241 }
242 #endif
243
244 int r2net_remote_put(struct tmem_xhandle *xh, char *data, size_t size,
245                                 bool ephemeral, int *remotenode)
246 {
247         int nodenum, ret = -1, status;
248         struct r2nm_node *node = NULL;
249         struct kvec vec[2];
250         size_t veclen = 2;
251         u32 msg_type;
252         struct r2net_node *nn;
253
254         BUG_ON(size > RMSTR_R2NET_MAX_LEN);
255         xh->client_id = r2nm_this_node(); /* which node is putting */
256         vec[0].iov_len = sizeof(*xh);
257         vec[0].iov_base = xh;
258         vec[1].iov_len = size;
259         vec[1].iov_base = data;
260
261         node = r2net_target_node;
262         if (!node)
263                 goto out;
264
265         nodenum = r2net_target_nodenum;
266
267         r2nm_node_get(node);
268
269         nn = r2net_nn_from_num(nodenum);
270         if (nn->nn_persistent_error || !nn->nn_sc_valid) {
271                 ret = -ENOTCONN;
272                 r2nm_node_put(node);
273                 goto out;
274         }
275
276         if (ephemeral)
277                 msg_type = RMSTR_TMEM_PUT_EPH;
278         else
279                 msg_type = RMSTR_TMEM_PUT_PERS;
280 #ifdef RAMSTER_TESTING
281         /* leave me here to see if it catches a weird crash */
282         ramster_check_irq_counts();
283 #endif
284
285         ret = r2net_send_message_vec(msg_type, RMSTR_KEY, vec, veclen,
286                                                 nodenum, &status);
287         if (ret < 0)
288                 ret = -1;
289         else {
290                 ret = status;
291                 *remotenode = nodenum;
292         }
293
294         r2nm_node_put(node);
295 out:
296         return ret;
297 }
298
299 int r2net_remote_flush(struct tmem_xhandle *xh, int remotenode)
300 {
301         int ret = -1, status;
302         struct r2nm_node *node = NULL;
303         struct kvec vec[1];
304         size_t veclen = 1;
305
306         node = r2nm_get_node_by_num(remotenode);
307         BUG_ON(node == NULL);
308         xh->client_id = r2nm_this_node(); /* which node is flushing */
309         vec[0].iov_len = sizeof(*xh);
310         vec[0].iov_base = xh;
311         BUG_ON(irqs_disabled());
312         BUG_ON(in_softirq());
313         ret = r2net_send_message_vec(RMSTR_TMEM_FLUSH, RMSTR_KEY,
314                                         vec, veclen, remotenode, &status);
315         r2nm_node_put(node);
316         return ret;
317 }
318
319 int r2net_remote_flush_object(struct tmem_xhandle *xh, int remotenode)
320 {
321         int ret = -1, status;
322         struct r2nm_node *node = NULL;
323         struct kvec vec[1];
324         size_t veclen = 1;
325
326         node = r2nm_get_node_by_num(remotenode);
327         BUG_ON(node == NULL);
328         xh->client_id = r2nm_this_node(); /* which node is flobjing */
329         vec[0].iov_len = sizeof(*xh);
330         vec[0].iov_base = xh;
331         ret = r2net_send_message_vec(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
332                                         vec, veclen, remotenode, &status);
333         r2nm_node_put(node);
334         return ret;
335 }
336
337 /*
338  * Handler registration
339  */
340
341 static LIST_HEAD(r2net_unreg_list);
342
343 static void r2net_unregister_handlers(void)
344 {
345         r2net_unregister_handler_list(&r2net_unreg_list);
346 }
347
348 int r2net_register_handlers(void)
349 {
350         int status;
351
352         status = r2net_register_handler(RMSTR_TMEM_PUT_EPH, RMSTR_KEY,
353                                 RMSTR_R2NET_MAX_LEN,
354                                 ramster_remote_put_handler,
355                                 NULL, NULL, &r2net_unreg_list);
356         if (status)
357                 goto bail;
358
359         status = r2net_register_handler(RMSTR_TMEM_PUT_PERS, RMSTR_KEY,
360                                 RMSTR_R2NET_MAX_LEN,
361                                 ramster_remote_put_handler,
362                                 NULL, NULL, &r2net_unreg_list);
363         if (status)
364                 goto bail;
365
366         status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REQUEST, RMSTR_KEY,
367                                 RMSTR_R2NET_MAX_LEN,
368                                 ramster_remote_async_get_request_handler,
369                                 NULL, NULL,
370                                 &r2net_unreg_list);
371         if (status)
372                 goto bail;
373
374         status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_AND_FREE_REQUEST,
375                                 RMSTR_KEY, RMSTR_R2NET_MAX_LEN,
376                                 ramster_remote_async_get_request_handler,
377                                 NULL, NULL,
378                                 &r2net_unreg_list);
379         if (status)
380                 goto bail;
381
382         status = r2net_register_handler(RMSTR_TMEM_ASYNC_GET_REPLY, RMSTR_KEY,
383                                 RMSTR_R2NET_MAX_LEN,
384                                 ramster_remote_async_get_reply_handler,
385                                 NULL, NULL,
386                                 &r2net_unreg_list);
387         if (status)
388                 goto bail;
389
390         status = r2net_register_handler(RMSTR_TMEM_FLUSH, RMSTR_KEY,
391                                 RMSTR_R2NET_MAX_LEN,
392                                 ramster_remote_flush_handler,
393                                 NULL, NULL,
394                                 &r2net_unreg_list);
395         if (status)
396                 goto bail;
397
398         status = r2net_register_handler(RMSTR_TMEM_FLOBJ, RMSTR_KEY,
399                                 RMSTR_R2NET_MAX_LEN,
400                                 ramster_remote_flobj_handler,
401                                 NULL, NULL,
402                                 &r2net_unreg_list);
403         if (status)
404                 goto bail;
405
406         pr_info("ramster: r2net handlers registered\n");
407
408 bail:
409         if (status) {
410                 r2net_unregister_handlers();
411                 pr_err("ramster: couldn't register r2net handlers\n");
412         }
413         return status;
414 }