2 * See the file LICENSE for redistribution information.
4 * Copyright (c) 2006, 2012 Oracle and/or its affiliates. All rights reserved.
13 typedef int (*HEARTBEAT_ACTION) __P((ENV *));
15 static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
16 static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
17 static void check_min_log_file __P((ENV *));
18 static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
19 static int prepare_input __P((ENV *, REPMGR_CONNECTION *));
20 static int process_own_msg __P((ENV *, REPMGR_CONNECTION *));
21 static int process_parameters __P((ENV *,
22 REPMGR_CONNECTION *, char *, u_int, u_int32_t, int, u_int32_t));
23 static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
24 static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
25 static int __repmgr_call_election __P((ENV *));
26 static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
27 static void *__repmgr_connector_thread __P((void *));
28 static int __repmgr_next_timeout __P((ENV *,
29 db_timespec *, HEARTBEAT_ACTION *));
30 static int __repmgr_retry_connections __P((ENV *));
31 static int __repmgr_send_heartbeat __P((ENV *));
32 static int __repmgr_try_one __P((ENV *, int));
33 static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
34 static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
36 #define ONLY_HANDSHAKE(env, conn) do { \
37 if (conn->msg_type != REPMGR_HANDSHAKE) { \
38 __db_errx(env, DB_STR_A("3613", \
39 "unexpected msg type %d in state %d", "%d %d"), \
40 (int)conn->msg_type, conn->state); \
41 return (DB_REP_UNAVAIL); \
46 * PUBLIC: void *__repmgr_select_thread __P((void *));
49 __repmgr_select_thread(argsp)
52 REPMGR_RUNNABLE *args;
59 if ((ret = __repmgr_select_loop(env)) != 0) {
60 __db_err(env, ret, DB_STR("3614", "select loop failed"));
61 (void)__repmgr_thread_failure(env, ret);
67 * PUBLIC: int __repmgr_bow_out __P((ENV *));
76 db_rep = env->rep_handle;
77 LOCK_MUTEX(db_rep->mutex);
78 ret = __repmgr_stop_threads(env);
79 UNLOCK_MUTEX(db_rep->mutex);
80 DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
85 * PUBLIC: int __repmgr_accept __P((ENV *));
92 REPMGR_CONNECTION *conn;
98 db_rep = env->rep_handle;
99 addrlen = sizeof(siaddr);
100 if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
103 * Some errors are innocuous and so should be ignored. MSDN
104 * Library documents the Windows ones; the Unix ones are
105 * advocated in Stevens' UNPv1, section 16.6; and Linux
106 * Application Development, p. 416.
108 switch (ret = net_errno) {
129 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
130 "accept error %d considered innocuous", ret));
133 __db_err(env, ret, DB_STR("3615", "accept error"));
137 RPRINT(env, (env, DB_VERB_REPMGR_MISC, "accepted a new connection"));
140 __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
141 (void)closesocket(s);
144 if ((ret = __repmgr_set_keepalive(env, conn)) != 0) {
145 (void)__repmgr_destroy_conn(env, conn);
148 if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
149 __db_err(env, ret, DB_STR("3616",
150 "can't set nonblock after accept"));
151 (void)__repmgr_destroy_conn(env, conn);
156 * We don't yet know which site this connection is coming from. So for
157 * now, put it on the "orphans" list; we'll move it to the appropriate
158 * site struct later when we discover who we're talking with, and what
159 * type of connection it is.
162 TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
169 * Computes how long we should wait for input, in other words how long until we
170 * have to wake up and do something. Returns TRUE if timeout is set; FALSE if
171 * there is nothing to wait for.
173 * Note that the resulting timeout could be zero; but it can't be negative.
175 * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
178 __repmgr_compute_timeout(env, timeout)
180 db_timespec *timeout;
187 db_rep = env->rep_handle;
190 * There are two factors to consider: are heartbeats in use? and, do we
191 * have any sites with broken connections that we ought to retry?
193 have_timeout = __repmgr_next_timeout(env, &t, NULL);
195 /* List items are in order, so we only have to examine the first one. */
196 if (!TAILQ_EMPTY(&db_rep->retries)) {
197 retry = TAILQ_FIRST(&db_rep->retries);
199 /* Choose earliest timeout deadline. */
200 t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
208 __os_gettime(env, &now, 1);
209 if (timespeccmp(&now, &t, >=))
210 timespecclear(timeout);
213 timespecsub(timeout, &now);
217 return (have_timeout);
221 * Figures out the next heartbeat-related thing to be done, and when it should
222 * be done. The code is factored this way because this computation needs to be
223 * done both before each select() call, and after (when we're checking for timer
227 __repmgr_next_timeout(env, deadline, action)
229 db_timespec *deadline;
230 HEARTBEAT_ACTION *action;
234 HEARTBEAT_ACTION my_action;
235 REPMGR_CONNECTION *conn;
240 db_rep = env->rep_handle;
241 rep = db_rep->region;
243 if (rep->master_id == db_rep->self_eid &&
244 rep->heartbeat_frequency > 0) {
245 t = db_rep->last_bcast;
246 TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
247 my_action = __repmgr_send_heartbeat;
248 } else if ((master = __repmgr_connected_master(env)) != NULL &&
249 !IS_SUBORDINATE(db_rep) &&
250 rep->heartbeat_monitor_timeout > 0) {
252 if ((conn = master->ref.conn.in) != NULL &&
253 IS_READY_STATE(conn->state))
254 version = conn->version;
255 if ((conn = master->ref.conn.out) != NULL &&
256 IS_READY_STATE(conn->state) &&
257 conn->version > version)
258 version = conn->version;
259 if (version >= HEARTBEAT_MIN_VERSION) {
261 * If we have a working connection to a heartbeat-aware
262 * master, let's monitor it. Otherwise there's really
265 t = master->last_rcvd_timestamp;
266 TIMESPEC_ADD_DB_TIMEOUT(&t,
267 rep->heartbeat_monitor_timeout);
268 my_action = __repmgr_call_election;
281 * Sends a heartbeat message.
283 * repmgr also uses the heartbeat facility to manage rerequests. We
284 * send the master's current generation and max_perm_lsn with the heartbeat
285 * message to help a client determine whether it has all master transactions.
286 * When a client receives a heartbeat message, it also checks whether it
287 * needs to rerequest anything. Note that heartbeats must be enabled for
288 * this rerequest processing to occur.
291 __repmgr_send_heartbeat(env)
297 __repmgr_permlsn_args permlsn;
298 u_int8_t buf[__REPMGR_PERMLSN_SIZE];
299 u_int unused1, unused2;
302 db_rep = env->rep_handle;
303 rep = db_rep->region;
305 permlsn.generation = rep->gen;
306 if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
308 __repmgr_permlsn_marshal(env, &permlsn, buf);
310 control.size = __REPMGR_PERMLSN_SIZE;
312 DB_INIT_DBT(rec, NULL, 0);
313 return (__repmgr_send_broadcast(env,
314 REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
318 * PUBLIC: REPMGR_SITE *__repmgr_connected_master __P((ENV *));
321 __repmgr_connected_master(env)
328 db_rep = env->rep_handle;
329 master_id = db_rep->region->master_id;
331 if (!IS_KNOWN_REMOTE_SITE(master_id))
333 master = SITE_FROM_EID(master_id);
334 if (master->state == SITE_CONNECTED)
340 __repmgr_call_election(env)
343 REPMGR_CONNECTION *conn;
347 master = __repmgr_connected_master(env);
350 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
351 "heartbeat monitor timeout expired"));
352 STAT(env->rep_handle->region->mstat.st_connection_drop++);
353 if ((conn = master->ref.conn.in) != NULL &&
354 (ret = __repmgr_bust_connection(env, conn)) != 0)
356 if ((conn = master->ref.conn.out) != NULL &&
357 (ret = __repmgr_bust_connection(env, conn)) != 0)
363 * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
366 * Assumes caller holds the mutex.
369 __repmgr_check_timeouts(env)
372 db_timespec when, now;
373 HEARTBEAT_ACTION action;
377 * Figure out the next heartbeat-related thing to be done. Then, if
378 * it's time to do it, do so.
380 if (__repmgr_next_timeout(env, &when, &action)) {
381 __os_gettime(env, &now, 1);
382 if (timespeccmp(&when, &now, <=) &&
383 (ret = (*action)(env)) != 0)
387 return (__repmgr_retry_connections(env));
391 * Initiates connection attempts for any sites on the idle list whose retry
392 * times have expired.
395 __repmgr_retry_connections(env)
404 db_rep = env->rep_handle;
405 __os_gettime(env, &now, 1);
407 while (!TAILQ_EMPTY(&db_rep->retries)) {
408 retry = TAILQ_FIRST(&db_rep->retries);
409 if (timespeccmp(&retry->time, &now, >=))
410 break; /* since items are in time order */
412 TAILQ_REMOVE(&db_rep->retries, retry, entries);
415 __os_free(env, retry);
416 DB_ASSERT(env, IS_VALID_EID(eid));
417 site = SITE_FROM_EID(eid);
418 DB_ASSERT(env, site->state == SITE_PAUSING);
420 if (site->membership == SITE_PRESENT) {
421 if ((ret = __repmgr_try_one(env, eid)) != 0)
424 site->state = SITE_IDLE;
430 * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
433 * Assumes caller holds the mutex.
436 __repmgr_first_try_connections(env)
443 db_rep = env->rep_handle;
444 FOR_EACH_REMOTE_SITE_INDEX(eid) {
445 site = SITE_FROM_EID(eid);
447 * Normally all sites would be IDLE here. But if a user thread
448 * triggered an auto-start in a subordinate process, our send()
449 * function may have found new sites when it sync'ed site
450 * addresses, and that action causes connection attempts to be
451 * scheduled (resulting in PAUSING state here, or conceivably
452 * even CONNECTING or CONNECTED).
454 if (site->state == SITE_IDLE &&
455 site->membership == SITE_PRESENT &&
456 (ret = __repmgr_try_one(env, eid)) != 0)
463 * Starts a thread to open a connection to the site at the given EID.
466 __repmgr_try_one(env, eid)
475 db_rep = env->rep_handle;
476 DB_ASSERT(env, IS_VALID_EID(eid));
477 site = SITE_FROM_EID(eid);
478 th = site->connector;
480 if ((ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &th)) != 0)
482 site->connector = th;
483 } else if (th->finished) {
484 if ((ret = __repmgr_thread_join(th)) != 0)
487 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
488 "eid %lu previous connector thread still running; will retry",
490 return (__repmgr_schedule_connection_attempt(env,
494 site->state = SITE_CONNECTING;
496 th->run = __repmgr_connector_thread;
498 if ((ret = __repmgr_thread_start(env, th)) != 0) {
500 site->connector = NULL;
506 __repmgr_connector_thread(argsp)
516 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
517 "starting connector thread, eid %u", th->args.eid));
518 if ((ret = __repmgr_connector_main(env, th)) != 0) {
519 __db_err(env, ret, DB_STR("3617", "connector thread failed"));
520 (void)__repmgr_thread_failure(env, ret);
522 RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
529 __repmgr_connector_main(env, th)
535 REPMGR_CONNECTION *conn;
536 DB_REPMGR_CONN_ERR info;
537 repmgr_netaddr_t netaddr;
538 SITE_STRING_BUFFER site_string;
541 db_rep = env->rep_handle;
544 LOCK_MUTEX(db_rep->mutex);
545 DB_ASSERT(env, IS_VALID_EID(th->args.eid));
546 site = SITE_FROM_EID(th->args.eid);
547 if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
551 * Drop the mutex during operations that could block. During those
552 * times, the site struct could move (if we had to grow the sites
553 * array), but host wouldn't.
555 * Also, during those times we might receive an incoming connection from
556 * the site, which would change its state. So, check state each time we
557 * reacquire the mutex, and quit if the state of the world changed while
560 netaddr = site->net_addr;
561 RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connecting to %s",
562 __repmgr_format_site_loc(site, site_string)));
563 UNLOCK_MUTEX(db_rep->mutex);
565 if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
566 DB_EVENT(env, DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
567 LOCK_MUTEX(db_rep->mutex);
568 if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
569 __db_err(env, ret, DB_STR("3618",
570 "set_nonblock in connnect thread"));
573 conn->type = REP_CONNECTION;
574 site = SITE_FROM_EID(th->args.eid);
575 if (site->state != SITE_CONNECTING ||
576 db_rep->repmgr_status == stopped)
579 conn->eid = th->args.eid;
580 site = SITE_FROM_EID(th->args.eid);
581 site->ref.conn.out = conn;
582 site->state = SITE_CONNECTED;
583 __os_gettime(env, &site->last_rcvd_timestamp, 1);
584 ret = __repmgr_wake_main_thread(env);
585 } else if (ret == DB_REP_UNAVAIL) {
586 /* Retryable error while trying to connect: retry later. */
587 info.eid = th->args.eid;
589 DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
590 STAT(db_rep->region->mstat.st_connect_fail++);
592 LOCK_MUTEX(db_rep->mutex);
593 site = SITE_FROM_EID(th->args.eid);
594 if (site->state != SITE_CONNECTING ||
595 db_rep->repmgr_status == stopped) {
599 ret = __repmgr_schedule_connection_attempt(env,
600 th->args.eid, FALSE);
606 if ((t_ret = __repmgr_destroy_conn(env, conn)) != 0 &&
612 UNLOCK_MUTEX(db_rep->mutex);
618 * PUBLIC: int __repmgr_send_v1_handshake __P((ENV *,
619 * PUBLIC: REPMGR_CONNECTION *, void *, size_t));
622 __repmgr_send_v1_handshake(env, conn, buf, len)
624 REPMGR_CONNECTION *conn;
630 repmgr_netaddr_t *my_addr;
631 DB_REPMGR_V1_HANDSHAKE buffer;
634 db_rep = env->rep_handle;
635 rep = db_rep->region;
636 my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
639 * We're about to send from a structure that has padding holes in it.
640 * Initializing it keeps Valgrind happy, plus we really shouldn't be
641 * sending out random garbage anyway (pro forma privacy issue).
643 memset(&buffer, 0, sizeof(buffer));
645 buffer.priority = htonl(rep->priority);
646 buffer.port = my_addr->port;
647 cntrl.data = &buffer;
648 cntrl.size = sizeof(buffer);
651 rec.size = (u_int32_t)len;
654 * It would of course be disastrous to block the select() thread, so
655 * pass the "maxblock" argument as 0. Fortunately blocking should
656 * never be necessary here, because the hand-shake is always the first
657 * thing we send. Which is a good thing, because it would be almost as
658 * disastrous if we allowed ourselves to drop a handshake.
660 return (__repmgr_send_one(env,
661 conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0));
665 * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
668 * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
671 __repmgr_read_from_site(env, conn)
673 REPMGR_CONNECTION *conn;
679 db_rep = env->rep_handle;
682 * Loop, just in case we get EINTR and need to restart the I/O. (All
683 * other branches return.)
686 switch ((ret = __repmgr_read_conn(conn))) {
692 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
693 case DB_REPMGR_EAGAIN:
699 /* Error 0 is understood to mean EOF. */
700 __repmgr_fire_conn_err_event(env, conn, 0);
701 STAT(env->rep_handle->
702 region->mstat.st_connection_drop++);
703 return (DB_REP_UNAVAIL);
706 if (IS_VALID_EID(conn->eid)) {
707 site = SITE_FROM_EID(conn->eid);
709 &site->last_rcvd_timestamp, 1);
711 return (conn->reading_phase == SIZES_PHASE ?
712 prepare_input(env, conn) :
713 dispatch_msgin(env, conn));
717 DB_ASSERT(env, ret != EBADF);
719 __repmgr_fire_conn_err_event(env, conn, ret);
720 STAT(db_rep->region->mstat.st_connection_drop++);
721 return (DB_REP_UNAVAIL);
727 * Reads in the current input phase, as defined by the connection's IOVECS
730 * Returns DB_REP_UNAVAIL for EOF.
732 * Makes no assumption about synchronization: it's up to the caller to hold
733 * mutex if necessary.
735 * PUBLIC: int __repmgr_read_conn __P((REPMGR_CONNECTION *));
738 __repmgr_read_conn(conn)
739 REPMGR_CONNECTION *conn;
745 * Keep reading pieces as long as we're making some progress, or until
746 * we complete the current read phase as defined in iovecs.
749 if ((ret = __repmgr_readv(conn->fd,
750 &conn->iovecs.vectors[conn->iovecs.offset],
751 conn->iovecs.count - conn->iovecs.offset, &nr)) != 0)
755 return (DB_REP_UNAVAIL);
757 if (__repmgr_update_consumed(&conn->iovecs, nr)) {
758 /* We've fully read as much as we wanted. */
765 * Having finished reading the 9-byte message header, figure out what kind of
766 * message we're about to receive, and prepare input buffers accordingly. The
767 * header includes enough information for us to figure out how much buffer space
768 * we need to allocate (though in some cases we need to do a bit of computation
769 * to arrive at the answer).
771 * Caller must hold mutex.
774 prepare_input(env, conn)
776 REPMGR_CONNECTION *conn;
778 #define MEM_ALIGN sizeof(double)
780 __repmgr_msg_hdr_args msg_hdr;
781 REPMGR_RESPONSE *resp;
782 u_int32_t control_size, rec_size, size;
783 size_t memsize, control_offset, rec_offset;
787 DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
790 * We can only get here after having read the full 9 bytes that we
791 * expect, so this can't fail.
793 ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
794 conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
795 DB_ASSERT(env, ret == 0);
797 __repmgr_iovec_init(&conn->iovecs);
800 switch ((conn->msg_type = msg_hdr.type)) {
801 case REPMGR_HEARTBEAT:
803 * The underlying byte-receiving mechanism will already have
804 * noted the fact that we got some traffic on this connection,
805 * which is all that is needed to monitor the heartbeat. But
806 * we also put the heartbeat message on the message queue so
807 * that it will perform rerequest processing.
809 case REPMGR_REP_MESSAGE:
810 env->rep_handle->seen_repmsg = TRUE;
811 control_size = REP_MSG_CONTROL_SIZE(msg_hdr);
812 rec_size = REP_MSG_REC_SIZE(msg_hdr);
813 if (control_size == 0) {
814 if (conn->msg_type == REPMGR_HEARTBEAT) {
816 * Got an old-style heartbeat without payload,
822 __db_errx(env, DB_STR("3619",
823 "illegal size for rep msg"));
824 return (DB_REP_UNAVAIL);
828 * Allocate a block of memory large enough to hold a
829 * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
830 * data areas that it points to. Start by calculating
831 * the total memory needed.
833 memsize = DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
834 control_offset = memsize;
835 memsize += control_size;
837 memsize = DB_ALIGN(memsize, MEM_ALIGN);
838 rec_offset = memsize;
841 COMPQUIET(rec_offset, 0);
842 if ((ret = __os_malloc(env, memsize, &membase)) != 0)
844 conn->input.rep_message = membase;
845 conn->input.rep_message->msg_hdr = msg_hdr;
846 conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
848 DB_INIT_DBT(conn->input.rep_message->v.repmsg.control,
849 (u_int8_t*)membase + control_offset, control_size);
850 __repmgr_add_dbt(&conn->iovecs,
851 &conn->input.rep_message->v.repmsg.control);
854 DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
856 (u_int8_t*)membase + rec_offset : NULL),
858 __repmgr_add_dbt(&conn->iovecs,
859 &conn->input.rep_message->v.repmsg.rec);
861 DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
865 case REPMGR_APP_MESSAGE:
867 * We need a buffer big enough to hold the REPMGR_MESSAGE struct
868 * and the data that we expect to receive on the wire. We must
869 * extend the struct size for the variable-length DBT array at
872 size = DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
873 APP_MSG_SEGMENT_COUNT(msg_hdr) * sizeof(DBT)),
875 memsize = size + APP_MSG_BUFFER_SIZE(msg_hdr);
876 if ((ret = __os_malloc(env, memsize, &membase)) != 0)
878 conn->input.rep_message = membase;
879 conn->input.rep_message->msg_hdr = msg_hdr;
880 conn->input.rep_message->v.appmsg.conn = conn;
882 DB_INIT_DBT(conn->input.rep_message->v.appmsg.buf,
883 (u_int8_t*)membase + size,
884 APP_MSG_BUFFER_SIZE(msg_hdr));
885 __repmgr_add_dbt(&conn->iovecs,
886 &conn->input.rep_message->v.appmsg.buf);
890 size = sizeof(REPMGR_MESSAGE) + REPMGR_OWN_BUF_SIZE(msg_hdr);
891 if ((ret = __os_malloc(env, size, &membase)) != 0)
893 conn->input.rep_message = membase;
894 conn->input.rep_message->msg_hdr = msg_hdr;
897 * Save "conn" pointer in case this turns out to be a one-shot
898 * request. If it isn't, it won't matter.
901 * An OWN msg that arrives in PARAMETERS state has bypassed the
902 * final handshake, implying that this connection is to be used
903 * for a one-shot GMDB request.
905 if (REPMGR_OWN_BUF_SIZE(msg_hdr) == 0) {
906 __db_errx(env, DB_STR_A("3680",
907 "invalid own buf size %lu in prepare_input", "%lu"),
908 (u_long)REPMGR_OWN_BUF_SIZE(msg_hdr));
909 return (DB_REP_UNAVAIL);
911 DB_INIT_DBT(conn->input.rep_message->v.gmdb_msg.request,
912 (u_int8_t*)membase + sizeof(REPMGR_MESSAGE),
913 REPMGR_OWN_BUF_SIZE(msg_hdr));
914 __repmgr_add_dbt(&conn->iovecs,
915 &conn->input.rep_message->v.gmdb_msg.request);
918 case REPMGR_APP_RESPONSE:
919 size = APP_RESP_BUFFER_SIZE(msg_hdr);
920 conn->cur_resp = APP_RESP_TAG(msg_hdr);
921 if (conn->cur_resp >= conn->aresp) {
922 __db_errx(env, DB_STR_A("3681",
923 "invalid cur resp %lu in prepare_input", "%lu"),
924 (u_long)conn->cur_resp);
925 return (DB_REP_UNAVAIL);
927 resp = &conn->responses[conn->cur_resp];
928 DB_ASSERT(env, F_ISSET(resp, RESP_IN_USE));
933 * Prepare to read message body into either the user-supplied
934 * buffer, or one we allocate here.
937 if (!F_ISSET(resp, RESP_THREAD_WAITING)) {
938 /* Caller already timed out; allocate dummy buffer. */
940 memset(dbt, 0, sizeof(*dbt));
941 ret = __os_malloc(env, size, &dbt->data);
942 F_SET(resp, RESP_DUMMY_BUF);
944 F_CLR(resp, RESP_IN_USE);
945 } else if (F_ISSET(dbt, DB_DBT_MALLOC))
946 ret = __os_umalloc(env, size, &dbt->data);
947 else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
948 if (dbt->data == NULL || dbt->size < size)
949 ret = __os_urealloc(env, size, &dbt->data);
950 } else if (F_ISSET(dbt, DB_DBT_USERMEM)) {
951 /* Recipient should have checked size limit. */
952 DB_ASSERT(env, size <= dbt->ulen);
959 __repmgr_add_dbt(&conn->iovecs, dbt);
960 F_SET(resp, RESP_READING);
963 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
964 F_SET(resp, RESP_COMPLETE);
965 if ((ret = __repmgr_wake_waiters(env,
966 &conn->response_waiters)) != 0)
972 case REPMGR_RESP_ERROR:
973 DB_ASSERT(env, RESP_ERROR_TAG(msg_hdr) < conn->aresp &&
974 conn->responses != NULL);
975 resp = &conn->responses[RESP_ERROR_TAG(msg_hdr)];
976 DB_ASSERT(env, !F_ISSET(resp, RESP_READING));
977 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
978 F_SET(resp, RESP_COMPLETE);
981 * DB errors are always negative, but we only send
982 * unsigned values on the wire.
984 resp->ret = -((int)RESP_ERROR_CODE(msg_hdr));
985 if ((ret = __repmgr_wake_waiters(env,
986 &conn->response_waiters)) != 0)
989 F_CLR(resp, RESP_IN_USE);
993 case REPMGR_HANDSHAKE:
995 if ((ret = __repmgr_prepare_simple_input(env,
996 conn, &msg_hdr)) != 0)
1001 __db_errx(env, DB_STR_A("3676",
1002 "unexpected msg type %lu in prepare_input", "%lu"),
1003 (u_long)conn->msg_type);
1004 return (DB_REP_UNAVAIL);
1009 * We can skip the DATA_PHASE, because the current message type
1010 * only has a header, no following data.
1012 __repmgr_reset_for_reading(conn);
1014 conn->reading_phase = DATA_PHASE;
1020 * PUBLIC: int __repmgr_prepare_simple_input __P((ENV *,
1021 * PUBLIC: REPMGR_CONNECTION *, __repmgr_msg_hdr_args *));
1024 __repmgr_prepare_simple_input(env, conn, msg_hdr)
1026 REPMGR_CONNECTION *conn;
1027 __repmgr_msg_hdr_args *msg_hdr;
1030 u_int32_t control_size, rec_size;
1033 control_size = REP_MSG_CONTROL_SIZE(*msg_hdr);
1034 rec_size = REP_MSG_REC_SIZE(*msg_hdr);
1036 dbt = &conn->input.repmgr_msg.cntrl;
1037 if ((dbt->size = control_size) > 0) {
1038 if ((ret = __os_malloc(env,
1039 dbt->size, &dbt->data)) != 0)
1041 __repmgr_add_dbt(&conn->iovecs, dbt);
1044 dbt = &conn->input.repmgr_msg.rec;
1045 if ((dbt->size = rec_size) > 0) {
1046 if ((ret = __os_malloc(env,
1047 dbt->size, &dbt->data)) != 0) {
1048 dbt = &conn->input.repmgr_msg.cntrl;
1050 __os_free(env, dbt->data);
1053 __repmgr_add_dbt(&conn->iovecs, dbt);
1059 * Processes an incoming message, depending on our current state.
1061 * Caller must hold mutex.
1064 dispatch_msgin(env, conn)
1066 REPMGR_CONNECTION *conn;
1070 REPMGR_RUNNABLE *th;
1071 REPMGR_RESPONSE *resp;
1076 DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
1077 db_rep = env->rep_handle;
1079 switch (conn->state) {
1080 case CONN_CONNECTED:
1082 * In this state, we know we're working with an outgoing
1083 * connection. We've sent a version proposal, and now expect
1084 * the response (which could be a dumb old V1 handshake).
1086 ONLY_HANDSHAKE(env, conn);
1089 * Here is a good opportunity to clean up this site's connector
1090 * thread, because we generally come through here after making
1091 * an outgoing connection, yet we're out of the main loop, so we
1092 * don't hit this often.
1095 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1096 site = SITE_FROM_EID(eid);
1097 th = site->connector;
1098 if (th != NULL && th->finished) {
1099 if ((ret = __repmgr_thread_join(th)) != 0)
1102 site->connector = NULL;
1105 if ((ret = read_version_response(env, conn)) != 0)
1109 case CONN_NEGOTIATE:
1111 * Since we're in this state, we know we're working with an
1112 * incoming connection, and this is the first message we've
1113 * received. So it must be a version negotiation proposal (or a
1114 * legacy V1 handshake). (We'll verify this of course.)
1116 ONLY_HANDSHAKE(env, conn);
1117 if ((ret = send_version_response(env, conn)) != 0)
1121 case CONN_PARAMETERS:
1123 * We've previously agreed on a (>1) version, so we expect
1124 * either the other side's parameters handshake, or possibly a
1125 * GMDB request on a one-shot, dedicated connection.
1127 switch (conn->msg_type) {
1128 case REPMGR_HANDSHAKE:
1129 dbt = &conn->input.repmgr_msg.rec;
1130 hostname = dbt->data;
1131 hostname[dbt->size-1] = '\0';
1132 if ((ret = accept_handshake(env, conn, hostname)) != 0)
1134 conn->state = CONN_READY;
1136 case REPMGR_OWN_MSG:
1138 * GM change requests arrive in their own dedicated
1139 * connections, and when they're served the entire
1140 * connection isn't needed any more. So the message
1141 * processing thread will do the entire job of serving
1142 * the request and finishing off the connection; so we
1143 * don't have to read it any more. Note that normally
1144 * whenever we remove a connection from our list we
1145 * decrement the reference count; but we also increment
1146 * it whenever we pass a reference over to the message
1147 * processing threads' queue. So in this case it's a
1150 conn->input.rep_message->v.gmdb_msg.conn = conn;
1151 TAILQ_REMOVE(&db_rep->connections, conn, entries);
1152 if ((ret = __repmgr_queue_put(env,
1153 conn->input.rep_message)) != 0)
1158 __db_errx(env, DB_STR_A("3620",
1159 "unexpected msg type %d in PARAMETERS state", "%d"),
1160 (int)conn->msg_type);
1161 return (DB_REP_UNAVAIL);
1167 case CONN_CONGESTED:
1169 * We have a complete message, so process it. Acks and
1170 * handshakes get processed here, in line. Regular rep messages
1171 * get posted to a queue, to be handled by a thread from the
1172 * message thread pool.
1174 switch (conn->msg_type) {
1175 case REPMGR_PERMLSN:
1176 if ((ret = record_permlsn(env, conn)) != 0)
1180 case REPMGR_HEARTBEAT:
1181 case REPMGR_APP_MESSAGE:
1182 case REPMGR_REP_MESSAGE:
1183 if ((ret = __repmgr_queue_put(env,
1184 conn->input.rep_message)) != 0)
1187 * The queue has taken over responsibility for the
1188 * rep_message buffer, and will free it later.
1190 if (conn->msg_type == REPMGR_APP_MESSAGE)
1194 case REPMGR_OWN_MSG:
1196 * Since we're in one of the "ready" states we know this
1197 * isn't a one-shot request, so we are not giving
1198 * ownership of this connection over to the message
1199 * thread queue; we're going to keep reading on it
1200 * ourselves. The message thread that processes this
1201 * request has no need for a connection anyway, since
1202 * there is no response that needs to be returned.
1204 conn->input.rep_message->v.gmdb_msg.conn = NULL;
1205 if ((ret = process_own_msg(env, conn)) != 0)
1209 case REPMGR_APP_RESPONSE:
1210 DB_ASSERT(env, conn->cur_resp < conn->aresp &&
1211 conn->responses != NULL);
1212 resp = &conn->responses[conn->cur_resp];
1213 DB_ASSERT(env, F_ISSET(resp, RESP_READING));
1214 F_CLR(resp, RESP_READING);
1215 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
1216 F_SET(resp, RESP_COMPLETE);
1217 if ((ret = __repmgr_wake_waiters(env,
1218 &conn->response_waiters)) != 0)
1222 * If the calling thread is no longer with us,
1223 * yet we're reading, it can only mean we're
1224 * reading into a dummy buffer, so free it now.
1226 DB_ASSERT(env, F_ISSET(resp, RESP_DUMMY_BUF));
1227 __os_free(env, resp->dbt.data);
1228 F_CLR(resp, RESP_IN_USE);
1232 case REPMGR_RESP_ERROR:
1234 __db_errx(env, DB_STR_A("3621",
1235 "unexpected msg type rcvd in ready state: %d",
1236 "%d"), (int)conn->msg_type);
1237 return (DB_REP_UNAVAIL);
1245 DB_ASSERT(env, FALSE);
1248 switch (conn->msg_type) {
1249 case REPMGR_HANDSHAKE:
1250 case REPMGR_PERMLSN:
1251 dbt = &conn->input.repmgr_msg.cntrl;
1253 __os_free(env, dbt->data);
1254 dbt = &conn->input.repmgr_msg.rec;
1256 __os_free(env, dbt->data);
1260 * Some messages in REPMGR_OWN_MSG format are also handled
1264 __repmgr_reset_for_reading(conn);
1269 * Process one of repmgr's "own" message types, and one that occurs on a regular
1270 * (not one-shot) connection.
1273 process_own_msg(env, conn)
1275 REPMGR_CONNECTION *conn;
1280 REPMGR_MESSAGE *msg;
1281 __repmgr_connect_reject_args reject;
1282 __repmgr_parm_refresh_args parms;
1287 * Set "msg" to point to the message struct. If we do all necessary
1288 * processing here now, leave it set so that it can be freed. On the
1289 * other hand, if we pass it off to the message queue for later
1290 * processing by a message thread, we want to avoid freeing the memory
1291 * here, so clear the pointer in such a case.
1293 switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
1294 case REPMGR_CONNECT_REJECT:
1295 dbt = &msg->v.gmdb_msg.request;
1296 if ((ret = __repmgr_connect_reject_unmarshal(env,
1297 &reject, dbt->data, dbt->size, NULL)) != 0)
1298 return (DB_REP_UNAVAIL);
1301 * If we're being rejected by someone who has more up-to-date
1302 * membership information than we do, it means we have been
1303 * removed from the group. If we've just gotten started, we can
1304 * make one attempt at automatically rejoining; otherwise we bow
1307 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1308 "got rejection msg citing version %lu/%lu",
1309 (u_long)reject.gen, (u_long)reject.version));
1311 if (__repmgr_gmdb_version_cmp(env,
1312 reject.gen, reject.version) > 0) {
1313 if (env->rep_handle->seen_repmsg)
1315 else if ((ret = __repmgr_defer_op(env,
1316 REPMGR_REJOIN)) == 0)
1317 ret = DB_REP_UNAVAIL;
1319 ret = DB_REP_UNAVAIL;
1320 DB_ASSERT(env, ret != 0);
1323 case REPMGR_SHARING:
1324 if ((ret = __repmgr_queue_put(env, msg)) != 0)
1326 /* Show that we no longer own this memory. */
1330 case REPMGR_PARM_REFRESH:
1331 dbt = &conn->input.rep_message->v.gmdb_msg.request;
1332 if ((ret = __repmgr_parm_refresh_unmarshal(env,
1333 &parms, dbt->data, dbt->size, NULL)) != 0)
1334 return (DB_REP_UNAVAIL);
1335 db_rep = env->rep_handle;
1336 DB_ASSERT(env, conn->type == REP_CONNECTION &&
1337 IS_KNOWN_REMOTE_SITE(conn->eid));
1338 site = SITE_FROM_EID(conn->eid);
1339 site->ack_policy = (int)parms.ack_policy;
1340 if (F_ISSET(&parms, ELECTABLE_SITE))
1341 F_SET(site, SITE_ELECTABLE);
1343 F_CLR(site, SITE_ELECTABLE);
1344 F_SET(site, SITE_HAS_PRIO);
1347 case REPMGR_GM_FAILURE:
1348 case REPMGR_GM_FORWARD:
1349 case REPMGR_JOIN_REQUEST:
1350 case REPMGR_JOIN_SUCCESS:
1351 case REPMGR_REMOVE_REQUEST:
1352 case REPMGR_RESOLVE_LIMBO:
1354 __db_errx(env, DB_STR_A("3677",
1355 "unexpected msg type %lu in process_own_msg", "%lu"),
1356 (u_long)REPMGR_OWN_MSG_TYPE(msg->msg_hdr));
1357 return (DB_REP_UNAVAIL);
1360 * If we haven't given ownership of the msg buffer to another thread,
1364 __os_free(env, msg);
1369 * Examine and verify the incoming version proposal message, and send an
1370 * appropriate response.
1373 send_version_response(env, conn)
1375 REPMGR_CONNECTION *conn;
1378 __repmgr_version_proposal_args versions;
1379 __repmgr_version_confirmation_args conf;
1380 repmgr_netaddr_t *my_addr;
1382 u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
1386 db_rep = env->rep_handle;
1387 my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1389 if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1392 /* No version info, so we must be talking to a v1 site. */
1393 hostname = conn->input.repmgr_msg.rec.data;
1394 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1396 if ((ret = __repmgr_send_v1_handshake(env,
1397 conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
1399 conn->state = CONN_READY;
1401 if ((ret = __repmgr_version_proposal_unmarshal(env,
1402 &versions, vi.data, vi.size, NULL)) != 0)
1403 return (DB_REP_UNAVAIL);
1405 if (DB_REPMGR_VERSION >= versions.min &&
1406 DB_REPMGR_VERSION <= versions.max)
1407 conf.version = DB_REPMGR_VERSION;
1408 else if (versions.max >= DB_REPMGR_MIN_VERSION &&
1409 versions.max <= DB_REPMGR_VERSION)
1410 conf.version = versions.max;
1413 * User must have wired up a combination of versions
1414 * exceeding what we said we'd support.
1416 __db_errx(env, DB_STR_A("3622",
1417 "No available version between %lu and %lu",
1418 "%lu %lu"), (u_long)versions.min,
1419 (u_long)versions.max);
1420 return (DB_REP_UNAVAIL);
1422 conn->version = conf.version;
1424 __repmgr_version_confirmation_marshal(env, &conf, buf);
1425 buf[__REPMGR_VERSION_CONFIRMATION_SIZE] = '\0';
1426 DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1427 if ((ret = __repmgr_send_handshake(env,
1428 conn, buf, sizeof(buf), 0)) != 0)
1431 conn->state = CONN_PARAMETERS;
1437 * Sends a version-aware handshake to the remote site, only after we've verified
1438 * that it is indeed version-aware. We can send either v2 or v3 handshake,
1439 * depending on the connection's version.
1441 * PUBLIC: int __repmgr_send_handshake __P((ENV *,
1442 * PUBLIC: REPMGR_CONNECTION *, void *, size_t, u_int32_t));
1445 __repmgr_send_handshake(env, conn, opt, optlen, flags)
1447 REPMGR_CONNECTION *conn;
1455 __repmgr_handshake_args hs;
1456 __repmgr_v2handshake_args v2hs;
1457 __repmgr_v3handshake_args v3hs;
1458 repmgr_netaddr_t *my_addr;
1459 size_t hostname_len, rec_len;
1462 u_int32_t cntrl_len;
1465 db_rep = env->rep_handle;
1466 rep = db_rep->region;
1467 my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1470 * The cntrl part has various parameters (varies by version). The rec
1471 * part has the host name, followed by whatever optional extra data was
1474 * Version awareness was introduced with protocol version 2 (so version
1475 * 1 is handled elsewhere).
1477 switch (conn->version) {
1479 cntrl_len = __REPMGR_V2HANDSHAKE_SIZE;
1482 cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
1485 cntrl_len = __REPMGR_HANDSHAKE_SIZE;
1488 __db_errx(env, DB_STR_A("3678",
1489 "unexpected conn version %lu in send_handshake", "%lu"),
1490 (u_long)conn->version);
1491 return (DB_REP_UNAVAIL);
1493 hostname_len = strlen(my_addr->host);
1494 rec_len = hostname_len + 1 +
1495 (opt == NULL ? 0 : optlen);
1497 if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
1500 cntrl.data = p = buf;
1501 switch (conn->version) {
1503 /* Not allowed to use multi-process feature in v2 group. */
1504 DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1505 v2hs.port = my_addr->port;
1506 v2hs.priority = rep->priority;
1507 __repmgr_v2handshake_marshal(env, &v2hs, p);
1510 v3hs.port = my_addr->port;
1511 v3hs.priority = rep->priority;
1513 __repmgr_v3handshake_marshal(env, &v3hs, p);
1516 hs.port = my_addr->port;
1517 hs.alignment = MEM_ALIGN;
1518 hs.ack_policy = (u_int32_t)rep->perm_policy;
1520 if (rep->priority > 0)
1521 F_SET(&hs, ELECTABLE_SITE);
1522 __repmgr_handshake_marshal(env, &hs, p);
1525 DB_ASSERT(env, FALSE);
1528 cntrl.size = cntrl_len;
1530 p = rec.data = &p[cntrl_len];
1531 (void)strcpy((char*)p, my_addr->host);
1532 p += hostname_len + 1;
1534 memcpy(p, opt, optlen);
1537 rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
1539 /* Never block on select thread: pass maxblock as 0. */
1540 ret = __repmgr_send_one(env,
1541 conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0);
1542 __os_free(env, buf);
1547 read_version_response(env, conn)
1549 REPMGR_CONNECTION *conn;
1552 __repmgr_version_confirmation_args conf;
1558 db_rep = env->rep_handle;
1560 if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1562 hostname = conn->input.repmgr_msg.rec.data;
1564 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1567 if ((ret = __repmgr_version_confirmation_unmarshal(env,
1568 &conf, vi.data, vi.size, NULL)) != 0)
1569 return (DB_REP_UNAVAIL);
1570 if (conf.version >= DB_REPMGR_MIN_VERSION &&
1571 conf.version <= DB_REPMGR_VERSION)
1572 conn->version = conf.version;
1575 * Remote site "confirmed" a version outside of the
1576 * range we proposed. It should never do that.
1578 __db_errx(env, DB_STR_A("3623",
1579 "Can't support confirmed version %lu", "%lu"),
1580 (u_long)conf.version);
1581 return (DB_REP_UNAVAIL);
1584 if ((ret = accept_handshake(env, conn, hostname)) != 0)
1586 flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
1587 if ((ret = __repmgr_send_handshake(env,
1588 conn, NULL, 0, flags)) != 0)
1591 conn->state = CONN_READY;
1596 * Examine the rec part of a handshake message to see if it has any version
1597 * information in it. This is the magic that lets us allows version-aware sites
1598 * to exchange information, and yet avoids tripping up v1 sites, which don't
1599 * know how to look for it.
1601 * PUBLIC: int __repmgr_find_version_info __P((ENV *,
1602 * PUBLIC: REPMGR_CONNECTION *, DBT *));
1605 __repmgr_find_version_info(env, conn, vi)
1607 REPMGR_CONNECTION *conn;
1612 u_int32_t hostname_len;
1614 dbt = &conn->input.repmgr_msg.rec;
1615 if (dbt->size == 0) {
1616 __db_errx(env, DB_STR("3624",
1617 "handshake is missing rec part"));
1618 return (DB_REP_UNAVAIL);
1620 hostname = dbt->data;
1621 hostname[dbt->size-1] = '\0';
1622 hostname_len = (u_int32_t)strlen(hostname);
1623 if (hostname_len + 1 == dbt->size) {
1625 * The rec DBT held only the host name. This is a simple legacy
1626 * V1 handshake; it contains no version information.
1631 * There's more data than just the host name. The remainder is
1632 * available to be treated as a normal byte buffer (and read in
1633 * by one of the unmarshal functions). Note that the remaining
1634 * length should not include the padding byte that we have
1635 * already clobbered.
1637 vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
1638 vi->size = (dbt->size - (hostname_len+1)) - 1;
1644 accept_handshake(env, conn, hostname)
1646 REPMGR_CONNECTION *conn;
1649 __repmgr_handshake_args hs;
1650 __repmgr_v2handshake_args hs2;
1651 __repmgr_v3handshake_args hs3;
1653 u_int32_t ack, flags;
1656 switch (conn->version) {
1658 if (__repmgr_v2handshake_unmarshal(env, &hs2,
1659 conn->input.repmgr_msg.cntrl.data,
1660 conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1661 return (DB_REP_UNAVAIL);
1663 electable = hs2.priority > 0;
1667 if (__repmgr_v3handshake_unmarshal(env, &hs3,
1668 conn->input.repmgr_msg.cntrl.data,
1669 conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1670 return (DB_REP_UNAVAIL);
1672 electable = hs3.priority > 0;
1677 if (__repmgr_handshake_unmarshal(env, &hs,
1678 conn->input.repmgr_msg.cntrl.data,
1679 conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1680 return (DB_REP_UNAVAIL);
1682 electable = F_ISSET(&hs, ELECTABLE_SITE);
1684 ack = hs.ack_policy;
1687 __db_errx(env, DB_STR_A("3679",
1688 "unexpected conn version %lu in accept_handshake", "%lu"),
1689 (u_long)conn->version);
1690 return (DB_REP_UNAVAIL);
1693 return (process_parameters(env,
1694 conn, hostname, port, ack, electable, flags));
1698 accept_v1_handshake(env, conn, hostname)
1700 REPMGR_CONNECTION *conn;
1703 DB_REPMGR_V1_HANDSHAKE *handshake;
1707 handshake = conn->input.repmgr_msg.cntrl.data;
1708 if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
1709 handshake->version != 1) {
1710 __db_errx(env, DB_STR("3625", "malformed V1 handshake"));
1711 return (DB_REP_UNAVAIL);
1715 prio = ntohl(handshake->priority);
1716 electable = prio > 0;
1717 return (process_parameters(env,
1718 conn, hostname, handshake->port, 0, electable, 0));
1721 /* Caller must hold mutex. */
1723 process_parameters(env, conn, host, port, ack, electable, flags)
1725 REPMGR_CONNECTION *conn;
1729 u_int32_t ack, flags;
1732 REPMGR_RETRY *retry;
1734 __repmgr_connect_reject_args reject;
1735 u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
1738 db_rep = env->rep_handle;
1740 /* Connection state can be used to discern incoming versus outgoing. */
1741 if (conn->state == CONN_CONNECTED) {
1743 * Since we initiated this as an outgoing connection, we
1744 * obviously already know the host, port and site. We just need
1745 * the other site's electability flag (which we'll grab below,
1746 * after the big "else" clause).
1748 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1749 site = SITE_FROM_EID(conn->eid);
1750 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1751 "handshake from connection to %s:%lu EID %u",
1752 site->net_addr.host,
1753 (u_long)site->net_addr.port, conn->eid));
1755 DB_ASSERT(env, conn->state == CONN_NEGOTIATE ||
1756 conn->state == CONN_PARAMETERS);
1758 * Incoming connection: until now we haven't known what kind of
1759 * connection we're dealing with (and in the case of a
1760 * REP_CONNECTION, what its EID is); so it must be on the
1761 * "orphans" list. But now that we've received the parameters
1762 * we'll be able to figure all that out.
1764 if (LF_ISSET(APP_CHANNEL_CONNECTION)) {
1765 conn->type = APP_CONNECTION;
1768 conn->type = REP_CONNECTION;
1771 * Now that we've been given the host and port, use them to find
1774 if ((site = __repmgr_lookup_site(env, host, port)) != NULL &&
1775 site->membership == SITE_PRESENT) {
1776 TAILQ_REMOVE(&db_rep->connections, conn, entries);
1779 eid = EID_FROM_SITE(site);
1780 if (LF_ISSET(REPMGR_SUBORDINATE)) {
1782 * Accept it, as a supplementary source of
1783 * input, but nothing else.
1785 TAILQ_INSERT_TAIL(&site->sub_conns,
1790 DB_EVENT_REP_CONNECT_ESTD, &eid);
1791 switch (site->state) {
1793 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1794 "handshake from paused site %s:%u EID %u",
1796 retry = site->ref.retry;
1797 TAILQ_REMOVE(&db_rep->retries,
1799 __os_free(env, retry);
1801 case SITE_CONNECTED:
1803 * We got an incoming connection for a
1804 * site we were already connected to; at
1805 * least we thought we were.
1807 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1808 "connection from %s:%u EID %u while already connected",
1810 if ((ret = resolve_collision(env,
1814 case SITE_CONNECTING:
1815 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1816 "handshake from connecting site %s:%u EID %u",
1819 * Connector thread will give up when it
1820 * sees this site's state change, so we
1821 * don't have to do anything else here.
1825 DB_ASSERT(env, FALSE);
1828 site->state = SITE_CONNECTED;
1829 site->ref.conn.in = conn;
1831 &site->last_rcvd_timestamp, 1);
1834 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1835 "rejecting connection from unknown or provisional site %s:%u",
1837 reject.version = db_rep->membership_version;
1838 reject.gen = db_rep->member_version_gen;
1839 __repmgr_connect_reject_marshal(env,
1840 &reject, reject_buf);
1842 if ((ret = __repmgr_send_own_msg(env, conn,
1843 REPMGR_CONNECT_REJECT, reject_buf,
1844 __REPMGR_CONNECT_REJECT_SIZE)) != 0)
1848 * Since we haven't set conn->eid, bust_connection will
1849 * not schedule a retry for this "failure", which is
1850 * exactly what we want.
1852 return (DB_REP_UNAVAIL);
1857 F_SET(site, SITE_ELECTABLE);
1859 F_CLR(site, SITE_ELECTABLE);
1860 F_SET(site, SITE_HAS_PRIO);
1861 site->ack_policy = (int)ack;
1864 * If we're moping around wishing we knew who the master was, then
1865 * getting in touch with another site might finally provide sufficient
1866 * connectivity to find out.
1868 if (!IS_SUBORDINATE(db_rep) && /* us */
1869 !__repmgr_master_is_known(env) &&
1870 !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
1871 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1872 "handshake with no known master to wake election thread"));
1873 db_rep->new_connection = TRUE;
1874 if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
1882 resolve_collision(env, site, conn)
1885 REPMGR_CONNECTION *conn;
1890 * No need for site-oriented recovery, since we now have a replacement
1891 * connection; so skip bust_connection() and call disable_conn()
1894 * If we already had an incoming connection, this new one always
1895 * replaces it. Whether it also/alternatively replaces an outgoing
1896 * connection depends on whether we're client or server (so as to avoid
1897 * connection collisions resulting in no remaining connections). (If
1898 * it's an older version that doesn't know about our collision
1899 * resolution protocol, it will behave like a client.)
1901 if (site->ref.conn.in != NULL) {
1902 ret = __repmgr_disable_connection(env, site->ref.conn.in);
1903 site->ref.conn.in = NULL;
1907 if (site->ref.conn.out != NULL &&
1908 conn->version >= CONN_COLLISION_VERSION &&
1909 __repmgr_is_server(env, site)) {
1910 ret = __repmgr_disable_connection(env, site->ref.conn.out);
1911 site->ref.conn.out = NULL;
1919 record_permlsn(env, conn)
1921 REPMGR_CONNECTION *conn;
1925 __repmgr_permlsn_args *ackp, ack;
1926 SITE_STRING_BUFFER location;
1931 db_rep = env->rep_handle;
1934 if (conn->version == 0 ||
1935 !IS_READY_STATE(conn->state) || !IS_VALID_EID(conn->eid)) {
1936 __db_errx(env, DB_STR("3682",
1937 "unexpected connection info in record_permlsn"));
1938 return (DB_REP_UNAVAIL);
1940 site = SITE_FROM_EID(conn->eid);
1943 * Extract the LSN. Save it only if it is an improvement over what the
1944 * site has already ack'ed.
1946 if (conn->version == 1) {
1947 ackp = conn->input.repmgr_msg.cntrl.data;
1948 if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
1949 conn->input.repmgr_msg.rec.size != 0) {
1950 __db_errx(env, DB_STR("3627", "bad ack msg size"));
1951 return (DB_REP_UNAVAIL);
1955 if ((ret = __repmgr_permlsn_unmarshal(env, ackp,
1956 conn->input.repmgr_msg.cntrl.data,
1957 conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
1958 return (DB_REP_UNAVAIL);
1961 /* Ignore stale acks. */
1962 gen = db_rep->region->gen;
1963 if (ackp->generation < gen) {
1964 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1965 "ignoring stale ack (%lu<%lu), from %s",
1966 (u_long)ackp->generation, (u_long)gen,
1967 __repmgr_format_site_loc(site, location)));
1970 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1971 "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
1972 (u_long)ackp->lsn.offset, (u_long)ackp->generation,
1973 __repmgr_format_site_loc(site, location)));
1975 if (ackp->generation == gen &&
1976 LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) {
1978 * If file number for this site changed, check lowest log
1979 * file needed after recording new permlsn for this site.
1981 if (ackp->lsn.file > site->max_ack.file)
1983 memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
1985 check_min_log_file(env);
1986 if ((ret = __repmgr_wake_waiters(env,
1987 &db_rep->ack_waiters)) != 0)
1994 * Maintains lowest log file still needed by the repgroup. This is stored
1995 * in shared rep region so that it is accessible to repmgr subordinate
1996 * processes that may not themselves have connections to other sites
1997 * (e.g. a separate db_archive process.)
2000 check_min_log_file(env)
2005 REPMGR_CONNECTION *conn;
2010 db_rep = env->rep_handle;
2011 rep = db_rep->region;
2015 * Record the lowest log file number from all connected sites. If this
2016 * is a client, ignore the master because the master does not maintain
2017 * nor send out its repmgr perm LSN in this way. Consider connections
2018 * so that we don't allow a site that has been down a long time to
2019 * indefinitely prevent log archiving.
2021 FOR_EACH_REMOTE_SITE_INDEX(eid) {
2022 if (eid == rep->master_id)
2024 site = SITE_FROM_EID(eid);
2025 if (site->state == SITE_CONNECTED &&
2026 (((conn = site->ref.conn.in) != NULL &&
2027 conn->state == CONN_READY) ||
2028 ((conn = site->ref.conn.out) != NULL &&
2029 conn->state == CONN_READY)) &&
2030 !IS_ZERO_LSN(site->max_ack) &&
2031 (min_log == 0 || site->max_ack.file < min_log))
2032 min_log = site->max_ack.file;
2035 * During normal operation min_log should increase over time, but it
2036 * is possible if a site returns after being disconnected for a while
2037 * that min_log could decrease.
2039 if (min_log != 0 && min_log != rep->min_log_file)
2040 rep->min_log_file = min_log;
2044 * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
2047 __repmgr_write_some(env, conn)
2049 REPMGR_CONNECTION *conn;
2051 QUEUED_OUTPUT *output;
2055 while (!STAILQ_EMPTY(&conn->outbound_queue)) {
2056 output = STAILQ_FIRST(&conn->outbound_queue);
2058 if ((bytes = sendsocket(conn->fd, &msg->data[output->offset],
2059 msg->length - output->offset, 0)) == SOCKET_ERROR) {
2060 switch (ret = net_errno) {
2062 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
2063 case DB_REPMGR_EAGAIN:
2067 __repmgr_fire_conn_err_event(env, conn, ret);
2068 STAT(env->rep_handle->
2069 region->mstat.st_connection_drop++);
2070 return (DB_REP_UNAVAIL);
2074 if ((output->offset += (size_t)bytes) >= msg->length) {
2075 STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
2076 __os_free(env, output);
2077 conn->out_queue_length--;
2078 if (--msg->ref_count <= 0)
2079 __os_free(env, msg);
2082 * We've achieved enough movement to free up at least
2083 * one space in the outgoing queue. Wake any message
2084 * threads that may be waiting for space. Leave
2085 * CONGESTED state so that when the queue reaches the
2086 * high-water mark again, the filling thread will be
2087 * allowed to try waiting again.
2089 conn->state = CONN_READY;
2090 if ((ret = __repmgr_signal(&conn->drained)) != 0)