registry: Add registry helper phase 1
[platform/upstream/gstreamer.git] / gst / gstpluginloader.c
1 /* GStreamer
2  * Copyright (C) 2008 Jan Schmidt <jan.schmidt@sun.com>
3  *
4  * gstpluginloader.c: GstPluginLoader helper for loading plugin files
5  * out of process.
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 #ifdef HAVE_CONFIG_H
24 #  include "config.h"
25 #endif
26
27 #ifndef G_OS_WIN32
28 #include <sys/types.h>
29 #include <sys/wait.h>
30 #include <unistd.h>
31 #endif
32 #include <errno.h>
33
34 #include <gst/gst_private.h>
35 #include <gst/gstconfig.h>
36
37 #include <gst/gstpoll.h>
38 #include <gst/gstutils.h>
39
40 #include <gst/gstpluginloader.h>
41 #include <gst/gstregistrychunks.h>
42
43 #define GST_CAT_DEFAULT GST_CAT_PLUGIN_LOADING
44
45 static GstPluginLoader *plugin_loader_new (GstRegistry * registry);
46 static gboolean plugin_loader_free (GstPluginLoader * loader);
47 static gboolean plugin_loader_load (GstPluginLoader * loader,
48     const gchar * filename);
49
50 const GstPluginLoaderFuncs _priv_gst_plugin_loader_funcs = {
51   plugin_loader_new, plugin_loader_free, plugin_loader_load
52 };
53
54 struct _GstPluginLoader
55 {
56   GstRegistry *registry;
57   GstPoll *fdset;
58
59   gboolean child_started;
60   GPid child_pid;
61   GstPollFD fd_w;
62   GstPollFD fd_r;
63
64   gboolean is_child;
65   gboolean got_plugin_details;
66
67   /* Transmit buffer */
68   guint8 *tx_buf;
69   guint tx_buf_size;
70   guint tx_buf_write;
71   guint tx_buf_read;
72
73   guint32 next_tag;
74
75   guint8 *rx_buf;
76   guint rx_buf_size;
77   gboolean rx_done;
78 };
79
80 #define PACKET_EXIT 1
81 #define PACKET_LOAD_PLUGIN 2
82 #define PACKET_STARTING_LOAD 3
83 #define PACKET_PLUGIN_DETAILS 4
84
85 #define BUF_INIT_SIZE 512
86 #define BUF_GROW_EXTRA 512
87 #define HEADER_SIZE 8
88 #define ALIGNMENT   (sizeof (void *))
89
90
91 static gboolean gst_plugin_loader_spawn (GstPluginLoader * loader);
92 static void put_packet (GstPluginLoader * loader, guint type, guint32 tag,
93     const guint8 * payload, guint32 payload_len);
94 static gboolean exchange_packets (GstPluginLoader * l);
95
96 static GstPluginLoader *
97 plugin_loader_new (GstRegistry * registry)
98 {
99   GstPluginLoader *l = g_new0 (GstPluginLoader, 1);
100
101   if (registry)
102     l->registry = gst_object_ref (registry);
103   l->fdset = gst_poll_new (FALSE);
104   gst_poll_fd_init (&l->fd_w);
105   gst_poll_fd_init (&l->fd_r);
106
107   l->tx_buf_size = BUF_INIT_SIZE;
108   l->tx_buf = g_malloc (BUF_INIT_SIZE);
109
110   l->next_tag = 0;
111
112   l->rx_buf_size = BUF_INIT_SIZE;
113   l->rx_buf = g_malloc (BUF_INIT_SIZE);
114
115   return l;
116 }
117
118 static gboolean
119 plugin_loader_free (GstPluginLoader * loader)
120 {
121   gboolean got_plugin_details;
122
123   fsync (loader->fd_w.fd);
124
125   if (loader->child_started) {
126     put_packet (loader, PACKET_EXIT, 0, NULL, 0);
127
128     /* Swap packets with the child until it exits */
129     while (!loader->rx_done && exchange_packets (loader)) {
130     };
131
132     close (loader->fd_w.fd);
133     close (loader->fd_r.fd);
134
135 #ifndef G_OS_WIN32
136     GST_LOG ("waiting for child process to exit");
137     waitpid (loader->child_pid, NULL, 0);
138 #else
139     g_warning ("FIXME: Implement child process shutdown for Win32");
140 #endif
141     g_spawn_close_pid (loader->child_pid);
142   } else {
143     close (loader->fd_w.fd);
144     close (loader->fd_r.fd);
145   }
146
147   gst_poll_free (loader->fdset);
148
149   g_free (loader->rx_buf);
150   g_free (loader->tx_buf);
151
152   if (loader->registry)
153     gst_object_unref (loader->registry);
154
155   got_plugin_details = loader->got_plugin_details;
156
157   g_free (loader);
158
159   return got_plugin_details;
160 }
161
162 static gboolean
163 plugin_loader_load (GstPluginLoader * loader, const gchar * filename)
164 {
165   if (!loader->child_started) {
166     if (!gst_plugin_loader_spawn (loader))
167       return FALSE;
168   }
169
170   /* Send a packet to the child requesting that it load the given file */
171   GST_LOG_OBJECT (loader->registry,
172       "Sending file %s to child. tag %u", filename, loader->next_tag);
173
174   put_packet (loader, PACKET_LOAD_PLUGIN, loader->next_tag,
175       (guint8 *) filename, strlen (filename) + 1);
176
177   loader->next_tag++;
178
179   if (!exchange_packets (loader))
180     return FALSE;
181
182   return TRUE;
183 }
184
185 static gboolean
186 gst_plugin_loader_spawn (GstPluginLoader * loader)
187 {
188   char *helper_bin =
189       "/home/jan/devel/gstreamer/head/gstreamer/libs/gst/helpers/plugin-scanner";
190   char *argv[] = { helper_bin, "-l", NULL };
191
192   if (!g_spawn_async_with_pipes (NULL, argv, NULL,
193           G_SPAWN_DO_NOT_REAP_CHILD /* | G_SPAWN_STDERR_TO_DEV_NULL */ ,
194           NULL, NULL, &loader->child_pid, &loader->fd_w.fd, &loader->fd_r.fd,
195           NULL, NULL))
196     return FALSE;
197
198   gst_poll_add_fd (loader->fdset, &loader->fd_w);
199
200   gst_poll_add_fd (loader->fdset, &loader->fd_r);
201   gst_poll_fd_ctl_read (loader->fdset, &loader->fd_r, TRUE);
202
203   loader->child_started = TRUE;
204
205   return TRUE;
206 }
207
208 gboolean
209 _gst_plugin_loader_client_run ()
210 {
211   GstPluginLoader *l;
212
213   l = plugin_loader_new (NULL);
214   if (l == NULL)
215     return FALSE;
216
217   l->fd_w.fd = 1;               /* STDOUT */
218   gst_poll_add_fd (l->fdset, &l->fd_w);
219
220   l->fd_r.fd = 0;               /* STDIN */
221   gst_poll_add_fd (l->fdset, &l->fd_r);
222   gst_poll_fd_ctl_read (l->fdset, &l->fd_r, TRUE);
223
224   l->is_child = TRUE;
225
226   GST_DEBUG ("Plugin scanner child running. Waiting for instructions");
227
228   /* Loop, listening for incoming packets on the fd and writing responses */
229   while (!l->rx_done && exchange_packets (l));
230
231   plugin_loader_free (l);
232
233   return TRUE;
234 }
235
236 static void
237 put_packet (GstPluginLoader * l, guint type, guint32 tag,
238     const guint8 * payload, guint32 payload_len)
239 {
240   guint8 *out;
241   guint len = payload_len + HEADER_SIZE;
242
243   if (l->tx_buf_write + len >= l->tx_buf_size) {
244     l->tx_buf_size = l->tx_buf_write + len + BUF_GROW_EXTRA;
245     l->tx_buf = g_realloc (l->tx_buf, l->tx_buf_size);
246   }
247
248   out = l->tx_buf + l->tx_buf_write;
249
250   out[0] = type;
251   GST_WRITE_UINT24_BE (out + 1, tag);
252   GST_WRITE_UINT32_BE (out + 4, payload_len);
253   memcpy (out + HEADER_SIZE, payload, payload_len);
254
255   l->tx_buf_write += len;
256   gst_poll_fd_ctl_write (l->fdset, &l->fd_w, TRUE);
257 }
258
259 static void
260 put_chunk (GstPluginLoader * l, GstRegistryChunk * chunk, guint * pos)
261 {
262   guint padsize = 0;
263   guint len;
264   guint8 *out;
265
266   /* Might need to align the chunk */
267   if (chunk->align && ((*pos) % ALIGNMENT) != 0)
268     padsize = ALIGNMENT - ((*pos) % ALIGNMENT);
269
270   len = padsize + chunk->size;
271
272   if (l->tx_buf_write + len >= l->tx_buf_size) {
273     l->tx_buf_size = l->tx_buf_write + len + BUF_GROW_EXTRA;
274     l->tx_buf = g_realloc (l->tx_buf, l->tx_buf_size);
275   }
276
277   out = l->tx_buf + l->tx_buf_write;
278   memcpy (out + padsize, chunk->data, chunk->size);
279
280   l->tx_buf_write += len;
281   *pos += len;
282
283   gst_poll_fd_ctl_write (l->fdset, &l->fd_w, TRUE);
284 };
285
286 static gboolean
287 write_one (GstPluginLoader * l)
288 {
289   guint8 *out;
290   guint32 to_write;
291   int res;
292
293   if (l->tx_buf_read + HEADER_SIZE > l->tx_buf_write)
294     return FALSE;
295
296   out = l->tx_buf + l->tx_buf_read;
297   to_write = GST_READ_UINT32_BE (out + 4) + HEADER_SIZE;
298   l->tx_buf_read += to_write;
299
300   GST_LOG ("Writing packet of size %d bytes to fd %d", to_write, l->fd_w.fd);
301
302   do {
303     res = write (l->fd_w.fd, out, to_write);
304     if (res > 0) {
305       to_write -= res;
306       out += res;
307     }
308   } while (to_write > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
309
310   if (l->tx_buf_read == l->tx_buf_write) {
311     gst_poll_fd_ctl_write (l->fdset, &l->fd_w, FALSE);
312     l->tx_buf_read = l->tx_buf_write = 0;
313   }
314   return TRUE;
315 }
316
317 static gboolean
318 do_plugin_load (GstPluginLoader * l, const gchar * filename, guint tag)
319 {
320   GstPlugin *newplugin;
321   GList *chunks = NULL;
322
323   GST_DEBUG ("Plugin scanner loading file %s. tag %u\n", filename, tag);
324   put_packet (l, PACKET_STARTING_LOAD, tag, NULL, 0);
325
326   newplugin = gst_plugin_load_file ((gchar *) filename, NULL);
327   if (newplugin) {
328     guint hdr_pos;
329     guint offset;
330
331     /* Now serialise the plugin details and send */
332     if (!_priv_gst_registry_chunks_save_plugin (&chunks,
333             gst_registry_get_default (), newplugin))
334       goto fail;
335
336     /* Store where the header is, write an empty one, then write
337      * all the payload chunks, then fix up the header size */
338     hdr_pos = l->tx_buf_write;
339     offset = HEADER_SIZE;
340     put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
341
342     if (chunks) {
343       GList *walk;
344       for (walk = chunks; walk; walk = g_list_next (walk)) {
345         GstRegistryChunk *cur = walk->data;
346         put_chunk (l, cur, &offset);
347
348         if (!(cur->flags & GST_REGISTRY_CHUNK_FLAG_CONST))
349           g_free (cur->data);
350         g_free (cur);
351       }
352
353       g_list_free (chunks);
354
355       /* Store the size of the written payload */
356       GST_WRITE_UINT32_BE (l->tx_buf + hdr_pos + 4, offset - HEADER_SIZE);
357     }
358
359     gst_object_unref (newplugin);
360   } else {
361     put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
362   }
363
364   return TRUE;
365 fail:
366   put_packet (l, PACKET_PLUGIN_DETAILS, tag, NULL, 0);
367   if (chunks) {
368     GList *walk;
369     for (walk = chunks; walk; walk = g_list_next (walk)) {
370       GstRegistryChunk *cur = walk->data;
371
372       if (!(cur->flags & GST_REGISTRY_CHUNK_FLAG_CONST))
373         g_free (cur->data);
374       g_free (cur);
375     }
376
377     g_list_free (chunks);
378   }
379
380   return FALSE;
381 }
382
383 static gboolean
384 handle_rx_packet (GstPluginLoader * l,
385     guint pack_type, guint32 tag, guint8 * payload, guint payload_len)
386 {
387   gboolean res = TRUE;
388
389   switch (pack_type) {
390     case PACKET_EXIT:
391       gst_poll_fd_ctl_read (l->fdset, &l->fd_r, FALSE);
392       if (l->is_child) {
393         /* Respond, then we keep looping until the parent closes the fd */
394         put_packet (l, PACKET_EXIT, 0, NULL, 0);
395       } else {
396         l->rx_done = TRUE;      /* All done reading from child */
397       }
398       return TRUE;
399     case PACKET_LOAD_PLUGIN:{
400
401       if (!l->is_child)
402         return TRUE;
403
404       /* Payload is the filename to load */
405       res = do_plugin_load (l, (gchar *) payload, tag);
406
407       break;
408     }
409     case PACKET_STARTING_LOAD:
410       GST_LOG_OBJECT (l->registry,
411           "child started loading plugin w/ tag %u", tag);
412       break;
413     case PACKET_PLUGIN_DETAILS:{
414       gchar *tmp = (gchar *) payload;
415
416       GST_DEBUG_OBJECT (l->registry,
417           "child loaded plugin w/ tag %u. %d bytes info", tag, payload_len);
418
419       if (payload_len > 0) {
420         GstPlugin *newplugin;
421         _priv_gst_registry_chunks_load_plugin (l->registry, &tmp,
422             tmp + payload_len, &newplugin);
423         newplugin->flags &= ~GST_PLUGIN_FLAG_CACHED;
424         GST_LOG_OBJECT (l->registry,
425             "marking plugin %p as registered as %s", newplugin,
426             newplugin->filename);
427         newplugin->registered = TRUE;
428
429         /* We got a set of plugin details - remember it for later */
430         l->got_plugin_details = TRUE;
431       }
432
433       break;
434     }
435     default:
436       return FALSE;             /* Invalid packet -> something is wrong */
437   }
438
439   return res;
440 }
441
442 static gboolean
443 read_one (GstPluginLoader * l)
444 {
445   guint32 to_read, packet_len, tag;
446   guint8 *in;
447   gint res;
448
449   to_read = HEADER_SIZE;
450   in = l->rx_buf;
451   do {
452     res = read (l->fd_r.fd, in, to_read);
453     if (res > 0) {
454       to_read -= res;
455       in += res;
456     }
457   } while (to_read > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
458
459   if (res < 0) {
460     GST_LOG ("Failed reading packet header");
461     return FALSE;
462   }
463
464   packet_len = GST_READ_UINT32_BE (l->rx_buf + 4);
465
466   if (packet_len + HEADER_SIZE >= l->rx_buf_size) {
467     l->rx_buf_size = packet_len + HEADER_SIZE + BUF_GROW_EXTRA;
468     l->rx_buf = g_realloc (l->rx_buf, l->rx_buf_size);
469   }
470
471   in = l->rx_buf + HEADER_SIZE;
472   to_read = packet_len;
473   do {
474     res = read (l->fd_r.fd, in, to_read);
475     if (res > 0) {
476       to_read -= res;
477       in += res;
478     }
479   } while (to_read > 0 && res < 0 && (errno == EAGAIN || errno == EINTR));
480
481   if (res < 0) {
482     GST_ERROR ("Packet payload read failed");
483     return FALSE;
484   }
485
486   tag = GST_READ_UINT24_BE (l->rx_buf + 1);
487
488   return handle_rx_packet (l, l->rx_buf[0], tag,
489       l->rx_buf + HEADER_SIZE, packet_len);
490 }
491
492 static gboolean
493 exchange_packets (GstPluginLoader * l)
494 {
495   gint res;
496
497   /* Wait for activity on our FDs */
498   do {
499     do {
500       res = gst_poll_wait (l->fdset, GST_CLOCK_TIME_NONE);
501     } while (res == -1 && (errno == EINTR || errno == EAGAIN));
502
503     if (res < 0)
504       return FALSE;
505
506     GST_DEBUG ("Poll res = %d. %d bytes pending for write", res,
507         l->tx_buf_write - l->tx_buf_read);
508
509     if (!l->rx_done) {
510       if (gst_poll_fd_has_error (l->fdset, &l->fd_r) ||
511           gst_poll_fd_has_closed (l->fdset, &l->fd_r)) {
512         GST_LOG ("read fd %d closed/errored", l->fd_r.fd);
513         return FALSE;
514       }
515
516       if (gst_poll_fd_can_read (l->fdset, &l->fd_r)) {
517         if (!read_one (l))
518           return FALSE;
519       }
520     }
521
522     if (l->tx_buf_read < l->tx_buf_write) {
523       if (gst_poll_fd_has_error (l->fdset, &l->fd_w) ||
524           gst_poll_fd_has_closed (l->fdset, &l->fd_r)) {
525         GST_ERROR ("write fd %d closed/errored", l->fd_w.fd);
526         return FALSE;
527       }
528       if (gst_poll_fd_can_write (l->fdset, &l->fd_w)) {
529         if (!write_one (l))
530           return FALSE;
531       }
532     }
533   } while (l->tx_buf_read < l->tx_buf_write);
534
535   return TRUE;
536 }