matroska-demux: fix accumulated base offset in segment seeks
[platform/upstream/gstreamer.git] / subprojects / gst-rtsp-server / examples / test-onvif-server.c
1 /* GStreamer
2  * Copyright (C) 2019 Mathieu Duponchelle <mathieu@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20
21 #include <gst/gst.h>
22
23 #include <gst/rtsp-server/rtsp-server.h>
24
25 #include "test-onvif-server.h"
26
27 GST_DEBUG_CATEGORY_STATIC (onvif_server_debug);
28 #define GST_CAT_DEFAULT (onvif_server_debug)
29
30 #define MAKE_AND_ADD(var, pipe, name, label, elem_name) \
31 G_STMT_START { \
32   if (G_UNLIKELY (!(var = (gst_element_factory_make (name, elem_name))))) { \
33     GST_ERROR ("Could not create element %s", name); \
34     goto label; \
35   } \
36   if (G_UNLIKELY (!gst_bin_add (GST_BIN_CAST (pipe), var))) { \
37     GST_ERROR ("Could not add element %s", name); \
38     goto label; \
39   } \
40 } G_STMT_END
41
42 /* This simulates an archive of recordings running from 01-01-1900 to 01-01-2000.
43  *
44  * This is implemented by repeating the file provided at the command line, with
45  * an empty interval of 5 seconds in-between. We intercept relevant events to
46  * translate them, and update the timestamps on the output buffers.
47  */
48
49 #define INTERVAL (5 * GST_SECOND)
50
51 /* January the first, 2000 */
52 #define END_DATE 3155673600 * GST_SECOND
53
54 static gchar *filename;
55
56 struct _ReplayBin
57 {
58   GstBin parent;
59
60   GstEvent *incoming_seek;
61   GstEvent *outgoing_seek;
62   GstClockTime trickmode_interval;
63
64   GstSegment segment;
65   const GstSegment *incoming_segment;
66   gboolean sent_segment;
67   GstClockTime ts_offset;
68   gint64 remainder;
69   GstClockTime min_pts;
70 };
71
72 G_DEFINE_TYPE (ReplayBin, replay_bin, GST_TYPE_BIN);
73
74 static void
75 replay_bin_init (ReplayBin * self)
76 {
77   self->incoming_seek = NULL;
78   self->outgoing_seek = NULL;
79   self->trickmode_interval = 0;
80   self->ts_offset = 0;
81   self->sent_segment = FALSE;
82   self->min_pts = GST_CLOCK_TIME_NONE;
83 }
84
85 static void
86 replay_bin_class_init (ReplayBinClass * klass)
87 {
88 }
89
90 static GstElement *
91 replay_bin_new (void)
92 {
93   return GST_ELEMENT (g_object_new (replay_bin_get_type (), NULL));
94 }
95
96 static void
97 demux_pad_added_cb (GstElement * demux, GstPad * pad, GstGhostPad * ghost)
98 {
99   GstCaps *caps = gst_pad_get_current_caps (pad);
100   GstStructure *s = gst_caps_get_structure (caps, 0);
101
102   if (gst_structure_has_name (s, "video/x-h264")) {
103     gst_ghost_pad_set_target (ghost, pad);
104   }
105
106   gst_caps_unref (caps);
107 }
108
109 static void
110 query_seekable (GstPad * ghost, gint64 * start, gint64 * stop)
111 {
112   GstPad *target;
113   GstQuery *query;
114   GstFormat format;
115   gboolean seekable;
116
117   target = gst_ghost_pad_get_target (GST_GHOST_PAD (ghost));
118
119   query = gst_query_new_seeking (GST_FORMAT_TIME);
120
121   gst_pad_query (target, query);
122
123   gst_query_parse_seeking (query, &format, &seekable, start, stop);
124
125   g_assert (seekable);
126
127   gst_object_unref (target);
128 }
129
130 static GstEvent *
131 translate_seek (ReplayBin * self, GstPad * pad, GstEvent * ievent)
132 {
133   GstEvent *oevent = NULL;
134   gdouble rate;
135   GstFormat format;
136   GstSeekFlags flags;
137   GstSeekType start_type, stop_type;
138   gint64 start, stop;
139   gint64 istart, istop;         /* Incoming */
140   gint64 ustart, ustop;         /* Upstream */
141   gint64 ostart, ostop;         /* Outgoing */
142   guint32 seqnum = gst_event_get_seqnum (ievent);
143
144   gst_event_parse_seek (ievent, &rate, &format, &flags, &start_type, &start,
145       &stop_type, &stop);
146
147   if (!GST_CLOCK_TIME_IS_VALID (stop))
148     stop = END_DATE;
149
150   gst_event_parse_seek_trickmode_interval (ievent, &self->trickmode_interval);
151
152   istart = start;
153   istop = stop;
154
155   query_seekable (pad, &ustart, &ustop);
156
157   if (rate > 0) {
158     /* First, from where we should seek the file */
159     ostart = istart % (ustop + INTERVAL);
160
161     /* This may end up in our empty interval */
162     if (ostart > ustop) {
163       istart += ostart - ustop;
164       ostart = 0;
165     }
166
167     /* Then, up to where we should seek it */
168     ostop = MIN (ustop, ostart + (istop - istart));
169   } else {
170     /* First up to where we should seek the file */
171     ostop = istop % (ustop + INTERVAL);
172
173     /* This may end up in our empty interval */
174     if (ostop > ustop) {
175       istop -= ostop - ustop;
176       ostop = ustop;
177     }
178
179     ostart = MAX (0, ostop - (istop - istart));
180   }
181
182   /* We may be left with nothing to actually play, in this
183    * case we won't seek upstream, and emit the expected events
184    * ourselves */
185   if (istart > istop) {
186     GstSegment segment;
187     GstEvent *event;
188     gboolean update;
189
190     event = gst_event_new_flush_start ();
191     gst_event_set_seqnum (event, seqnum);
192     gst_pad_push_event (pad, event);
193
194     event = gst_event_new_flush_stop (TRUE);
195     gst_event_set_seqnum (event, seqnum);
196     gst_pad_push_event (pad, event);
197
198     gst_segment_init (&segment, format);
199     gst_segment_do_seek (&segment, rate, format, flags, start_type, start,
200         stop_type, stop, &update);
201
202     event = gst_event_new_segment (&segment);
203     gst_event_set_seqnum (event, seqnum);
204     gst_pad_push_event (pad, event);
205
206     event = gst_event_new_eos ();
207     gst_event_set_seqnum (event, seqnum);
208     gst_pad_push_event (pad, event);
209
210     goto done;
211   }
212
213   /* Lastly, how much will remain to play back (this remainder includes the interval) */
214   if (stop - start > ostop - ostart)
215     self->remainder = (stop - start) - (ostop - ostart);
216
217   flags |= GST_SEEK_FLAG_SEGMENT;
218
219   oevent =
220       gst_event_new_seek (rate, format, flags, start_type, ostart, stop_type,
221       ostop);
222   gst_event_set_seek_trickmode_interval (oevent, self->trickmode_interval);
223   gst_event_set_seqnum (oevent, seqnum);
224
225   GST_DEBUG ("Translated event to %" GST_PTR_FORMAT
226       " (remainder: %" G_GINT64_FORMAT ")", oevent, self->remainder);
227
228 done:
229   return oevent;
230 }
231
232 static gboolean
233 replay_bin_event_func (GstPad * pad, GstObject * parent, GstEvent * event)
234 {
235   ReplayBin *self = REPLAY_BIN (parent);
236   gboolean ret = TRUE;
237   gboolean forward = TRUE;
238
239   switch (GST_EVENT_TYPE (event)) {
240     case GST_EVENT_SEEK:
241     {
242       GST_DEBUG ("Processing seek event %" GST_PTR_FORMAT, event);
243
244       self->incoming_seek = event;
245
246       gst_event_replace (&self->outgoing_seek, NULL);
247       self->sent_segment = FALSE;
248
249       event = translate_seek (self, pad, event);
250
251       if (!event)
252         forward = FALSE;
253       else
254         self->outgoing_seek = gst_event_ref (event);
255       break;
256     }
257     default:
258       break;
259   }
260
261   if (forward)
262     return gst_pad_event_default (pad, parent, event);
263   else
264     return ret;
265 }
266
267 static gboolean
268 replay_bin_query_func (GstPad * pad, GstObject * parent, GstQuery * query)
269 {
270   ReplayBin *self = REPLAY_BIN (parent);
271   gboolean ret = TRUE;
272   gboolean forward = TRUE;
273
274   switch (GST_QUERY_TYPE (query)) {
275     case GST_QUERY_SEEKING:
276       /* We are seekable from the beginning till the end of time */
277       gst_query_set_seeking (query, GST_FORMAT_TIME, TRUE, 0,
278           GST_CLOCK_TIME_NONE);
279       forward = FALSE;
280       break;
281     case GST_QUERY_SEGMENT:
282       gst_query_set_segment (query, self->segment.rate, self->segment.format,
283           self->segment.start, self->segment.stop);
284       forward = FALSE;
285     default:
286       break;
287   }
288
289   GST_DEBUG ("Processed query %" GST_PTR_FORMAT, query);
290
291   if (forward)
292     return gst_pad_query_default (pad, parent, query);
293   else
294     return ret;
295 }
296
297 static GstEvent *
298 translate_segment (GstPad * pad, GstEvent * ievent)
299 {
300   ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad));
301   GstEvent *ret;
302   gdouble irate, orate;
303   GstFormat iformat, oformat;
304   GstSeekFlags iflags, oflags;
305   GstSeekType istart_type, ostart_type, istop_type, ostop_type;
306   gint64 istart, ostart, istop, ostop;
307   gboolean update;
308
309   gst_event_parse_segment (ievent, &self->incoming_segment);
310
311   if (!self->outgoing_seek) {
312     GstSegment segment;
313     gboolean update;
314
315     gst_segment_init (&segment, GST_FORMAT_TIME);
316
317     gst_segment_do_seek (&segment, 1.0, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET,
318         0, GST_SEEK_TYPE_SET, END_DATE, &update);
319
320     ret = gst_event_new_segment (&segment);
321     gst_event_unref (ievent);
322     goto done;
323   }
324
325   if (!self->sent_segment) {
326     gst_event_parse_seek (self->incoming_seek, &irate, &iformat, &iflags,
327         &istart_type, &istart, &istop_type, &istop);
328     gst_event_parse_seek (self->outgoing_seek, &orate, &oformat, &oflags,
329         &ostart_type, &ostart, &ostop_type, &ostop);
330
331     if (istop == -1)
332       istop = END_DATE;
333
334     if (self->incoming_segment->rate > 0)
335       self->ts_offset = istart - ostart;
336     else
337       self->ts_offset = istop - ostop;
338
339     istart += self->incoming_segment->start - ostart;
340     istop += self->incoming_segment->stop - ostop;
341
342     gst_segment_init (&self->segment, self->incoming_segment->format);
343
344     gst_segment_do_seek (&self->segment, self->incoming_segment->rate,
345         self->incoming_segment->format,
346         (GstSeekFlags) self->incoming_segment->flags, GST_SEEK_TYPE_SET,
347         (guint64) istart, GST_SEEK_TYPE_SET, (guint64) istop, &update);
348
349     self->min_pts = istart;
350
351     ret = gst_event_new_segment (&self->segment);
352
353     self->sent_segment = TRUE;
354
355     gst_event_unref (ievent);
356
357     GST_DEBUG ("Translated segment: %" GST_PTR_FORMAT ", "
358         "ts_offset: %" G_GUINT64_FORMAT, ret, self->ts_offset);
359   } else {
360     ret = NULL;
361   }
362
363 done:
364   return ret;
365 }
366
367 static void
368 handle_segment_done (ReplayBin * self, GstPad * pad)
369 {
370   GstEvent *event;
371
372   if (self->remainder < INTERVAL) {
373     self->remainder = 0;
374     event = gst_event_new_eos ();
375     gst_event_set_seqnum (event, gst_event_get_seqnum (self->incoming_seek));
376     gst_pad_push_event (pad, event);
377   } else {
378     gint64 ustart, ustop;
379     gint64 ostart, ostop;
380     GstPad *target;
381     GstStructure *s;
382
383     /* Signify the end of a contiguous section of recording */
384     s = gst_structure_new ("GstOnvifTimestamp",
385         "ntp-offset", G_TYPE_UINT64, 0, "discont", G_TYPE_BOOLEAN, TRUE, NULL);
386
387     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM, s);
388
389     gst_pad_push_event (pad, event);
390
391     query_seekable (pad, &ustart, &ustop);
392
393     self->remainder -= INTERVAL;
394
395     if (self->incoming_segment->rate > 0) {
396       ostart = 0;
397       ostop = MIN (ustop, self->remainder);
398     } else {
399       ostart = MAX (ustop - self->remainder, 0);
400       ostop = ustop;
401     }
402
403     self->remainder = MAX (self->remainder - ostop - ostart, 0);
404
405     event =
406         gst_event_new_seek (self->segment.rate, self->segment.format,
407         self->segment.flags & ~GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, ostart,
408         GST_SEEK_TYPE_SET, ostop);
409     gst_event_set_seek_trickmode_interval (event, self->trickmode_interval);
410
411     if (self->incoming_segment->rate > 0)
412       self->ts_offset += INTERVAL + ustop;
413     else
414       self->ts_offset -= INTERVAL + ustop;
415
416     GST_DEBUG ("New offset: %" GST_TIME_FORMAT,
417         GST_TIME_ARGS (self->ts_offset));
418
419     GST_DEBUG ("Seeking to %" GST_PTR_FORMAT, event);
420     target = gst_ghost_pad_get_target (GST_GHOST_PAD (pad));
421     gst_pad_send_event (target, event);
422     gst_object_unref (target);
423   }
424 }
425
426 static GstPadProbeReturn
427 replay_bin_event_probe (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
428 {
429   ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad));
430   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
431
432   GST_DEBUG ("Probed %" GST_PTR_FORMAT, info->data);
433
434   switch (GST_EVENT_TYPE (info->data)) {
435     case GST_EVENT_SEGMENT:
436     {
437       GstEvent *translated;
438
439       GST_DEBUG ("Probed segment %" GST_PTR_FORMAT, info->data);
440
441       translated = translate_segment (pad, GST_EVENT (info->data));
442       if (translated)
443         info->data = translated;
444       else
445         ret = GST_PAD_PROBE_HANDLED;
446
447       break;
448     }
449     case GST_EVENT_SEGMENT_DONE:
450     {
451       handle_segment_done (self, pad);
452       ret = GST_PAD_PROBE_HANDLED;
453       break;
454     }
455     default:
456       break;
457   }
458
459   return ret;
460 }
461
462 static GstPadProbeReturn
463 replay_bin_buffer_probe (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
464 {
465   ReplayBin *self = REPLAY_BIN (GST_OBJECT_PARENT (pad));
466   GstPadProbeReturn ret = GST_PAD_PROBE_OK;
467
468   if (GST_BUFFER_PTS (info->data) > self->incoming_segment->stop) {
469     ret = GST_PAD_PROBE_DROP;
470     goto done;
471   }
472
473   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_PTS (info->data)))
474     GST_BUFFER_PTS (info->data) += self->ts_offset;
475   if (GST_CLOCK_TIME_IS_VALID (GST_BUFFER_DTS (info->data)))
476     GST_BUFFER_DTS (info->data) += self->ts_offset;
477
478   GST_LOG ("Pushing buffer %" GST_PTR_FORMAT, info->data);
479
480 done:
481   return ret;
482 }
483
484 static GstElement *
485 create_replay_bin (GstElement * parent)
486 {
487   GstElement *ret, *src, *demux;
488   GstPad *ghost;
489
490   ret = replay_bin_new ();
491   if (!gst_bin_add (GST_BIN (parent), ret)) {
492     gst_object_unref (ret);
493     goto fail;
494   }
495
496   MAKE_AND_ADD (src, ret, "filesrc", fail, NULL);
497   MAKE_AND_ADD (demux, ret, "qtdemux", fail, NULL);
498
499   ghost = gst_ghost_pad_new_no_target ("src", GST_PAD_SRC);
500   gst_element_add_pad (ret, ghost);
501
502   gst_pad_set_event_function (ghost, replay_bin_event_func);
503   gst_pad_add_probe (ghost, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
504       replay_bin_event_probe, NULL, NULL);
505   gst_pad_add_probe (ghost, GST_PAD_PROBE_TYPE_BUFFER, replay_bin_buffer_probe,
506       NULL, NULL);
507   gst_pad_set_query_function (ghost, replay_bin_query_func);
508
509   if (!gst_element_link (src, demux))
510     goto fail;
511
512   g_object_set (src, "location", filename, NULL);
513   g_signal_connect (demux, "pad-added", G_CALLBACK (demux_pad_added_cb), ghost);
514
515 done:
516   return ret;
517
518 fail:
519   ret = NULL;
520   goto done;
521 }
522
523 /* A simple factory to set up our replay bin */
524
525 struct _OnvifFactory
526 {
527   GstRTSPOnvifMediaFactory parent;
528 };
529
530 G_DEFINE_TYPE (OnvifFactory, onvif_factory, GST_TYPE_RTSP_MEDIA_FACTORY);
531
532 static void
533 onvif_factory_init (OnvifFactory * factory)
534 {
535 }
536
537 static GstElement *
538 onvif_factory_create_element (GstRTSPMediaFactory * factory,
539     const GstRTSPUrl * url)
540 {
541   GstElement *replay_bin, *q1, *parse, *pay, *onvifts, *q2;
542   GstElement *ret = gst_bin_new (NULL);
543   GstElement *pbin = gst_bin_new ("pay0");
544   GstPad *sinkpad, *srcpad;
545
546   if (!(replay_bin = create_replay_bin (ret)))
547     goto fail;
548
549   MAKE_AND_ADD (q1, pbin, "queue", fail, NULL);
550   MAKE_AND_ADD (parse, pbin, "h264parse", fail, NULL);
551   MAKE_AND_ADD (pay, pbin, "rtph264pay", fail, NULL);
552   MAKE_AND_ADD (onvifts, pbin, "rtponviftimestamp", fail, NULL);
553   MAKE_AND_ADD (q2, pbin, "queue", fail, NULL);
554
555   gst_bin_add (GST_BIN (ret), pbin);
556
557   if (!gst_element_link_many (q1, parse, pay, onvifts, q2, NULL))
558     goto fail;
559
560   sinkpad = gst_element_get_static_pad (q1, "sink");
561   gst_element_add_pad (pbin, gst_ghost_pad_new ("sink", sinkpad));
562   gst_object_unref (sinkpad);
563
564   if (!gst_element_link (replay_bin, pbin))
565     goto fail;
566
567   srcpad = gst_element_get_static_pad (q2, "src");
568   gst_element_add_pad (pbin, gst_ghost_pad_new ("src", srcpad));
569   gst_object_unref (srcpad);
570
571   g_object_set (onvifts, "set-t-bit", TRUE, "set-e-bit", TRUE, "ntp-offset",
572       G_GUINT64_CONSTANT (0), "drop-out-of-segment", FALSE, NULL);
573
574   gst_element_set_clock (onvifts, gst_system_clock_obtain ());
575
576 done:
577   return ret;
578
579 fail:
580   gst_object_unref (ret);
581   ret = NULL;
582   goto done;
583 }
584
585 static void
586 onvif_factory_class_init (OnvifFactoryClass * klass)
587 {
588   GstRTSPMediaFactoryClass *mf_class = GST_RTSP_MEDIA_FACTORY_CLASS (klass);
589
590   mf_class->create_element = onvif_factory_create_element;
591 }
592
593 static GstRTSPMediaFactory *
594 onvif_factory_new (void)
595 {
596   GstRTSPMediaFactory *result;
597
598   result =
599       GST_RTSP_MEDIA_FACTORY (g_object_new (onvif_factory_get_type (), NULL));
600
601   return result;
602 }
603
604 int
605 main (int argc, char *argv[])
606 {
607   GMainLoop *loop;
608   GstRTSPServer *server;
609   GstRTSPMountPoints *mounts;
610   GstRTSPMediaFactory *factory;
611   GOptionContext *optctx;
612   GError *error = NULL;
613   gchar *service;
614
615   optctx = g_option_context_new ("<filename.mp4> - ONVIF RTSP Server, MP4");
616   g_option_context_add_group (optctx, gst_init_get_option_group ());
617   if (!g_option_context_parse (optctx, &argc, &argv, &error)) {
618     g_printerr ("Error parsing options: %s\n", error->message);
619     g_option_context_free (optctx);
620     g_clear_error (&error);
621     return -1;
622   }
623   if (argc < 2) {
624     g_print ("%s\n", g_option_context_get_help (optctx, TRUE, NULL));
625     return 1;
626   }
627   filename = argv[1];
628   g_option_context_free (optctx);
629
630   GST_DEBUG_CATEGORY_INIT (onvif_server_debug, "onvif-server", 0,
631       "ONVIF server");
632
633   loop = g_main_loop_new (NULL, FALSE);
634
635   server = gst_rtsp_onvif_server_new ();
636
637   mounts = gst_rtsp_server_get_mount_points (server);
638
639   factory = onvif_factory_new ();
640   gst_rtsp_media_factory_set_media_gtype (factory, GST_TYPE_RTSP_ONVIF_MEDIA);
641
642   gst_rtsp_mount_points_add_factory (mounts, "/test", factory);
643
644   g_object_unref (mounts);
645
646   gst_rtsp_server_attach (server, NULL);
647
648   service = gst_rtsp_server_get_service (server);
649   g_print ("stream ready at rtsp://127.0.0.1:%s/test\n", service);
650   g_free (service);
651   g_main_loop_run (loop);
652
653   return 0;
654 }