Merge tag 'v3.14.25' into backport/v3.14.24-ltsi-rc1+v3.14.25/snapshot-merge.wip
[platform/adaptation/renesas_rcar/renesas_kernel.git] / drivers / staging / lttng / lib / ringbuffer / ring_buffer_iterator.c
1 /*
2  * ring_buffer_iterator.c
3  *
4  * Ring buffer and channel iterators. Get each event of a channel in order. Uses
5  * a prio heap for per-cpu buffers, giving a O(log(NR_CPUS)) algorithmic
6  * complexity for the "get next event" operation.
7  *
8  * Copyright (C) 2010-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
9  *
10  * This library is free software; you can redistribute it and/or
11  * modify it under the terms of the GNU Lesser General Public
12  * License as published by the Free Software Foundation; only
13  * version 2.1 of the License.
14  *
15  * This library is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18  * Lesser General Public License for more details.
19  *
20  * You should have received a copy of the GNU Lesser General Public
21  * License along with this library; if not, write to the Free Software
22  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
23  *
24  * Author:
25  *      Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
26  */
27
28 #include "../../wrapper/ringbuffer/iterator.h"
29 #include <linux/jiffies.h>
30 #include <linux/delay.h>
31 #include <linux/module.h>
32
33 /*
34  * Safety factor taking into account internal kernel interrupt latency.
35  * Assuming 250ms worse-case latency.
36  */
37 #define MAX_SYSTEM_LATENCY      250
38
39 /*
40  * Maximum delta expected between trace clocks. At most 1 jiffy delta.
41  */
42 #define MAX_CLOCK_DELTA         (jiffies_to_usecs(1) * 1000)
43
44 /**
45  * lib_ring_buffer_get_next_record - Get the next record in a buffer.
46  * @chan: channel
47  * @buf: buffer
48  *
49  * Returns the size of the event read, -EAGAIN if buffer is empty, -ENODATA if
50  * buffer is empty and finalized. The buffer must already be opened for reading.
51  */
52 ssize_t lib_ring_buffer_get_next_record(struct channel *chan,
53                                         struct lib_ring_buffer *buf)
54 {
55         const struct lib_ring_buffer_config *config = &chan->backend.config;
56         struct lib_ring_buffer_iter *iter = &buf->iter;
57         int ret;
58
59 restart:
60         switch (iter->state) {
61         case ITER_GET_SUBBUF:
62                 ret = lib_ring_buffer_get_next_subbuf(buf);
63                 if (ret && !ACCESS_ONCE(buf->finalized)
64                     && config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
65                         /*
66                          * Use "pull" scheme for global buffers. The reader
67                          * itself flushes the buffer to "pull" data not visible
68                          * to readers yet. Flush current subbuffer and re-try.
69                          *
70                          * Per-CPU buffers rather use a "push" scheme because
71                          * the IPI needed to flush all CPU's buffers is too
72                          * costly. In the "push" scheme, the reader waits for
73                          * the writer periodic deferrable timer to flush the
74                          * buffers (keeping track of a quiescent state
75                          * timestamp). Therefore, the writer "pushes" data out
76                          * of the buffers rather than letting the reader "pull"
77                          * data from the buffer.
78                          */
79                         lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
80                         ret = lib_ring_buffer_get_next_subbuf(buf);
81                 }
82                 if (ret)
83                         return ret;
84                 iter->consumed = buf->cons_snapshot;
85                 iter->data_size = lib_ring_buffer_get_read_data_size(config, buf);
86                 iter->read_offset = iter->consumed;
87                 /* skip header */
88                 iter->read_offset += config->cb.subbuffer_header_size();
89                 iter->state = ITER_TEST_RECORD;
90                 goto restart;
91         case ITER_TEST_RECORD:
92                 if (iter->read_offset - iter->consumed >= iter->data_size) {
93                         iter->state = ITER_PUT_SUBBUF;
94                 } else {
95                         CHAN_WARN_ON(chan, !config->cb.record_get);
96                         config->cb.record_get(config, chan, buf,
97                                               iter->read_offset,
98                                               &iter->header_len,
99                                               &iter->payload_len,
100                                               &iter->timestamp);
101                         iter->read_offset += iter->header_len;
102                         subbuffer_consume_record(config, &buf->backend);
103                         iter->state = ITER_NEXT_RECORD;
104                         return iter->payload_len;
105                 }
106                 goto restart;
107         case ITER_NEXT_RECORD:
108                 iter->read_offset += iter->payload_len;
109                 iter->state = ITER_TEST_RECORD;
110                 goto restart;
111         case ITER_PUT_SUBBUF:
112                 lib_ring_buffer_put_next_subbuf(buf);
113                 iter->state = ITER_GET_SUBBUF;
114                 goto restart;
115         default:
116                 CHAN_WARN_ON(chan, 1);  /* Should not happen */
117                 return -EPERM;
118         }
119 }
120 EXPORT_SYMBOL_GPL(lib_ring_buffer_get_next_record);
121
122 static int buf_is_higher(void *a, void *b)
123 {
124         struct lib_ring_buffer *bufa = a;
125         struct lib_ring_buffer *bufb = b;
126
127         /* Consider lowest timestamps to be at the top of the heap */
128         return (bufa->iter.timestamp < bufb->iter.timestamp);
129 }
130
131 static
132 void lib_ring_buffer_get_empty_buf_records(const struct lib_ring_buffer_config *config,
133                                            struct channel *chan)
134 {
135         struct lttng_ptr_heap *heap = &chan->iter.heap;
136         struct lib_ring_buffer *buf, *tmp;
137         ssize_t len;
138
139         list_for_each_entry_safe(buf, tmp, &chan->iter.empty_head,
140                                  iter.empty_node) {
141                 len = lib_ring_buffer_get_next_record(chan, buf);
142
143                 /*
144                  * Deal with -EAGAIN and -ENODATA.
145                  * len >= 0 means record contains data.
146                  * -EBUSY should never happen, because we support only one
147                  * reader.
148                  */
149                 switch (len) {
150                 case -EAGAIN:
151                         /* Keep node in empty list */
152                         break;
153                 case -ENODATA:
154                         /*
155                          * Buffer is finalized. Don't add to list of empty
156                          * buffer, because it has no more data to provide, ever.
157                          */
158                         list_del(&buf->iter.empty_node);
159                         break;
160                 case -EBUSY:
161                         CHAN_WARN_ON(chan, 1);
162                         break;
163                 default:
164                         /*
165                          * Insert buffer into the heap, remove from empty buffer
166                          * list.
167                          */
168                         CHAN_WARN_ON(chan, len < 0);
169                         list_del(&buf->iter.empty_node);
170                         CHAN_WARN_ON(chan, lttng_heap_insert(heap, buf));
171                 }
172         }
173 }
174
175 static
176 void lib_ring_buffer_wait_for_qs(const struct lib_ring_buffer_config *config,
177                                  struct channel *chan)
178 {
179         u64 timestamp_qs;
180         unsigned long wait_msecs;
181
182         /*
183          * No need to wait if no empty buffers are present.
184          */
185         if (list_empty(&chan->iter.empty_head))
186                 return;
187
188         timestamp_qs = config->cb.ring_buffer_clock_read(chan);
189         /*
190          * We need to consider previously empty buffers.
191          * Do a get next buf record on each of them. Add them to
192          * the heap if they have data. If at least one of them
193          * don't have data, we need to wait for
194          * switch_timer_interval + MAX_SYSTEM_LATENCY (so we are sure the
195          * buffers have been switched either by the timer or idle entry) and
196          * check them again, adding them if they have data.
197          */
198         lib_ring_buffer_get_empty_buf_records(config, chan);
199
200         /*
201          * No need to wait if no empty buffers are present.
202          */
203         if (list_empty(&chan->iter.empty_head))
204                 return;
205
206         /*
207          * We need to wait for the buffer switch timer to run. If the
208          * CPU is idle, idle entry performed the switch.
209          * TODO: we could optimize further by skipping the sleep if all
210          * empty buffers belong to idle or offline cpus.
211          */
212         wait_msecs = jiffies_to_msecs(chan->switch_timer_interval);
213         wait_msecs += MAX_SYSTEM_LATENCY;
214         msleep(wait_msecs);
215         lib_ring_buffer_get_empty_buf_records(config, chan);
216         /*
217          * Any buffer still in the empty list here cannot possibly
218          * contain an event with a timestamp prior to "timestamp_qs".
219          * The new quiescent state timestamp is the one we grabbed
220          * before waiting for buffer data.  It is therefore safe to
221          * ignore empty buffers up to last_qs timestamp for fusion
222          * merge.
223          */
224         chan->iter.last_qs = timestamp_qs;
225 }
226
227 /**
228  * channel_get_next_record - Get the next record in a channel.
229  * @chan: channel
230  * @ret_buf: the buffer in which the event is located (output)
231  *
232  * Returns the size of new current event, -EAGAIN if all buffers are empty,
233  * -ENODATA if all buffers are empty and finalized. The channel must already be
234  * opened for reading.
235  */
236
237 ssize_t channel_get_next_record(struct channel *chan,
238                                 struct lib_ring_buffer **ret_buf)
239 {
240         const struct lib_ring_buffer_config *config = &chan->backend.config;
241         struct lib_ring_buffer *buf;
242         struct lttng_ptr_heap *heap;
243         ssize_t len;
244
245         if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
246                 *ret_buf = channel_get_ring_buffer(config, chan, 0);
247                 return lib_ring_buffer_get_next_record(chan, *ret_buf);
248         }
249
250         heap = &chan->iter.heap;
251
252         /*
253          * get next record for topmost buffer.
254          */
255         buf = lttng_heap_maximum(heap);
256         if (buf) {
257                 len = lib_ring_buffer_get_next_record(chan, buf);
258                 /*
259                  * Deal with -EAGAIN and -ENODATA.
260                  * len >= 0 means record contains data.
261                  */
262                 switch (len) {
263                 case -EAGAIN:
264                         buf->iter.timestamp = 0;
265                         list_add(&buf->iter.empty_node, &chan->iter.empty_head);
266                         /* Remove topmost buffer from the heap */
267                         CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
268                         break;
269                 case -ENODATA:
270                         /*
271                          * Buffer is finalized. Remove buffer from heap and
272                          * don't add to list of empty buffer, because it has no
273                          * more data to provide, ever.
274                          */
275                         CHAN_WARN_ON(chan, lttng_heap_remove(heap) != buf);
276                         break;
277                 case -EBUSY:
278                         CHAN_WARN_ON(chan, 1);
279                         break;
280                 default:
281                         /*
282                          * Reinsert buffer into the heap. Note that heap can be
283                          * partially empty, so we need to use
284                          * lttng_heap_replace_max().
285                          */
286                         CHAN_WARN_ON(chan, len < 0);
287                         CHAN_WARN_ON(chan, lttng_heap_replace_max(heap, buf) != buf);
288                         break;
289                 }
290         }
291
292         buf = lttng_heap_maximum(heap);
293         if (!buf || buf->iter.timestamp > chan->iter.last_qs) {
294                 /*
295                  * Deal with buffers previously showing no data.
296                  * Add buffers containing data to the heap, update
297                  * last_qs.
298                  */
299                 lib_ring_buffer_wait_for_qs(config, chan);
300         }
301
302         *ret_buf = buf = lttng_heap_maximum(heap);
303         if (buf) {
304                 /*
305                  * If this warning triggers, you probably need to check your
306                  * system interrupt latency. Typical causes: too many printk()
307                  * output going to a serial console with interrupts off.
308                  * Allow for MAX_CLOCK_DELTA ns timestamp delta going backward.
309                  * Observed on SMP KVM setups with trace_clock().
310                  */
311                 if (chan->iter.last_timestamp
312                     > (buf->iter.timestamp + MAX_CLOCK_DELTA)) {
313                         printk(KERN_WARNING "ring_buffer: timestamps going "
314                                "backward. Last time %llu ns, cpu %d, "
315                                "current time %llu ns, cpu %d, "
316                                "delta %llu ns.\n",
317                                chan->iter.last_timestamp, chan->iter.last_cpu,
318                                buf->iter.timestamp, buf->backend.cpu,
319                                chan->iter.last_timestamp - buf->iter.timestamp);
320                         CHAN_WARN_ON(chan, 1);
321                 }
322                 chan->iter.last_timestamp = buf->iter.timestamp;
323                 chan->iter.last_cpu = buf->backend.cpu;
324                 return buf->iter.payload_len;
325         } else {
326                 /* Heap is empty */
327                 if (list_empty(&chan->iter.empty_head))
328                         return -ENODATA;        /* All buffers finalized */
329                 else
330                         return -EAGAIN;         /* Temporarily empty */
331         }
332 }
333 EXPORT_SYMBOL_GPL(channel_get_next_record);
334
335 static
336 void lib_ring_buffer_iterator_init(struct channel *chan, struct lib_ring_buffer *buf)
337 {
338         if (buf->iter.allocated)
339                 return;
340
341         buf->iter.allocated = 1;
342         if (chan->iter.read_open && !buf->iter.read_open) {
343                 CHAN_WARN_ON(chan, lib_ring_buffer_open_read(buf) != 0);
344                 buf->iter.read_open = 1;
345         }
346
347         /* Add to list of buffers without any current record */
348         if (chan->backend.config.alloc == RING_BUFFER_ALLOC_PER_CPU)
349                 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
350 }
351
352 #ifdef CONFIG_HOTPLUG_CPU
353 static
354 int channel_iterator_cpu_hotplug(struct notifier_block *nb,
355                                            unsigned long action,
356                                            void *hcpu)
357 {
358         unsigned int cpu = (unsigned long)hcpu;
359         struct channel *chan = container_of(nb, struct channel,
360                                             hp_iter_notifier);
361         struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
362         const struct lib_ring_buffer_config *config = &chan->backend.config;
363
364         if (!chan->hp_iter_enable)
365                 return NOTIFY_DONE;
366
367         CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
368
369         switch (action) {
370         case CPU_DOWN_FAILED:
371         case CPU_DOWN_FAILED_FROZEN:
372         case CPU_ONLINE:
373         case CPU_ONLINE_FROZEN:
374                 lib_ring_buffer_iterator_init(chan, buf);
375                 return NOTIFY_OK;
376         default:
377                 return NOTIFY_DONE;
378         }
379 }
380 #endif
381
382 int channel_iterator_init(struct channel *chan)
383 {
384         const struct lib_ring_buffer_config *config = &chan->backend.config;
385         struct lib_ring_buffer *buf;
386
387         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
388                 int cpu, ret;
389
390                 INIT_LIST_HEAD(&chan->iter.empty_head);
391                 ret = lttng_heap_init(&chan->iter.heap,
392                                 num_possible_cpus(),
393                                 GFP_KERNEL, buf_is_higher);
394                 if (ret)
395                         return ret;
396                 /*
397                  * In case of non-hotplug cpu, if the ring-buffer is allocated
398                  * in early initcall, it will not be notified of secondary cpus.
399                  * In that off case, we need to allocate for all possible cpus.
400                  */
401 #ifdef CONFIG_HOTPLUG_CPU
402                 chan->hp_iter_notifier.notifier_call =
403                         channel_iterator_cpu_hotplug;
404                 chan->hp_iter_notifier.priority = 10;
405                 register_cpu_notifier(&chan->hp_iter_notifier);
406                 get_online_cpus();
407                 for_each_online_cpu(cpu) {
408                         buf = per_cpu_ptr(chan->backend.buf, cpu);
409                         lib_ring_buffer_iterator_init(chan, buf);
410                 }
411                 chan->hp_iter_enable = 1;
412                 put_online_cpus();
413 #else
414                 for_each_possible_cpu(cpu) {
415                         buf = per_cpu_ptr(chan->backend.buf, cpu);
416                         lib_ring_buffer_iterator_init(chan, buf);
417                 }
418 #endif
419         } else {
420                 buf = channel_get_ring_buffer(config, chan, 0);
421                 lib_ring_buffer_iterator_init(chan, buf);
422         }
423         return 0;
424 }
425
426 void channel_iterator_unregister_notifiers(struct channel *chan)
427 {
428         const struct lib_ring_buffer_config *config = &chan->backend.config;
429
430         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
431                 chan->hp_iter_enable = 0;
432                 unregister_cpu_notifier(&chan->hp_iter_notifier);
433         }
434 }
435
436 void channel_iterator_free(struct channel *chan)
437 {
438         const struct lib_ring_buffer_config *config = &chan->backend.config;
439
440         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
441                 lttng_heap_free(&chan->iter.heap);
442 }
443
444 int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf)
445 {
446         struct channel *chan = buf->backend.chan;
447         const struct lib_ring_buffer_config *config = &chan->backend.config;
448         CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
449         return lib_ring_buffer_open_read(buf);
450 }
451 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_open);
452
453 /*
454  * Note: Iterators must not be mixed with other types of outputs, because an
455  * iterator can leave the buffer in "GET" state, which is not consistent with
456  * other types of output (mmap, splice, raw data read).
457  */
458 void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf)
459 {
460         lib_ring_buffer_release_read(buf);
461 }
462 EXPORT_SYMBOL_GPL(lib_ring_buffer_iterator_release);
463
464 int channel_iterator_open(struct channel *chan)
465 {
466         const struct lib_ring_buffer_config *config = &chan->backend.config;
467         struct lib_ring_buffer *buf;
468         int ret = 0, cpu;
469
470         CHAN_WARN_ON(chan, config->output != RING_BUFFER_ITERATOR);
471
472         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
473                 get_online_cpus();
474                 /* Allow CPU hotplug to keep track of opened reader */
475                 chan->iter.read_open = 1;
476                 for_each_channel_cpu(cpu, chan) {
477                         buf = channel_get_ring_buffer(config, chan, cpu);
478                         ret = lib_ring_buffer_iterator_open(buf);
479                         if (ret)
480                                 goto error;
481                         buf->iter.read_open = 1;
482                 }
483                 put_online_cpus();
484         } else {
485                 buf = channel_get_ring_buffer(config, chan, 0);
486                 ret = lib_ring_buffer_iterator_open(buf);
487         }
488         return ret;
489 error:
490         /* Error should always happen on CPU 0, hence no close is required. */
491         CHAN_WARN_ON(chan, cpu != 0);
492         put_online_cpus();
493         return ret;
494 }
495 EXPORT_SYMBOL_GPL(channel_iterator_open);
496
497 void channel_iterator_release(struct channel *chan)
498 {
499         const struct lib_ring_buffer_config *config = &chan->backend.config;
500         struct lib_ring_buffer *buf;
501         int cpu;
502
503         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
504                 get_online_cpus();
505                 for_each_channel_cpu(cpu, chan) {
506                         buf = channel_get_ring_buffer(config, chan, cpu);
507                         if (buf->iter.read_open) {
508                                 lib_ring_buffer_iterator_release(buf);
509                                 buf->iter.read_open = 0;
510                         }
511                 }
512                 chan->iter.read_open = 0;
513                 put_online_cpus();
514         } else {
515                 buf = channel_get_ring_buffer(config, chan, 0);
516                 lib_ring_buffer_iterator_release(buf);
517         }
518 }
519 EXPORT_SYMBOL_GPL(channel_iterator_release);
520
521 void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf)
522 {
523         struct channel *chan = buf->backend.chan;
524
525         if (buf->iter.state != ITER_GET_SUBBUF)
526                 lib_ring_buffer_put_next_subbuf(buf);
527         buf->iter.state = ITER_GET_SUBBUF;
528         /* Remove from heap (if present). */
529         if (lttng_heap_cherrypick(&chan->iter.heap, buf))
530                 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
531         buf->iter.timestamp = 0;
532         buf->iter.header_len = 0;
533         buf->iter.payload_len = 0;
534         buf->iter.consumed = 0;
535         buf->iter.read_offset = 0;
536         buf->iter.data_size = 0;
537         /* Don't reset allocated and read_open */
538 }
539
540 void channel_iterator_reset(struct channel *chan)
541 {
542         const struct lib_ring_buffer_config *config = &chan->backend.config;
543         struct lib_ring_buffer *buf;
544         int cpu;
545
546         /* Empty heap, put into empty_head */
547         while ((buf = lttng_heap_remove(&chan->iter.heap)) != NULL)
548                 list_add(&buf->iter.empty_node, &chan->iter.empty_head);
549
550         for_each_channel_cpu(cpu, chan) {
551                 buf = channel_get_ring_buffer(config, chan, cpu);
552                 lib_ring_buffer_iterator_reset(buf);
553         }
554         /* Don't reset read_open */
555         chan->iter.last_qs = 0;
556         chan->iter.last_timestamp = 0;
557         chan->iter.last_cpu = 0;
558         chan->iter.len_left = 0;
559 }
560
561 /*
562  * Ring buffer payload extraction read() implementation.
563  */
564 static
565 ssize_t channel_ring_buffer_file_read(struct file *filp,
566                                       char __user *user_buf,
567                                       size_t count,
568                                       loff_t *ppos,
569                                       struct channel *chan,
570                                       struct lib_ring_buffer *buf,
571                                       int fusionmerge)
572 {
573         const struct lib_ring_buffer_config *config = &chan->backend.config;
574         size_t read_count = 0, read_offset;
575         ssize_t len;
576
577         might_sleep();
578         if (!access_ok(VERIFY_WRITE, user_buf, count))
579                 return -EFAULT;
580
581         /* Finish copy of previous record */
582         if (*ppos != 0) {
583                 if (read_count < count) {
584                         len = chan->iter.len_left;
585                         read_offset = *ppos;
586                         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU
587                             && fusionmerge)
588                                 buf = lttng_heap_maximum(&chan->iter.heap);
589                         CHAN_WARN_ON(chan, !buf);
590                         goto skip_get_next;
591                 }
592         }
593
594         while (read_count < count) {
595                 size_t copy_len, space_left;
596
597                 if (fusionmerge)
598                         len = channel_get_next_record(chan, &buf);
599                 else
600                         len = lib_ring_buffer_get_next_record(chan, buf);
601 len_test:
602                 if (len < 0) {
603                         /*
604                          * Check if buffer is finalized (end of file).
605                          */
606                         if (len == -ENODATA) {
607                                 /* A 0 read_count will tell about end of file */
608                                 goto nodata;
609                         }
610                         if (filp->f_flags & O_NONBLOCK) {
611                                 if (!read_count)
612                                         read_count = -EAGAIN;
613                                 goto nodata;
614                         } else {
615                                 int error;
616
617                                 /*
618                                  * No data available at the moment, return what
619                                  * we got.
620                                  */
621                                 if (read_count)
622                                         goto nodata;
623
624                                 /*
625                                  * Wait for returned len to be >= 0 or -ENODATA.
626                                  */
627                                 if (fusionmerge)
628                                         error = wait_event_interruptible(
629                                           chan->read_wait,
630                                           ((len = channel_get_next_record(chan,
631                                                 &buf)), len != -EAGAIN));
632                                 else
633                                         error = wait_event_interruptible(
634                                           buf->read_wait,
635                                           ((len = lib_ring_buffer_get_next_record(
636                                                   chan, buf)), len != -EAGAIN));
637                                 CHAN_WARN_ON(chan, len == -EBUSY);
638                                 if (error) {
639                                         read_count = error;
640                                         goto nodata;
641                                 }
642                                 CHAN_WARN_ON(chan, len < 0 && len != -ENODATA);
643                                 goto len_test;
644                         }
645                 }
646                 read_offset = buf->iter.read_offset;
647 skip_get_next:
648                 space_left = count - read_count;
649                 if (len <= space_left) {
650                         copy_len = len;
651                         chan->iter.len_left = 0;
652                         *ppos = 0;
653                 } else {
654                         copy_len = space_left;
655                         chan->iter.len_left = len - copy_len;
656                         *ppos = read_offset + copy_len;
657                 }
658                 if (__lib_ring_buffer_copy_to_user(&buf->backend, read_offset,
659                                                &user_buf[read_count],
660                                                copy_len)) {
661                         /*
662                          * Leave the len_left and ppos values at their current
663                          * state, as we currently have a valid event to read.
664                          */
665                         return -EFAULT;
666                 }
667                 read_count += copy_len;
668         };
669         return read_count;
670
671 nodata:
672         *ppos = 0;
673         chan->iter.len_left = 0;
674         return read_count;
675 }
676
677 /**
678  * lib_ring_buffer_file_read - Read buffer record payload.
679  * @filp: file structure pointer.
680  * @buffer: user buffer to read data into.
681  * @count: number of bytes to read.
682  * @ppos: file read position.
683  *
684  * Returns a negative value on error, or the number of bytes read on success.
685  * ppos is used to save the position _within the current record_ between calls
686  * to read().
687  */
688 static
689 ssize_t lib_ring_buffer_file_read(struct file *filp,
690                                   char __user *user_buf,
691                                   size_t count,
692                                   loff_t *ppos)
693 {
694         struct inode *inode = filp->f_dentry->d_inode;
695         struct lib_ring_buffer *buf = inode->i_private;
696         struct channel *chan = buf->backend.chan;
697
698         return channel_ring_buffer_file_read(filp, user_buf, count, ppos,
699                                              chan, buf, 0);
700 }
701
702 /**
703  * channel_file_read - Read channel record payload.
704  * @filp: file structure pointer.
705  * @buffer: user buffer to read data into.
706  * @count: number of bytes to read.
707  * @ppos: file read position.
708  *
709  * Returns a negative value on error, or the number of bytes read on success.
710  * ppos is used to save the position _within the current record_ between calls
711  * to read().
712  */
713 static
714 ssize_t channel_file_read(struct file *filp,
715                           char __user *user_buf,
716                           size_t count,
717                           loff_t *ppos)
718 {
719         struct inode *inode = filp->f_dentry->d_inode;
720         struct channel *chan = inode->i_private;
721         const struct lib_ring_buffer_config *config = &chan->backend.config;
722
723         if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
724                 return channel_ring_buffer_file_read(filp, user_buf, count,
725                                                      ppos, chan, NULL, 1);
726         else {
727                 struct lib_ring_buffer *buf =
728                         channel_get_ring_buffer(config, chan, 0);
729                 return channel_ring_buffer_file_read(filp, user_buf, count,
730                                                      ppos, chan, buf, 0);
731         }
732 }
733
734 static
735 int lib_ring_buffer_file_open(struct inode *inode, struct file *file)
736 {
737         struct lib_ring_buffer *buf = inode->i_private;
738         int ret;
739
740         ret = lib_ring_buffer_iterator_open(buf);
741         if (ret)
742                 return ret;
743
744         file->private_data = buf;
745         ret = nonseekable_open(inode, file);
746         if (ret)
747                 goto release_iter;
748         return 0;
749
750 release_iter:
751         lib_ring_buffer_iterator_release(buf);
752         return ret;
753 }
754
755 static
756 int lib_ring_buffer_file_release(struct inode *inode, struct file *file)
757 {
758         struct lib_ring_buffer *buf = inode->i_private;
759
760         lib_ring_buffer_iterator_release(buf);
761         return 0;
762 }
763
764 static
765 int channel_file_open(struct inode *inode, struct file *file)
766 {
767         struct channel *chan = inode->i_private;
768         int ret;
769
770         ret = channel_iterator_open(chan);
771         if (ret)
772                 return ret;
773
774         file->private_data = chan;
775         ret = nonseekable_open(inode, file);
776         if (ret)
777                 goto release_iter;
778         return 0;
779
780 release_iter:
781         channel_iterator_release(chan);
782         return ret;
783 }
784
785 static
786 int channel_file_release(struct inode *inode, struct file *file)
787 {
788         struct channel *chan = inode->i_private;
789
790         channel_iterator_release(chan);
791         return 0;
792 }
793
794 const struct file_operations channel_payload_file_operations = {
795         .owner = THIS_MODULE,
796         .open = channel_file_open,
797         .release = channel_file_release,
798         .read = channel_file_read,
799         .llseek = vfs_lib_ring_buffer_no_llseek,
800 };
801 EXPORT_SYMBOL_GPL(channel_payload_file_operations);
802
803 const struct file_operations lib_ring_buffer_payload_file_operations = {
804         .owner = THIS_MODULE,
805         .open = lib_ring_buffer_file_open,
806         .release = lib_ring_buffer_file_release,
807         .read = lib_ring_buffer_file_read,
808         .llseek = vfs_lib_ring_buffer_no_llseek,
809 };
810 EXPORT_SYMBOL_GPL(lib_ring_buffer_payload_file_operations);