registry: Support installed/uninstalled plugin-scanner helper
[platform/upstream/gstreamer.git] / gst / gstpluginloader.c
1 /* GStreamer
2  * Copyright (C) 2008 Jan Schmidt <jan.schmidt@sun.com>
3  *
4  * gstpluginloader.c: GstPluginLoader helper for loading plugin files
5  * out of process.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #ifndef G_OS_WIN32
28 #include <sys/types.h>
29 #include <sys/wait.h>
30 #include <unistd.h>
31 #endif
32 #include <errno.h>
33
34 #include <gst/gst_private.h>
35 #include <gst/gstconfig.h>
36
37 #include <gst/gstpoll.h>
38 #include <gst/gstutils.h>
39
40 #include <gst/gstpluginloader.h>
41 #include <gst/gstregistrychunks.h>
42
43 /* IMPORTANT: Bump the version number if the plugin loader protocol changes */
44 static const guint32 loader_protocol_version = 1;
45
46 #define GST_CAT_DEFAULT GST_CAT_PLUGIN_LOADING
47
48 static GstPluginLoader *plugin_loader_new (GstRegistry * registry);
49 static gboolean plugin_loader_free (GstPluginLoader * loader);
50 static gboolean plugin_loader_load (GstPluginLoader * loader,
51     const gchar * filename, off_t file_size, time_t file_mtime);
52
53 const GstPluginLoaderFuncs _priv_gst_plugin_loader_funcs = {
54   plugin_loader_new, plugin_loader_free, plugin_loader_load
55 };
56
57 typedef struct _PendingPluginEntry
58 {
59   guint32 tag;
60   gchar *filename;
61   off_t file_size;
62   time_t file_mtime;
63 } PendingPluginEntry;
64
65 struct _GstPluginLoader
66 {
67   GstRegistry *registry;
68   GstPoll *fdset;
69
70   gboolean child_running;
71   GPid child_pid;
72   GstPollFD fd_w;
73   GstPollFD fd_r;
74
75   gboolean is_child;
76   gboolean got_plugin_details;
77
78   /* Transmit buffer */
79   guint8 *tx_buf;
80   guint tx_buf_size;
81   guint tx_buf_write;
82   guint tx_buf_read;
83
84   guint32 next_tag;
85
86   guint8 *rx_buf;
87   guint rx_buf_size;
88   gboolean rx_done;
89   gboolean rx_got_sync;
90
91   guint32 got_version;
92
93   /* Head and tail of the pending plugins list. List of
94      PendingPluginEntry structs */
95   GList *pending_plugins;
96   GList *pending_plugins_tail;
97 };
98
99 #define PACKET_EXIT 1
100 #define PACKET_LOAD_PLUGIN 2
101 #define PACKET_SYNC 3
102 #define PACKET_PLUGIN_DETAILS 4
103 #define PACKET_VERSION 5
104
105 #define BUF_INIT_SIZE 512
106 #define BUF_GROW_EXTRA 512
107 #define HEADER_SIZE 8
108 #define ALIGNMENT   (sizeof (void *))
109
110 static gboolean gst_plugin_loader_spawn (GstPluginLoader * loader);
111 static void put_packet (GstPluginLoader * loader, guint type, guint32 tag,
112     const guint8 * payload, guint32 payload_len);
113 static gboolean exchange_packets (GstPluginLoader * l);
114 static gboolean plugin_loader_replay_pending (GstPluginLoader * l);
115 static gboolean plugin_loader_load_and_sync (GstPluginLoader * l,
116     PendingPluginEntry * entry);
117 static void plugin_loader_create_blacklist_plugin (GstPluginLoader * l,
118     PendingPluginEntry * entry);
119 static void plugin_loader_cleanup_child (GstPluginLoader * loader);
120 static gboolean plugin_loader_sync_with_child (GstPluginLoader * l);
121
122 static GstPluginLoader *
123 plugin_loader_new (GstRegistry * registry)
124 {
125   GstPluginLoader *l = g_new0 (GstPluginLoader, 1);
126
127   if (registry)
128     l->registry = gst_object_ref (registry);
129   l->fdset = gst_poll_new (FALSE);
130   gst_poll_fd_init (&l->fd_w);
131   gst_poll_fd_init (&l->fd_r);
132
133   l->tx_buf_size = BUF_INIT_SIZE;
134   l->tx_buf = g_malloc (BUF_INIT_SIZE);
135
136   l->next_tag = 0;
137
138   l->rx_buf_size = BUF_INIT_SIZE;
139   l->rx_buf = g_malloc (BUF_INIT_SIZE);
140
141   return l;
142 }
143
144 static gboolean
145 plugin_loader_free (GstPluginLoader * loader)
146 {
147   GList *cur;
148   gboolean got_plugin_details;
149
150   fsync (loader->fd_w.fd);
151
152   if (loader->child_running) {
153     put_packet (loader, PACKET_EXIT, 0, NULL, 0);
154
155     /* Swap packets with the child until it exits cleanly */
156     while (!loader->rx_done) {
157       if (exchange_packets (loader) || loader->rx_done)
158         continue;
159
160       if (!plugin_loader_replay_pending (loader))
161         break;
162       put_packet (loader, PACKET_EXIT, 0, NULL, 0);
163     }
164
165     plugin_loader_cleanup_child (loader);
166   } else {
167     close (loader->fd_w.fd);
168     close (loader->fd_r.fd);
169   }
170
171   gst_poll_free (loader->fdset);
172
173   g_free (loader->rx_buf);
174   g_free (loader->tx_buf);
175
176   if (loader->registry)
177     gst_object_unref (loader->registry);
178
179   got_plugin_details = loader->got_plugin_details;
180
181   /* Free any pending plugin entries */
182   cur = loader->pending_plugins;
183   while (cur) {
184     PendingPluginEntry *entry = (PendingPluginEntry *) (cur->data);
185     g_free (entry->filename);
186     g_free (entry);
187
188     cur = g_list_delete_link (cur, cur);
189   }
190
191   g_free (loader);
192
193   return got_plugin_details;
194 }
195
196 static gboolean
197 plugin_loader_load (GstPluginLoader * loader, const gchar * filename,
198     off_t file_size, time_t file_mtime)
199 {
200   gint len;
201   PendingPluginEntry *entry;
202
203   if (!gst_plugin_loader_spawn (loader))
204     return FALSE;
205
206   /* Send a packet to the child requesting that it load the given file */
207   GST_LOG_OBJECT (loader->registry,
208       "Sending file %s to child. tag %u", filename, loader->next_tag);
209
210   entry = g_new (PendingPluginEntry, 1);
211   entry->tag = loader->next_tag++;
212   entry->filename = g_strdup (filename);
213   entry->file_size = file_size;
214   entry->file_mtime = file_mtime;
215   loader->pending_plugins_tail =
216       g_list_append (loader->pending_plugins_tail, entry);
217
218   if (loader->pending_plugins == NULL)
219     loader->pending_plugins = loader->pending_plugins_tail;
220   else
221     loader->pending_plugins_tail = g_list_next (loader->pending_plugins_tail);
222
223   len = strlen (filename);
224   put_packet (loader, PACKET_LOAD_PLUGIN, entry->tag,
225       (guint8 *) filename, len + 1);
226
227   if (!exchange_packets (loader)) {
228     if (!plugin_loader_replay_pending (loader))
229       return FALSE;
230   }
231
232   return TRUE;
233 }
234
235 static gboolean
236 plugin_loader_replay_pending (GstPluginLoader * l)
237 {
238   GList *cur, *next;
239
240 restart:
241   if (!gst_plugin_loader_spawn (l))
242     return FALSE;
243
244   /* Load each plugin one by one synchronously until we find the
245    * crashing one */
246   while ((cur = l->pending_plugins)) {
247     PendingPluginEntry *entry = (PendingPluginEntry *) (cur->data);
248
249     if (!plugin_loader_load_and_sync (l, entry)) {
250       GST_INFO ("AHA! %s crashes on loading", entry->filename);
251       /* Create dummy plugin entry to block re-scanning this file */
252       plugin_loader_create_blacklist_plugin (l, entry);
253       l->got_plugin_details = TRUE;
254       /* Now remove this crashy plugin from the head of the list */
255       l->pending_plugins = g_list_delete_link (cur, cur);
256       if (l->pending_plugins == NULL)
257         l->pending_plugins_tail = NULL;
258       if (!gst_plugin_loader_spawn (l))
259         return FALSE;
260       break;
261     }
262   }
263
264   /* We exited after finding the crashing one. If there's any more pending,
265    * dispatch them post-haste, but don't wait */
266   for (cur = l->pending_plugins; cur != NULL; cur = next) {
267     PendingPluginEntry *entry = (PendingPluginEntry *) (cur->data);
268
269     next = g_list_next (cur);
270
271     put_packet (l, PACKET_LOAD_PLUGIN, entry->tag,
272         (guint8 *) entry->filename, strlen (entry->filename) + 1);
273
274     /* This might invalidate cur, which is why we grabbed 'next' above */
275     if (!exchange_packets (l))
276       goto restart;
277   }
278
279   return TRUE;
280 }
281
282 static gboolean
283 plugin_loader_sync_with_child (GstPluginLoader * l)
284 {
285   put_packet (l, PACKET_SYNC, 0, NULL, 0);
286
287   l->rx_got_sync = FALSE;
288   while (!l->rx_got_sync) {
289     if (!exchange_packets (l))
290       return FALSE;
291   }
292   return TRUE;
293 }
294
295 static gboolean
296 plugin_loader_load_and_sync (GstPluginLoader * l, PendingPluginEntry * entry)
297 {
298   gint len;
299
300   GST_DEBUG_OBJECT (l->registry, "Synchronously loading plugin file %s",
301       entry->filename);
302
303   len = strlen (entry->filename);
304   put_packet (l, PACKET_LOAD_PLUGIN, entry->tag,
305       (guint8 *) entry->filename, len + 1);
306
307   return plugin_loader_sync_with_child (l);
308 }
309
310 static void
311 plugin_loader_create_blacklist_plugin (GstPluginLoader * l,
312     PendingPluginEntry * entry)
313 {
314   GstPlugin *plugin = g_object_new (GST_TYPE_PLUGIN, NULL);
315
316   plugin->filename = g_strdup (entry->filename);
317   plugin->file_mtime = entry->file_mtime;
318   plugin->file_size = entry->file_size;
319   plugin->flags |= GST_PLUGIN_FLAG_BLACKLISTED;
320
321   plugin->basename = g_path_get_basename (plugin->filename);
322   plugin->desc.name = g_intern_string (plugin->basename);
323   plugin->desc.description = g_strdup_printf ("Plugin for blacklisted file");
324   plugin->desc.version = g_intern_string ("0.0.0");
325   plugin->desc.license = g_intern_string ("BLACKLIST");
326   plugin->desc.source = plugin->desc.license;
327   plugin->desc.package = plugin->desc.license;
328   plugin->desc.origin = plugin->desc.license;
329
330   GST_DEBUG ("Adding blacklist plugin '%s'", plugin->desc.name);
331   gst_registry_add_plugin (l->registry, plugin);
332 }
333
334 static gboolean
335 gst_plugin_loader_try_helper (GstPluginLoader * loader, gchar * location)
336 {
337   char *argv[] = { location, "-l", NULL };
338
339   GST_LOG ("Trying to spawn plugin-scanner helper at %s", location);
340   if (!g_spawn_async_with_pipes (NULL, argv, NULL,
341           G_SPAWN_DO_NOT_REAP_CHILD /* | G_SPAWN_STDERR_TO_DEV_NULL */ ,
342           NULL, NULL, &loader->child_pid, &loader->fd_w.fd, &loader->fd_r.fd,
343           NULL, NULL))
344     return FALSE;
345
346   gst_poll_add_fd (loader->fdset, &loader->fd_w);
347   gst_poll_add_fd (loader->fdset, &loader->fd_r);
348
349   gst_poll_fd_ctl_read (loader->fdset, &loader->fd_r, TRUE);
350
351   loader->tx_buf_write = loader->tx_buf_read = 0;
352
353   loader->got_version = (guint32) (-1);
354   put_packet (loader, PACKET_VERSION, 0, NULL, 0);
355   if (!plugin_loader_sync_with_child (loader))
356     return FALSE;
357
358   GST_LOG ("Got VERSION %u from child. Ours is %u", loader->got_version,
359       loader_protocol_version);
360   if (loader->got_version != loader_protocol_version)
361     return FALSE;
362
363   loader->child_running = TRUE;
364
365   return TRUE;
366 }
367
368 static gboolean
369 gst_plugin_loader_spawn (GstPluginLoader * loader)
370 {
371   char *helper_bin;
372   gboolean res;
373
374   if (loader->child_running)
375     return TRUE;
376
377   /* Find the plugin-scanner, first try installed then by env-var */
378   helper_bin = g_strdup (GST_PLUGIN_SCANNER_INSTALLED);
379   res = gst_plugin_loader_try_helper (loader, helper_bin);
380   g_free (helper_bin);
381
382   if (!res) {
383     /* Try the GST_PLUGIN_SCANNER env var */
384     const gchar *env = g_getenv ("GST_PLUGIN_SCANNER");
385     if (env != NULL) {
386       GST_LOG ("Installed plugin scanner failed. "
387           "Trying GST_PLUGIN_SCANNER env var: %s", env);
388       helper_bin = g_strdup (env);
389       res = gst_plugin_loader_try_helper (loader, helper_bin);
390       g_free (helper_bin);
391     } else {
392       GST_LOG ("Installed plugin scanner failed and "
393           "GST_PLUGIN_SCANNER env var not set. No plugin-scanner available");
394     }
395   }
396
397   return loader->child_running;
398 }
399
400 static void
401 plugin_loader_cleanup_child (GstPluginLoader * l)
402 {
403   if (!l->child_running || l->is_child)
404     return;
405
406   gst_poll_remove_fd (l->fdset, &l->fd_w);
407   gst_poll_remove_fd (l->fdset, &l->fd_r);
408
409   close (l->fd_w.fd);
410   close (l->fd_r.fd);
411
412 #ifndef G_OS_WIN32
413   GST_LOG ("waiting for child process to exit");
414   waitpid (l->child_pid, NULL, 0);
415 #else
416   g_warning ("FIXME: Implement child process shutdown for Win32");
417 #endif
418   g_spawn_close_pid (l->child_pid);
419
420   l->child_running = FALSE;
421 }
422
423 gboolean
424 _gst_plugin_loader_client_run ()
425 {
426   GstPluginLoader *l;
427
428   l = plugin_loader_new (NULL);
429   if (l == NULL)
430     return FALSE;
431
432   l->fd_w.fd = 1;               /* STDOUT */
433   gst_poll_add_fd (l->fdset, &l->fd_w);
434
435   l->fd_r.fd = 0;               /* STDIN */
436   gst_poll_add_fd (l->fdset, &l->fd_r);
437   gst_poll_fd_ctl_read (l->fdset, &l->fd_r, TRUE);
438
439   l->is_child = TRUE;
440
441   GST_DEBUG ("Plugin scanner child running. Waiting for instructions");
442
443   /* Loop, listening for incoming packets on the fd and writing responses */
444   while (!l->rx_done && exchange_packets (l));
445
446   plugin_loader_free (l);
447
448   return TRUE;
449 }
450
451 static void
452 put_packet (GstPluginLoader * l, guint type, guint32 tag,
453     const guint8 * payload, guint32 payload_len)
454 {
455   guint8 *out;
456   guint len = payload_len + HEADER_SIZE;
457
458   if (l->tx_buf_write + len >= l->tx_buf_size) {
459     l->tx_buf_size = l->tx_buf_write + len + BUF_GROW_EXTRA;
460     l->tx_buf = g_realloc (l->tx_buf, l->tx_buf_size);
461   }
462
463   out = l->tx_buf + l->tx_buf_write;
464
465   out[0] = type;
466   GST_WRITE_UINT24_BE (out + 1, tag);
467   GST_WRITE_UINT32_BE (out + 4, payload_len);
468   memcpy (out + HEADER_SIZE, payload, payload_len);
469
470   l->tx_buf_write += len;
471   gst_poll_fd_ctl_write (l->fdset, &l->fd_w, TRUE);
472 }
473
474 static void
475 put_chunk (GstPluginLoader * l, GstRegistryChunk * chunk, guint * pos)
476 {
477   guint padsize = 0;
478   guint len;
479   guint8 *out;
480
481   /* Might need to align the chunk */
482   if (chunk->align && ((*pos) % ALIGNMENT) != 0)
483     padsize = ALIGNMENT - ((*pos) % ALIGNMENT);
484
485   len = padsize + chunk->size;
486
487   if (l->tx_buf_write + len >= l->tx_buf_size) {
488     l->tx_buf_size = l->tx_buf_write + len + BUF_GROW_EXTRA;
489     l->tx_buf = g_realloc (l->tx_buf, l->tx_buf_size);
490   }
491
492   out = l->tx_buf + l->tx_buf_write;
493   memcpy (out + padsize, chunk->data, chunk->size);
494
495   l->tx_buf_write += len;
496   *pos += len;
497
498   gst_poll_fd_ctl_write (l->fdset, &l->fd_w, TRUE);
499 };
500
501 static gboolean
502 write_one (GstPluginLoader * l)
503 {
504   guint8 *out;
505   guint32 to_write;
506   int res;
507
508   if (l->tx_buf_read + HEADER_SIZE > l->tx_buf_write)
509     return FALSE;
510
511   out = l->tx_buf + l->tx_buf_read;
512   to_write = GST_READ_UINT32_BE (out + 4) + HEADER_SIZE;
513   l->tx_buf_read += to_write;
514
515   GST_LOG ("Writing packet of size %d bytes to fd %d", to_write, l->fd_w.fd);
516
517   do {
518     res = write (l->fd_w.fd, out, to_write);
519     if (res > 0) {
520       to_write -= res;
521       out += res;
522     }
523   } while (to_write > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
524   if (res < 0) {
525     /* Failed to write -> child died */
526     plugin_loader_cleanup_child (l);
527     return FALSE;
528   }
529
530   if (l->tx_buf_read == l->tx_buf_write) {
531     gst_poll_fd_ctl_write (l->fdset, &l->fd_w, FALSE);
532     l->tx_buf_read = l->tx_buf_write = 0;
533   }
534   return TRUE;
535 }
536
537 static gboolean
538 do_plugin_load (GstPluginLoader * l, const gchar * filename, guint tag)
539 {
540   GstPlugin *newplugin;
541   GList *chunks = NULL;
542
543   GST_DEBUG ("Plugin scanner loading file %s. tag %u\n", filename, tag);
544
545 #if 0                           /* Test code - crash based on an env var */
546   if (strstr (filename, "coreelements") == NULL) {
547     g_printerr ("Crashing on file %s\n", filename);
548     g_printerr ("%d", *(gint *) (NULL));
549   }
550 #endif
551
552
553   newplugin = gst_plugin_load_file ((gchar *) filename, NULL);
554   if (newplugin) {
555     guint hdr_pos;
556     guint offset;
557
558     /* Now serialise the plugin details and send */
559     if (!_priv_gst_registry_chunks_save_plugin (&chunks,
560             gst_registry_get_default (), newplugin))
561       goto fail;
562
563     /* Store where the header is, write an empty one, then write
564      * all the payload chunks, then fix up the header size */
565     hdr_pos = l->tx_buf_write;
566     offset = HEADER_SIZE;
567     put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
568
569     if (chunks) {
570       GList *walk;
571       for (walk = chunks; walk; walk = g_list_next (walk)) {
572         GstRegistryChunk *cur = walk->data;
573         put_chunk (l, cur, &offset);
574
575         if (!(cur->flags & GST_REGISTRY_CHUNK_FLAG_CONST))
576           g_free (cur->data);
577         g_free (cur);
578       }
579
580       g_list_free (chunks);
581
582       /* Store the size of the written payload */
583       GST_WRITE_UINT32_BE (l->tx_buf + hdr_pos + 4, offset - HEADER_SIZE);
584     }
585
586     gst_object_unref (newplugin);
587   } else {
588     put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
589   }
590
591   return TRUE;
592 fail:
593   put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
594   if (chunks) {
595     GList *walk;
596     for (walk = chunks; walk; walk = g_list_next (walk)) {
597       GstRegistryChunk *cur = walk->data;
598
599       if (!(cur->flags & GST_REGISTRY_CHUNK_FLAG_CONST))
600         g_free (cur->data);
601       g_free (cur);
602     }
603
604     g_list_free (chunks);
605   }
606
607   return FALSE;
608 }
609
610 static gboolean
611 handle_rx_packet (GstPluginLoader * l,
612     guint pack_type, guint32 tag, guint8 * payload, guint payload_len)
613 {
614   gboolean res = TRUE;
615
616   switch (pack_type) {
617     case PACKET_EXIT:
618       gst_poll_fd_ctl_read (l->fdset, &l->fd_r, FALSE);
619       if (l->is_child) {
620         /* Respond, then we keep looping until the parent closes the fd */
621         put_packet (l, PACKET_EXIT, 0, NULL, 0);
622       } else {
623         l->rx_done = TRUE;      /* All done reading from child */
624       }
625       return TRUE;
626     case PACKET_LOAD_PLUGIN:{
627       if (!l->is_child)
628         return TRUE;
629
630       /* Payload is the filename to load */
631       res = do_plugin_load (l, (gchar *) payload, tag);
632
633       break;
634     }
635     case PACKET_PLUGIN_DETAILS:{
636       gchar *tmp = (gchar *) payload;
637       PendingPluginEntry *entry = NULL;
638       GList *cur;
639
640       GST_DEBUG_OBJECT (l->registry,
641           "Received plugin details from child w/ tag %u. %d bytes info",
642           tag, payload_len);
643
644       /* Assume that tagged details come back in the order
645        * we requested, and delete anything before this one */
646       cur = l->pending_plugins;
647       while (cur) {
648         PendingPluginEntry *e = (PendingPluginEntry *) (cur->data);
649
650         if (e->tag > tag)
651           break;
652
653         cur = g_list_delete_link (cur, cur);
654
655         if (e->tag == tag) {
656           entry = e;
657           break;
658         } else {
659           g_free (e->filename);
660           g_free (e);
661         }
662       }
663       l->pending_plugins = cur;
664       if (cur == NULL)
665         l->pending_plugins_tail = NULL;
666
667       if (payload_len > 0) {
668         GstPlugin *newplugin;
669         _priv_gst_registry_chunks_load_plugin (l->registry, &tmp,
670             tmp + payload_len, &newplugin);
671         newplugin->flags &= ~GST_PLUGIN_FLAG_CACHED;
672         GST_LOG_OBJECT (l->registry,
673             "marking plugin %p as registered as %s", newplugin,
674             newplugin->filename);
675         newplugin->registered = TRUE;
676
677         /* We got a set of plugin details - remember it for later */
678         l->got_plugin_details = TRUE;
679       } else if (entry != NULL) {
680         /* Create a blacklist entry for this file to prevent scanning every time */
681         plugin_loader_create_blacklist_plugin (l, entry);
682         l->got_plugin_details = TRUE;
683       }
684
685       if (entry != NULL) {
686         g_free (entry->filename);
687         g_free (entry);
688       }
689
690       break;
691     }
692     case PACKET_SYNC:
693       if (l->is_child) {
694         /* Respond with our reply - also a sync */
695         put_packet (l, PACKET_SYNC, tag, NULL, 0);
696         GST_LOG ("Got SYNC in child - replying");
697       } else
698         l->rx_got_sync = TRUE;
699       break;
700     case PACKET_VERSION:
701       if (l->is_child) {
702         /* Respond with our reply - a version packet, with the version */
703         guint32 val;
704         GST_WRITE_UINT32_BE (&val, loader_protocol_version);
705         put_packet (l, PACKET_VERSION, tag, (guint8 *) & val, sizeof (guint32));
706         GST_LOG ("Got VERSION in child - replying %u", loader_protocol_version);
707       } else {
708         if (payload_len == sizeof (loader_protocol_version)) {
709           l->got_version = GST_READ_UINT32_BE (payload);
710         } else {
711           res = FALSE;
712         }
713       }
714       break;
715     default:
716       return FALSE;             /* Invalid packet -> something is wrong */
717   }
718
719   return res;
720 }
721
722 static gboolean
723 read_one (GstPluginLoader * l)
724 {
725   guint32 to_read, packet_len, tag;
726   guint8 *in;
727   gint res;
728
729   to_read = HEADER_SIZE;
730   in = l->rx_buf;
731   do {
732     res = read (l->fd_r.fd, in, to_read);
733     if (res > 0) {
734       to_read -= res;
735       in += res;
736     }
737   } while (to_read > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
738
739   if (res < 0) {
740     GST_LOG ("Failed reading packet header");
741     return FALSE;
742   }
743
744   packet_len = GST_READ_UINT32_BE (l->rx_buf + 4);
745
746   if (packet_len + HEADER_SIZE >= l->rx_buf_size) {
747     l->rx_buf_size = packet_len + HEADER_SIZE + BUF_GROW_EXTRA;
748     l->rx_buf = g_realloc (l->rx_buf, l->rx_buf_size);
749   }
750
751   in = l->rx_buf + HEADER_SIZE;
752   to_read = packet_len;
753   do {
754     res = read (l->fd_r.fd, in, to_read);
755     if (res > 0) {
756       to_read -= res;
757       in += res;
758     }
759   } while (to_read > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
760
761   if (res < 0) {
762     GST_ERROR ("Packet payload read failed");
763     return FALSE;
764   }
765
766   tag = GST_READ_UINT24_BE (l->rx_buf + 1);
767
768   return handle_rx_packet (l, l->rx_buf[0], tag,
769       l->rx_buf + HEADER_SIZE, packet_len);
770 }
771
772 static gboolean
773 exchange_packets (GstPluginLoader * l)
774 {
775   gint res;
776
777   /* Wait for activity on our FDs */
778   do {
779     do {
780       res = gst_poll_wait (l->fdset, GST_CLOCK_TIME_NONE);
781     } while (res == -1 && (errno == EINTR || errno == EAGAIN));
782
783     if (res < 0)
784       return FALSE;
785
786     GST_LOG ("Poll res = %d. %d bytes pending for write", res,
787         l->tx_buf_write - l->tx_buf_read);
788
789     if (!l->rx_done) {
790       if (gst_poll_fd_has_error (l->fdset, &l->fd_r) ||
791           gst_poll_fd_has_closed (l->fdset, &l->fd_r)) {
792         GST_LOG ("read fd %d closed/errored", l->fd_r.fd);
793         plugin_loader_cleanup_child (l);
794         return FALSE;
795       }
796
797       if (gst_poll_fd_can_read (l->fdset, &l->fd_r)) {
798         if (!read_one (l))
799           return FALSE;
800       }
801     }
802
803     if (l->tx_buf_read < l->tx_buf_write) {
804       if (gst_poll_fd_has_error (l->fdset, &l->fd_w) ||
805           gst_poll_fd_has_closed (l->fdset, &l->fd_r)) {
806         GST_ERROR ("write fd %d closed/errored", l->fd_w.fd);
807         plugin_loader_cleanup_child (l);
808         return FALSE;
809       }
810       if (gst_poll_fd_can_write (l->fdset, &l->fd_w)) {
811         if (!write_one (l))
812           return FALSE;
813       }
814     }
815   } while (l->tx_buf_read < l->tx_buf_write);
816
817   return TRUE;
818 }