8e41600a8a485fe6f6c0696ec456a2df55bfc885
[platform/upstream/gstreamer.git] / tests / check / elements / multisocketsink.c
1 /* GStreamer
2  *
3  * Copyright (C) 2006 Thomas Vander Stichele <thomas at apestaart dot org>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
18  * Boston, MA 02111-1307, USA.
19  */
20
21 #include <unistd.h>
22 #include <sys/ioctl.h>
23 #include <sys/socket.h>
24 #ifdef HAVE_FIONREAD_IN_SYS_FILIO
25 #include <sys/filio.h>
26 #endif
27
28 #include <gio/gio.h>
29 #include <gst/check/gstcheck.h>
30
31 #include "gst/tcp/gstmultisocketsink.h"
32
33 static GstPad *mysrcpad;
34
35 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
36     GST_PAD_SRC,
37     GST_PAD_ALWAYS,
38     GST_STATIC_CAPS ("application/x-gst-check")
39     );
40
41 static GstElement *
42 setup_multisocketsink (void)
43 {
44   GstElement *multisocketsink;
45
46   GST_DEBUG ("setup_multisocketsink");
47   multisocketsink = gst_check_setup_element ("multisocketsink");
48   mysrcpad = gst_check_setup_src_pad (multisocketsink, &srctemplate);
49   GST_PAD_UNSET_FLUSHING (mysrcpad);
50
51   return multisocketsink;
52 }
53
54 static void
55 cleanup_multisocketsink (GstElement * multisocketsink)
56 {
57   GST_DEBUG ("cleanup_multisocketsink");
58
59   gst_check_teardown_src_pad (multisocketsink);
60   gst_check_teardown_element (multisocketsink);
61 }
62
63 static void
64 wait_bytes_served (GstElement * sink, guint64 bytes)
65 {
66   guint64 bytes_served = 0;
67
68   while (bytes_served != bytes) {
69     g_object_get (sink, "bytes-served", &bytes_served, NULL);
70   }
71 }
72
73 /* FIXME: possibly racy, since if it would write, we may not get it
74  * immediately ? */
75 #define fail_if_can_read(msg,fd) \
76 G_STMT_START { \
77   long avail; \
78 \
79   fail_if (ioctl (fd, FIONREAD, &avail) < 0, "%s: could not ioctl", msg); \
80   fail_if (avail > 0, "%s: has bytes available to read"); \
81 } G_STMT_END;
82
83
84 GST_START_TEST (test_no_clients)
85 {
86   GstElement *sink;
87   GstBuffer *buffer;
88   GstCaps *caps;
89
90   sink = setup_multisocketsink ();
91
92   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
93
94   caps = gst_caps_from_string ("application/x-gst-check");
95   buffer = gst_buffer_new_and_alloc (4);
96   gst_pad_set_caps (mysrcpad, caps);
97   gst_caps_unref (caps);
98   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
99
100   GST_DEBUG ("cleaning up multisocketsink");
101   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
102   cleanup_multisocketsink (sink);
103 }
104
105 GST_END_TEST;
106
107 static gboolean
108 setup_handles (GSocket ** sinkhandle, GSocket ** srchandle)
109 {
110   GError *error = NULL;
111   gint sv[3];
112
113
114 //  g_assert (*sinkhandle);
115 //  g_assert (*srchandle);
116
117   fail_if (socketpair (PF_UNIX, SOCK_STREAM, 0, sv));
118
119   *sinkhandle = g_socket_new_from_fd (sv[1], &error);
120   fail_if (error);
121   fail_if (*sinkhandle == NULL);
122   *srchandle = g_socket_new_from_fd (sv[0], &error);
123   fail_if (error);
124   fail_if (*srchandle == NULL);
125
126   return TRUE;
127 }
128
129 static ssize_t
130 read_handle (GSocket * srchandle, void *buf, size_t count)
131 {
132   gssize ret;
133
134   ret = g_socket_receive (srchandle, buf, count, NULL, NULL);
135
136   return ret;
137 }
138
139 #define fail_unless_read(msg,handle,size,ref) \
140 G_STMT_START { \
141   char data[size + 1]; \
142   int nbytes; \
143 \
144   GST_DEBUG ("%s: reading %d bytes", msg, size); \
145   nbytes = read_handle (handle, data, size); \
146   data[size] = 0; \
147   GST_DEBUG ("%s: read %d bytes", msg, nbytes); \
148   fail_if (nbytes < size); \
149   fail_unless (memcmp (data, ref, size) == 0, \
150       "data read '%s' differs from '%s'", data, ref); \
151 } G_STMT_END;
152
153 GST_START_TEST (test_add_client)
154 {
155   GstElement *sink;
156   GstBuffer *buffer;
157   GstCaps *caps;
158   gchar data[4];
159   GSocket *sinksocket, *srcsocket;
160
161   sink = setup_multisocketsink ();
162   fail_unless (setup_handles (&sinksocket, &srcsocket));
163
164
165   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
166
167   /* add the client */
168   g_signal_emit_by_name (sink, "add", sinksocket);
169
170   caps = gst_caps_from_string ("application/x-gst-check");
171   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
172   GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
173   buffer = gst_buffer_new_and_alloc (4);
174   gst_pad_set_caps (mysrcpad, caps);
175   ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
176   gst_buffer_fill (buffer, 0, "dead", 4);
177   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
178
179   GST_DEBUG ("reading");
180   fail_if (read_handle (srcsocket, data, 4) < 4);
181   fail_unless (strncmp (data, "dead", 4) == 0);
182   wait_bytes_served (sink, 4);
183
184   GST_DEBUG ("cleaning up multisocketsink");
185   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
186   cleanup_multisocketsink (sink);
187
188   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
189   gst_caps_unref (caps);
190 }
191
192 GST_END_TEST;
193
194 /* from the given two data buffers, create two streamheader buffers and
195  * some caps that match it, and store them in the given pointers
196  * returns  one ref to each of the buffers and the caps */
197 static void
198 gst_multisocketsink_create_streamheader (const gchar * data1,
199     const gchar * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2,
200     GstCaps ** caps)
201 {
202   GstBuffer *buf;
203   GValue array = { 0 };
204   GValue value = { 0 };
205   GstStructure *structure;
206   guint size1 = strlen (data1);
207   guint size2 = strlen (data2);
208
209   fail_if (hbuf1 == NULL);
210   fail_if (hbuf2 == NULL);
211   fail_if (caps == NULL);
212
213   /* create caps with streamheader, set the caps, and push the IN_CAPS
214    * buffers */
215   *hbuf1 = gst_buffer_new_and_alloc (size1);
216   GST_BUFFER_FLAG_SET (*hbuf1, GST_BUFFER_FLAG_IN_CAPS);
217   gst_buffer_fill (*hbuf1, 0, data1, size1);
218   *hbuf2 = gst_buffer_new_and_alloc (size2);
219   GST_BUFFER_FLAG_SET (*hbuf2, GST_BUFFER_FLAG_IN_CAPS);
220   gst_buffer_fill (*hbuf2, 0, data2, size2);
221
222   g_value_init (&array, GST_TYPE_ARRAY);
223
224   g_value_init (&value, GST_TYPE_BUFFER);
225   /* we take a copy, set it on the array (which refs it), then unref our copy */
226   buf = gst_buffer_copy (*hbuf1);
227   gst_value_set_buffer (&value, buf);
228   ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
229   gst_buffer_unref (buf);
230   gst_value_array_append_value (&array, &value);
231   g_value_unset (&value);
232
233   g_value_init (&value, GST_TYPE_BUFFER);
234   buf = gst_buffer_copy (*hbuf2);
235   gst_value_set_buffer (&value, buf);
236   ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
237   gst_buffer_unref (buf);
238   gst_value_array_append_value (&array, &value);
239   g_value_unset (&value);
240
241   *caps = gst_caps_from_string ("application/x-gst-check");
242   structure = gst_caps_get_structure (*caps, 0);
243
244   gst_structure_set_value (structure, "streamheader", &array);
245   g_value_unset (&array);
246   ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 1);
247
248   /* we want to keep them around for the tests */
249   gst_buffer_ref (*hbuf1);
250   gst_buffer_ref (*hbuf2);
251
252   GST_DEBUG ("created streamheader caps %p %" GST_PTR_FORMAT, *caps, *caps);
253 }
254
255
256 /* this test:
257  * - adds a first client
258  * - sets streamheader caps on the pad
259  * - pushes the IN_CAPS buffers
260  * - pushes a buffer
261  * - verifies that the client received all the data correctly, and did not
262  *   get multiple copies of the streamheader
263  * - adds a second client
264  * - verifies that this second client receives the streamheader caps too, plus
265  * - the new buffer
266  */
267 GST_START_TEST (test_streamheader)
268 {
269   GstElement *sink;
270   GstBuffer *hbuf1, *hbuf2, *buf;
271   GstCaps *caps;
272   GSocket *socket[4];
273
274   sink = setup_multisocketsink ();
275
276   fail_unless (setup_handles (&socket[0], &socket[1]));
277   fail_unless (setup_handles (&socket[2], &socket[3]));
278
279   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
280
281   /* add the first client */
282   g_signal_emit_by_name (sink, "add", socket[0]);
283
284   /* create caps with streamheader, set the caps, and push the IN_CAPS
285    * buffers */
286   gst_multisocketsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
287       &caps);
288   ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
289   ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
290   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
291   fail_unless (gst_pad_set_caps (mysrcpad, caps));
292   /* one is ours, two from set_caps */
293   ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
294
295   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
296   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
297   // FIXME: we can't assert on the refcount because giving away the ref
298   //        doesn't mean the refcount decreases
299   // ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
300   // ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
301
302   //FIXME:
303   //fail_if_can_read ("first client", socket[1]);
304
305   /* push a non-IN_CAPS buffer, this should trigger the client receiving the
306    * first three buffers */
307   buf = gst_buffer_new_and_alloc (4);
308   gst_buffer_fill (buf, 0, "f00d", 4);
309   gst_pad_push (mysrcpad, buf);
310
311   fail_unless_read ("first client", socket[1], 4, "babe");
312   fail_unless_read ("first client", socket[1], 8, "deadbeef");
313   fail_unless_read ("first client", socket[1], 4, "f00d");
314   wait_bytes_served (sink, 16);
315
316   /* now add the second client */
317   g_signal_emit_by_name (sink, "add", socket[2]);
318   //FIXME:
319   //fail_if_can_read ("second client", socket[3]);
320
321   /* now push another buffer, which will trigger streamheader for second
322    * client */
323   buf = gst_buffer_new_and_alloc (4);
324   gst_buffer_fill (buf, 0, "deaf", 4);
325   gst_pad_push (mysrcpad, buf);
326
327   fail_unless_read ("first client", socket[1], 4, "deaf");
328
329   fail_unless_read ("second client", socket[3], 4, "babe");
330   fail_unless_read ("second client", socket[3], 8, "deadbeef");
331   /* we missed the f00d buffer */
332   fail_unless_read ("second client", socket[3], 4, "deaf");
333   wait_bytes_served (sink, 36);
334
335   GST_DEBUG ("cleaning up multisocketsink");
336
337   g_signal_emit_by_name (sink, "remove", socket[0]);
338   g_signal_emit_by_name (sink, "remove", socket[2]);
339
340   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
341   cleanup_multisocketsink (sink);
342
343   ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
344   ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
345   gst_buffer_unref (hbuf1);
346   gst_buffer_unref (hbuf2);
347
348   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
349   gst_caps_unref (caps);
350 }
351
352 GST_END_TEST;
353
354 /* this tests changing of streamheaders
355  * - set streamheader caps on the pad
356  * - pushes the IN_CAPS buffers
357  * - pushes a buffer
358  * - add a first client
359  * - verifies that this first client receives the first streamheader caps,
360  *   plus a new buffer
361  * - change streamheader caps
362  * - verify that the first client receives the new streamheader buffers as well
363  */
364 GST_START_TEST (test_change_streamheader)
365 {
366   GstElement *sink;
367   GstBuffer *hbuf1, *hbuf2, *buf;
368   GstCaps *caps;
369   GSocket *socket[4];
370
371   sink = setup_multisocketsink ();
372
373   fail_unless (setup_handles (&socket[0], &socket[1]));
374   fail_unless (setup_handles (&socket[2], &socket[3]));
375
376   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
377
378   /* create caps with streamheader, set the caps, and push the IN_CAPS
379    * buffers */
380   gst_multisocketsink_create_streamheader ("first", "header", &hbuf1, &hbuf2,
381       &caps);
382   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
383   fail_unless (gst_pad_set_caps (mysrcpad, caps));
384   /* one is ours, two from set_caps */
385   ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
386
387   /* one to hold for the test and one to give away */
388   ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
389   ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
390
391   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
392   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
393
394   /* add the first client */
395   g_signal_emit_by_name (sink, "add", socket[0]);
396
397   /* verify this hasn't triggered a write yet */
398   /* FIXME: possibly racy, since if it would write, we may not get it
399    * immediately ? */
400   //fail_if_can_read ("first client, no buffer", socket[1]);
401
402   /* now push a buffer and read */
403   buf = gst_buffer_new_and_alloc (4);
404   gst_buffer_fill (buf, 0, "f00d", 4);
405   gst_pad_push (mysrcpad, buf);
406
407   fail_unless_read ("change: first client", socket[1], 5, "first");
408   fail_unless_read ("change: first client", socket[1], 6, "header");
409   fail_unless_read ("change: first client", socket[1], 4, "f00d");
410   //wait_bytes_served (sink, 16);
411
412   /* now add the second client */
413   g_signal_emit_by_name (sink, "add", socket[2]);
414   //fail_if_can_read ("second client, no buffer", socket[3]);
415
416   /* change the streamheader */
417
418   /* before we change, multisocketsink still has a list of the old streamheaders */
419   ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
420   ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
421   gst_buffer_unref (hbuf1);
422   gst_buffer_unref (hbuf2);
423
424   /* drop our ref to the previous caps */
425   gst_caps_unref (caps);
426
427   gst_multisocketsink_create_streamheader ("second", "header", &hbuf1, &hbuf2,
428       &caps);
429   fail_unless (gst_pad_set_caps (mysrcpad, caps));
430   /* one to hold for the test and one to give away */
431   ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
432   ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
433
434   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
435   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
436
437   /* verify neither client has new data available to read */
438   //fail_if_can_read ("first client, changed streamheader", socket[1]);
439   //fail_if_can_read ("second client, changed streamheader", socket[3]);
440
441   /* now push another buffer, which will trigger streamheader for second
442    * client, but should also send new streamheaders to first client */
443   buf = gst_buffer_new_and_alloc (8);
444   gst_buffer_fill (buf, 0, "deadbabe", 8);
445   gst_pad_push (mysrcpad, buf);
446
447   fail_unless_read ("first client", socket[1], 6, "second");
448   fail_unless_read ("first client", socket[1], 6, "header");
449   fail_unless_read ("first client", socket[1], 8, "deadbabe");
450
451   /* new streamheader data */
452   fail_unless_read ("second client", socket[3], 6, "second");
453   fail_unless_read ("second client", socket[3], 6, "header");
454   /* we missed the f00d buffer */
455   fail_unless_read ("second client", socket[3], 8, "deadbabe");
456   //wait_bytes_served (sink, 36);
457
458   GST_DEBUG ("cleaning up multisocketsink");
459   g_signal_emit_by_name (sink, "remove", socket[0]);
460   g_signal_emit_by_name (sink, "remove", socket[2]);
461   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
462
463   /* setting to NULL should have cleared the streamheader */
464   ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
465   ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
466   gst_buffer_unref (hbuf1);
467   gst_buffer_unref (hbuf2);
468   cleanup_multisocketsink (sink);
469
470   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
471   gst_caps_unref (caps);
472 }
473
474 GST_END_TEST;
475
476 static GstBuffer *
477 gst_new_buffer (int i)
478 {
479   GstMapInfo info;
480   gchar *data;
481
482   GstBuffer *buffer = gst_buffer_new_and_alloc (16);
483
484   /* copy some id */
485   g_assert (gst_buffer_map (buffer, &info, GST_MAP_WRITE));
486   data = (gchar *) info.data;
487   g_snprintf (data, 16, "deadbee%08x", i);
488   gst_buffer_unmap (buffer, &info);
489
490   return buffer;
491 }
492
493
494 /* keep 100 bytes and burst 80 bytes to clients */
495 GST_START_TEST (test_burst_client_bytes)
496 {
497   GstElement *sink;
498   GstCaps *caps;
499   GSocket *socket[6];
500   gint i;
501   guint buffers_queued;
502
503   sink = setup_multisocketsink ();
504   /* make sure we keep at least 100 bytes at all times */
505   g_object_set (sink, "bytes-min", 100, NULL);
506   g_object_set (sink, "sync-method", 3, NULL);  /* 3 = burst */
507   g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
508   g_object_set (sink, "burst-value", (guint64) 80, NULL);
509
510   fail_unless (setup_handles (&socket[0], &socket[1]));
511   fail_unless (setup_handles (&socket[2], &socket[3]));
512   fail_unless (setup_handles (&socket[4], &socket[5]));
513
514   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
515
516   caps = gst_caps_from_string ("application/x-gst-check");
517   gst_pad_set_caps (mysrcpad, caps);
518   GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
519
520   /* push buffers in, 9 * 16 bytes = 144 bytes */
521   for (i = 0; i < 9; i++) {
522     GstBuffer *buffer = gst_new_buffer (i);
523
524     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
525   }
526
527   /* check that at least 7 buffers (112 bytes) are in the queue */
528   g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
529   fail_if (buffers_queued != 7);
530
531   /* now add the clients */
532   g_signal_emit_by_name (sink, "add", socket[0]);
533   g_signal_emit_by_name (sink, "add_full", socket[2], GST_SYNC_METHOD_BURST,
534       GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 200);
535   g_signal_emit_by_name (sink, "add_full", socket[4], GST_SYNC_METHOD_BURST,
536       GST_FORMAT_BYTES, (guint64) 50, GST_FORMAT_BYTES, (guint64) 50);
537
538   /* push last buffer to make client fds ready for reading */
539   for (i = 9; i < 10; i++) {
540     GstBuffer *buffer = gst_new_buffer (i);
541
542     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
543   }
544
545   /* now we should only read the last 5 buffers (5 * 16 = 80 bytes) */
546   GST_DEBUG ("Reading from client 1");
547   fail_unless_read ("client 1", socket[1], 16, "deadbee00000005");
548   fail_unless_read ("client 1", socket[1], 16, "deadbee00000006");
549   fail_unless_read ("client 1", socket[1], 16, "deadbee00000007");
550   fail_unless_read ("client 1", socket[1], 16, "deadbee00000008");
551   fail_unless_read ("client 1", socket[1], 16, "deadbee00000009");
552
553   /* second client only bursts 50 bytes = 4 buffers (we get 4 buffers since
554    * the max allows it) */
555   GST_DEBUG ("Reading from client 2");
556   fail_unless_read ("client 2", socket[3], 16, "deadbee00000006");
557   fail_unless_read ("client 2", socket[3], 16, "deadbee00000007");
558   fail_unless_read ("client 2", socket[3], 16, "deadbee00000008");
559   fail_unless_read ("client 2", socket[3], 16, "deadbee00000009");
560
561   /* third client only bursts 50 bytes = 4 buffers, we can't send
562    * more than 50 bytes so we only get 3 buffers (48 bytes). */
563   GST_DEBUG ("Reading from client 3");
564   fail_unless_read ("client 3", socket[5], 16, "deadbee00000007");
565   fail_unless_read ("client 3", socket[5], 16, "deadbee00000008");
566   fail_unless_read ("client 3", socket[5], 16, "deadbee00000009");
567
568   GST_DEBUG ("cleaning up multisocketsink");
569   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
570   cleanup_multisocketsink (sink);
571
572   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
573   gst_caps_unref (caps);
574 }
575
576 GST_END_TEST;
577
578 /* keep 100 bytes and burst 80 bytes to clients */
579 GST_START_TEST (test_burst_client_bytes_keyframe)
580 {
581   GstElement *sink;
582   GstCaps *caps;
583   GSocket *socket[6];
584   gint i;
585   guint buffers_queued;
586
587   sink = setup_multisocketsink ();
588   /* make sure we keep at least 100 bytes at all times */
589   g_object_set (sink, "bytes-min", 100, NULL);
590   g_object_set (sink, "sync-method", 4, NULL);  /* 4 = burst_keyframe */
591   g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
592   g_object_set (sink, "burst-value", (guint64) 80, NULL);
593
594   fail_unless (setup_handles (&socket[0], &socket[1]));
595   fail_unless (setup_handles (&socket[2], &socket[3]));
596   fail_unless (setup_handles (&socket[4], &socket[5]));
597
598   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
599
600   caps = gst_caps_from_string ("application/x-gst-check");
601   GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
602   gst_pad_set_caps (mysrcpad, caps);
603
604   /* push buffers in, 9 * 16 bytes = 144 bytes */
605   for (i = 0; i < 9; i++) {
606     GstBuffer *buffer = gst_new_buffer (i);
607
608     /* mark most buffers as delta */
609     if (i != 0 && i != 4 && i != 8)
610       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
611
612     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
613   }
614
615   /* check that at least 7 buffers (112 bytes) are in the queue */
616   g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
617   fail_if (buffers_queued != 7);
618
619   /* now add the clients */
620   g_signal_emit_by_name (sink, "add", socket[0]);
621   g_signal_emit_by_name (sink, "add_full", socket[2],
622       GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
623       GST_FORMAT_BYTES, (guint64) 90);
624   g_signal_emit_by_name (sink, "add_full", socket[4],
625       GST_SYNC_METHOD_BURST_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
626       GST_FORMAT_BYTES, (guint64) 50);
627
628   /* push last buffer to make client fds ready for reading */
629   for (i = 9; i < 10; i++) {
630     GstBuffer *buffer = gst_new_buffer (i);
631
632     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
633
634     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
635   }
636
637   /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
638    * keyframe at buffer 4 */
639   GST_DEBUG ("Reading from client 1");
640   fail_unless_read ("client 1", socket[1], 16, "deadbee00000004");
641   fail_unless_read ("client 1", socket[1], 16, "deadbee00000005");
642   fail_unless_read ("client 1", socket[1], 16, "deadbee00000006");
643   fail_unless_read ("client 1", socket[1], 16, "deadbee00000007");
644   fail_unless_read ("client 1", socket[1], 16, "deadbee00000008");
645   fail_unless_read ("client 1", socket[1], 16, "deadbee00000009");
646
647   /* second client only bursts 50 bytes = 4 buffers, there is
648    * no keyframe above min and below max, so get one below min */
649   GST_DEBUG ("Reading from client 2");
650   fail_unless_read ("client 2", socket[3], 16, "deadbee00000008");
651   fail_unless_read ("client 2", socket[3], 16, "deadbee00000009");
652
653   /* third client only bursts 50 bytes = 4 buffers, we can't send
654    * more than 50 bytes so we only get 2 buffers (32 bytes). */
655   GST_DEBUG ("Reading from client 3");
656   fail_unless_read ("client 3", socket[5], 16, "deadbee00000008");
657   fail_unless_read ("client 3", socket[5], 16, "deadbee00000009");
658
659   GST_DEBUG ("cleaning up multisocketsink");
660   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
661   cleanup_multisocketsink (sink);
662
663   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
664   gst_caps_unref (caps);
665 }
666
667 GST_END_TEST;
668
669
670
671 /* keep 100 bytes and burst 80 bytes to clients */
672 GST_START_TEST (test_burst_client_bytes_with_keyframe)
673 {
674   GstElement *sink;
675   GstCaps *caps;
676   GSocket *socket[6];
677   gint i;
678   guint buffers_queued;
679
680   sink = setup_multisocketsink ();
681
682   /* make sure we keep at least 100 bytes at all times */
683   g_object_set (sink, "bytes-min", 100, NULL);
684   g_object_set (sink, "sync-method", 5, NULL);  /* 5 = burst_with_keyframe */
685   g_object_set (sink, "burst-format", GST_FORMAT_BYTES, NULL);
686   g_object_set (sink, "burst-value", (guint64) 80, NULL);
687
688   fail_unless (setup_handles (&socket[0], &socket[1]));
689   fail_unless (setup_handles (&socket[2], &socket[3]));
690   fail_unless (setup_handles (&socket[4], &socket[5]));
691
692   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
693
694   caps = gst_caps_from_string ("application/x-gst-check");
695   gst_pad_set_caps (mysrcpad, caps);
696   GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
697
698   /* push buffers in, 9 * 16 bytes = 144 bytes */
699   for (i = 0; i < 9; i++) {
700     GstBuffer *buffer = gst_new_buffer (i);
701
702     /* mark most buffers as delta */
703     if (i != 0 && i != 4 && i != 8)
704       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
705
706     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
707   }
708
709   /* check that at least 7 buffers (112 bytes) are in the queue */
710   g_object_get (sink, "buffers-queued", &buffers_queued, NULL);
711   fail_if (buffers_queued != 7);
712
713   /* now add the clients */
714   g_signal_emit_by_name (sink, "add", socket[0]);
715   g_signal_emit_by_name (sink, "add_full", socket[2],
716       GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
717       GST_FORMAT_BYTES, (guint64) 90);
718   g_signal_emit_by_name (sink, "add_full", socket[4],
719       GST_SYNC_METHOD_BURST_WITH_KEYFRAME, GST_FORMAT_BYTES, (guint64) 50,
720       GST_FORMAT_BYTES, (guint64) 50);
721
722   /* push last buffer to make client fds ready for reading */
723   for (i = 9; i < 10; i++) {
724     GstBuffer *buffer = gst_new_buffer (i);
725
726     GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
727
728     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
729   }
730
731   /* now we should only read the last 6 buffers (min 5 * 16 = 80 bytes),
732    * keyframe at buffer 4 */
733   GST_DEBUG ("Reading from client 1");
734   fail_unless_read ("client 1", socket[1], 16, "deadbee00000004");
735   fail_unless_read ("client 1", socket[1], 16, "deadbee00000005");
736   fail_unless_read ("client 1", socket[1], 16, "deadbee00000006");
737   fail_unless_read ("client 1", socket[1], 16, "deadbee00000007");
738   fail_unless_read ("client 1", socket[1], 16, "deadbee00000008");
739   fail_unless_read ("client 1", socket[1], 16, "deadbee00000009");
740
741   /* second client only bursts 50 bytes = 4 buffers, there is
742    * no keyframe above min and below max, so send min */
743   GST_DEBUG ("Reading from client 2");
744   fail_unless_read ("client 2", socket[3], 16, "deadbee00000006");
745   fail_unless_read ("client 2", socket[3], 16, "deadbee00000007");
746   fail_unless_read ("client 2", socket[3], 16, "deadbee00000008");
747   fail_unless_read ("client 2", socket[3], 16, "deadbee00000009");
748
749   /* third client only bursts 50 bytes = 4 buffers, we can't send
750    * more than 50 bytes so we only get 3 buffers (48 bytes). */
751   GST_DEBUG ("Reading from client 3");
752   fail_unless_read ("client 3", socket[5], 16, "deadbee00000007");
753   fail_unless_read ("client 3", socket[5], 16, "deadbee00000008");
754   fail_unless_read ("client 3", socket[5], 16, "deadbee00000009");
755
756   GST_DEBUG ("cleaning up multisocketsink");
757   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
758   cleanup_multisocketsink (sink);
759
760   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
761   gst_caps_unref (caps);
762 }
763
764 GST_END_TEST;
765
766 /* Check that we can get data when multisocketsink is configured in next-keyframe
767  * mode */
768 GST_START_TEST (test_client_next_keyframe)
769 {
770   GstElement *sink;
771   GstCaps *caps;
772   GSocket *socket[2];
773   gint i;
774
775   sink = setup_multisocketsink ();
776   g_object_set (sink, "sync-method", 1, NULL);  /* 1 = next-keyframe */
777
778   fail_unless (setup_handles (&socket[0], &socket[1]));
779
780   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
781
782   caps = gst_caps_from_string ("application/x-gst-check");
783   gst_pad_set_caps (mysrcpad, caps);
784   GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
785
786   /* now add our client */
787   g_signal_emit_by_name (sink, "add", socket[0]);
788
789   /* push buffers in: keyframe, then non-keyframe */
790   for (i = 0; i < 2; i++) {
791     GstBuffer *buffer = gst_new_buffer (i);
792     if (i > 0)
793       GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT);
794
795     fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
796   }
797
798   /* now we should be able to read some data */
799   GST_DEBUG ("Reading from client 1");
800   fail_unless_read ("client 1", socket[1], 16, "deadbee00000000");
801   fail_unless_read ("client 1", socket[1], 16, "deadbee00000001");
802
803   GST_DEBUG ("cleaning up multisocketsink");
804   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
805   cleanup_multisocketsink (sink);
806
807   ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
808   gst_caps_unref (caps);
809 }
810
811 GST_END_TEST;
812
813 /* FIXME: add test simulating chained oggs where:
814  * sync-method is burst-on-connect
815  * (when multisocketsink actually does burst-on-connect based on byte size, not
816    "last keyframe" which any frame for audio :))
817  * an old client still needs to read from before the new streamheaders
818  * a new client gets the new streamheaders
819  */
820 static Suite *
821 multisocketsink_suite (void)
822 {
823   Suite *s = suite_create ("multisocketsink");
824   TCase *tc_chain = tcase_create ("general");
825
826   suite_add_tcase (s, tc_chain);
827   tcase_add_test (tc_chain, test_no_clients);
828   tcase_add_test (tc_chain, test_add_client);
829   tcase_add_test (tc_chain, test_streamheader);
830   tcase_add_test (tc_chain, test_change_streamheader);
831   tcase_add_test (tc_chain, test_burst_client_bytes);
832   tcase_add_test (tc_chain, test_burst_client_bytes_keyframe);
833   tcase_add_test (tc_chain, test_burst_client_bytes_with_keyframe);
834   tcase_add_test (tc_chain, test_client_next_keyframe);
835
836   return s;
837 }
838
839 GST_CHECK_MAIN (multisocketsink);