Merge tag 'v3.14.25' into backport/v3.14.24-ltsi-rc1+v3.14.25/snapshot-merge.wip
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / staging / lttng / lib / ringbuffer / ring_buffer_backend.c
1 /*
2  * ring_buffer_backend.c
3  *
4  * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Lesser General Public
8  * License as published by the Free Software Foundation; only
9  * version 2.1 of the License.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14  * Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public
17  * License along with this library; if not, write to the Free Software
18  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
19  */
20
21 #include <linux/stddef.h>
22 #include <linux/module.h>
23 #include <linux/string.h>
24 #include <linux/bitops.h>
25 #include <linux/delay.h>
26 #include <linux/errno.h>
27 #include <linux/slab.h>
28 #include <linux/cpu.h>
29 #include <linux/mm.h>
30
31 #include "../../wrapper/vmalloc.h"      /* for wrapper_vmalloc_sync_all() */
32 #include "../../wrapper/ringbuffer/config.h"
33 #include "../../wrapper/ringbuffer/backend.h"
34 #include "../../wrapper/ringbuffer/frontend.h"
35
36 /**
37  * lib_ring_buffer_backend_allocate - allocate a channel buffer
38  * @config: ring buffer instance configuration
39  * @buf: the buffer struct
40  * @size: total size of the buffer
41  * @num_subbuf: number of subbuffers
42  * @extra_reader_sb: need extra subbuffer for reader
43  */
44 static
45 int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config,
46                                      struct lib_ring_buffer_backend *bufb,
47                                      size_t size, size_t num_subbuf,
48                                      int extra_reader_sb)
49 {
50         struct channel_backend *chanb = &bufb->chan->backend;
51         unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0;
52         unsigned long subbuf_size, mmap_offset = 0;
53         unsigned long num_subbuf_alloc;
54         struct page **pages;
55         void **virt;
56         unsigned long i;
57
58         num_pages = size >> PAGE_SHIFT;
59         num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf);
60         subbuf_size = chanb->subbuf_size;
61         num_subbuf_alloc = num_subbuf;
62
63         if (extra_reader_sb) {
64                 num_pages += num_pages_per_subbuf; /* Add pages for reader */
65                 num_subbuf_alloc++;
66         }
67
68         pages = kmalloc_node(ALIGN(sizeof(*pages) * num_pages,
69                                    1 << INTERNODE_CACHE_SHIFT),
70                         GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
71         if (unlikely(!pages))
72                 goto pages_error;
73
74         virt = kmalloc_node(ALIGN(sizeof(*virt) * num_pages,
75                                   1 << INTERNODE_CACHE_SHIFT),
76                         GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
77         if (unlikely(!virt))
78                 goto virt_error;
79
80         bufb->array = kmalloc_node(ALIGN(sizeof(*bufb->array)
81                                          * num_subbuf_alloc,
82                                   1 << INTERNODE_CACHE_SHIFT),
83                         GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
84         if (unlikely(!bufb->array))
85                 goto array_error;
86
87         for (i = 0; i < num_pages; i++) {
88                 pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)),
89                                             GFP_KERNEL | __GFP_ZERO, 0);
90                 if (unlikely(!pages[i]))
91                         goto depopulate;
92                 virt[i] = page_address(pages[i]);
93         }
94         bufb->num_pages_per_subbuf = num_pages_per_subbuf;
95
96         /* Allocate backend pages array elements */
97         for (i = 0; i < num_subbuf_alloc; i++) {
98                 bufb->array[i] =
99                         kzalloc_node(ALIGN(
100                                 sizeof(struct lib_ring_buffer_backend_pages) +
101                                 sizeof(struct lib_ring_buffer_backend_page)
102                                 * num_pages_per_subbuf,
103                                 1 << INTERNODE_CACHE_SHIFT),
104                                 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
105                 if (!bufb->array[i])
106                         goto free_array;
107         }
108
109         /* Allocate write-side subbuffer table */
110         bufb->buf_wsb = kzalloc_node(ALIGN(
111                                 sizeof(struct lib_ring_buffer_backend_subbuffer)
112                                 * num_subbuf,
113                                 1 << INTERNODE_CACHE_SHIFT),
114                                 GFP_KERNEL, cpu_to_node(max(bufb->cpu, 0)));
115         if (unlikely(!bufb->buf_wsb))
116                 goto free_array;
117
118         for (i = 0; i < num_subbuf; i++)
119                 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
120
121         /* Assign read-side subbuffer table */
122         if (extra_reader_sb)
123                 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
124                                                 num_subbuf_alloc - 1);
125         else
126                 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
127
128         /* Assign pages to page index */
129         for (i = 0; i < num_subbuf_alloc; i++) {
130                 for (j = 0; j < num_pages_per_subbuf; j++) {
131                         CHAN_WARN_ON(chanb, page_idx > num_pages);
132                         bufb->array[i]->p[j].virt = virt[page_idx];
133                         bufb->array[i]->p[j].page = pages[page_idx];
134                         page_idx++;
135                 }
136                 if (config->output == RING_BUFFER_MMAP) {
137                         bufb->array[i]->mmap_offset = mmap_offset;
138                         mmap_offset += subbuf_size;
139                 }
140         }
141
142         /*
143          * If kmalloc ever uses vmalloc underneath, make sure the buffer pages
144          * will not fault.
145          */
146         wrapper_vmalloc_sync_all();
147         kfree(virt);
148         kfree(pages);
149         return 0;
150
151 free_array:
152         for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++)
153                 kfree(bufb->array[i]);
154 depopulate:
155         /* Free all allocated pages */
156         for (i = 0; (i < num_pages && pages[i]); i++)
157                 __free_page(pages[i]);
158         kfree(bufb->array);
159 array_error:
160         kfree(virt);
161 virt_error:
162         kfree(pages);
163 pages_error:
164         return -ENOMEM;
165 }
166
167 int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb,
168                                    struct channel_backend *chanb, int cpu)
169 {
170         const struct lib_ring_buffer_config *config = &chanb->config;
171
172         bufb->chan = container_of(chanb, struct channel, backend);
173         bufb->cpu = cpu;
174
175         return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size,
176                                                 chanb->num_subbuf,
177                                                 chanb->extra_reader_sb);
178 }
179
180 void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb)
181 {
182         struct channel_backend *chanb = &bufb->chan->backend;
183         unsigned long i, j, num_subbuf_alloc;
184
185         num_subbuf_alloc = chanb->num_subbuf;
186         if (chanb->extra_reader_sb)
187                 num_subbuf_alloc++;
188
189         kfree(bufb->buf_wsb);
190         for (i = 0; i < num_subbuf_alloc; i++) {
191                 for (j = 0; j < bufb->num_pages_per_subbuf; j++)
192                         __free_page(bufb->array[i]->p[j].page);
193                 kfree(bufb->array[i]);
194         }
195         kfree(bufb->array);
196         bufb->allocated = 0;
197 }
198
199 void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb)
200 {
201         struct channel_backend *chanb = &bufb->chan->backend;
202         const struct lib_ring_buffer_config *config = &chanb->config;
203         unsigned long num_subbuf_alloc;
204         unsigned int i;
205
206         num_subbuf_alloc = chanb->num_subbuf;
207         if (chanb->extra_reader_sb)
208                 num_subbuf_alloc++;
209
210         for (i = 0; i < chanb->num_subbuf; i++)
211                 bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i);
212         if (chanb->extra_reader_sb)
213                 bufb->buf_rsb.id = subbuffer_id(config, 0, 1,
214                                                 num_subbuf_alloc - 1);
215         else
216                 bufb->buf_rsb.id = subbuffer_id(config, 0, 1, 0);
217
218         for (i = 0; i < num_subbuf_alloc; i++) {
219                 /* Don't reset mmap_offset */
220                 v_set(config, &bufb->array[i]->records_commit, 0);
221                 v_set(config, &bufb->array[i]->records_unread, 0);
222                 bufb->array[i]->data_size = 0;
223                 /* Don't reset backend page and virt addresses */
224         }
225         /* Don't reset num_pages_per_subbuf, cpu, allocated */
226         v_set(config, &bufb->records_read, 0);
227 }
228
229 /*
230  * The frontend is responsible for also calling ring_buffer_backend_reset for
231  * each buffer when calling channel_backend_reset.
232  */
233 void channel_backend_reset(struct channel_backend *chanb)
234 {
235         struct channel *chan = container_of(chanb, struct channel, backend);
236         const struct lib_ring_buffer_config *config = &chanb->config;
237
238         /*
239          * Don't reset buf_size, subbuf_size, subbuf_size_order,
240          * num_subbuf_order, buf_size_order, extra_reader_sb, num_subbuf,
241          * priv, notifiers, config, cpumask and name.
242          */
243         chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
244 }
245
246 #ifdef CONFIG_HOTPLUG_CPU
247 /**
248  *      lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
249  *      @nb: notifier block
250  *      @action: hotplug action to take
251  *      @hcpu: CPU number
252  *
253  *      Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
254  */
255 static
256 int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
257                                               unsigned long action,
258                                               void *hcpu)
259 {
260         unsigned int cpu = (unsigned long)hcpu;
261         struct channel_backend *chanb = container_of(nb, struct channel_backend,
262                                                      cpu_hp_notifier);
263         const struct lib_ring_buffer_config *config = &chanb->config;
264         struct lib_ring_buffer *buf;
265         int ret;
266
267         CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
268
269         switch (action) {
270         case CPU_UP_PREPARE:
271         case CPU_UP_PREPARE_FROZEN:
272                 buf = per_cpu_ptr(chanb->buf, cpu);
273                 ret = lib_ring_buffer_create(buf, chanb, cpu);
274                 if (ret) {
275                         printk(KERN_ERR
276                           "ring_buffer_cpu_hp_callback: cpu %d "
277                           "buffer creation failed\n", cpu);
278                         return NOTIFY_BAD;
279                 }
280                 break;
281         case CPU_DEAD:
282         case CPU_DEAD_FROZEN:
283                 /* No need to do a buffer switch here, because it will happen
284                  * when tracing is stopped, or will be done by switch timer CPU
285                  * DEAD callback. */
286                 break;
287         }
288         return NOTIFY_OK;
289 }
290 #endif
291
292 /**
293  * channel_backend_init - initialize a channel backend
294  * @chanb: channel backend
295  * @name: channel name
296  * @config: client ring buffer configuration
297  * @priv: client private data
298  * @parent: dentry of parent directory, %NULL for root directory
299  * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2)
300  * @num_subbuf: number of sub-buffers (power of 2)
301  *
302  * Returns channel pointer if successful, %NULL otherwise.
303  *
304  * Creates per-cpu channel buffers using the sizes and attributes
305  * specified.  The created channel buffer files will be named
306  * name_0...name_N-1.  File permissions will be %S_IRUSR.
307  *
308  * Called with CPU hotplug disabled.
309  */
310 int channel_backend_init(struct channel_backend *chanb,
311                          const char *name,
312                          const struct lib_ring_buffer_config *config,
313                          void *priv, size_t subbuf_size, size_t num_subbuf)
314 {
315         struct channel *chan = container_of(chanb, struct channel, backend);
316         unsigned int i;
317         int ret;
318
319         if (!name)
320                 return -EPERM;
321
322         /* Check that the subbuffer size is larger than a page. */
323         if (subbuf_size < PAGE_SIZE)
324                 return -EINVAL;
325
326         /*
327          * Make sure the number of subbuffers and subbuffer size are
328          * power of 2 and nonzero.
329          */
330         if (!subbuf_size || (subbuf_size & (subbuf_size - 1)))
331                 return -EINVAL;
332         if (!num_subbuf || (num_subbuf & (num_subbuf - 1)))
333                 return -EINVAL;
334
335         ret = subbuffer_id_check_index(config, num_subbuf);
336         if (ret)
337                 return ret;
338
339         chanb->priv = priv;
340         chanb->buf_size = num_subbuf * subbuf_size;
341         chanb->subbuf_size = subbuf_size;
342         chanb->buf_size_order = get_count_order(chanb->buf_size);
343         chanb->subbuf_size_order = get_count_order(subbuf_size);
344         chanb->num_subbuf_order = get_count_order(num_subbuf);
345         chanb->extra_reader_sb =
346                         (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0;
347         chanb->num_subbuf = num_subbuf;
348         strlcpy(chanb->name, name, NAME_MAX);
349         memcpy(&chanb->config, config, sizeof(chanb->config));
350
351         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
352                 if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL))
353                         return -ENOMEM;
354         }
355
356         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
357                 /* Allocating the buffer per-cpu structures */
358                 chanb->buf = alloc_percpu(struct lib_ring_buffer);
359                 if (!chanb->buf)
360                         goto free_cpumask;
361
362                 /*
363                  * In case of non-hotplug cpu, if the ring-buffer is allocated
364                  * in early initcall, it will not be notified of secondary cpus.
365                  * In that off case, we need to allocate for all possible cpus.
366                  */
367 #ifdef CONFIG_HOTPLUG_CPU
368                 /*
369                  * buf->backend.allocated test takes care of concurrent CPU
370                  * hotplug.
371                  * Priority higher than frontend, so we create the ring buffer
372                  * before we start the timer.
373                  */
374                 chanb->cpu_hp_notifier.notifier_call =
375                                 lib_ring_buffer_cpu_hp_callback;
376                 chanb->cpu_hp_notifier.priority = 5;
377                 register_hotcpu_notifier(&chanb->cpu_hp_notifier);
378
379                 get_online_cpus();
380                 for_each_online_cpu(i) {
381                         ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
382                                                  chanb, i);
383                         if (ret)
384                                 goto free_bufs; /* cpu hotplug locked */
385                 }
386                 put_online_cpus();
387 #else
388                 for_each_possible_cpu(i) {
389                         ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i),
390                                                  chanb, i);
391                         if (ret)
392                                 goto free_bufs; /* cpu hotplug locked */
393                 }
394 #endif
395         } else {
396                 chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL);
397                 if (!chanb->buf)
398                         goto free_cpumask;
399                 ret = lib_ring_buffer_create(chanb->buf, chanb, -1);
400                 if (ret)
401                         goto free_bufs;
402         }
403         chanb->start_tsc = config->cb.ring_buffer_clock_read(chan);
404
405         return 0;
406
407 free_bufs:
408         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
409                 for_each_possible_cpu(i) {
410                         struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
411
412                         if (!buf->backend.allocated)
413                                 continue;
414                         lib_ring_buffer_free(buf);
415                 }
416 #ifdef CONFIG_HOTPLUG_CPU
417                 put_online_cpus();
418 #endif
419                 free_percpu(chanb->buf);
420         } else
421                 kfree(chanb->buf);
422 free_cpumask:
423         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
424                 free_cpumask_var(chanb->cpumask);
425         return -ENOMEM;
426 }
427
428 /**
429  * channel_backend_unregister_notifiers - unregister notifiers
430  * @chan: the channel
431  *
432  * Holds CPU hotplug.
433  */
434 void channel_backend_unregister_notifiers(struct channel_backend *chanb)
435 {
436         const struct lib_ring_buffer_config *config = &chanb->config;
437
438         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
439                 unregister_hotcpu_notifier(&chanb->cpu_hp_notifier);
440 }
441
442 /**
443  * channel_backend_free - destroy the channel
444  * @chan: the channel
445  *
446  * Destroy all channel buffers and frees the channel.
447  */
448 void channel_backend_free(struct channel_backend *chanb)
449 {
450         const struct lib_ring_buffer_config *config = &chanb->config;
451         unsigned int i;
452
453         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
454                 for_each_possible_cpu(i) {
455                         struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i);
456
457                         if (!buf->backend.allocated)
458                                 continue;
459                         lib_ring_buffer_free(buf);
460                 }
461                 free_cpumask_var(chanb->cpumask);
462                 free_percpu(chanb->buf);
463         } else {
464                 struct lib_ring_buffer *buf = chanb->buf;
465
466                 CHAN_WARN_ON(chanb, !buf->backend.allocated);
467                 lib_ring_buffer_free(buf);
468                 kfree(buf);
469         }
470 }
471
472 /**
473  * lib_ring_buffer_write - write data to a ring_buffer buffer.
474  * @bufb : buffer backend
475  * @offset : offset within the buffer
476  * @src : source address
477  * @len : length to write
478  * @pagecpy : page size copied so far
479  */
480 void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset,
481                             const void *src, size_t len, ssize_t pagecpy)
482 {
483         struct channel_backend *chanb = &bufb->chan->backend;
484         const struct lib_ring_buffer_config *config = &chanb->config;
485         size_t sbidx, index;
486         struct lib_ring_buffer_backend_pages *rpages;
487         unsigned long sb_bindex, id;
488
489         do {
490                 len -= pagecpy;
491                 src += pagecpy;
492                 offset += pagecpy;
493                 sbidx = offset >> chanb->subbuf_size_order;
494                 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
495
496                 /*
497                  * Underlying layer should never ask for writes across
498                  * subbuffers.
499                  */
500                 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
501
502                 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
503                 id = bufb->buf_wsb[sbidx].id;
504                 sb_bindex = subbuffer_id_get_index(config, id);
505                 rpages = bufb->array[sb_bindex];
506                 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
507                              && subbuffer_id_is_noref(config, id));
508                 lib_ring_buffer_do_copy(config,
509                                         rpages->p[index].virt
510                                                 + (offset & ~PAGE_MASK),
511                                         src, pagecpy);
512         } while (unlikely(len != pagecpy));
513 }
514 EXPORT_SYMBOL_GPL(_lib_ring_buffer_write);
515
516
517 /**
518  * lib_ring_buffer_memset - write len bytes of c to a ring_buffer buffer.
519  * @bufb : buffer backend
520  * @offset : offset within the buffer
521  * @c : the byte to write
522  * @len : length to write
523  * @pagecpy : page size copied so far
524  */
525 void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb,
526                              size_t offset,
527                              int c, size_t len, ssize_t pagecpy)
528 {
529         struct channel_backend *chanb = &bufb->chan->backend;
530         const struct lib_ring_buffer_config *config = &chanb->config;
531         size_t sbidx, index;
532         struct lib_ring_buffer_backend_pages *rpages;
533         unsigned long sb_bindex, id;
534
535         do {
536                 len -= pagecpy;
537                 offset += pagecpy;
538                 sbidx = offset >> chanb->subbuf_size_order;
539                 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
540
541                 /*
542                  * Underlying layer should never ask for writes across
543                  * subbuffers.
544                  */
545                 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
546
547                 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
548                 id = bufb->buf_wsb[sbidx].id;
549                 sb_bindex = subbuffer_id_get_index(config, id);
550                 rpages = bufb->array[sb_bindex];
551                 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
552                              && subbuffer_id_is_noref(config, id));
553                 lib_ring_buffer_do_memset(rpages->p[index].virt
554                                           + (offset & ~PAGE_MASK),
555                                           c, pagecpy);
556         } while (unlikely(len != pagecpy));
557 }
558 EXPORT_SYMBOL_GPL(_lib_ring_buffer_memset);
559
560
561 /**
562  * lib_ring_buffer_copy_from_user_inatomic - write user data to a ring_buffer buffer.
563  * @bufb : buffer backend
564  * @offset : offset within the buffer
565  * @src : source address
566  * @len : length to write
567  * @pagecpy : page size copied so far
568  *
569  * This function deals with userspace pointers, it should never be called
570  * directly without having the src pointer checked with access_ok()
571  * previously.
572  */
573 void _lib_ring_buffer_copy_from_user_inatomic(struct lib_ring_buffer_backend *bufb,
574                                       size_t offset,
575                                       const void __user *src, size_t len,
576                                       ssize_t pagecpy)
577 {
578         struct channel_backend *chanb = &bufb->chan->backend;
579         const struct lib_ring_buffer_config *config = &chanb->config;
580         size_t sbidx, index;
581         struct lib_ring_buffer_backend_pages *rpages;
582         unsigned long sb_bindex, id;
583         int ret;
584
585         do {
586                 len -= pagecpy;
587                 src += pagecpy;
588                 offset += pagecpy;
589                 sbidx = offset >> chanb->subbuf_size_order;
590                 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
591
592                 /*
593                  * Underlying layer should never ask for writes across
594                  * subbuffers.
595                  */
596                 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
597
598                 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
599                 id = bufb->buf_wsb[sbidx].id;
600                 sb_bindex = subbuffer_id_get_index(config, id);
601                 rpages = bufb->array[sb_bindex];
602                 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
603                                 && subbuffer_id_is_noref(config, id));
604                 ret = lib_ring_buffer_do_copy_from_user_inatomic(rpages->p[index].virt
605                                                         + (offset & ~PAGE_MASK),
606                                                         src, pagecpy) != 0;
607                 if (ret > 0) {
608                         offset += (pagecpy - ret);
609                         len -= (pagecpy - ret);
610                         _lib_ring_buffer_memset(bufb, offset, 0, len, 0);
611                         break; /* stop copy */
612                 }
613         } while (unlikely(len != pagecpy));
614 }
615 EXPORT_SYMBOL_GPL(_lib_ring_buffer_copy_from_user_inatomic);
616
617 /**
618  * lib_ring_buffer_read - read data from ring_buffer_buffer.
619  * @bufb : buffer backend
620  * @offset : offset within the buffer
621  * @dest : destination address
622  * @len : length to copy to destination
623  *
624  * Should be protected by get_subbuf/put_subbuf.
625  * Returns the length copied.
626  */
627 size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset,
628                             void *dest, size_t len)
629 {
630         struct channel_backend *chanb = &bufb->chan->backend;
631         const struct lib_ring_buffer_config *config = &chanb->config;
632         size_t index;
633         ssize_t pagecpy, orig_len;
634         struct lib_ring_buffer_backend_pages *rpages;
635         unsigned long sb_bindex, id;
636
637         orig_len = len;
638         offset &= chanb->buf_size - 1;
639         index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
640         if (unlikely(!len))
641                 return 0;
642         for (;;) {
643                 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
644                 id = bufb->buf_rsb.id;
645                 sb_bindex = subbuffer_id_get_index(config, id);
646                 rpages = bufb->array[sb_bindex];
647                 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
648                              && subbuffer_id_is_noref(config, id));
649                 memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK),
650                        pagecpy);
651                 len -= pagecpy;
652                 if (likely(!len))
653                         break;
654                 dest += pagecpy;
655                 offset += pagecpy;
656                 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
657                 /*
658                  * Underlying layer should never ask for reads across
659                  * subbuffers.
660                  */
661                 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
662         }
663         return orig_len;
664 }
665 EXPORT_SYMBOL_GPL(lib_ring_buffer_read);
666
667 /**
668  * __lib_ring_buffer_copy_to_user - read data from ring_buffer to userspace
669  * @bufb : buffer backend
670  * @offset : offset within the buffer
671  * @dest : destination userspace address
672  * @len : length to copy to destination
673  *
674  * Should be protected by get_subbuf/put_subbuf.
675  * access_ok() must have been performed on dest addresses prior to call this
676  * function.
677  * Returns -EFAULT on error, 0 if ok.
678  */
679 int __lib_ring_buffer_copy_to_user(struct lib_ring_buffer_backend *bufb,
680                                    size_t offset, void __user *dest, size_t len)
681 {
682         struct channel_backend *chanb = &bufb->chan->backend;
683         const struct lib_ring_buffer_config *config = &chanb->config;
684         size_t index;
685         ssize_t pagecpy;
686         struct lib_ring_buffer_backend_pages *rpages;
687         unsigned long sb_bindex, id;
688
689         offset &= chanb->buf_size - 1;
690         index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
691         if (unlikely(!len))
692                 return 0;
693         for (;;) {
694                 pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK));
695                 id = bufb->buf_rsb.id;
696                 sb_bindex = subbuffer_id_get_index(config, id);
697                 rpages = bufb->array[sb_bindex];
698                 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
699                              && subbuffer_id_is_noref(config, id));
700                 if (__copy_to_user(dest,
701                                rpages->p[index].virt + (offset & ~PAGE_MASK),
702                                pagecpy))
703                         return -EFAULT;
704                 len -= pagecpy;
705                 if (likely(!len))
706                         break;
707                 dest += pagecpy;
708                 offset += pagecpy;
709                 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
710                 /*
711                  * Underlying layer should never ask for reads across
712                  * subbuffers.
713                  */
714                 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
715         }
716         return 0;
717 }
718 EXPORT_SYMBOL_GPL(__lib_ring_buffer_copy_to_user);
719
720 /**
721  * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer.
722  * @bufb : buffer backend
723  * @offset : offset within the buffer
724  * @dest : destination address
725  * @len : destination's length
726  *
727  * Return string's length, or -EINVAL on error.
728  * Should be protected by get_subbuf/put_subbuf.
729  * Destination length should be at least 1 to hold '\0'.
730  */
731 int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset,
732                               void *dest, size_t len)
733 {
734         struct channel_backend *chanb = &bufb->chan->backend;
735         const struct lib_ring_buffer_config *config = &chanb->config;
736         size_t index;
737         ssize_t pagecpy, pagelen, strpagelen, orig_offset;
738         char *str;
739         struct lib_ring_buffer_backend_pages *rpages;
740         unsigned long sb_bindex, id;
741
742         offset &= chanb->buf_size - 1;
743         index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
744         orig_offset = offset;
745         if (unlikely(!len))
746                 return -EINVAL;
747         for (;;) {
748                 id = bufb->buf_rsb.id;
749                 sb_bindex = subbuffer_id_get_index(config, id);
750                 rpages = bufb->array[sb_bindex];
751                 CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
752                              && subbuffer_id_is_noref(config, id));
753                 str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK);
754                 pagelen = PAGE_SIZE - (offset & ~PAGE_MASK);
755                 strpagelen = strnlen(str, pagelen);
756                 if (len) {
757                         pagecpy = min_t(size_t, len, strpagelen);
758                         if (dest) {
759                                 memcpy(dest, str, pagecpy);
760                                 dest += pagecpy;
761                         }
762                         len -= pagecpy;
763                 }
764                 offset += strpagelen;
765                 index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
766                 if (strpagelen < pagelen)
767                         break;
768                 /*
769                  * Underlying layer should never ask for reads across
770                  * subbuffers.
771                  */
772                 CHAN_WARN_ON(chanb, offset >= chanb->buf_size);
773         }
774         if (dest && len)
775                 ((char *)dest)[0] = 0;
776         return offset - orig_offset;
777 }
778 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr);
779
780 /**
781  * lib_ring_buffer_read_get_page - Get a whole page to read from
782  * @bufb : buffer backend
783  * @offset : offset within the buffer
784  * @virt : pointer to page address (output)
785  *
786  * Should be protected by get_subbuf/put_subbuf.
787  * Returns the pointer to the page struct pointer.
788  */
789 struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb,
790                                             size_t offset, void ***virt)
791 {
792         size_t index;
793         struct lib_ring_buffer_backend_pages *rpages;
794         struct channel_backend *chanb = &bufb->chan->backend;
795         const struct lib_ring_buffer_config *config = &chanb->config;
796         unsigned long sb_bindex, id;
797
798         offset &= chanb->buf_size - 1;
799         index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
800         id = bufb->buf_rsb.id;
801         sb_bindex = subbuffer_id_get_index(config, id);
802         rpages = bufb->array[sb_bindex];
803         CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
804                      && subbuffer_id_is_noref(config, id));
805         *virt = &rpages->p[index].virt;
806         return &rpages->p[index].page;
807 }
808 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page);
809
810 /**
811  * lib_ring_buffer_read_offset_address - get address of a buffer location
812  * @bufb : buffer backend
813  * @offset : offset within the buffer.
814  *
815  * Return the address where a given offset is located (for read).
816  * Should be used to get the current subbuffer header pointer. Given we know
817  * it's never on a page boundary, it's safe to write directly to this address,
818  * as long as the write is never bigger than a page size.
819  */
820 void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb,
821                                           size_t offset)
822 {
823         size_t index;
824         struct lib_ring_buffer_backend_pages *rpages;
825         struct channel_backend *chanb = &bufb->chan->backend;
826         const struct lib_ring_buffer_config *config = &chanb->config;
827         unsigned long sb_bindex, id;
828
829         offset &= chanb->buf_size - 1;
830         index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
831         id = bufb->buf_rsb.id;
832         sb_bindex = subbuffer_id_get_index(config, id);
833         rpages = bufb->array[sb_bindex];
834         CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
835                      && subbuffer_id_is_noref(config, id));
836         return rpages->p[index].virt + (offset & ~PAGE_MASK);
837 }
838 EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address);
839
840 /**
841  * lib_ring_buffer_offset_address - get address of a location within the buffer
842  * @bufb : buffer backend
843  * @offset : offset within the buffer.
844  *
845  * Return the address where a given offset is located.
846  * Should be used to get the current subbuffer header pointer. Given we know
847  * it's always at the beginning of a page, it's safe to write directly to this
848  * address, as long as the write is never bigger than a page size.
849  */
850 void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb,
851                                      size_t offset)
852 {
853         size_t sbidx, index;
854         struct lib_ring_buffer_backend_pages *rpages;
855         struct channel_backend *chanb = &bufb->chan->backend;
856         const struct lib_ring_buffer_config *config = &chanb->config;
857         unsigned long sb_bindex, id;
858
859         offset &= chanb->buf_size - 1;
860         sbidx = offset >> chanb->subbuf_size_order;
861         index = (offset & (chanb->subbuf_size - 1)) >> PAGE_SHIFT;
862         id = bufb->buf_wsb[sbidx].id;
863         sb_bindex = subbuffer_id_get_index(config, id);
864         rpages = bufb->array[sb_bindex];
865         CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE
866                      && subbuffer_id_is_noref(config, id));
867         return rpages->p[index].virt + (offset & ~PAGE_MASK);
868 }
869 EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address);