2 * See the file LICENSE for redistribution information.
4 * Copyright (c) 2001, 2012 Oracle and/or its affiliates. All rights reserved.
10 * In this application, we specify all communication via the command line. In
11 * a real application, we would expect that information about the other sites
12 * in the system would be maintained in some sort of configuration file. The
13 * critical part of this interface is that we assume at startup that we can
15 * 1) what our Berkeley DB home environment is,
16 * 2) what host/port we wish to listen on for connections; and
17 * 3) an optional list of other sites we should attempt to connect to.
19 * These pieces of information are expressed by the following flags.
20 * -h home (required; h stands for home directory)
21 * -l host:port (required unless -L is specified; l stands for local)
22 * -L host:port (optional, L means group creator)
23 * -C or -M (optional; start up as client or master)
24 * -r host:port (optional; r stands for remote; any number of these may be
26 * -R host:port (optional; R stands for remote peer; only one of these may
28 * -a all|quorum (optional; a stands for ack policy)
29 * -b (optional; b stands for bulk)
30 * -p priority (optional; defaults to 100)
31 * -v (optional; v stands for verbose)
42 #include "RepConfigInfo.h"
51 using std::istringstream;
55 #define CACHESIZE (10 * 1024 * 1024)
56 #define DATABASE "quote.db"
58 const char *progname = "excxx_repquote";
62 #define WIN32_LEAN_AND_MEAN
64 #define snprintf _snprintf
65 #define sleep(s) Sleep(1000 * (s))
68 extern int getopt(int, char * const *, const char *);
72 typedef HANDLE thread_t;
73 typedef DWORD thread_exit_status_t;
74 #define thread_create(thrp, attr, func, arg) \
75 (((*(thrp) = CreateThread(NULL, 0, \
76 (LPTHREAD_START_ROUTINE)(func), (arg), 0, NULL)) == NULL) ? -1 : 0)
77 #define thread_join(thr, statusp) \
78 ((WaitForSingleObject((thr), INFINITE) == WAIT_OBJECT_0) && \
79 GetExitCodeThread((thr), (LPDWORD)(statusp)) ? 0 : -1)
83 typedef pthread_t thread_t;
84 typedef void* thread_exit_status_t;
85 #define thread_create(thrp, attr, func, arg) \
86 pthread_create((thrp), (attr), (func), (arg))
87 #define thread_join(thr, statusp) pthread_join((thr), (statusp))
90 // Struct used to store information in Db app_private field.
98 static void log(const char *);
99 void *checkpoint_thread (void *);
100 void *log_archive_thread (void *);
102 class RepQuoteExample {
105 void init(RepConfigInfo* config);
109 static void event_callback(DbEnv* dbenv, u_int32_t which, void *info);
112 // disable copy constructor.
113 RepQuoteExample(const RepQuoteExample &);
114 void operator = (const RepQuoteExample &);
116 // internal data members.
118 RepConfigInfo *app_config;
124 void print_stocks(Db *dbp);
130 DbHolder(DbEnv *env) : env(env) {
138 // Ignore: this may mean another exception is pending
142 bool ensure_open(bool creating) {
145 dbp = new Db(env, 0);
147 u_int32_t flags = DB_AUTO_COMMIT;
151 dbp->open(NULL, DATABASE, NULL, DB_BTREE, flags, 0);
153 } catch (DbDeadlockException e) {
154 } catch (DbRepHandleDeadException e) {
155 } catch (DbException e) {
156 if (e.get_errno() == DB_REP_LOCKOUT) {
157 // Just fall through.
158 } else if (e.get_errno() == ENOENT && !creating) {
159 // Provide a bit of extra explanation.
160 log("Stock DB does not yet exist");
165 // (All retryable errors fall through to here.)
166 log("please retry the operation");
198 class StringDbt : public Dbt {
200 #define GET_STRING_OK 0
201 #define GET_STRING_INVALID_PARAM 1
202 #define GET_STRING_SMALL_BUFFER 2
203 #define GET_STRING_EMPTY_DATA 3
204 int get_string(char **buf, size_t buf_len)
207 int ret = GET_STRING_OK;
209 cerr << "Invalid input buffer to get_string" << endl;
210 return GET_STRING_INVALID_PARAM;
213 // make sure the string is null terminated.
214 memset(*buf, 0, buf_len);
216 // if there is no string, just return.
217 if (get_data() == NULL || get_size() == 0)
218 return GET_STRING_OK;
220 if (get_size() >= buf_len) {
221 ret = GET_STRING_SMALL_BUFFER;
222 copy_len = buf_len - 1; // save room for a terminator.
224 copy_len = get_size();
225 memcpy(*buf, get_data(), copy_len);
229 size_t get_string_length()
233 return strlen((char *)get_data());
235 void set_string(char *string)
238 set_size((u_int32_t)strlen(string));
241 StringDbt(char *string) :
242 Dbt(string, (u_int32_t)strlen(string)) {};
243 StringDbt() : Dbt() {};
246 // Don't add extra data to this sub-class since we want it to remain
247 // compatible with Dbt objects created internally by Berkeley DB.
250 RepQuoteExample::RepQuoteExample() : app_config(0), cur_env(0) {
251 app_data.app_finished = 0;
252 app_data.in_client_sync = 0;
253 app_data.is_master = 0; // assume I start out as client
254 app_data.verbose = 0;
257 void RepQuoteExample::init(RepConfigInfo *config) {
263 cur_env.set_app_private(&app_data);
264 cur_env.set_errfile(stderr);
265 cur_env.set_errpfx(progname);
266 cur_env.set_event_notify(event_callback);
268 // Configure bulk transfer to send groups of records to clients
269 // in a single network transfer. This is useful for master sites
270 // and clients participating in client-to-client synchronization.
272 if (app_config->bulk)
273 cur_env.rep_set_config(DB_REP_CONF_BULK, 1);
275 // Turn on debugging and informational output if requested.
276 if (app_config->verbose) {
277 cur_env.set_verbose(DB_VERB_REPLICATION, 1);
278 app_data.verbose = 1;
281 // Set replication group election priority for this environment.
282 // An election first selects the site with the most recent log
283 // records as the new master. If multiple sites have the most
284 // recent log records, the site with the highest priority value
285 // is selected as master.
287 cur_env.rep_set_priority(app_config->priority);
289 // Set the policy that determines how master and client sites
290 // handle acknowledgement of replication messages needed for
291 // permanent records. The default policy of "quorum" requires only
292 // a quorum of electable peers sufficient to ensure a permanent
293 // record remains durable if an election is held. The "all" option
294 // requires all clients to acknowledge a permanent replication
297 cur_env.repmgr_set_ack_policy(app_config->ack_policy);
299 // Set the threshold for the minimum and maximum time the client
300 // waits before requesting retransmission of a missing message.
301 // Base these values on the performance and load characteristics
302 // of the master and client host platforms as well as the round
303 // trip message time.
305 cur_env.rep_set_request(20000, 500000);
307 // Configure deadlock detection to ensure that any deadlocks
308 // are broken by having one of the conflicting lock requests
309 // rejected. DB_LOCK_DEFAULT uses the lock policy specified
310 // at environment creation time or DB_LOCK_RANDOM if none was
313 cur_env.set_lk_detect(DB_LOCK_DEFAULT);
315 // The following base replication features may also be useful to your
316 // application. See Berkeley DB documentation for more details.
317 // - Master leases: Provide stricter consistency for data reads
319 // - Timeouts: Customize the amount of time Berkeley DB waits
320 // for such things as an election to be concluded or a master
321 // lease to be granted.
322 // - Delayed client synchronization: Manage the master site's
323 // resources by spreading out resource-intensive client
325 // - Blocked client operations: Return immediately with an error
326 // instead of waiting indefinitely if a client operation is
327 // blocked by an ongoing client synchronization.
328 cur_env.repmgr_site(app_config->this_host.host,
329 app_config->this_host.port, &dbsite, 0);
330 dbsite->set_config(DB_LOCAL_SITE, 1);
331 if (app_config->this_host.creator)
332 dbsite->set_config(DB_GROUP_CREATOR, 1);
336 for ( REP_HOST_INFO *cur = app_config->other_hosts;
337 cur != NULL && i <= app_config->nrsites;
338 cur = cur->next, i++) {
339 cur_env.repmgr_site(cur->host, cur->port, &dbsite, 0);
340 dbsite->set_config(DB_BOOTSTRAP_HELPER, 1);
342 dbsite->set_config(DB_REPMGR_PEER, 1);
346 // Configure heartbeat timeouts so that repmgr monitors the
347 // health of the TCP connection. Master sites broadcast a heartbeat
348 // at the frequency specified by the DB_REP_HEARTBEAT_SEND timeout.
349 // Client sites wait for message activity the length of the
350 // DB_REP_HEARTBEAT_MONITOR timeout before concluding that the
351 // connection to the master is lost. The DB_REP_HEARTBEAT_MONITOR
352 // timeout should be longer than the DB_REP_HEARTBEAT_SEND timeout.
354 cur_env.rep_set_timeout(DB_REP_HEARTBEAT_SEND, 5000000);
355 cur_env.rep_set_timeout(DB_REP_HEARTBEAT_MONITOR, 10000000);
357 // The following repmgr features may also be useful to your
358 // application. See Berkeley DB documentation for more details.
359 // - Two-site strict majority rule - In a two-site replication
360 // group, require both sites to be available to elect a new
362 // - Timeouts - Customize the amount of time repmgr waits
363 // for such things as waiting for acknowledgements or attempting
364 // to reconnect to other sites.
365 // - Site list - return a list of sites currently known to repmgr.
367 // We can now open our environment, although we're not ready to
368 // begin replicating. However, we want to have a dbenv around
369 // so that we can send it into any of our message handlers.
371 cur_env.set_cachesize(0, CACHESIZE, 0);
372 cur_env.set_flags(DB_TXN_NOSYNC, 1);
374 cur_env.open(app_config->home, DB_CREATE | DB_RECOVER |
375 DB_THREAD | DB_INIT_REP | DB_INIT_LOCK | DB_INIT_LOG |
376 DB_INIT_MPOOL | DB_INIT_TXN, 0);
378 // Start checkpoint and log archive support threads.
379 (void)thread_create(&ckp_thr, NULL, checkpoint_thread, &cur_env);
380 (void)thread_create(&lga_thr, NULL, log_archive_thread, &cur_env);
382 cur_env.repmgr_start(3, app_config->start_policy);
385 int RepQuoteExample::terminate() {
387 // Wait for checkpoint and log archive threads to finish.
388 // Windows does not allow NULL pointer for exit code variable.
389 thread_exit_status_t exstat;
391 (void)thread_join(lga_thr, &exstat);
392 (void)thread_join(ckp_thr, &exstat);
394 // We have used the DB_TXN_NOSYNC environment flag for
395 // improved performance without the usual sacrifice of
396 // transactional durability, as discussed in the
397 // "Transactional guarantees" page of the Reference
398 // Guide: if one replication site crashes, we can
399 // expect the data to exist at another site. However,
400 // in case we shut down all sites gracefully, we push
401 // out the end of the log here so that the most
402 // recent transactions don't mysteriously disappear.
404 cur_env.log_flush(NULL);
407 } catch (DbException dbe) {
408 cout << "error closing environment: " << dbe.what() << endl;
413 void RepQuoteExample::prompt() {
414 cout << "QUOTESERVER";
415 if (!app_data.is_master)
416 cout << "(read-only)";
417 cout << "> " << flush;
420 void log(const char *msg) {
424 // Simple command-line user interface:
425 // - enter "<stock symbol> <price>" to insert or update a record in the
427 // - just press Return (i.e., blank input line) to print out the contents of
429 // - enter "quit" or "exit" to quit.
431 void RepQuoteExample::doloop() {
432 DbHolder dbh(&cur_env);
435 while (prompt(), getline(cin, input)) {
436 istringstream is(input);
437 string token1, token2;
439 // Read 0, 1 or 2 tokens from the input.
449 if (token1 == "exit" || token1 == "quit") {
450 app_data.app_finished = 1;
453 log("Format: <stock> <price>");
458 // Here we know count is either 0 or 2, so we're about to try a
461 // Open database with DB_CREATE only if this is a master
462 // database. A client database uses polling to attempt
463 // to open the database without DB_CREATE until it is
466 // This DB_CREATE polling logic can be simplified under
467 // some circumstances. For example, if the application can
468 // be sure a database is already there, it would never need
469 // to open it with DB_CREATE.
471 if (!dbh.ensure_open(app_data.is_master))
476 if (app_data.in_client_sync)
478 "Cannot read data during client initialization - please try again.");
481 else if (!app_data.is_master)
482 log("Can't update at client");
484 const char *symbol = token1.c_str();
485 StringDbt key(const_cast<char*>(symbol));
487 const char *price = token2.c_str();
488 StringDbt data(const_cast<char*>(price));
490 dbh->put(NULL, &key, &data, 0);
492 } catch (DbDeadlockException e) {
493 log("please retry the operation");
495 } catch (DbRepHandleDeadException e) {
496 log("please retry the operation");
498 } catch (DbException e) {
499 if (e.get_errno() == DB_REP_LOCKOUT) {
500 log("please retry the operation");
510 void RepQuoteExample::event_callback(DbEnv* dbenv, u_int32_t which, void *info)
512 APP_DATA *app = (APP_DATA*)dbenv->get_app_private();
514 info = NULL; /* Currently unused. */
517 case DB_EVENT_REP_CLIENT:
519 app->in_client_sync = 1;
521 case DB_EVENT_REP_MASTER:
523 app->in_client_sync = 0;
525 case DB_EVENT_REP_NEWMASTER:
526 app->in_client_sync = 1;
528 case DB_EVENT_REP_PERM_FAILED:
529 // Did not get enough acks to guarantee transaction
530 // durability based on the configured ack policy. This
531 // transaction will be flushed to the master site's
532 // local disk storage for durability.
536 "EVENT: Insufficient acknowledgements to guarantee transaction durability.");
539 case DB_EVENT_REP_STARTUPDONE:
540 app->in_client_sync = 0;
545 log("EVENT: receive panic event");
548 case DB_EVENT_REP_CONNECT_BROKEN:
550 log("EVENT: connection is broken");
553 case DB_EVENT_REP_DUPMASTER:
555 log("EVENT: duplicate master");
558 case DB_EVENT_REP_ELECTED:
560 log("EVENT: election in replication group");
563 case DB_EVENT_REP_CONNECT_ESTD:
565 log("EVENT: establish connection");
568 case DB_EVENT_REP_CONNECT_TRY_FAILED:
570 log("EVENT: fail to try connection");
573 case DB_EVENT_REP_INIT_DONE:
575 log("EVENT: finish initialization");
578 case DB_EVENT_REP_LOCAL_SITE_REMOVED:
580 log("EVENT: remove local site");
583 case DB_EVENT_REP_SITE_ADDED:
585 log("EVENT: add site");
588 case DB_EVENT_REP_SITE_REMOVED:
590 log("EVENT: remote site");
594 dbenv->errx("ignoring event %d", which);
598 void RepQuoteExample::print_stocks(Db *dbp) {
600 #define MAXKEYSIZE 10
601 #define MAXDATASIZE 20
602 char keybuf[MAXKEYSIZE + 1], databuf[MAXDATASIZE + 1];
605 memset(&key, 0, sizeof(key));
606 memset(&data, 0, sizeof(data));
610 DbcAuto dbc(dbp, 0, 0);
611 cout << "\tSymbol\tPrice" << endl
612 << "\t======\t=====" << endl;
614 for (int ret = dbc->get(&key, &data, DB_FIRST);
616 ret = dbc->get(&key, &data, DB_NEXT)) {
617 key.get_string(&kbuf, MAXKEYSIZE);
618 data.get_string(&dbuf, MAXDATASIZE);
620 cout << "\t" << keybuf << "\t" << databuf << endl;
622 cout << endl << flush;
626 static void usage() {
627 cerr << "usage: " << progname << endl << " -h home -l|-L host:port"
628 << " [-C|M] [-r host:port] [-R host:port]" << endl
629 << " [-a all|quorum] [-b] [-p priority] [-v]" << endl;
631 cerr << "\t -h home (required; h stands for home directory)" << endl
632 << "\t -l host:port (required unless -L is specified;"
633 << " l stands for local)" << endl
634 << "\t -L host:port (optional, L means group creator)" << endl
635 << "\t -C or -M (optional; start up as client or master)" << endl
636 << "\t -r host:port (optional; r stands for remote; any "
637 << "number of these" << endl
638 << "\t may be specified)" << endl
639 << "\t -R host:port (optional; R stands for remote peer; only "
641 << "\t these may be specified)" << endl
642 << "\t -a all|quorum (optional; a stands for ack policy)" << endl
643 << "\t -b (optional; b stands for bulk)" << endl
644 << "\t -p priority (optional; defaults to 100)" << endl
645 << "\t -v (optional; v stands for verbose)" << endl;
650 int main(int argc, char **argv) {
651 RepConfigInfo config;
652 char ch, *portstr, *tmphost;
656 // Extract the command line parameters
657 while ((ch = getopt(argc, argv, "a:bCh:L:l:Mp:R:r:v")) != EOF) {
661 if (strncmp(optarg, "all", 3) == 0)
662 config.ack_policy = DB_REPMGR_ACKS_ALL;
663 else if (strncmp(optarg, "quorum", 6) != 0)
670 config.start_policy = DB_REP_CLIENT;
673 config.home = optarg;
676 config.this_host.creator = true; // FALLTHROUGH
678 config.this_host.host = strtok(optarg, ":");
679 if ((portstr = strtok(NULL, ":")) == NULL) {
680 cerr << "Bad host specification." << endl;
683 config.this_host.port = (unsigned short)atoi(portstr);
684 config.got_listen_address = true;
687 config.start_policy = DB_REP_MASTER;
690 config.priority = atoi(optarg);
693 tmppeer = true; // FALLTHROUGH
695 tmphost = strtok(optarg, ":");
696 if ((portstr = strtok(NULL, ":")) == NULL) {
697 cerr << "Bad host specification." << endl;
700 tmpport = (unsigned short)atoi(portstr);
702 config.addOtherHost(tmphost, tmpport, tmppeer);
705 config.verbose = true;
713 // Error check command line.
714 if ((!config.got_listen_address) || config.home == NULL)
717 RepQuoteExample runner;
719 runner.init(&config);
721 } catch (DbException dbe) {
722 cerr << "Caught an exception during initialization or"
723 << " processing: " << dbe.what() << endl;
729 // This is a very simple thread that performs checkpoints at a fixed
730 // time interval. For a master site, the time interval is one minute
731 // plus the duration of the checkpoint_delay timeout (30 seconds by
732 // default.) For a client site, the time interval is one minute.
734 void *checkpoint_thread(void *args)
741 app = (APP_DATA *)env->get_app_private();
744 // Wait for one minute, polling once per second to see if
745 // application has finished. When application has finished,
746 // terminate this thread.
748 for (i = 0; i < 60; i++) {
750 if (app->app_finished == 1)
751 return ((void *)EXIT_SUCCESS);
754 // Perform a checkpoint.
755 if ((ret = env->txn_checkpoint(0, 0, 0)) != 0) {
756 env->err(ret, "Could not perform checkpoint.\n");
757 return ((void *)EXIT_FAILURE);
762 // This is a simple log archive thread. Once per minute, it removes all but
763 // the most recent 3 logs that are safe to remove according to a call to
764 // DBENV->log_archive().
766 // Log cleanup is needed to conserve disk space, but aggressive log cleanup
767 // can cause more frequent client initializations if a client lags too far
768 // behind the current master. This can happen in the event of a slow client,
769 // a network partition, or a new master that has not kept as many logs as the
772 // The approach in this routine balances the need to mitigate against a
773 // lagging client by keeping a few more of the most recent unneeded logs
774 // with the need to conserve disk space by regularly cleaning up log files.
775 // Use of automatic log removal (DBENV->log_set_config() DB_LOG_AUTO_REMOVE
776 // flag) is not recommended for replication due to the risk of frequent
777 // client initializations.
779 void *log_archive_thread(void *args)
783 char **begin, **list;
784 int i, listlen, logs_to_keep, minlog, ret;
787 app = (APP_DATA *)env->get_app_private();
791 // Wait for one minute, polling once per second to see if
792 // application has finished. When application has finished,
793 // terminate this thread.
795 for (i = 0; i < 60; i++) {
797 if (app->app_finished == 1)
798 return ((void *)EXIT_SUCCESS);
801 // Get the list of unneeded log files.
802 if ((ret = env->log_archive(&list, DB_ARCH_ABS)) != 0) {
803 env->err(ret, "Could not get log archive list.");
804 return ((void *)EXIT_FAILURE);
808 // Get the number of logs in the list.
809 for (begin = list; *begin != NULL; begin++, listlen++);
810 // Remove all but the logs_to_keep most recent
811 // unneeded log files.
813 minlog = listlen - logs_to_keep;
814 for (begin = list, i= 0; i < minlog; list++, i++) {
815 if ((ret = unlink(*list)) != 0) {
817 "logclean: remove %s", *list);
819 "logclean: Error remove %s", *list);
821 return ((void *)EXIT_FAILURE);