Imported Upstream version 5.3.21
[platform/upstream/libdb.git] / examples / cxx / excxx_repquote / RepQuoteExample.cpp
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2001, 2012 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8
9 /*
10  * In this application, we specify all communication via the command line.  In
11  * a real application, we would expect that information about the other sites
12  * in the system would be maintained in some sort of configuration file.  The
13  * critical part of this interface is that we assume at startup that we can
14  * find out
15  *      1) what our Berkeley DB home environment is,
16  *      2) what host/port we wish to listen on for connections; and
17  *      3) an optional list of other sites we should attempt to connect to.
18  *
19  * These pieces of information are expressed by the following flags.
20  * -h home (required; h stands for home directory)
21  * -l host:port (required unless -L is specified; l stands for local)
22  * -L host:port (optional, L means group creator)
23  * -C or -M (optional; start up as client or master)
24  * -r host:port (optional; r stands for remote; any number of these may be
25  *      specified)
26  * -R host:port (optional; R stands for remote peer; only one of these may
27  *      be specified)
28  * -a all|quorum (optional; a stands for ack policy)
29  * -b (optional; b stands for bulk)
30  * -p priority (optional; defaults to 100)
31  * -v (optional; v stands for verbose)
32  */
33
34 #include <cstdlib>
35 #include <cstring>
36
37 #include <iostream>
38 #include <string>
39 #include <sstream>
40
41 #include <db_cxx.h>
42 #include "RepConfigInfo.h"
43 #include "dbc_auto.h"
44
45 using std::cout;
46 using std::cin;
47 using std::cerr;
48 using std::endl;
49 using std::flush;
50 using std::istream;
51 using std::istringstream;
52 using std::string;
53 using std::getline;
54
55 #define CACHESIZE       (10 * 1024 * 1024)
56 #define DATABASE        "quote.db"
57
58 const char *progname = "excxx_repquote";
59
60 #include <errno.h>
61 #ifdef _WIN32
62 #define WIN32_LEAN_AND_MEAN
63 #include <windows.h>
64 #define snprintf                _snprintf
65 #define sleep(s)                Sleep(1000 * (s))
66
67 extern "C" {
68     extern int getopt(int, char * const *, const char *);
69     extern char *optarg;
70 }
71
72 typedef HANDLE thread_t;
73 typedef DWORD thread_exit_status_t;
74 #define thread_create(thrp, attr, func, arg)                               \
75     (((*(thrp) = CreateThread(NULL, 0,                                     \
76         (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
77 #define thread_join(thr, statusp)                                          \
78     ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) &&            \
79     GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
80 #else /* !_WIN32 */
81 #include <pthread.h>
82
83 typedef pthread_t thread_t;
84 typedef void* thread_exit_status_t;
85 #define thread_create(thrp, attr, func, arg)                               \
86     pthread_create((thrp), (attr), (func), (arg))
87 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
88 #endif
89
90 // Struct used to store information in Db app_private field.
91 typedef struct {
92         bool app_finished;
93         bool in_client_sync;
94         bool is_master;
95         bool verbose;
96 } APP_DATA;
97
98 static void log(const char *);
99 void *checkpoint_thread (void *);
100 void *log_archive_thread (void *);
101
102 class RepQuoteExample {
103 public:
104         RepQuoteExample();
105         void init(RepConfigInfo* config);
106         void doloop();
107         int terminate();
108
109         static void event_callback(DbEnv* dbenv, u_int32_t which, void *info);
110
111 private:
112         // disable copy constructor.
113         RepQuoteExample(const RepQuoteExample &);
114         void operator = (const RepQuoteExample &);
115
116         // internal data members.
117         APP_DATA                app_data;
118         RepConfigInfo   *app_config;
119         DbEnv              cur_env;
120         thread_t ckp_thr;
121         thread_t lga_thr;
122
123         // private methods.
124         void print_stocks(Db *dbp);
125         void prompt();
126 };
127
128 class DbHolder {
129 public:
130         DbHolder(DbEnv *env) : env(env) {
131         dbp = 0;
132         }
133
134         ~DbHolder() {
135         try {
136                 close();
137         } catch (...) {
138                 // Ignore: this may mean another exception is pending
139         }
140         }
141
142         bool ensure_open(bool creating) {
143         if (dbp)
144                 return (true);
145         dbp = new Db(env, 0);
146
147         u_int32_t flags = DB_AUTO_COMMIT;
148         if (creating)
149                 flags |= DB_CREATE;
150         try {
151                 dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
152                 return (true);
153         } catch (DbDeadlockException e) {
154         } catch (DbRepHandleDeadException e) {
155         } catch (DbException e) {
156                 if (e.get_errno() == DB_REP_LOCKOUT) {
157                         // Just fall through.
158                 } else if (e.get_errno() == ENOENT && !creating) {
159                         // Provide a bit of extra explanation.
160                         log("Stock DB does not yet exist");
161                 } else
162                         throw;
163         }
164
165         // (All retryable errors fall through to here.)
166         log("please retry the operation");
167         close();
168         return (false);
169         }
170
171         void close() {
172         if (dbp) {
173                 try {
174                 dbp->close(0);
175                 delete dbp;
176                 dbp = 0;
177                 } catch (...) {
178                 delete dbp;
179                 dbp = 0;
180                 throw;
181                 }
182         }
183         }
184
185         operator Db *() {
186         return dbp;
187         }
188
189         Db *operator->() {
190         return dbp;
191         }
192
193 private:
194         Db *dbp;
195         DbEnv *env;
196 };
197
198 class StringDbt : public Dbt {
199 public:
200 #define GET_STRING_OK 0
201 #define GET_STRING_INVALID_PARAM 1
202 #define GET_STRING_SMALL_BUFFER 2
203 #define GET_STRING_EMPTY_DATA 3
204         int get_string(char **buf, size_t buf_len)
205         {
206                 size_t copy_len;
207                 int ret = GET_STRING_OK;
208                 if (buf == NULL) {
209                         cerr << "Invalid input buffer to get_string" << endl;
210                         return GET_STRING_INVALID_PARAM;
211                 }
212
213                 // make sure the string is null terminated.
214                 memset(*buf, 0, buf_len);
215
216                 // if there is no string, just return.
217                 if (get_data() == NULL || get_size() == 0)
218                         return GET_STRING_OK;
219
220                 if (get_size() >= buf_len) {
221                         ret = GET_STRING_SMALL_BUFFER;
222                         copy_len = buf_len - 1; // save room for a terminator.
223                 } else
224                         copy_len = get_size();
225                 memcpy(*buf, get_data(), copy_len);
226
227                 return ret;
228         }
229         size_t get_string_length()
230         {
231                 if (get_size() == 0)
232                         return 0;
233                 return strlen((char *)get_data());
234         }
235         void set_string(char *string)
236         {
237                 set_data(string);
238                 set_size((u_int32_t)strlen(string));
239         }
240
241         StringDbt(char *string) : 
242             Dbt(string, (u_int32_t)strlen(string)) {};
243         StringDbt() : Dbt() {};
244         ~StringDbt() {};
245
246         // Don't add extra data to this sub-class since we want it to remain
247         // compatible with Dbt objects created internally by Berkeley DB.
248 };
249
250 RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(0) {
251         app_data.app_finished = 0;
252         app_data.in_client_sync = 0;
253         app_data.is_master = 0; // assume I start out as client
254         app_data.verbose = 0;
255 }
256
257 void RepQuoteExample::init(RepConfigInfo *config) {
258         DbSite *dbsite;
259         int i;
260
261         app_config = config;
262
263         cur_env.set_app_private(&app_data);
264         cur_env.set_errfile(stderr);
265         cur_env.set_errpfx(progname);
266         cur_env.set_event_notify(event_callback);
267
268         // Configure bulk transfer to send groups of records to clients 
269         // in a single network transfer.  This is useful for master sites 
270         // and clients participating in client-to-client synchronization.
271         //
272         if (app_config->bulk)
273                 cur_env.rep_set_config(DB_REP_CONF_BULK, 1);
274
275         // Turn on debugging and informational output if requested.
276         if (app_config->verbose) {
277                 cur_env.set_verbose(DB_VERB_REPLICATION, 1);
278                 app_data.verbose = 1;
279         }
280
281         // Set replication group election priority for this environment.
282         // An election first selects the site with the most recent log
283         // records as the new master.  If multiple sites have the most
284         // recent log records, the site with the highest priority value 
285         // is selected as master.
286         //
287         cur_env.rep_set_priority(app_config->priority);
288
289         // Set the policy that determines how master and client sites
290         // handle acknowledgement of replication messages needed for
291         // permanent records.  The default policy of "quorum" requires only
292         // a quorum of electable peers sufficient to ensure a permanent
293         // record remains durable if an election is held.  The "all" option
294         // requires all clients to acknowledge a permanent replication
295         // message instead.
296         //
297         cur_env.repmgr_set_ack_policy(app_config->ack_policy);
298
299         // Set the threshold for the minimum and maximum time the client
300         // waits before requesting retransmission of a missing message.
301         // Base these values on the performance and load characteristics
302         // of the master and client host platforms as well as the round
303         // trip message time.
304         //
305         cur_env.rep_set_request(20000, 500000);
306
307         // Configure deadlock detection to ensure that any deadlocks
308         // are broken by having one of the conflicting lock requests
309         // rejected. DB_LOCK_DEFAULT uses the lock policy specified
310         // at environment creation time or DB_LOCK_RANDOM if none was
311         // specified.
312         //
313         cur_env.set_lk_detect(DB_LOCK_DEFAULT);
314
315         // The following base replication features may also be useful to your
316         // application. See Berkeley DB documentation for more details.
317         //   - Master leases: Provide stricter consistency for data reads
318         //     on a master site.
319         //   - Timeouts: Customize the amount of time Berkeley DB waits
320         //     for such things as an election to be concluded or a master
321         //     lease to be granted.
322         //   - Delayed client synchronization: Manage the master site's
323         //     resources by spreading out resource-intensive client 
324         //     synchronizations.
325         //   - Blocked client operations: Return immediately with an error
326         //     instead of waiting indefinitely if a client operation is
327         //     blocked by an ongoing client synchronization.
328         cur_env.repmgr_site(app_config->this_host.host,
329             app_config->this_host.port, &dbsite, 0);
330         dbsite->set_config(DB_LOCAL_SITE, 1);
331         if (app_config->this_host.creator)
332                 dbsite->set_config(DB_GROUP_CREATOR, 1);
333         dbsite->close();
334
335         i = 1;
336         for ( REP_HOST_INFO *cur = app_config->other_hosts;
337             cur != NULL && i <= app_config->nrsites;
338             cur = cur->next, i++) {
339                 cur_env.repmgr_site(cur->host, cur->port, &dbsite, 0);
340                 dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);
341                 if (cur->peer)
342                         dbsite->set_config(DB_REPMGR_PEER, 1);
343                 dbsite->close();
344         }
345
346         // Configure heartbeat timeouts so that repmgr monitors the
347         // health of the TCP connection.  Master sites broadcast a heartbeat
348         // at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
349         // Client sites wait for message activity the length of the 
350         // DB_REP_HEARTBEAT_MONITOR timeout before concluding that the 
351         // connection to the master is lost.  The DB_REP_HEARTBEAT_MONITOR 
352         // timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
353         //
354         cur_env.rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000);
355         cur_env.rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000);
356
357         // The following repmgr features may also be useful to your
358         // application.  See Berkeley DB documentation for more details.
359         //  - Two-site strict majority rule - In a two-site replication
360         //    group, require both sites to be available to elect a new
361         //    master.
362         //  - Timeouts - Customize the amount of time repmgr waits
363         //    for such things as waiting for acknowledgements or attempting 
364         //    to reconnect to other sites.
365         //  - Site list - return a list of sites currently known to repmgr.
366
367         // We can now open our environment, although we're not ready to
368         // begin replicating.  However, we want to have a dbenv around
369         // so that we can send it into any of our message handlers.
370         //
371         cur_env.set_cachesize(0, CACHESIZE, 0);
372         cur_env.set_flags(DB_TXN_NOSYNC, 1);
373
374         cur_env.open(app_config->home, DB_CREATE | DB_RECOVER |
375             DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
376             DB_INIT_MPOOL | DB_INIT_TXN, 0);
377
378         // Start checkpoint and log archive support threads.
379         (void)thread_create(&ckp_thr, NULL, checkpoint_thread, &cur_env);
380         (void)thread_create(&lga_thr, NULL, log_archive_thread, &cur_env);
381
382         cur_env.repmgr_start(3, app_config->start_policy);
383 }
384
385 int RepQuoteExample::terminate() {
386         try {
387                 // Wait for checkpoint and log archive threads to finish.
388                 // Windows does not allow NULL pointer for exit code variable.
389                 thread_exit_status_t exstat; 
390
391                 (void)thread_join(lga_thr, &exstat);
392                 (void)thread_join(ckp_thr, &exstat);
393
394                 // We have used the DB_TXN_NOSYNC environment flag for
395                 // improved performance without the usual sacrifice of
396                 // transactional durability, as discussed in the
397                 // "Transactional guarantees" page of the Reference
398                 // Guide: if one replication site crashes, we can
399                 // expect the data to exist at another site.  However,
400                 // in case we shut down all sites gracefully, we push
401                 // out the end of the log here so that the most
402                 // recent transactions don't mysteriously disappear.
403                 //
404                 cur_env.log_flush(NULL);
405
406                 cur_env.close(0);
407         } catch (DbException dbe) {
408                 cout << "error closing environment: " << dbe.what() << endl;
409         }
410         return 0;
411 }
412
413 void RepQuoteExample::prompt() {
414         cout << "QUOTESERVER";
415         if (!app_data.is_master)
416                 cout << "(read-only)";
417         cout << "> " << flush;
418 }
419
420 void log(const char *msg) {
421         cerr << msg << endl;
422 }
423
424 // Simple command-line user interface:
425 //  - enter "<stock symbol> <price>" to insert or update a record in the
426 //      database;
427 //  - just press Return (i.e., blank input line) to print out the contents of
428 //      the database;
429 //  - enter "quit" or "exit" to quit.
430 //
431 void RepQuoteExample::doloop() {
432         DbHolder dbh(&cur_env);
433
434         string input;
435         while (prompt(), getline(cin, input)) {
436                 istringstream is(input);
437                 string token1, token2;
438
439                 // Read 0, 1 or 2 tokens from the input.
440                 //
441                 int count = 0;
442                 if (is >> token1) {
443                         count++;
444                         if (is >> token2)
445                         count++;
446                 }
447
448                 if (count == 1) {
449                         if (token1 == "exit" || token1 == "quit") {
450                                 app_data.app_finished = 1;
451                                 break;
452                         } else {
453                                 log("Format: <stock> <price>");
454                                 continue;
455                         }
456                 }
457
458                 // Here we know count is either 0 or 2, so we're about to try a
459                 // DB operation.
460                 //
461                 // Open database with DB_CREATE only if this is a master
462                 // database.  A client database uses polling to attempt
463                 // to open the database without DB_CREATE until it is
464                 // successful.
465                 //
466                 // This DB_CREATE polling logic can be simplified under
467                 // some circumstances.  For example, if the application can
468                 // be sure a database is already there, it would never need
469                 // to open it with DB_CREATE.
470                 //
471                 if (!dbh.ensure_open(app_data.is_master))
472                         continue;
473
474                 try {
475                         if (count == 0)
476                                 if (app_data.in_client_sync)
477                                         log(
478     "Cannot read data during client initialization - please try again.");
479                                 else
480                                         print_stocks(dbh);
481                         else if (!app_data.is_master)
482                                 log("Can't update at client");
483                         else {
484                                 const char *symbol = token1.c_str();
485                                 StringDbt key(const_cast<char*>(symbol));
486
487                                 const char *price = token2.c_str();
488                                 StringDbt data(const_cast<char*>(price));
489
490                                 dbh->put(NULL, &key, &data, 0);
491                         }
492                 } catch (DbDeadlockException e) {
493                         log("please retry the operation");
494                         dbh.close();
495                 } catch (DbRepHandleDeadException e) {
496                         log("please retry the operation");
497                         dbh.close();
498                 } catch (DbException e) {
499                         if (e.get_errno() == DB_REP_LOCKOUT) {
500                         log("please retry the operation");
501                         dbh.close();
502                         } else
503                         throw;
504                 }
505         }
506
507         dbh.close();
508 }
509
510 void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
511 {
512         APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
513
514         info = NULL;            /* Currently unused. */
515
516         switch (which) {
517         case DB_EVENT_REP_CLIENT:
518                 app->is_master = 0;
519                 app->in_client_sync = 1;
520                 break;
521         case DB_EVENT_REP_MASTER:
522                 app->is_master = 1;
523                 app->in_client_sync = 0;
524                 break;
525         case DB_EVENT_REP_NEWMASTER:
526                 app->in_client_sync = 1;
527                 break;
528         case DB_EVENT_REP_PERM_FAILED:
529                 // Did not get enough acks to guarantee transaction
530                 // durability based on the configured ack policy.  This 
531                 // transaction will be flushed to the master site's
532                 // local disk storage for durability.
533                 //
534                 if (app->verbose)
535                         log(
536 "EVENT: Insufficient acknowledgements to guarantee transaction durability.");
537                 break;
538
539         case DB_EVENT_REP_STARTUPDONE:
540                 app->in_client_sync = 0;
541                 break;
542
543         case DB_EVENT_PANIC:
544                 if (app->verbose)
545                         log("EVENT: receive panic event");
546                 break;
547
548         case DB_EVENT_REP_CONNECT_BROKEN:
549                 if (app->verbose)
550                         log("EVENT: connection is broken");
551                 break;
552
553         case DB_EVENT_REP_DUPMASTER:
554                 if (app->verbose)
555                         log("EVENT: duplicate master");
556                 break;
557
558         case DB_EVENT_REP_ELECTED:
559                 if (app->verbose)
560                         log("EVENT: election in replication group");
561                 break;
562
563         case DB_EVENT_REP_CONNECT_ESTD:
564                 if (app->verbose)
565                         log("EVENT: establish connection");
566                 break;
567
568         case DB_EVENT_REP_CONNECT_TRY_FAILED:
569                 if (app->verbose)
570                         log("EVENT: fail to try connection");
571                 break;
572
573         case DB_EVENT_REP_INIT_DONE:
574                 if (app->verbose)
575                         log("EVENT: finish initialization");
576                 break;
577
578         case DB_EVENT_REP_LOCAL_SITE_REMOVED:
579                 if (app->verbose)
580                         log("EVENT: remove local site");
581                 break;
582
583         case DB_EVENT_REP_SITE_ADDED:
584                 if (app->verbose)
585                         log("EVENT: add site");
586                 break;
587
588         case DB_EVENT_REP_SITE_REMOVED:
589                 if (app->verbose)
590                         log("EVENT: remote site");
591                 break;
592
593         default:
594                 dbenv->errx("ignoring event %d", which);
595         }
596 }
597
598 void RepQuoteExample::print_stocks(Db *dbp) {
599         StringDbt key, data;
600 #define MAXKEYSIZE      10
601 #define MAXDATASIZE     20
602         char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
603         char *kbuf, *dbuf;
604
605         memset(&key, 0, sizeof(key));
606         memset(&data, 0, sizeof(data));
607         kbuf = keybuf;
608         dbuf = databuf;
609
610         DbcAuto dbc(dbp, 0, 0);
611         cout << "\tSymbol\tPrice" << endl
612                 << "\t======\t=====" << endl;
613
614         for (int ret = dbc->get(&key, &data, DB_FIRST);
615                 ret == 0;
616                 ret = dbc->get(&key, &data, DB_NEXT)) {
617                 key.get_string(&kbuf, MAXKEYSIZE);
618                 data.get_string(&dbuf, MAXDATASIZE);
619
620                 cout << "\t" << keybuf << "\t" << databuf << endl;
621         }
622         cout << endl << flush;
623         dbc.close();
624 }
625
626 static void usage() {
627         cerr << "usage: " << progname << endl << "  -h home -l|-L host:port"
628             << "  [-C|M] [-r host:port] [-R host:port]" << endl
629             << "  [-a all|quorum] [-b] [-p priority] [-v]" << endl;
630
631         cerr << "\t -h home (required; h stands for home directory)" << endl
632             << "\t -l host:port (required unless -L is specified;"
633             << " l stands for local)" << endl
634             << "\t -L host:port (optional, L means group creator)" << endl
635             << "\t -C or -M (optional; start up as client or master)" << endl
636             << "\t -r host:port (optional; r stands for remote; any "
637             << "number of these" << endl
638             << "\t               may be specified)" << endl
639             << "\t -R host:port (optional; R stands for remote peer; only "
640             << "one of" << endl
641             << "\t               these may be specified)" << endl
642             << "\t -a all|quorum (optional; a stands for ack policy)" << endl
643             << "\t -b (optional; b stands for bulk)" << endl
644             << "\t -p priority (optional; defaults to 100)" << endl
645             << "\t -v (optional; v stands for verbose)" << endl;
646
647         exit(EXIT_FAILURE);
648 }
649
650 int main(int argc, char **argv) {
651         RepConfigInfo config;
652         char ch, *portstr, *tmphost;
653         int tmpport;
654         bool tmppeer;
655
656         // Extract the command line parameters
657         while ((ch = getopt(argc, argv, "a:bCh:L:l:Mp:R:r:v")) != EOF) {
658                 tmppeer = false;
659                 switch (ch) {
660                 case 'a':
661                         if (strncmp(optarg, "all", 3) == 0)
662                                 config.ack_policy = DB_REPMGR_ACKS_ALL;
663                         else if (strncmp(optarg, "quorum", 6) != 0)
664                                 usage();
665                         break;
666                 case 'b':
667                         config.bulk = true;
668                         break;
669                 case 'C':
670                         config.start_policy = DB_REP_CLIENT;
671                         break;
672                 case 'h':
673                         config.home = optarg;
674                         break;
675                 case 'L':
676                         config.this_host.creator = true; // FALLTHROUGH
677                 case 'l':
678                         config.this_host.host = strtok(optarg, ":");
679                         if ((portstr = strtok(NULL, ":")) == NULL) {
680                                 cerr << "Bad host specification." << endl;
681                                 usage();
682                         }
683                         config.this_host.port = (unsigned short)atoi(portstr);
684                         config.got_listen_address = true;
685                         break;
686                 case 'M':
687                         config.start_policy = DB_REP_MASTER;
688                         break;
689                 case 'p':
690                         config.priority = atoi(optarg);
691                         break;
692                 case 'R':
693                         tmppeer = true; // FALLTHROUGH
694                 case 'r':
695                         tmphost = strtok(optarg, ":");
696                         if ((portstr = strtok(NULL, ":")) == NULL) {
697                                 cerr << "Bad host specification." << endl;
698                                 usage();
699                         }
700                         tmpport = (unsigned short)atoi(portstr);
701
702                         config.addOtherHost(tmphost, tmpport, tmppeer);
703                         break;
704                 case 'v':
705                         config.verbose = true;
706                         break;
707                 case '?':
708                 default:
709                         usage();
710                 }
711         }
712
713         // Error check command line.
714         if ((!config.got_listen_address) || config.home == NULL)
715                 usage();
716
717         RepQuoteExample runner;
718         try {
719                 runner.init(&config);
720                 runner.doloop();
721         } catch (DbException dbe) {
722                 cerr << "Caught an exception during initialization or"
723                         << " processing: " << dbe.what() << endl;
724         }
725         runner.terminate();
726         return 0;
727 }
728
729 // This is a very simple thread that performs checkpoints at a fixed
730 // time interval.  For a master site, the time interval is one minute
731 // plus the duration of the checkpoint_delay timeout (30 seconds by
732 // default.)  For a client site, the time interval is one minute.
733 //
734 void *checkpoint_thread(void *args)
735 {
736         DbEnv *env;
737         APP_DATA *app;
738         int i, ret;
739
740         env = (DbEnv *)args;
741         app = (APP_DATA *)env->get_app_private();
742
743         for (;;) {
744                 // Wait for one minute, polling once per second to see if
745                 // application has finished.  When application has finished,
746                 // terminate this thread.
747                 //
748                 for (i = 0; i < 60; i++) {
749                         sleep(1);
750                         if (app->app_finished == 1)
751                                 return ((void *)EXIT_SUCCESS);
752                 }
753
754                 // Perform a checkpoint.
755                 if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) {
756                         env->err(ret, "Could not perform checkpoint.\n");
757                         return ((void *)EXIT_FAILURE);
758                 }
759         }
760 }
761
762 // This is a simple log archive thread.  Once per minute, it removes all but
763 // the most recent 3 logs that are safe to remove according to a call to
764 // DBENV->log_archive().
765 //
766 // Log cleanup is needed to conserve disk space, but aggressive log cleanup
767 // can cause more frequent client initializations if a client lags too far
768 // behind the current master.  This can happen in the event of a slow client,
769 // a network partition, or a new master that has not kept as many logs as the
770 // previous master.
771 //
772 // The approach in this routine balances the need to mitigate against a
773 // lagging client by keeping a few more of the most recent unneeded logs
774 // with the need to conserve disk space by regularly cleaning up log files.
775 // Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE 
776 // flag) is not recommended for replication due to the risk of frequent 
777 // client initializations.
778 //
779 void *log_archive_thread(void *args)
780 {
781         DbEnv *env;
782         APP_DATA *app;
783         char **begin, **list;
784         int i, listlen, logs_to_keep, minlog, ret;
785
786         env = (DbEnv *)args;
787         app = (APP_DATA *)env->get_app_private();
788         logs_to_keep = 3;
789
790         for (;;) {
791                 // Wait for one minute, polling once per second to see if
792                 // application has finished.  When application has finished,
793                 // terminate this thread.
794                 //
795                 for (i = 0; i < 60; i++) {
796                         sleep(1);
797                         if (app->app_finished == 1)
798                                 return ((void *)EXIT_SUCCESS);
799                 }
800
801                 // Get the list of unneeded log files.
802                 if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) {
803                         env->err(ret, "Could not get log archive list.");
804                         return ((void *)EXIT_FAILURE);
805                 }
806                 if (list != NULL) {
807                         listlen = 0;
808                         // Get the number of logs in the list.
809                         for (begin = list; *begin != NULL; begin++, listlen++);
810                         // Remove all but the logs_to_keep most recent
811                         // unneeded log files.
812                         //
813                         minlog = listlen - logs_to_keep;
814                         for (begin = list, i= 0; i < minlog; list++, i++) {
815                                 if ((ret = unlink(*list)) != 0) {
816                                         env->err(ret,
817                                             "logclean: remove %s", *list);
818                                         env->errx(
819                                             "logclean: Error remove %s", *list);
820                                         free(begin);
821                                         return ((void *)EXIT_FAILURE);
822                                 }
823                         }
824                         free(begin);
825                 }
826         }
827 }