1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 /* vim:set expandtab ts=4 shiftwidth=4: */
4 * Copyright (C) 2008 Sun Microsystems, Inc. All rights reserved.
5 * Use is subject to license terms.
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
17 * You should have received a copy of the GNU Lesser General
18 * Public License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
20 * Boston, MA 02111-1307, USA.
22 * Authors: Lin Ma <lin.ma@sun.com>
27 #include <sys/types.h>
33 #include "fen-kernel.h"
34 #include "fen-missing.h"
37 #define PROCESS_EVENTQ_TIME 10 /* in milliseconds */
38 #define PAIR_EVENTS_TIMEVAL 00000 /* in microseconds */
39 #define PAIR_EVENTS_INC_TIMEVAL 0000 /* in microseconds */
40 #define SCAN_CHANGINGS_TIME 50 /* in milliseconds */
41 #define SCAN_CHANGINGS_MAX_TIME (4*100) /* in milliseconds */
42 #define SCAN_CHANGINGS_MIN_TIME (4*100) /* in milliseconds */
43 #define INIT_CHANGES_NUM 2
46 #ifdef GIO_COMPILATION
47 #define FD_W if (fd_debug_enabled) g_warning
48 static gboolean fd_debug_enabled = FALSE;
50 #include "gam_error.h"
51 #define FD_W(...) GAM_DEBUG(DEBUG_INFO, __VA_ARGS__)
54 G_LOCK_EXTERN (fen_lock);
55 static GList *deleting_data = NULL;
56 static guint deleting_data_id = 0;
58 static void (*emit_once_cb) (fdata *f, int events, gpointer sub);
59 static void (*emit_cb) (fdata *f, int events);
60 static int (*_event_converter) (int event);
62 static gboolean fdata_delete (fdata* f);
63 static gint fdata_sub_find (gpointer a, gpointer b);
64 static void scan_children (node_t *f);
65 static void scan_known_children (node_t* f);
68 _add_missing_cb (node_t* parent, gpointer user_data)
71 FD_W ("%s p:0x%p %s\n", __func__, parent, (gchar*)user_data);
72 return _add_node (parent, (gchar*)user_data);
76 _pre_del_cb (node_t* node, gpointer user_data)
81 data = _node_get_data (node);
82 FD_W ("%s node:0x%p %s\n", __func__, node, NODE_NAME(node));
84 if (!FN_IS_PASSIVE(data)) {
93 _pow (guint x, guint y)
96 g_assert (x >= 0 && y >= 0);
104 get_scalable_scan_time (fdata* data)
107 /* Caculate from num = 0 */
108 sleep_time = _pow (BASE_NUM, data->changed_event_num) * SCAN_CHANGINGS_TIME;
109 if (sleep_time < SCAN_CHANGINGS_MIN_TIME) {
110 sleep_time = SCAN_CHANGINGS_MIN_TIME;
111 } else if (sleep_time > SCAN_CHANGINGS_MAX_TIME) {
112 sleep_time = SCAN_CHANGINGS_MAX_TIME;
113 data->change_update_id = INIT_CHANGES_NUM;
115 FD_W ("SCALABE SCAN num:time [ %4u : %4u ] %s\n", data->changed_event_num, sleep_time, FN_NAME(data));
120 g_timeval_lt (GTimeVal *val1, GTimeVal *val2)
122 if (val1->tv_sec < val2->tv_sec)
125 if (val1->tv_sec > val2->tv_sec)
128 /* val1->tv_sec == val2->tv_sec */
129 if (val1->tv_usec < val2->tv_usec)
136 * If all active children nodes are ported, then cancel monitor the parent node
141 scan_known_children (node_t* f)
147 FD_W ("%s %s [0x%p]\n", __func__, NODE_NAME(f), f);
148 pdata = _node_get_data (f);
150 * Currect fdata must is directly monitored. Be sure it is 1 level monitor.
152 dir = g_dir_open (NODE_NAME(f), 0, &err);
154 const char *basename;
156 while ((basename = g_dir_read_name (dir)))
158 node_t* childf = NULL;
162 * If the node is existed, and isn't ported, then emit created
163 * event. Ignore others.
165 childf = _children_find (f, basename);
167 (data = _node_get_data (childf)) != NULL &&
168 !FN_IS_PASSIVE (data)) {
169 if (!is_monitoring (data) &&
170 _port_add (&data->fobj, &data->len, data)) {
171 _fdata_emit_events (data, FN_EVENT_CREATED);
183 scan_children (node_t *f)
189 FD_W ("%s %s [0x%p]\n", __func__, NODE_NAME(f), f);
190 pdata = _node_get_data (f);
192 * Currect fdata must is directly monitored. Be sure it is 1 level monitor.
194 dir = g_dir_open (NODE_NAME(f), 0, &err);
196 const char *basename;
198 while ((basename = g_dir_read_name (dir)))
200 node_t* childf = NULL;
204 childf = _children_find (f, basename);
205 if (childf == NULL) {
208 filename = g_build_filename (NODE_NAME(f), basename, NULL);
209 childf = _add_node (f, filename);
211 data = _fdata_new (childf, FALSE);
214 if ((data = _node_get_data (childf)) == NULL) {
215 data = _fdata_new (childf, FALSE);
217 /* Be sure data isn't ported and add to port successfully */
218 /* Don't need delete it, it will be deleted by the parent */
219 if (is_monitoring (data)) {
221 } else if (/* !_is_ported (data) && */
222 _port_add (&data->fobj, &data->len, data)) {
223 _fdata_emit_events (data, FN_EVENT_CREATED);
234 scan_deleting_data (gpointer data)
238 GList* deleted_list = NULL;
241 if (G_TRYLOCK (fen_lock)) {
242 for (i = deleting_data; i; i = i->next) {
244 if (fdata_delete (f)) {
245 deleted_list = g_list_prepend (deleted_list, i);
249 for (i = deleted_list; i; i = i->next) {
250 deleting_data = g_list_remove_link (deleting_data,
252 g_list_free_1 ((GList *)i->data);
254 g_list_free (deleted_list);
256 if (deleting_data == NULL) {
257 deleting_data_id = 0;
266 is_monitoring (fdata* data)
268 return _is_ported (data) || data->change_update_id > 0;
272 _get_parent_data (fdata* data)
274 if (FN_NODE(data) && !IS_TOPNODE(FN_NODE(data))) {
275 return _node_get_data (FN_NODE(data)->parent);
281 _get_parent_node (fdata* data)
284 return (FN_NODE(data)->parent);
290 _fdata_new (node_t* node, gboolean is_mondir)
295 if ((f = g_new0 (fdata, 1)) != NULL) {
297 FN_NAME(f) = g_strdup (NODE_NAME(node));
298 f->is_dir = is_mondir;
299 f->eventq = g_queue_new ();
300 FD_W ("[ %s ] 0x%p %s\n", __func__, f, FN_NAME(f));
301 _node_set_data (node, f);
307 fdata_delete (fdata *f)
311 FD_W ("[ TRY %s ] 0x%p id[%4d:%4d] %s\n", __func__, f, f->eventq_id, f->change_update_id, FN_NAME(f));
312 g_assert (FN_IS_PASSIVE(f));
315 /* _missing_remove (f); */
317 if (f->node != NULL) {
318 _node_set_data (f->node, NULL);
322 if (f->change_update_id > 0 || f->eventq_id > 0) {
323 if (FN_IS_LIVING(f)) {
324 f->is_cancelled = TRUE;
325 deleting_data = g_list_prepend (deleting_data, f);
326 if (deleting_data_id == 0) {
327 deleting_data_id = g_idle_add (scan_deleting_data, NULL);
328 g_assert (deleting_data_id > 0);
333 FD_W ("[ %s ] 0x%p %s\n", __func__, f, FN_NAME(f));
335 while ((ev = g_queue_pop_head (f->eventq)) != NULL) {
336 _fnode_event_delete (ev);
339 g_queue_free (f->eventq);
346 _fdata_reset (fdata* data)
352 while ((ev = g_queue_pop_head (data->eventq)) != NULL) {
353 _fnode_event_delete (ev);
358 fdata_sub_find (gpointer a, gpointer b)
368 _fdata_sub_add (fdata *f, gpointer sub)
370 FD_W ("[%s] [data: 0x%p ] [s: 0x%p ] %s\n", __func__, f, sub, FN_NAME(f));
371 g_assert (g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find) == NULL);
372 f->subs = g_list_prepend (f->subs, sub);
376 _fdata_sub_remove (fdata *f, gpointer sub)
379 FD_W ("[%s] [data: 0x%p ] [s: 0x%p ] %s\n", __func__, f, sub, FN_NAME(f));
380 g_assert (g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find) != NULL);
381 l = g_list_find_custom (f->subs, sub, (GCompareFunc)fdata_sub_find);
383 g_assert (sub == l->data);
384 f->subs = g_list_delete_link (f->subs, l);
388 * Adjust self on failing to Port
391 _fdata_adjust_deleted (fdata* f)
395 node_op_t op = {NULL, NULL, _pre_del_cb, NULL};
398 * It's a top node. We move it to missing list.
400 parent = _get_parent_node (f);
401 pdata = _get_parent_data (f);
402 if (!FN_IS_PASSIVE(f) ||
403 _children_num (FN_NODE(f)) > 0 ||
404 (pdata && !FN_IS_PASSIVE(pdata))) {
407 pdata = _fdata_new (parent, FALSE);
410 if (!_port_add (&pdata->fobj, &pdata->len, pdata)) {
411 _fdata_adjust_deleted (pdata);
415 g_assert (IS_TOPNODE(FN_NODE(f)));
419 #ifdef GIO_COMPILATION
420 _pending_remove_node (FN_NODE(f), &op);
422 _remove_node (FN_NODE(f), &op);
428 fdata_adjust_changed (fdata *f)
436 parent = _get_parent_node (f);
437 pdata = _get_parent_data (f);
439 if (!FN_IS_LIVING(f) ||
440 (_children_num (FN_NODE(f)) == 0 &&
442 pdata && FN_IS_PASSIVE(pdata))) {
443 f->change_update_id = 0;
448 FD_W ("[ %s ] %s\n", __func__, FN_NAME(f));
449 if (FN_STAT (FN_NAME(f), &buf) != 0) {
450 FD_W ("LSTAT [%-20s] %s\n", FN_NAME(f), g_strerror (errno));
453 f->is_dir = S_ISDIR (buf.st_mode) ? TRUE : FALSE;
454 if (f->len != buf.st_size) {
455 /* FD_W ("LEN [%lld:%lld] %s\n", f->len, buf.st_size, FN_NAME(f)); */
456 f->len = buf.st_size;
457 ev = _fnode_event_new (FILE_MODIFIED, TRUE, f);
459 ev->is_pending = TRUE;
460 _fdata_add_event (f, ev);
462 /* Fdata is still changing, so scalable scan */
463 f->change_update_id = g_timeout_add (get_scalable_scan_time (f),
464 (GSourceFunc)fdata_adjust_changed,
469 f->changed_event_num = 0;
470 f->fobj.fo_atime = buf.st_atim;
471 f->fobj.fo_mtime = buf.st_mtim;
472 f->fobj.fo_ctime = buf.st_ctim;
474 if (FN_IS_MONDIR(f)) {
475 scan_children (FN_NODE(f));
477 scan_known_children (FN_NODE(f));
478 if ((_children_num (FN_NODE(f)) == 0 &&
480 pdata && FN_IS_PASSIVE(pdata))) {
486 if (!_port_add_simple (&f->fobj, f)) {
488 ev = _fnode_event_new (FILE_DELETE, FALSE, f);
490 _fdata_add_event (f, ev);
495 f->change_update_id = 0;
501 _fdata_emit_events_once (fdata *f, int event, gpointer sub)
503 emit_once_cb (f, _event_converter (event), sub);
507 _fdata_emit_events (fdata *f, int event)
509 emit_cb (f, _event_converter (event));
513 process_events (gpointer udata)
515 node_op_t op = {NULL, NULL, _pre_del_cb, NULL};
520 /* FD_W ("IN <======== %s\n", __func__); */
523 FD_W ("%s 0x%p id:%-4d %s\n", __func__, f, f->eventq_id, FN_NAME(f));
527 if (!FN_IS_LIVING(f)) {
533 if ((ev = (fnode_event_t*)g_queue_pop_head (f->eventq)) != NULL) {
534 /* Send events to clients. */
536 if (!ev->is_pending) {
537 #ifdef GIO_COMPILATION
539 _fdata_emit_events (f, FILE_ATTRIB);
542 _fdata_emit_events (f, ev->e);
545 _fnode_event_delete (ev);
548 /* Adjust node state. */
550 * Node the node has been created, so we can delete create event in
551 * optimizing. To reduce the statings, we add it to Port on discoving
552 * it then emit CREATED event. So we don't need to do anything here.
558 /* If the event is a changed event, then pending process it */
559 if (f->change_update_id == 0) {
560 f->change_update_id = g_timeout_add (get_scalable_scan_time(f),
561 (GSourceFunc)fdata_adjust_changed,
563 g_assert (f->change_update_id > 0);
567 g_assert (f->change_update_id == 0);
568 if (!_port_add (&f->fobj, &f->len, f)) {
569 ev = _fnode_event_new (FILE_DELETE, FALSE, f);
571 _fdata_add_event (f, ev);
575 case FILE_DELETE: /* Ignored */
578 g_assert_not_reached ();
581 /* Process one event a time */
587 /* FD_W ("OUT ========> %s\n", __func__); */
592 _fdata_add_event (fdata *f, fnode_event_t *ev)
594 node_op_t op = {NULL, NULL, _pre_del_cb, NULL};
597 if (!FN_IS_LIVING(f)) {
598 _fnode_event_delete (ev);
602 FD_W ("%s %d\n", __func__, ev->e);
603 g_get_current_time (&ev->t);
605 * If created/deleted events of child node happened, then we use parent
606 * event queue to handle.
607 * If child node emits deleted event, it seems no changes for the parent
608 * node, but the attr is changed. So we may try to cancel processing the
609 * coming changed events of the parent node.
611 tail = (fnode_event_t*)g_queue_peek_tail (f->eventq);
613 case FILE_RENAME_FROM:
616 _fnode_event_delete (ev);
617 g_assert_not_reached ();
620 /* clear changed event number */
621 f->changed_event_num = 0;
623 * We will cancel all previous events.
626 g_queue_pop_tail (f->eventq);
628 _fnode_event_delete (tail);
629 } while ((tail = (fnode_event_t*)g_queue_pop_tail (f->eventq)) != NULL);
632 * Given a node "f" is deleted, process it ASAP.
634 _fdata_emit_events (f, ev->e);
635 _fnode_event_delete (ev);
636 _fdata_adjust_deleted (f);
641 /* clear changed event number */
642 f->changed_event_num ++;
646 * If in the time range, we will try optimizing
647 * (changed+) to (changed)
648 * (attrchanged changed) to ([changed, attrchanged])
649 * (event attrchanged) to ([event, attrchanged])
653 if (tail->e == ev->e) {
654 if (g_timeval_lt (&ev->t, &tail->t)) {
655 g_queue_peek_tail (f->eventq);
656 /* Add the increment */
657 g_time_val_add (&ev->t, PAIR_EVENTS_INC_TIMEVAL);
658 /* skip the previous event */
659 FD_W ("SKIPPED -- %s\n", _event_string (tail->e));
660 _fnode_event_delete (tail);
664 } else if (ev->e == FILE_MODIFIED && tail->e == FILE_ATTRIB) {
666 _fnode_event_delete (tail);
667 } else if (ev->e == FILE_ATTRIB && f->change_update_id > 0) {
668 tail->has_twin = TRUE;
669 /* skip the current event */
670 _fnode_event_delete (ev);
675 } while ((tail = (fnode_event_t*)g_queue_peek_tail (f->eventq)) != NULL);
679 /* must add the threshold time */
680 g_time_val_add (&ev->t, PAIR_EVENTS_TIMEVAL);
682 g_queue_push_tail (f->eventq, ev);
684 /* starting process_events */
685 if (f->eventq_id == 0) {
686 f->eventq_id = g_timeout_add (PROCESS_EVENTQ_TIME,
689 g_assert (f->eventq_id > 0);
691 FD_W ("%s 0x%p id:%-4d %s\n", __func__, f, f->eventq_id, FN_NAME(f));
695 _fdata_class_init (void (*user_emit_cb) (fdata*, int),
696 void (*user_emit_once_cb) (fdata*, int, gpointer),
697 int (*user_event_converter) (int event))
699 FD_W ("%s\n", __func__);
700 if (user_emit_cb == NULL) {
703 if (user_emit_once_cb == NULL) {
706 if (user_event_converter == NULL) {
709 emit_cb = user_emit_cb;
710 emit_once_cb = user_emit_once_cb;
711 _event_converter = user_event_converter;
713 if (!_port_class_init (_fdata_add_event)) {
714 FD_W ("_port_class_init failed.");