Plugin loader phase 2
[platform/upstream/gstreamer.git] / gst / gstpluginloader.c
1 /* GStreamer
2  * Copyright (C) 2008 Jan Schmidt <jan.schmidt@sun.com>
3  *
4  * gstpluginloader.c: GstPluginLoader helper for loading plugin files
5  * out of process.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #ifndef G_OS_WIN32
28 #include <sys/types.h>
29 #include <sys/wait.h>
30 #include <unistd.h>
31 #endif
32 #include <errno.h>
33
34 #include <gst/gst_private.h>
35 #include <gst/gstconfig.h>
36
37 #include <gst/gstpoll.h>
38 #include <gst/gstutils.h>
39
40 #include <gst/gstpluginloader.h>
41 #include <gst/gstregistrychunks.h>
42
43 #define GST_CAT_DEFAULT GST_CAT_PLUGIN_LOADING
44
45 static GstPluginLoader *plugin_loader_new (GstRegistry * registry);
46 static gboolean plugin_loader_free (GstPluginLoader * loader);
47 static gboolean plugin_loader_load (GstPluginLoader * loader,
48     const gchar * filename, off_t file_size, time_t file_mtime);
49
50 const GstPluginLoaderFuncs _priv_gst_plugin_loader_funcs = {
51   plugin_loader_new, plugin_loader_free, plugin_loader_load
52 };
53
54 typedef struct _PendingPluginEntry
55 {
56   guint32 tag;
57   gchar *filename;
58   off_t file_size;
59   time_t file_mtime;
60 } PendingPluginEntry;
61
62 struct _GstPluginLoader
63 {
64   GstRegistry *registry;
65   GstPoll *fdset;
66
67   gboolean child_started;
68   GPid child_pid;
69   GstPollFD fd_w;
70   GstPollFD fd_r;
71
72   gboolean is_child;
73   gboolean got_plugin_details;
74
75   /* Transmit buffer */
76   guint8 *tx_buf;
77   guint tx_buf_size;
78   guint tx_buf_write;
79   guint tx_buf_read;
80
81   guint32 next_tag;
82
83   guint8 *rx_buf;
84   guint rx_buf_size;
85   gboolean rx_done;
86
87   /* Head and tail of the pending plugins list. List of
88      PendingPluginEntry structs */
89   GList *pending_plugins;
90   GList *pending_plugins_tail;
91 };
92
93 #define PACKET_EXIT 1
94 #define PACKET_LOAD_PLUGIN 2
95 #define PACKET_STARTING_LOAD 3
96 #define PACKET_PLUGIN_DETAILS 4
97
98 #define BUF_INIT_SIZE 512
99 #define BUF_GROW_EXTRA 512
100 #define HEADER_SIZE 8
101 #define ALIGNMENT   (sizeof (void *))
102
103
104 static gboolean gst_plugin_loader_spawn (GstPluginLoader * loader);
105 static void put_packet (GstPluginLoader * loader, guint type, guint32 tag,
106     const guint8 * payload, guint32 payload_len);
107 static gboolean exchange_packets (GstPluginLoader * l);
108
109 static GstPluginLoader *
110 plugin_loader_new (GstRegistry * registry)
111 {
112   GstPluginLoader *l = g_new0 (GstPluginLoader, 1);
113
114   if (registry)
115     l->registry = gst_object_ref (registry);
116   l->fdset = gst_poll_new (FALSE);
117   gst_poll_fd_init (&l->fd_w);
118   gst_poll_fd_init (&l->fd_r);
119
120   l->tx_buf_size = BUF_INIT_SIZE;
121   l->tx_buf = g_malloc (BUF_INIT_SIZE);
122
123   l->next_tag = 0;
124
125   l->rx_buf_size = BUF_INIT_SIZE;
126   l->rx_buf = g_malloc (BUF_INIT_SIZE);
127
128   return l;
129 }
130
131 static gboolean
132 plugin_loader_free (GstPluginLoader * loader)
133 {
134   GList *cur;
135   gboolean got_plugin_details;
136
137   fsync (loader->fd_w.fd);
138
139   if (loader->child_started) {
140     put_packet (loader, PACKET_EXIT, 0, NULL, 0);
141
142     /* Swap packets with the child until it exits */
143     while (!loader->rx_done && exchange_packets (loader)) {
144     };
145
146     close (loader->fd_w.fd);
147     close (loader->fd_r.fd);
148
149 #ifndef G_OS_WIN32
150     GST_LOG ("waiting for child process to exit");
151     waitpid (loader->child_pid, NULL, 0);
152 #else
153     g_warning ("FIXME: Implement child process shutdown for Win32");
154 #endif
155     g_spawn_close_pid (loader->child_pid);
156   } else {
157     close (loader->fd_w.fd);
158     close (loader->fd_r.fd);
159   }
160
161   gst_poll_free (loader->fdset);
162
163   g_free (loader->rx_buf);
164   g_free (loader->tx_buf);
165
166   if (loader->registry)
167     gst_object_unref (loader->registry);
168
169   got_plugin_details = loader->got_plugin_details;
170
171   /* Free any pending plugin entries */
172   cur = loader->pending_plugins;
173   while (cur) {
174     PendingPluginEntry *entry = (PendingPluginEntry *) (cur->data);
175     g_free (entry->filename);
176     g_free (entry);
177
178     cur = g_list_delete_link (cur, cur);
179   }
180
181   g_free (loader);
182
183   return got_plugin_details;
184 }
185
186 static gboolean
187 plugin_loader_load (GstPluginLoader * loader, const gchar * filename,
188     off_t file_size, time_t file_mtime)
189 {
190   gint len;
191   PendingPluginEntry *entry;
192
193   if (!loader->child_started) {
194     if (!gst_plugin_loader_spawn (loader))
195       return FALSE;
196   }
197
198   /* Send a packet to the child requesting that it load the given file */
199   GST_LOG_OBJECT (loader->registry,
200       "Sending file %s to child. tag %u", filename, loader->next_tag);
201
202   entry = g_new (PendingPluginEntry, 1);
203   entry->tag = loader->next_tag++;
204   entry->filename = g_strdup (filename);
205   entry->file_size = file_size;
206   entry->file_mtime = file_mtime;
207   loader->pending_plugins_tail =
208       g_list_append (loader->pending_plugins_tail, entry);
209
210   if (loader->pending_plugins == NULL)
211     loader->pending_plugins = loader->pending_plugins_tail;
212   else
213     loader->pending_plugins_tail = g_list_next (loader->pending_plugins_tail);
214
215   len = strlen (filename);
216   put_packet (loader, PACKET_LOAD_PLUGIN, entry->tag,
217       (guint8 *) filename, len + 1);
218
219   if (!exchange_packets (loader))
220     return FALSE;
221
222   return TRUE;
223 }
224
225 static gboolean
226 gst_plugin_loader_spawn (GstPluginLoader * loader)
227 {
228   char *helper_bin =
229       "/home/jan/devel/gstreamer/head/gstreamer/libs/gst/helpers/plugin-scanner";
230   char *argv[] = { helper_bin, "-l", NULL };
231
232   if (!g_spawn_async_with_pipes (NULL, argv, NULL,
233           G_SPAWN_DO_NOT_REAP_CHILD /* | G_SPAWN_STDERR_TO_DEV_NULL */ ,
234           NULL, NULL, &loader->child_pid, &loader->fd_w.fd, &loader->fd_r.fd,
235           NULL, NULL))
236     return FALSE;
237
238   gst_poll_add_fd (loader->fdset, &loader->fd_w);
239
240   gst_poll_add_fd (loader->fdset, &loader->fd_r);
241   gst_poll_fd_ctl_read (loader->fdset, &loader->fd_r, TRUE);
242
243   loader->child_started = TRUE;
244
245   return TRUE;
246 }
247
248 gboolean
249 _gst_plugin_loader_client_run ()
250 {
251   GstPluginLoader *l;
252
253   l = plugin_loader_new (NULL);
254   if (l == NULL)
255     return FALSE;
256
257   l->fd_w.fd = 1;               /* STDOUT */
258   gst_poll_add_fd (l->fdset, &l->fd_w);
259
260   l->fd_r.fd = 0;               /* STDIN */
261   gst_poll_add_fd (l->fdset, &l->fd_r);
262   gst_poll_fd_ctl_read (l->fdset, &l->fd_r, TRUE);
263
264   l->is_child = TRUE;
265
266   GST_DEBUG ("Plugin scanner child running. Waiting for instructions");
267
268   /* Loop, listening for incoming packets on the fd and writing responses */
269   while (!l->rx_done && exchange_packets (l));
270
271   plugin_loader_free (l);
272
273   return TRUE;
274 }
275
276 static void
277 put_packet (GstPluginLoader * l, guint type, guint32 tag,
278     const guint8 * payload, guint32 payload_len)
279 {
280   guint8 *out;
281   guint len = payload_len + HEADER_SIZE;
282
283   if (l->tx_buf_write + len >= l->tx_buf_size) {
284     l->tx_buf_size = l->tx_buf_write + len + BUF_GROW_EXTRA;
285     l->tx_buf = g_realloc (l->tx_buf, l->tx_buf_size);
286   }
287
288   out = l->tx_buf + l->tx_buf_write;
289
290   out[0] = type;
291   GST_WRITE_UINT24_BE (out + 1, tag);
292   GST_WRITE_UINT32_BE (out + 4, payload_len);
293   memcpy (out + HEADER_SIZE, payload, payload_len);
294
295   l->tx_buf_write += len;
296   gst_poll_fd_ctl_write (l->fdset, &l->fd_w, TRUE);
297 }
298
299 static void
300 put_chunk (GstPluginLoader * l, GstRegistryChunk * chunk, guint * pos)
301 {
302   guint padsize = 0;
303   guint len;
304   guint8 *out;
305
306   /* Might need to align the chunk */
307   if (chunk->align && ((*pos) % ALIGNMENT) != 0)
308     padsize = ALIGNMENT - ((*pos) % ALIGNMENT);
309
310   len = padsize + chunk->size;
311
312   if (l->tx_buf_write + len >= l->tx_buf_size) {
313     l->tx_buf_size = l->tx_buf_write + len + BUF_GROW_EXTRA;
314     l->tx_buf = g_realloc (l->tx_buf, l->tx_buf_size);
315   }
316
317   out = l->tx_buf + l->tx_buf_write;
318   memcpy (out + padsize, chunk->data, chunk->size);
319
320   l->tx_buf_write += len;
321   *pos += len;
322
323   gst_poll_fd_ctl_write (l->fdset, &l->fd_w, TRUE);
324 };
325
326 static gboolean
327 write_one (GstPluginLoader * l)
328 {
329   guint8 *out;
330   guint32 to_write;
331   int res;
332
333   if (l->tx_buf_read + HEADER_SIZE > l->tx_buf_write)
334     return FALSE;
335
336   out = l->tx_buf + l->tx_buf_read;
337   to_write = GST_READ_UINT32_BE (out + 4) + HEADER_SIZE;
338   l->tx_buf_read += to_write;
339
340   GST_LOG ("Writing packet of size %d bytes to fd %d", to_write, l->fd_w.fd);
341
342   do {
343     res = write (l->fd_w.fd, out, to_write);
344     if (res > 0) {
345       to_write -= res;
346       out += res;
347     }
348   } while (to_write > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
349
350   if (l->tx_buf_read == l->tx_buf_write) {
351     gst_poll_fd_ctl_write (l->fdset, &l->fd_w, FALSE);
352     l->tx_buf_read = l->tx_buf_write = 0;
353   }
354   return TRUE;
355 }
356
357 static gboolean
358 do_plugin_load (GstPluginLoader * l, const gchar * filename, guint tag)
359 {
360   GstPlugin *newplugin;
361   GList *chunks = NULL;
362
363   GST_DEBUG ("Plugin scanner loading file %s. tag %u\n", filename, tag);
364   put_packet (l, PACKET_STARTING_LOAD, tag, NULL, 0);
365
366   newplugin = gst_plugin_load_file ((gchar *) filename, NULL);
367   if (newplugin) {
368     guint hdr_pos;
369     guint offset;
370
371     /* Now serialise the plugin details and send */
372     if (!_priv_gst_registry_chunks_save_plugin (&chunks,
373             gst_registry_get_default (), newplugin))
374       goto fail;
375
376     /* Store where the header is, write an empty one, then write
377      * all the payload chunks, then fix up the header size */
378     hdr_pos = l->tx_buf_write;
379     offset = HEADER_SIZE;
380     put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
381
382     if (chunks) {
383       GList *walk;
384       for (walk = chunks; walk; walk = g_list_next (walk)) {
385         GstRegistryChunk *cur = walk->data;
386         put_chunk (l, cur, &offset);
387
388         if (!(cur->flags & GST_REGISTRY_CHUNK_FLAG_CONST))
389           g_free (cur->data);
390         g_free (cur);
391       }
392
393       g_list_free (chunks);
394
395       /* Store the size of the written payload */
396       GST_WRITE_UINT32_BE (l->tx_buf + hdr_pos + 4, offset - HEADER_SIZE);
397     }
398
399     gst_object_unref (newplugin);
400   } else {
401     put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
402   }
403
404   return TRUE;
405 fail:
406   put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
407   if (chunks) {
408     GList *walk;
409     for (walk = chunks; walk; walk = g_list_next (walk)) {
410       GstRegistryChunk *cur = walk->data;
411
412       if (!(cur->flags & GST_REGISTRY_CHUNK_FLAG_CONST))
413         g_free (cur->data);
414       g_free (cur);
415     }
416
417     g_list_free (chunks);
418   }
419
420   return FALSE;
421 }
422
423 static gboolean
424 handle_rx_packet (GstPluginLoader * l,
425     guint pack_type, guint32 tag, guint8 * payload, guint payload_len)
426 {
427   gboolean res = TRUE;
428
429   switch (pack_type) {
430     case PACKET_EXIT:
431       gst_poll_fd_ctl_read (l->fdset, &l->fd_r, FALSE);
432       if (l->is_child) {
433         /* Respond, then we keep looping until the parent closes the fd */
434         put_packet (l, PACKET_EXIT, 0, NULL, 0);
435       } else {
436         l->rx_done = TRUE;      /* All done reading from child */
437       }
438       return TRUE;
439     case PACKET_LOAD_PLUGIN:{
440
441       if (!l->is_child)
442         return TRUE;
443
444       /* Payload is the filename to load */
445       res = do_plugin_load (l, (gchar *) payload, tag);
446
447       break;
448     }
449     case PACKET_STARTING_LOAD:
450       GST_LOG_OBJECT (l->registry,
451           "child started loading plugin w/ tag %u", tag);
452       break;
453     case PACKET_PLUGIN_DETAILS:{
454       gchar *tmp = (gchar *) payload;
455       PendingPluginEntry *entry = NULL;
456       GList *cur;
457
458       GST_DEBUG_OBJECT (l->registry,
459           "Received plugin details from child w/ tag %u. %d bytes info",
460           tag, payload_len);
461
462       /* Assume that tagged details come back in the order
463        * we requested, and delete anything before this one */
464       cur = l->pending_plugins;
465       while (cur) {
466         PendingPluginEntry *e = (PendingPluginEntry *) (cur->data);
467
468         if (e->tag > tag)
469           break;
470
471         cur = g_list_delete_link (cur, cur);
472
473         if (e->tag == tag) {
474           entry = e;
475           break;
476         } else {
477           g_free (e->filename);
478           g_free (e);
479         }
480       }
481       l->pending_plugins = cur;
482       if (cur == NULL)
483         l->pending_plugins_tail = NULL;
484
485       if (payload_len > 0) {
486         GstPlugin *newplugin;
487         _priv_gst_registry_chunks_load_plugin (l->registry, &tmp,
488             tmp + payload_len, &newplugin);
489         newplugin->flags &= ~GST_PLUGIN_FLAG_CACHED;
490         GST_LOG_OBJECT (l->registry,
491             "marking plugin %p as registered as %s", newplugin,
492             newplugin->filename);
493         newplugin->registered = TRUE;
494
495         /* We got a set of plugin details - remember it for later */
496         l->got_plugin_details = TRUE;
497       } else if (entry != NULL) {
498         /* Create a dummy entry for this file to prevent scanning every time */
499         GstPlugin *plugin = g_object_new (GST_TYPE_PLUGIN, NULL);
500
501         plugin->filename = g_strdup (entry->filename);
502         plugin->file_mtime = entry->file_mtime;
503         plugin->file_size = entry->file_size;
504
505         plugin->basename = g_path_get_basename (plugin->filename);
506         plugin->desc.name = g_intern_string (plugin->basename);
507         plugin->desc.description = g_strdup_printf ("Dummy plugin for file %s",
508             plugin->filename);
509         plugin->desc.version = g_intern_string ("0.0.0");
510         plugin->desc.license = g_intern_string ("DUMMY");
511         plugin->desc.source = plugin->desc.license;
512         plugin->desc.package = plugin->desc.license;
513         plugin->desc.origin = plugin->desc.license;
514
515         GST_DEBUG ("Adding dummy plugin '%s'", plugin->desc.name);
516         gst_registry_add_plugin (l->registry, plugin);
517         l->got_plugin_details = TRUE;
518       }
519
520       if (entry != NULL) {
521         g_free (entry->filename);
522         g_free (entry);
523       }
524
525       break;
526     }
527     default:
528       return FALSE;             /* Invalid packet -> something is wrong */
529   }
530
531   return res;
532 }
533
534 static gboolean
535 read_one (GstPluginLoader * l)
536 {
537   guint32 to_read, packet_len, tag;
538   guint8 *in;
539   gint res;
540
541   to_read = HEADER_SIZE;
542   in = l->rx_buf;
543   do {
544     res = read (l->fd_r.fd, in, to_read);
545     if (res > 0) {
546       to_read -= res;
547       in += res;
548     }
549   } while (to_read > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
550
551   if (res < 0) {
552     GST_LOG ("Failed reading packet header");
553     return FALSE;
554   }
555
556   packet_len = GST_READ_UINT32_BE (l->rx_buf + 4);
557
558   if (packet_len + HEADER_SIZE >= l->rx_buf_size) {
559     l->rx_buf_size = packet_len + HEADER_SIZE + BUF_GROW_EXTRA;
560     l->rx_buf = g_realloc (l->rx_buf, l->rx_buf_size);
561   }
562
563   in = l->rx_buf + HEADER_SIZE;
564   to_read = packet_len;
565   do {
566     res = read (l->fd_r.fd, in, to_read);
567     if (res > 0) {
568       to_read -= res;
569       in += res;
570     }
571   } while (to_read > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
572
573   if (res < 0) {
574     GST_ERROR ("Packet payload read failed");
575     return FALSE;
576   }
577
578   tag = GST_READ_UINT24_BE (l->rx_buf + 1);
579
580   return handle_rx_packet (l, l->rx_buf[0], tag,
581       l->rx_buf + HEADER_SIZE, packet_len);
582 }
583
584 static gboolean
585 exchange_packets (GstPluginLoader * l)
586 {
587   gint res;
588
589   /* Wait for activity on our FDs */
590   do {
591     do {
592       res = gst_poll_wait (l->fdset, GST_CLOCK_TIME_NONE);
593     } while (res == -1 && (errno == EINTR || errno == EAGAIN));
594
595     if (res < 0)
596       return FALSE;
597
598     GST_DEBUG ("Poll res = %d. %d bytes pending for write", res,
599         l->tx_buf_write - l->tx_buf_read);
600
601     if (!l->rx_done) {
602       if (gst_poll_fd_has_error (l->fdset, &l->fd_r) ||
603           gst_poll_fd_has_closed (l->fdset, &l->fd_r)) {
604         GST_LOG ("read fd %d closed/errored", l->fd_r.fd);
605         return FALSE;
606       }
607
608       if (gst_poll_fd_can_read (l->fdset, &l->fd_r)) {
609         if (!read_one (l))
610           return FALSE;
611       }
612     }
613
614     if (l->tx_buf_read < l->tx_buf_write) {
615       if (gst_poll_fd_has_error (l->fdset, &l->fd_w) ||
616           gst_poll_fd_has_closed (l->fdset, &l->fd_r)) {
617         GST_ERROR ("write fd %d closed/errored", l->fd_w.fd);
618         return FALSE;
619       }
620       if (gst_poll_fd_can_write (l->fdset, &l->fd_w)) {
621         if (!write_one (l))
622           return FALSE;
623       }
624     }
625   } while (l->tx_buf_read < l->tx_buf_write);
626
627   return TRUE;
628 }