Imported Upstream version 5.3.21
[platform/upstream/libdb.git] / src / repmgr / repmgr_sel.c
1 /*-
2  * See the file LICENSE for redistribution information.
3  *
4  * Copyright (c) 2006, 2012 Oracle and/or its affiliates.  All rights reserved.
5  *
6  * $Id$
7  */
8
9 #include "db_config.h"
10
11 #include "db_int.h"
12
13 typedef int (*HEARTBEAT_ACTION) __P((ENV *));
14
15 static int accept_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
16 static int accept_v1_handshake __P((ENV *, REPMGR_CONNECTION *, char *));
17 static void check_min_log_file __P((ENV *));
18 static int dispatch_msgin __P((ENV *, REPMGR_CONNECTION *));
19 static int prepare_input __P((ENV *, REPMGR_CONNECTION *));
20 static int process_own_msg __P((ENV *, REPMGR_CONNECTION *));
21 static int process_parameters __P((ENV *,
22     REPMGR_CONNECTION *, char *, u_int, u_int32_t, int, u_int32_t));
23 static int read_version_response __P((ENV *, REPMGR_CONNECTION *));
24 static int record_permlsn __P((ENV *, REPMGR_CONNECTION *));
25 static int __repmgr_call_election __P((ENV *));
26 static int __repmgr_connector_main __P((ENV *, REPMGR_RUNNABLE *));
27 static void *__repmgr_connector_thread __P((void *));
28 static int __repmgr_next_timeout __P((ENV *,
29     db_timespec *, HEARTBEAT_ACTION *));
30 static int __repmgr_retry_connections __P((ENV *));
31 static int __repmgr_send_heartbeat __P((ENV *));
32 static int __repmgr_try_one __P((ENV *, int));
33 static int resolve_collision __P((ENV *, REPMGR_SITE *, REPMGR_CONNECTION *));
34 static int send_version_response __P((ENV *, REPMGR_CONNECTION *));
35
36 #define ONLY_HANDSHAKE(env, conn) do {                               \
37         if (conn->msg_type != REPMGR_HANDSHAKE) {                    \
38                 __db_errx(env, DB_STR_A("3613",              \
39                     "unexpected msg type %d in state %d", "%d %d"),  \
40                     (int)conn->msg_type, conn->state);               \
41                 return (DB_REP_UNAVAIL);                             \
42         }                                                            \
43 } while (0)
44
45 /*
46  * PUBLIC: void *__repmgr_select_thread __P((void *));
47  */
48 void *
49 __repmgr_select_thread(argsp)
50         void *argsp;
51 {
52         REPMGR_RUNNABLE *args;
53         ENV *env;
54         int ret;
55
56         args = argsp;
57         env = args->env;
58
59         if ((ret = __repmgr_select_loop(env))  != 0) {
60                 __db_err(env, ret, DB_STR("3614", "select loop failed"));
61                 (void)__repmgr_thread_failure(env, ret);
62         }
63         return (NULL);
64 }
65
66 /*
67  * PUBLIC: int __repmgr_bow_out __P((ENV *));
68  */
69 int
70 __repmgr_bow_out(env)
71         ENV *env;
72 {
73         DB_REP *db_rep;
74         int ret;
75
76         db_rep = env->rep_handle;
77         LOCK_MUTEX(db_rep->mutex);
78         ret = __repmgr_stop_threads(env);
79         UNLOCK_MUTEX(db_rep->mutex);
80         DB_EVENT(env, DB_EVENT_REP_LOCAL_SITE_REMOVED, NULL);
81         return (ret);
82 }
83
84 /*
85  * PUBLIC: int __repmgr_accept __P((ENV *));
86  */
87 int
88 __repmgr_accept(env)
89         ENV *env;
90 {
91         DB_REP *db_rep;
92         REPMGR_CONNECTION *conn;
93         ACCEPT_ADDR siaddr;
94         socklen_t addrlen;
95         socket_t s;
96         int ret;
97
98         db_rep = env->rep_handle;
99         addrlen = sizeof(siaddr);
100         if ((s = accept(db_rep->listen_fd, (struct sockaddr *)&siaddr,
101             &addrlen)) == -1) {
102                 /*
103                  * Some errors are innocuous and so should be ignored.  MSDN
104                  * Library documents the Windows ones; the Unix ones are
105                  * advocated in Stevens' UNPv1, section 16.6; and Linux
106                  * Application Development, p. 416.
107                  */
108                 switch (ret = net_errno) {
109 #ifdef DB_WIN32
110                 case WSAECONNRESET:
111                 case WSAEWOULDBLOCK:
112 #else
113                 case EINTR:
114                 case EWOULDBLOCK:
115                 case ECONNABORTED:
116                 case ENETDOWN:
117 #ifdef EPROTO
118                 case EPROTO:
119 #endif
120                 case ENOPROTOOPT:
121                 case EHOSTDOWN:
122 #ifdef ENONET
123                 case ENONET:
124 #endif
125                 case EHOSTUNREACH:
126                 case EOPNOTSUPP:
127                 case ENETUNREACH:
128 #endif
129                         VPRINT(env, (env, DB_VERB_REPMGR_MISC,
130                             "accept error %d considered innocuous", ret));
131                         return (0);
132                 default:
133                         __db_err(env, ret, DB_STR("3615", "accept error"));
134                         return (ret);
135                 }
136         }
137         RPRINT(env, (env, DB_VERB_REPMGR_MISC, "accepted a new connection"));
138
139         if ((ret =
140             __repmgr_new_connection(env, &conn, s, CONN_NEGOTIATE)) != 0) {
141                 (void)closesocket(s);
142                 return (ret);
143         }
144         if ((ret = __repmgr_set_keepalive(env, conn)) != 0) {
145                 (void)__repmgr_destroy_conn(env, conn);
146                 return (ret);
147         }
148         if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
149                 __db_err(env, ret, DB_STR("3616",
150                     "can't set nonblock after accept"));
151                 (void)__repmgr_destroy_conn(env, conn);
152                 return (ret);
153         }
154
155         /*
156          * We don't yet know which site this connection is coming from.  So for
157          * now, put it on the "orphans" list; we'll move it to the appropriate
158          * site struct later when we discover who we're talking with, and what
159          * type of connection it is.
160          */
161         conn->eid = -1;
162         TAILQ_INSERT_TAIL(&db_rep->connections, conn, entries);
163         conn->ref_count++;
164
165         return (0);
166 }
167
168 /*
169  * Computes how long we should wait for input, in other words how long until we
170  * have to wake up and do something.  Returns TRUE if timeout is set; FALSE if
171  * there is nothing to wait for.
172  *
173  * Note that the resulting timeout could be zero; but it can't be negative.
174  *
175  * PUBLIC: int __repmgr_compute_timeout __P((ENV *, db_timespec *));
176  */
177 int
178 __repmgr_compute_timeout(env, timeout)
179         ENV *env;
180         db_timespec *timeout;
181 {
182         DB_REP *db_rep;
183         REPMGR_RETRY *retry;
184         db_timespec now, t;
185         int have_timeout;
186
187         db_rep = env->rep_handle;
188
189         /*
190          * There are two factors to consider: are heartbeats in use?  and, do we
191          * have any sites with broken connections that we ought to retry?
192          */
193         have_timeout = __repmgr_next_timeout(env, &t, NULL);
194
195         /* List items are in order, so we only have to examine the first one. */
196         if (!TAILQ_EMPTY(&db_rep->retries)) {
197                 retry = TAILQ_FIRST(&db_rep->retries);
198                 if (have_timeout) {
199                         /* Choose earliest timeout deadline. */
200                         t = timespeccmp(&retry->time, &t, <) ? retry->time : t;
201                 } else {
202                         t = retry->time;
203                         have_timeout = TRUE;
204                 }
205         }
206
207         if (have_timeout) {
208                 __os_gettime(env, &now, 1);
209                 if (timespeccmp(&now, &t, >=))
210                         timespecclear(timeout);
211                 else {
212                         *timeout = t;
213                         timespecsub(timeout, &now);
214                 }
215         }
216
217         return (have_timeout);
218 }
219
220 /*
221  * Figures out the next heartbeat-related thing to be done, and when it should
222  * be done.  The code is factored this way because this computation needs to be
223  * done both before each select() call, and after (when we're checking for timer
224  * expiration).
225  */
226 static int
227 __repmgr_next_timeout(env, deadline, action)
228         ENV *env;
229         db_timespec *deadline;
230         HEARTBEAT_ACTION *action;
231 {
232         DB_REP *db_rep;
233         REP *rep;
234         HEARTBEAT_ACTION my_action;
235         REPMGR_CONNECTION *conn;
236         REPMGR_SITE *master;
237         db_timespec t;
238         u_int32_t version;
239
240         db_rep = env->rep_handle;
241         rep = db_rep->region;
242
243         if (rep->master_id == db_rep->self_eid &&
244             rep->heartbeat_frequency > 0) {
245                 t = db_rep->last_bcast;
246                 TIMESPEC_ADD_DB_TIMEOUT(&t, rep->heartbeat_frequency);
247                 my_action = __repmgr_send_heartbeat;
248         } else if ((master = __repmgr_connected_master(env)) != NULL &&
249             !IS_SUBORDINATE(db_rep) &&
250             rep->heartbeat_monitor_timeout > 0) {
251                 version = 0;
252                 if ((conn = master->ref.conn.in) != NULL &&
253                     IS_READY_STATE(conn->state))
254                         version = conn->version;
255                 if ((conn = master->ref.conn.out) != NULL &&
256                     IS_READY_STATE(conn->state) &&
257                     conn->version > version)
258                         version = conn->version;
259                 if (version >= HEARTBEAT_MIN_VERSION) {
260                         /*
261                          * If we have a working connection to a heartbeat-aware
262                          * master, let's monitor it.  Otherwise there's really
263                          * nothing we can do.
264                          */
265                         t = master->last_rcvd_timestamp;
266                         TIMESPEC_ADD_DB_TIMEOUT(&t,
267                             rep->heartbeat_monitor_timeout);
268                         my_action = __repmgr_call_election;
269                 } else
270                         return (FALSE);
271         } else
272                 return (FALSE);
273
274         *deadline = t;
275         if (action != NULL)
276                 *action = my_action;
277         return (TRUE);
278 }
279
280 /*
281  * Sends a heartbeat message.
282  *
283  * repmgr also uses the heartbeat facility to manage rerequests.  We
284  * send the master's current generation and max_perm_lsn with the heartbeat
285  * message to help a client determine whether it has all master transactions.
286  * When a client receives a heartbeat message, it also checks whether it
287  * needs to rerequest anything.  Note that heartbeats must be enabled for
288  * this rerequest processing to occur.
289  */
290 static int
291 __repmgr_send_heartbeat(env)
292         ENV *env;
293 {
294         DB_REP *db_rep;
295         REP *rep;
296         DBT control, rec;
297         __repmgr_permlsn_args permlsn;
298         u_int8_t buf[__REPMGR_PERMLSN_SIZE];
299         u_int unused1, unused2;
300         int ret, unused3;
301
302         db_rep = env->rep_handle;
303         rep = db_rep->region;
304
305         permlsn.generation = rep->gen;
306         if ((ret = __rep_get_maxpermlsn(env, &permlsn.lsn)) != 0)
307                 return (ret);
308         __repmgr_permlsn_marshal(env, &permlsn, buf);
309         control.data = buf;
310         control.size = __REPMGR_PERMLSN_SIZE;
311
312         DB_INIT_DBT(rec, NULL, 0);
313         return (__repmgr_send_broadcast(env,
314             REPMGR_HEARTBEAT, &control, &rec, &unused1, &unused2, &unused3));
315 }
316
317 /*
318  * PUBLIC: REPMGR_SITE *__repmgr_connected_master __P((ENV *));
319  */
320 REPMGR_SITE *
321 __repmgr_connected_master(env)
322         ENV *env;
323 {
324         DB_REP *db_rep;
325         REPMGR_SITE *master;
326         int master_id;
327
328         db_rep = env->rep_handle;
329         master_id = db_rep->region->master_id;
330
331         if (!IS_KNOWN_REMOTE_SITE(master_id))
332                 return (NULL);
333         master = SITE_FROM_EID(master_id);
334         if (master->state == SITE_CONNECTED)
335                 return (master);
336         return (NULL);
337 }
338
339 static int
340 __repmgr_call_election(env)
341         ENV *env;
342 {
343         REPMGR_CONNECTION *conn;
344         REPMGR_SITE *master;
345         int ret;
346
347         master = __repmgr_connected_master(env);
348         if (master == NULL)
349                 return (0);
350         RPRINT(env, (env, DB_VERB_REPMGR_MISC,
351             "heartbeat monitor timeout expired"));
352         STAT(env->rep_handle->region->mstat.st_connection_drop++);
353         if ((conn = master->ref.conn.in) != NULL &&
354             (ret = __repmgr_bust_connection(env, conn)) != 0)
355                 return (ret);
356         if ((conn = master->ref.conn.out) != NULL &&
357             (ret = __repmgr_bust_connection(env, conn)) != 0)
358                 return (ret);
359         return (0);
360 }
361
362 /*
363  * PUBLIC: int __repmgr_check_timeouts __P((ENV *));
364  *
365  * !!!
366  * Assumes caller holds the mutex.
367  */
368 int
369 __repmgr_check_timeouts(env)
370         ENV *env;
371 {
372         db_timespec when, now;
373         HEARTBEAT_ACTION action;
374         int ret;
375
376         /*
377          * Figure out the next heartbeat-related thing to be done.  Then, if
378          * it's time to do it, do so.
379          */
380         if (__repmgr_next_timeout(env, &when, &action)) {
381                 __os_gettime(env, &now, 1);
382                 if (timespeccmp(&when, &now, <=) &&
383                     (ret = (*action)(env)) != 0)
384                         return (ret);
385         }
386
387         return (__repmgr_retry_connections(env));
388 }
389
390 /*
391  * Initiates connection attempts for any sites on the idle list whose retry
392  * times have expired.
393  */
394 static int
395 __repmgr_retry_connections(env)
396         ENV *env;
397 {
398         DB_REP *db_rep;
399         REPMGR_SITE *site;
400         REPMGR_RETRY *retry;
401         db_timespec now;
402         int eid, ret;
403
404         db_rep = env->rep_handle;
405         __os_gettime(env, &now, 1);
406
407         while (!TAILQ_EMPTY(&db_rep->retries)) {
408                 retry = TAILQ_FIRST(&db_rep->retries);
409                 if (timespeccmp(&retry->time, &now, >=))
410                         break;  /* since items are in time order */
411
412                 TAILQ_REMOVE(&db_rep->retries, retry, entries);
413
414                 eid = retry->eid;
415                 __os_free(env, retry);
416                 DB_ASSERT(env, IS_VALID_EID(eid));
417                 site = SITE_FROM_EID(eid);
418                 DB_ASSERT(env, site->state == SITE_PAUSING);
419
420                 if (site->membership == SITE_PRESENT) {
421                         if ((ret = __repmgr_try_one(env, eid)) != 0)
422                                 return (ret);
423                 } else
424                         site->state = SITE_IDLE;
425         }
426         return (0);
427 }
428
429 /*
430  * PUBLIC: int __repmgr_first_try_connections __P((ENV *));
431  *
432  * !!!
433  * Assumes caller holds the mutex.
434  */
435 int
436 __repmgr_first_try_connections(env)
437         ENV *env;
438 {
439         DB_REP *db_rep;
440         REPMGR_SITE *site;
441         int eid, ret;
442
443         db_rep = env->rep_handle;
444         FOR_EACH_REMOTE_SITE_INDEX(eid) {
445                 site = SITE_FROM_EID(eid);
446                 /*
447                  * Normally all sites would be IDLE here.  But if a user thread
448                  * triggered an auto-start in a subordinate process, our send()
449                  * function may have found new sites when it sync'ed site
450                  * addresses, and that action causes connection attempts to be
451                  * scheduled (resulting in PAUSING state here, or conceivably
452                  * even CONNECTING or CONNECTED).
453                  */
454                 if (site->state == SITE_IDLE &&
455                     site->membership == SITE_PRESENT &&
456                     (ret = __repmgr_try_one(env, eid)) != 0)
457                         return (ret);
458         }
459         return (0);
460 }
461
462 /*
463  * Starts a thread to open a connection to the site at the given EID.
464  */
465 static int
466 __repmgr_try_one(env, eid)
467         ENV *env;
468         int eid;
469 {
470         DB_REP *db_rep;
471         REPMGR_SITE *site;
472         REPMGR_RUNNABLE *th;
473         int ret;
474
475         db_rep = env->rep_handle;
476         DB_ASSERT(env, IS_VALID_EID(eid));
477         site = SITE_FROM_EID(eid);
478         th = site->connector;
479         if (th == NULL) {
480                 if ((ret = __os_malloc(env, sizeof(REPMGR_RUNNABLE), &th)) != 0)
481                         return (ret);
482                 site->connector = th;
483         } else if (th->finished) {
484                 if ((ret = __repmgr_thread_join(th)) != 0)
485                         return (ret);
486         } else {
487                 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
488                   "eid %lu previous connector thread still running; will retry",
489                     (u_long)eid));
490                 return (__repmgr_schedule_connection_attempt(env,
491                         eid, FALSE));
492         }
493
494         site->state = SITE_CONNECTING;
495
496         th->run = __repmgr_connector_thread;
497         th->args.eid = eid;
498         if ((ret = __repmgr_thread_start(env, th)) != 0) {
499                 __os_free(env, th);
500                 site->connector = NULL;
501         }
502         return (ret);
503 }
504
505 static void *
506 __repmgr_connector_thread(argsp)
507         void *argsp;
508 {
509         REPMGR_RUNNABLE *th;
510         ENV *env;
511         int ret;
512
513         th = argsp;
514         env = th->env;
515
516         RPRINT(env, (env, DB_VERB_REPMGR_MISC,
517             "starting connector thread, eid %u", th->args.eid));
518         if ((ret = __repmgr_connector_main(env, th)) != 0) {
519                 __db_err(env, ret, DB_STR("3617", "connector thread failed"));
520                 (void)__repmgr_thread_failure(env, ret);
521         }
522         RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connector thread is exiting"));
523
524         th->finished = TRUE;
525         return (NULL);
526 }
527
528 static int
529 __repmgr_connector_main(env, th)
530         ENV *env;
531         REPMGR_RUNNABLE *th;
532 {
533         DB_REP *db_rep;
534         REPMGR_SITE *site;
535         REPMGR_CONNECTION *conn;
536         DB_REPMGR_CONN_ERR info;
537         repmgr_netaddr_t netaddr;
538         SITE_STRING_BUFFER site_string;
539         int err, ret, t_ret;
540
541         db_rep = env->rep_handle;
542         ret = 0;
543
544         LOCK_MUTEX(db_rep->mutex);
545         DB_ASSERT(env, IS_VALID_EID(th->args.eid));
546         site = SITE_FROM_EID(th->args.eid);
547         if (site->state != SITE_CONNECTING && db_rep->repmgr_status == stopped)
548                 goto unlock;
549
550         /*
551          * Drop the mutex during operations that could block.  During those
552          * times, the site struct could move (if we had to grow the sites
553          * array), but host wouldn't.
554          *
555          * Also, during those times we might receive an incoming connection from
556          * the site, which would change its state.  So, check state each time we
557          * reacquire the mutex, and quit if the state of the world changed while
558          * we were away.
559          */
560         netaddr = site->net_addr;
561         RPRINT(env, (env, DB_VERB_REPMGR_MISC, "connecting to %s",
562                 __repmgr_format_site_loc(site, site_string)));
563         UNLOCK_MUTEX(db_rep->mutex);
564
565         if ((ret = __repmgr_connect(env, &netaddr, &conn, &err)) == 0) {
566                 DB_EVENT(env,  DB_EVENT_REP_CONNECT_ESTD, &th->args.eid);
567                 LOCK_MUTEX(db_rep->mutex);
568                 if ((ret = __repmgr_set_nonblock_conn(conn)) != 0) {
569                         __db_err(env, ret, DB_STR("3618",
570                             "set_nonblock in connnect thread"));
571                         goto cleanup;
572                 }
573                 conn->type = REP_CONNECTION;
574                 site = SITE_FROM_EID(th->args.eid);
575                 if (site->state != SITE_CONNECTING ||
576                     db_rep->repmgr_status == stopped)
577                         goto cleanup;
578
579                 conn->eid = th->args.eid;
580                 site = SITE_FROM_EID(th->args.eid);
581                 site->ref.conn.out = conn;
582                 site->state = SITE_CONNECTED;
583                 __os_gettime(env, &site->last_rcvd_timestamp, 1);
584                 ret = __repmgr_wake_main_thread(env);
585         } else if (ret == DB_REP_UNAVAIL) {
586                 /* Retryable error while trying to connect: retry later. */
587                 info.eid = th->args.eid;
588                 info.error = err;
589                 DB_EVENT(env, DB_EVENT_REP_CONNECT_TRY_FAILED, &info);
590                 STAT(db_rep->region->mstat.st_connect_fail++);
591
592                 LOCK_MUTEX(db_rep->mutex);
593                 site = SITE_FROM_EID(th->args.eid);
594                 if (site->state != SITE_CONNECTING ||
595                     db_rep->repmgr_status == stopped) {
596                         ret = 0;
597                         goto unlock;
598                 }
599                 ret = __repmgr_schedule_connection_attempt(env,
600                     th->args.eid, FALSE);
601         } else
602                 goto out;
603
604         if (0) {
605 cleanup:
606                 if ((t_ret = __repmgr_destroy_conn(env, conn)) != 0 &&
607                     ret == 0)
608                         ret = t_ret;
609         }
610
611 unlock:
612         UNLOCK_MUTEX(db_rep->mutex);
613 out:
614         return (ret);
615 }
616
617 /*
618  * PUBLIC: int __repmgr_send_v1_handshake __P((ENV *,
619  * PUBLIC:     REPMGR_CONNECTION *, void *, size_t));
620  */
621 int
622 __repmgr_send_v1_handshake(env, conn, buf, len)
623         ENV *env;
624         REPMGR_CONNECTION *conn;
625         void *buf;
626         size_t len;
627 {
628         DB_REP *db_rep;
629         REP *rep;
630         repmgr_netaddr_t *my_addr;
631         DB_REPMGR_V1_HANDSHAKE buffer;
632         DBT cntrl, rec;
633
634         db_rep = env->rep_handle;
635         rep = db_rep->region;
636         my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
637
638         /*
639          * We're about to send from a structure that has padding holes in it.
640          * Initializing it keeps Valgrind happy, plus we really shouldn't be
641          * sending out random garbage anyway (pro forma privacy issue).
642          */
643         memset(&buffer, 0, sizeof(buffer));
644         buffer.version = 1;
645         buffer.priority = htonl(rep->priority);
646         buffer.port = my_addr->port;
647         cntrl.data = &buffer;
648         cntrl.size = sizeof(buffer);
649
650         rec.data = buf;
651         rec.size = (u_int32_t)len;
652
653         /*
654          * It would of course be disastrous to block the select() thread, so
655          * pass the "maxblock" argument as 0.  Fortunately blocking should
656          * never be necessary here, because the hand-shake is always the first
657          * thing we send.  Which is a good thing, because it would be almost as
658          * disastrous if we allowed ourselves to drop a handshake.
659          */
660         return (__repmgr_send_one(env,
661             conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0));
662 }
663
664 /*
665  * PUBLIC: int __repmgr_read_from_site __P((ENV *, REPMGR_CONNECTION *));
666  *
667  * !!!
668  * Caller is assumed to hold repmgr->mutex, 'cuz we call queue_put() from here.
669  */
670 int
671 __repmgr_read_from_site(env, conn)
672         ENV *env;
673         REPMGR_CONNECTION *conn;
674 {
675         DB_REP *db_rep;
676         REPMGR_SITE *site;
677         int ret;
678
679         db_rep = env->rep_handle;
680
681         /*
682          * Loop, just in case we get EINTR and need to restart the I/O.  (All
683          * other branches return.)
684          */
685         for (;;) {
686                 switch ((ret = __repmgr_read_conn(conn))) {
687 #ifndef DB_WIN32
688                 case EINTR:
689                         continue;
690 #endif
691
692 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
693                 case DB_REPMGR_EAGAIN:
694 #endif
695                 case WOULDBLOCK:
696                         return (0);
697
698                 case DB_REP_UNAVAIL:
699                         /* Error 0 is understood to mean EOF. */
700                         __repmgr_fire_conn_err_event(env, conn, 0);
701                         STAT(env->rep_handle->
702                             region->mstat.st_connection_drop++);
703                         return (DB_REP_UNAVAIL);
704
705                 case 0:
706                         if (IS_VALID_EID(conn->eid)) {
707                                 site = SITE_FROM_EID(conn->eid);
708                                 __os_gettime(env,
709                                     &site->last_rcvd_timestamp, 1);
710                         }
711                         return (conn->reading_phase == SIZES_PHASE ?
712                             prepare_input(env, conn) :
713                             dispatch_msgin(env, conn));
714
715                 default:
716 #ifdef EBADF
717                         DB_ASSERT(env, ret != EBADF);
718 #endif
719                         __repmgr_fire_conn_err_event(env, conn, ret);
720                         STAT(db_rep->region->mstat.st_connection_drop++);
721                         return (DB_REP_UNAVAIL);
722                 }
723         }
724 }
725
726 /*
727  * Reads in the current input phase, as defined by the connection's IOVECS
728  * struct.
729  *
730  * Returns DB_REP_UNAVAIL for EOF.
731  *
732  * Makes no assumption about synchronization: it's up to the caller to hold
733  * mutex if necessary.
734  *
735  * PUBLIC: int __repmgr_read_conn __P((REPMGR_CONNECTION *));
736  */
737 int
738 __repmgr_read_conn(conn)
739         REPMGR_CONNECTION *conn;
740 {
741         size_t nr;
742         int ret;
743
744         /*
745          * Keep reading pieces as long as we're making some progress, or until
746          * we complete the current read phase as defined in iovecs.
747          */
748         for (;;) {
749                 if ((ret = __repmgr_readv(conn->fd,
750                     &conn->iovecs.vectors[conn->iovecs.offset],
751                     conn->iovecs.count - conn->iovecs.offset, &nr)) != 0)
752                         return (ret);
753
754                 if (nr == 0)
755                         return (DB_REP_UNAVAIL);
756
757                 if (__repmgr_update_consumed(&conn->iovecs, nr)) {
758                         /* We've fully read as much as we wanted. */
759                         return (0);
760                 }
761         }
762 }
763
764 /*
765  * Having finished reading the 9-byte message header, figure out what kind of
766  * message we're about to receive, and prepare input buffers accordingly.  The
767  * header includes enough information for us to figure out how much buffer space
768  * we need to allocate (though in some cases we need to do a bit of computation
769  * to arrive at the answer).
770  *
771  * Caller must hold mutex.
772  */
773 static int
774 prepare_input(env, conn)
775         ENV *env;
776         REPMGR_CONNECTION *conn;
777 {
778 #define MEM_ALIGN sizeof(double)
779         DBT *dbt;
780         __repmgr_msg_hdr_args msg_hdr;
781         REPMGR_RESPONSE *resp;
782         u_int32_t control_size, rec_size, size;
783         size_t memsize, control_offset, rec_offset;
784         void *membase;
785         int ret, skip;
786
787         DB_ASSERT(env, conn->reading_phase == SIZES_PHASE);
788
789         /*
790          * We can only get here after having read the full 9 bytes that we
791          * expect, so this can't fail.
792          */
793         ret = __repmgr_msg_hdr_unmarshal(env, &msg_hdr,
794             conn->msg_hdr_buf, __REPMGR_MSG_HDR_SIZE, NULL);
795         DB_ASSERT(env, ret == 0);
796
797         __repmgr_iovec_init(&conn->iovecs);
798         skip = FALSE;
799
800         switch ((conn->msg_type = msg_hdr.type)) {
801         case REPMGR_HEARTBEAT:
802                 /*
803                  * The underlying byte-receiving mechanism will already have
804                  * noted the fact that we got some traffic on this connection,
805                  * which is all that is needed to monitor the heartbeat.  But
806                  * we also put the heartbeat message on the message queue so
807                  * that it will perform rerequest processing.
808                  */
809         case REPMGR_REP_MESSAGE:
810                 env->rep_handle->seen_repmsg = TRUE;
811                 control_size = REP_MSG_CONTROL_SIZE(msg_hdr);
812                 rec_size = REP_MSG_REC_SIZE(msg_hdr);
813                 if (control_size == 0) {
814                         if (conn->msg_type == REPMGR_HEARTBEAT) {
815                                 /*
816                                  * Got an old-style heartbeat without payload,
817                                  * nothing to do.
818                                  */
819                                 skip = TRUE;
820                                 break;
821                         } else {
822                                 __db_errx(env, DB_STR("3619",
823                                     "illegal size for rep msg"));
824                                 return (DB_REP_UNAVAIL);
825                         }
826                 }
827                 /*
828                  * Allocate a block of memory large enough to hold a
829                  * DB_REPMGR_MESSAGE wrapper, plus the (one or) two DBT
830                  * data areas that it points to.  Start by calculating
831                  * the total memory needed.
832                  */
833                 memsize = DB_ALIGN(sizeof(REPMGR_MESSAGE), MEM_ALIGN);
834                 control_offset = memsize;
835                 memsize += control_size;
836                 if (rec_size > 0) {
837                         memsize = DB_ALIGN(memsize, MEM_ALIGN);
838                         rec_offset = memsize;
839                         memsize += rec_size;
840                 } else
841                         COMPQUIET(rec_offset, 0);
842                 if ((ret = __os_malloc(env, memsize, &membase)) != 0)
843                         return (ret);
844                 conn->input.rep_message = membase;
845                 conn->input.rep_message->msg_hdr = msg_hdr;
846                 conn->input.rep_message->v.repmsg.originating_eid = conn->eid;
847
848                 DB_INIT_DBT(conn->input.rep_message->v.repmsg.control,
849                     (u_int8_t*)membase + control_offset, control_size);
850                 __repmgr_add_dbt(&conn->iovecs,
851                     &conn->input.rep_message->v.repmsg.control);
852
853                 if (rec_size > 0) {
854                         DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
855                             (rec_size > 0 ?
856                                 (u_int8_t*)membase + rec_offset : NULL),
857                             rec_size);
858                         __repmgr_add_dbt(&conn->iovecs,
859                             &conn->input.rep_message->v.repmsg.rec);
860                 } else
861                         DB_INIT_DBT(conn->input.rep_message->v.repmsg.rec,
862                             NULL, 0);
863                 break;
864
865         case REPMGR_APP_MESSAGE:
866                 /*
867                  * We need a buffer big enough to hold the REPMGR_MESSAGE struct
868                  * and the data that we expect to receive on the wire.  We must
869                  * extend the struct size for the variable-length DBT array at
870                  * the end.
871                  */
872                 size = DB_ALIGN((size_t)(sizeof(REPMGR_MESSAGE) +
873                     APP_MSG_SEGMENT_COUNT(msg_hdr) * sizeof(DBT)),
874                     MEM_ALIGN);
875                 memsize = size + APP_MSG_BUFFER_SIZE(msg_hdr);
876                 if ((ret = __os_malloc(env, memsize, &membase)) != 0)
877                         return (ret);
878                 conn->input.rep_message = membase;
879                 conn->input.rep_message->msg_hdr = msg_hdr;
880                 conn->input.rep_message->v.appmsg.conn = conn;
881
882                 DB_INIT_DBT(conn->input.rep_message->v.appmsg.buf,
883                     (u_int8_t*)membase + size,
884                     APP_MSG_BUFFER_SIZE(msg_hdr));
885                 __repmgr_add_dbt(&conn->iovecs,
886                     &conn->input.rep_message->v.appmsg.buf);
887                 break;
888
889         case REPMGR_OWN_MSG:
890                 size = sizeof(REPMGR_MESSAGE) + REPMGR_OWN_BUF_SIZE(msg_hdr);
891                 if ((ret = __os_malloc(env, size, &membase)) != 0)
892                         return (ret);
893                 conn->input.rep_message = membase;
894                 conn->input.rep_message->msg_hdr = msg_hdr;
895
896                 /*
897                  * Save "conn" pointer in case this turns out to be a one-shot
898                  * request.  If it isn't, it won't matter.
899                  */
900                 /*
901                  * An OWN msg that arrives in PARAMETERS state has bypassed the
902                  * final handshake, implying that this connection is to be used
903                  * for a one-shot GMDB request.
904                  */
905                 if (REPMGR_OWN_BUF_SIZE(msg_hdr) == 0) {
906                         __db_errx(env, DB_STR_A("3680",
907                             "invalid own buf size %lu in prepare_input", "%lu"),
908                             (u_long)REPMGR_OWN_BUF_SIZE(msg_hdr));
909                         return (DB_REP_UNAVAIL);
910                 }
911                 DB_INIT_DBT(conn->input.rep_message->v.gmdb_msg.request,
912                     (u_int8_t*)membase + sizeof(REPMGR_MESSAGE),
913                     REPMGR_OWN_BUF_SIZE(msg_hdr));
914                 __repmgr_add_dbt(&conn->iovecs,
915                     &conn->input.rep_message->v.gmdb_msg.request);
916                 break;
917
918         case REPMGR_APP_RESPONSE:
919                 size = APP_RESP_BUFFER_SIZE(msg_hdr);
920                 conn->cur_resp = APP_RESP_TAG(msg_hdr);
921                 if (conn->cur_resp >= conn->aresp) {
922                         __db_errx(env, DB_STR_A("3681",
923                             "invalid cur resp %lu in prepare_input", "%lu"),
924                             (u_long)conn->cur_resp);
925                         return (DB_REP_UNAVAIL);
926                 }
927                 resp = &conn->responses[conn->cur_resp];
928                 DB_ASSERT(env, F_ISSET(resp, RESP_IN_USE));
929
930                 dbt = &resp->dbt;
931
932                 /*
933                  * Prepare to read message body into either the user-supplied
934                  * buffer, or one we allocate here.
935                  */
936                 ret = 0;
937                 if (!F_ISSET(resp, RESP_THREAD_WAITING)) {
938                         /* Caller already timed out; allocate dummy buffer. */
939                         if (size > 0) {
940                                 memset(dbt, 0, sizeof(*dbt));
941                                 ret = __os_malloc(env, size, &dbt->data);
942                                 F_SET(resp, RESP_DUMMY_BUF);
943                         } else
944                                 F_CLR(resp, RESP_IN_USE);
945                 } else if (F_ISSET(dbt, DB_DBT_MALLOC))
946                         ret = __os_umalloc(env, size, &dbt->data);
947                 else if (F_ISSET(dbt, DB_DBT_REALLOC)) {
948                         if (dbt->data == NULL || dbt->size < size)
949                                 ret = __os_urealloc(env, size, &dbt->data);
950                 } else if (F_ISSET(dbt, DB_DBT_USERMEM)) {
951                         /* Recipient should have checked size limit. */
952                         DB_ASSERT(env, size <= dbt->ulen);
953                 }
954                 dbt->size = size;
955                 if (ret != 0)
956                         return (ret);
957
958                 if (size > 0) {
959                         __repmgr_add_dbt(&conn->iovecs, dbt);
960                         F_SET(resp, RESP_READING);
961                 } else {
962                         skip = TRUE;
963                         if (F_ISSET(resp, RESP_THREAD_WAITING)) {
964                                 F_SET(resp, RESP_COMPLETE);
965                                 if ((ret = __repmgr_wake_waiters(env,
966                                     &conn->response_waiters)) != 0)
967                                         return (ret);
968                         }
969                 }
970                 break;
971
972         case REPMGR_RESP_ERROR:
973                 DB_ASSERT(env, RESP_ERROR_TAG(msg_hdr) < conn->aresp &&
974                     conn->responses != NULL);
975                 resp = &conn->responses[RESP_ERROR_TAG(msg_hdr)];
976                 DB_ASSERT(env, !F_ISSET(resp, RESP_READING));
977                 if (F_ISSET(resp, RESP_THREAD_WAITING)) {
978                         F_SET(resp, RESP_COMPLETE);
979
980                         /*
981                          * DB errors are always negative, but we only send
982                          * unsigned values on the wire.
983                          */
984                         resp->ret = -((int)RESP_ERROR_CODE(msg_hdr));
985                         if ((ret = __repmgr_wake_waiters(env,
986                             &conn->response_waiters)) != 0)
987                                 return (ret);
988                 } else
989                         F_CLR(resp, RESP_IN_USE);
990                 skip = TRUE;
991                 break;
992
993         case REPMGR_HANDSHAKE:
994         case REPMGR_PERMLSN:
995                 if ((ret = __repmgr_prepare_simple_input(env,
996                     conn, &msg_hdr)) != 0)
997                         return (ret);
998                 break;
999
1000         default:
1001                 __db_errx(env, DB_STR_A("3676",
1002                     "unexpected msg type %lu in prepare_input", "%lu"),
1003                     (u_long)conn->msg_type);
1004                 return (DB_REP_UNAVAIL);
1005         }
1006
1007         if (skip) {
1008                 /*
1009                  * We can skip the DATA_PHASE, because the current message type
1010                  * only has a header, no following data.
1011                  */
1012                 __repmgr_reset_for_reading(conn);
1013         } else
1014                 conn->reading_phase = DATA_PHASE;
1015
1016         return (0);
1017 }
1018
1019 /*
1020  * PUBLIC: int __repmgr_prepare_simple_input __P((ENV *,
1021  * PUBLIC:     REPMGR_CONNECTION *, __repmgr_msg_hdr_args *));
1022  */
1023 int
1024 __repmgr_prepare_simple_input(env, conn, msg_hdr)
1025         ENV *env;
1026         REPMGR_CONNECTION *conn;
1027         __repmgr_msg_hdr_args *msg_hdr;
1028 {
1029         DBT *dbt;
1030         u_int32_t control_size, rec_size;
1031         int ret;
1032
1033         control_size = REP_MSG_CONTROL_SIZE(*msg_hdr);
1034         rec_size = REP_MSG_REC_SIZE(*msg_hdr);
1035
1036         dbt = &conn->input.repmgr_msg.cntrl;
1037         if ((dbt->size = control_size) > 0) {
1038                 if ((ret = __os_malloc(env,
1039                     dbt->size, &dbt->data)) != 0)
1040                         return (ret);
1041                 __repmgr_add_dbt(&conn->iovecs, dbt);
1042         }
1043
1044         dbt = &conn->input.repmgr_msg.rec;
1045         if ((dbt->size = rec_size) > 0) {
1046                 if ((ret = __os_malloc(env,
1047                     dbt->size, &dbt->data)) != 0) {
1048                         dbt = &conn->input.repmgr_msg.cntrl;
1049                         if (dbt->size > 0)
1050                                 __os_free(env, dbt->data);
1051                         return (ret);
1052                 }
1053                 __repmgr_add_dbt(&conn->iovecs, dbt);
1054         }
1055         return (0);
1056 }
1057
1058 /*
1059  * Processes an incoming message, depending on our current state.
1060  *
1061  * Caller must hold mutex.
1062  */
1063 static int
1064 dispatch_msgin(env, conn)
1065         ENV *env;
1066         REPMGR_CONNECTION *conn;
1067 {
1068         DB_REP *db_rep;
1069         REPMGR_SITE *site;
1070         REPMGR_RUNNABLE *th;
1071         REPMGR_RESPONSE *resp;
1072         DBT *dbt;
1073         char *hostname;
1074         int eid, ret;
1075
1076         DB_ASSERT(env, conn->reading_phase == DATA_PHASE);
1077         db_rep = env->rep_handle;
1078
1079         switch (conn->state) {
1080         case CONN_CONNECTED:
1081                 /*
1082                  * In this state, we know we're working with an outgoing
1083                  * connection.  We've sent a version proposal, and now expect
1084                  * the response (which could be a dumb old V1 handshake).
1085                  */
1086                 ONLY_HANDSHAKE(env, conn);
1087
1088                 /*
1089                  * Here is a good opportunity to clean up this site's connector
1090                  * thread, because we generally come through here after making
1091                  * an outgoing connection, yet we're out of the main loop, so we
1092                  * don't hit this often.
1093                  */
1094                 eid = conn->eid;
1095                 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1096                 site = SITE_FROM_EID(eid);
1097                 th = site->connector;
1098                 if (th != NULL && th->finished) {
1099                         if ((ret = __repmgr_thread_join(th)) != 0)
1100                                 return (ret);
1101                         __os_free(env, th);
1102                         site->connector = NULL;
1103                 }
1104
1105                 if ((ret = read_version_response(env, conn)) != 0)
1106                         return (ret);
1107                 break;
1108
1109         case CONN_NEGOTIATE:
1110                 /*
1111                  * Since we're in this state, we know we're working with an
1112                  * incoming connection, and this is the first message we've
1113                  * received.  So it must be a version negotiation proposal (or a
1114                  * legacy V1 handshake).  (We'll verify this of course.)
1115                  */
1116                 ONLY_HANDSHAKE(env, conn);
1117                 if ((ret = send_version_response(env, conn)) != 0)
1118                         return (ret);
1119                 break;
1120
1121         case CONN_PARAMETERS:
1122                 /*
1123                  * We've previously agreed on a (>1) version, so we expect
1124                  * either the other side's parameters handshake, or possibly a
1125                  * GMDB request on a one-shot, dedicated connection.
1126                  */
1127                 switch (conn->msg_type) {
1128                 case REPMGR_HANDSHAKE:
1129                         dbt = &conn->input.repmgr_msg.rec;
1130                         hostname = dbt->data;
1131                         hostname[dbt->size-1] = '\0';
1132                         if ((ret = accept_handshake(env, conn, hostname)) != 0)
1133                                 return (ret);
1134                         conn->state = CONN_READY;
1135                         break;
1136                 case REPMGR_OWN_MSG:
1137                         /*
1138                          * GM change requests arrive in their own dedicated
1139                          * connections, and when they're served the entire
1140                          * connection isn't needed any more.  So the message
1141                          * processing thread will do the entire job of serving
1142                          * the request and finishing off the connection; so we
1143                          * don't have to read it any more.  Note that normally
1144                          * whenever we remove a connection from our list we
1145                          * decrement the reference count; but we also increment
1146                          * it whenever we pass a reference over to the message
1147                          * processing threads' queue.  So in this case it's a
1148                          * wash.
1149                          */
1150                         conn->input.rep_message->v.gmdb_msg.conn = conn;
1151                         TAILQ_REMOVE(&db_rep->connections, conn, entries);
1152                         if ((ret = __repmgr_queue_put(env,
1153                             conn->input.rep_message)) != 0)
1154                                 return (ret);
1155                         break;
1156
1157                 default:
1158                         __db_errx(env, DB_STR_A("3620",
1159                             "unexpected msg type %d in PARAMETERS state", "%d"),
1160                             (int)conn->msg_type);
1161                         return (DB_REP_UNAVAIL);
1162                 }
1163
1164                 break;
1165
1166         case CONN_READY:
1167         case CONN_CONGESTED:
1168                 /*
1169                  * We have a complete message, so process it.  Acks and
1170                  * handshakes get processed here, in line.  Regular rep messages
1171                  * get posted to a queue, to be handled by a thread from the
1172                  * message thread pool.
1173                  */
1174                 switch (conn->msg_type) {
1175                 case REPMGR_PERMLSN:
1176                         if ((ret = record_permlsn(env, conn)) != 0)
1177                                 return (ret);
1178                         break;
1179
1180                 case REPMGR_HEARTBEAT:
1181                 case REPMGR_APP_MESSAGE:
1182                 case REPMGR_REP_MESSAGE:
1183                         if ((ret = __repmgr_queue_put(env,
1184                             conn->input.rep_message)) != 0)
1185                                 return (ret);
1186                         /*
1187                          * The queue has taken over responsibility for the
1188                          * rep_message buffer, and will free it later.
1189                          */
1190                         if (conn->msg_type == REPMGR_APP_MESSAGE)
1191                                 conn->ref_count++;
1192                         break;
1193
1194                 case REPMGR_OWN_MSG:
1195                         /*
1196                          * Since we're in one of the "ready" states we know this
1197                          * isn't a one-shot request, so we are not giving
1198                          * ownership of this connection over to the message
1199                          * thread queue; we're going to keep reading on it
1200                          * ourselves.  The message thread that processes this
1201                          * request has no need for a connection anyway, since
1202                          * there is no response that needs to be returned.
1203                          */
1204                         conn->input.rep_message->v.gmdb_msg.conn = NULL;
1205                         if ((ret = process_own_msg(env, conn)) != 0)
1206                                 return (ret);
1207                         break;
1208
1209                 case REPMGR_APP_RESPONSE:
1210                         DB_ASSERT(env, conn->cur_resp < conn->aresp &&
1211                             conn->responses != NULL);
1212                         resp = &conn->responses[conn->cur_resp];
1213                         DB_ASSERT(env, F_ISSET(resp, RESP_READING));
1214                         F_CLR(resp, RESP_READING);
1215                         if (F_ISSET(resp, RESP_THREAD_WAITING)) {
1216                                 F_SET(resp, RESP_COMPLETE);
1217                                 if ((ret = __repmgr_wake_waiters(env,
1218                                     &conn->response_waiters)) != 0)
1219                                         return (ret);
1220                         } else {
1221                                 /*
1222                                  * If the calling thread is no longer with us,
1223                                  * yet we're reading, it can only mean we're
1224                                  * reading into a dummy buffer, so free it now.
1225                                  */
1226                                 DB_ASSERT(env, F_ISSET(resp, RESP_DUMMY_BUF));
1227                                 __os_free(env, resp->dbt.data);
1228                                 F_CLR(resp, RESP_IN_USE);
1229                         }
1230                         break;
1231
1232                 case REPMGR_RESP_ERROR:
1233                 default:
1234                         __db_errx(env, DB_STR_A("3621",
1235                             "unexpected msg type rcvd in ready state: %d",
1236                             "%d"), (int)conn->msg_type);
1237                         return (DB_REP_UNAVAIL);
1238                 }
1239                 break;
1240
1241         case CONN_DEFUNCT:
1242                 break;
1243
1244         default:
1245                 DB_ASSERT(env, FALSE);
1246         }
1247
1248         switch (conn->msg_type) {
1249         case REPMGR_HANDSHAKE:
1250         case REPMGR_PERMLSN:
1251                 dbt = &conn->input.repmgr_msg.cntrl;
1252                 if (dbt->size > 0)
1253                         __os_free(env, dbt->data);
1254                 dbt = &conn->input.repmgr_msg.rec;
1255                 if (dbt->size > 0)
1256                         __os_free(env, dbt->data);
1257                 break;
1258         default:
1259                 /*
1260                  * Some messages in REPMGR_OWN_MSG format are also handled
1261                  */
1262                 break;
1263         }
1264         __repmgr_reset_for_reading(conn);
1265         return (0);
1266 }
1267
1268 /*
1269  * Process one of repmgr's "own" message types, and one that occurs on a regular
1270  * (not one-shot) connection.
1271  */
1272 static int
1273 process_own_msg(env, conn)
1274         ENV *env;
1275         REPMGR_CONNECTION *conn;
1276 {
1277         DB_REP *db_rep;
1278         DBT *dbt;
1279         REPMGR_SITE *site;
1280         REPMGR_MESSAGE *msg;
1281         __repmgr_connect_reject_args reject;
1282         __repmgr_parm_refresh_args parms;
1283         int ret;
1284
1285         ret = 0;
1286         /*
1287          * Set "msg" to point to the message struct.  If we do all necessary
1288          * processing here now, leave it set so that it can be freed.  On the
1289          * other hand, if we pass it off to the message queue for later
1290          * processing by a message thread, we want to avoid freeing the memory
1291          * here, so clear the pointer in such a case.
1292          */
1293         switch (REPMGR_OWN_MSG_TYPE((msg = conn->input.rep_message)->msg_hdr)) {
1294         case REPMGR_CONNECT_REJECT:
1295                 dbt = &msg->v.gmdb_msg.request;
1296                 if ((ret = __repmgr_connect_reject_unmarshal(env,
1297                     &reject, dbt->data, dbt->size, NULL)) != 0)
1298                         return (DB_REP_UNAVAIL);
1299
1300                 /*
1301                  * If we're being rejected by someone who has more up-to-date
1302                  * membership information than we do, it means we have been
1303                  * removed from the group.  If we've just gotten started, we can
1304                  * make one attempt at automatically rejoining; otherwise we bow
1305                  * out gracefully.
1306                  */
1307                 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1308                         "got rejection msg citing version %lu/%lu",
1309                         (u_long)reject.gen, (u_long)reject.version));
1310
1311                 if (__repmgr_gmdb_version_cmp(env,
1312                     reject.gen, reject.version) > 0) {
1313                         if (env->rep_handle->seen_repmsg)
1314                                 ret = DB_DELETED;
1315                         else if ((ret = __repmgr_defer_op(env,
1316                             REPMGR_REJOIN)) == 0)
1317                                 ret = DB_REP_UNAVAIL;
1318                 } else
1319                         ret = DB_REP_UNAVAIL;
1320                 DB_ASSERT(env, ret != 0);
1321                 return (ret);
1322
1323         case REPMGR_SHARING:
1324                 if ((ret = __repmgr_queue_put(env, msg)) != 0)
1325                         return (ret);
1326                 /* Show that we no longer own this memory. */
1327                 msg = NULL;
1328                 break;
1329
1330         case REPMGR_PARM_REFRESH:
1331                 dbt = &conn->input.rep_message->v.gmdb_msg.request;
1332                 if ((ret = __repmgr_parm_refresh_unmarshal(env,
1333                     &parms, dbt->data, dbt->size, NULL)) != 0)
1334                         return (DB_REP_UNAVAIL);
1335                 db_rep = env->rep_handle;
1336                 DB_ASSERT(env, conn->type == REP_CONNECTION &&
1337                     IS_KNOWN_REMOTE_SITE(conn->eid));
1338                 site = SITE_FROM_EID(conn->eid);
1339                 site->ack_policy = (int)parms.ack_policy;
1340                 if (F_ISSET(&parms, ELECTABLE_SITE))
1341                         F_SET(site, SITE_ELECTABLE);
1342                 else
1343                         F_CLR(site, SITE_ELECTABLE);
1344                 F_SET(site, SITE_HAS_PRIO);
1345                 break;
1346
1347         case REPMGR_GM_FAILURE:
1348         case REPMGR_GM_FORWARD:
1349         case REPMGR_JOIN_REQUEST:
1350         case REPMGR_JOIN_SUCCESS:
1351         case REPMGR_REMOVE_REQUEST:
1352         case REPMGR_RESOLVE_LIMBO:
1353         default:
1354                 __db_errx(env, DB_STR_A("3677",
1355                     "unexpected msg type %lu in process_own_msg", "%lu"),
1356                     (u_long)REPMGR_OWN_MSG_TYPE(msg->msg_hdr));
1357                 return (DB_REP_UNAVAIL);
1358         }
1359         /*
1360          * If we haven't given ownership of the msg buffer to another thread,
1361          * free it now.
1362          */
1363         if (msg != NULL)
1364                 __os_free(env, msg);
1365         return (ret);
1366 }
1367
1368 /*
1369  * Examine and verify the incoming version proposal message, and send an
1370  * appropriate response.
1371  */
1372 static int
1373 send_version_response(env, conn)
1374         ENV *env;
1375         REPMGR_CONNECTION *conn;
1376 {
1377         DB_REP *db_rep;
1378         __repmgr_version_proposal_args versions;
1379         __repmgr_version_confirmation_args conf;
1380         repmgr_netaddr_t *my_addr;
1381         char *hostname;
1382         u_int8_t buf[__REPMGR_VERSION_CONFIRMATION_SIZE+1];
1383         DBT vi;
1384         int ret;
1385
1386         db_rep = env->rep_handle;
1387         my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1388
1389         if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1390                 return (ret);
1391         if (vi.size == 0) {
1392                 /* No version info, so we must be talking to a v1 site. */
1393                 hostname = conn->input.repmgr_msg.rec.data;
1394                 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1395                         return (ret);
1396                 if ((ret = __repmgr_send_v1_handshake(env,
1397                     conn, my_addr->host, strlen(my_addr->host) + 1)) != 0)
1398                         return (ret);
1399                 conn->state = CONN_READY;
1400         } else {
1401                 if ((ret = __repmgr_version_proposal_unmarshal(env,
1402                     &versions, vi.data, vi.size, NULL)) != 0)
1403                         return (DB_REP_UNAVAIL);
1404
1405                 if (DB_REPMGR_VERSION >= versions.min &&
1406                     DB_REPMGR_VERSION <= versions.max)
1407                         conf.version = DB_REPMGR_VERSION;
1408                 else if (versions.max >= DB_REPMGR_MIN_VERSION &&
1409                     versions.max <= DB_REPMGR_VERSION)
1410                         conf.version = versions.max;
1411                 else {
1412                         /*
1413                          * User must have wired up a combination of versions
1414                          * exceeding what we said we'd support.
1415                          */
1416                         __db_errx(env, DB_STR_A("3622",
1417                             "No available version between %lu and %lu",
1418                             "%lu %lu"), (u_long)versions.min,
1419                             (u_long)versions.max);
1420                         return (DB_REP_UNAVAIL);
1421                 }
1422                 conn->version = conf.version;
1423
1424                 __repmgr_version_confirmation_marshal(env, &conf, buf);
1425                 buf[__REPMGR_VERSION_CONFIRMATION_SIZE] = '\0';
1426                 DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1427                 if ((ret = __repmgr_send_handshake(env,
1428                      conn, buf, sizeof(buf), 0)) != 0)
1429                         return (ret);
1430
1431                 conn->state = CONN_PARAMETERS;
1432         }
1433         return (ret);
1434 }
1435
1436 /*
1437  * Sends a version-aware handshake to the remote site, only after we've verified
1438  * that it is indeed version-aware.  We can send either v2 or v3 handshake,
1439  * depending on the connection's version.
1440  *
1441  * PUBLIC: int __repmgr_send_handshake __P((ENV *,
1442  * PUBLIC:     REPMGR_CONNECTION *, void *, size_t, u_int32_t));
1443  */
1444 int
1445 __repmgr_send_handshake(env, conn, opt, optlen, flags)
1446         ENV *env;
1447         REPMGR_CONNECTION *conn;
1448         void *opt;
1449         size_t optlen;
1450         u_int32_t flags;
1451 {
1452         DB_REP *db_rep;
1453         REP *rep;
1454         DBT cntrl, rec;
1455         __repmgr_handshake_args hs;
1456         __repmgr_v2handshake_args v2hs;
1457         __repmgr_v3handshake_args v3hs;
1458         repmgr_netaddr_t *my_addr;
1459         size_t hostname_len, rec_len;
1460         void *buf;
1461         u_int8_t *p;
1462         u_int32_t cntrl_len;
1463         int ret;
1464
1465         db_rep = env->rep_handle;
1466         rep = db_rep->region;
1467         my_addr = &SITE_FROM_EID(db_rep->self_eid)->net_addr;
1468
1469         /*
1470          * The cntrl part has various parameters (varies by version).  The rec
1471          * part has the host name, followed by whatever optional extra data was
1472          * passed to us.
1473          *
1474          * Version awareness was introduced with protocol version 2 (so version
1475          * 1 is handled elsewhere).
1476          */
1477         switch (conn->version) {
1478         case 2:
1479                 cntrl_len = __REPMGR_V2HANDSHAKE_SIZE;
1480                 break;
1481         case 3:
1482                 cntrl_len = __REPMGR_V3HANDSHAKE_SIZE;
1483                 break;
1484         case 4:
1485                 cntrl_len = __REPMGR_HANDSHAKE_SIZE;
1486                 break;
1487         default:
1488                 __db_errx(env, DB_STR_A("3678",
1489                     "unexpected conn version %lu in send_handshake", "%lu"),
1490                     (u_long)conn->version);
1491                 return (DB_REP_UNAVAIL);
1492         }
1493         hostname_len = strlen(my_addr->host);
1494         rec_len = hostname_len + 1 +
1495             (opt == NULL ? 0 : optlen);
1496
1497         if ((ret = __os_malloc(env, cntrl_len + rec_len, &buf)) != 0)
1498                 return (ret);
1499
1500         cntrl.data = p = buf;
1501         switch (conn->version) {
1502         case 2:
1503                 /* Not allowed to use multi-process feature in v2 group. */
1504                 DB_ASSERT(env, !IS_SUBORDINATE(db_rep));
1505                 v2hs.port = my_addr->port;
1506                 v2hs.priority = rep->priority;
1507                 __repmgr_v2handshake_marshal(env, &v2hs, p);
1508                 break;
1509         case 3:
1510                 v3hs.port = my_addr->port;
1511                 v3hs.priority = rep->priority;
1512                 v3hs.flags = flags;
1513                 __repmgr_v3handshake_marshal(env, &v3hs, p);
1514                 break;
1515         case 4:
1516                 hs.port = my_addr->port;
1517                 hs.alignment = MEM_ALIGN;
1518                 hs.ack_policy = (u_int32_t)rep->perm_policy;
1519                 hs.flags = flags;
1520                 if (rep->priority > 0)
1521                         F_SET(&hs, ELECTABLE_SITE);
1522                 __repmgr_handshake_marshal(env, &hs, p);
1523                 break;
1524         default:
1525                 DB_ASSERT(env, FALSE);
1526                 break;
1527         }
1528         cntrl.size = cntrl_len;
1529
1530         p = rec.data = &p[cntrl_len];
1531         (void)strcpy((char*)p, my_addr->host);
1532         p += hostname_len + 1;
1533         if (opt != NULL) {
1534                 memcpy(p, opt, optlen);
1535                 p += optlen;
1536         }
1537         rec.size = (u_int32_t)(p - (u_int8_t*)rec.data);
1538
1539         /* Never block on select thread: pass maxblock as 0. */
1540         ret = __repmgr_send_one(env,
1541             conn, REPMGR_HANDSHAKE, &cntrl, &rec, 0);
1542         __os_free(env, buf);
1543         return (ret);
1544 }
1545
1546 static int
1547 read_version_response(env, conn)
1548         ENV *env;
1549         REPMGR_CONNECTION *conn;
1550 {
1551         DB_REP *db_rep;
1552         __repmgr_version_confirmation_args conf;
1553         DBT vi;
1554         char *hostname;
1555         u_int32_t flags;
1556         int ret;
1557
1558         db_rep = env->rep_handle;
1559
1560         if ((ret = __repmgr_find_version_info(env, conn, &vi)) != 0)
1561                 return (ret);
1562         hostname = conn->input.repmgr_msg.rec.data;
1563         if (vi.size == 0) {
1564                 if ((ret = accept_v1_handshake(env, conn, hostname)) != 0)
1565                         return (ret);
1566         } else {
1567                 if ((ret = __repmgr_version_confirmation_unmarshal(env,
1568                     &conf, vi.data, vi.size, NULL)) != 0)
1569                         return (DB_REP_UNAVAIL);
1570                 if (conf.version >= DB_REPMGR_MIN_VERSION &&
1571                     conf.version <= DB_REPMGR_VERSION)
1572                         conn->version = conf.version;
1573                 else {
1574                         /*
1575                          * Remote site "confirmed" a version outside of the
1576                          * range we proposed.  It should never do that.
1577                          */
1578                         __db_errx(env, DB_STR_A("3623",
1579                             "Can't support confirmed version %lu", "%lu"),
1580                             (u_long)conf.version);
1581                         return (DB_REP_UNAVAIL);
1582                 }
1583
1584                 if ((ret = accept_handshake(env, conn, hostname)) != 0)
1585                         return (ret);
1586                 flags = IS_SUBORDINATE(db_rep) ? REPMGR_SUBORDINATE : 0;
1587                 if ((ret = __repmgr_send_handshake(env,
1588                     conn, NULL, 0, flags)) != 0)
1589                         return (ret);
1590         }
1591         conn->state = CONN_READY;
1592         return (ret);
1593 }
1594
1595 /*
1596  * Examine the rec part of a handshake message to see if it has any version
1597  * information in it.  This is the magic that lets us allows version-aware sites
1598  * to exchange information, and yet avoids tripping up v1 sites, which don't
1599  * know how to look for it.
1600  *
1601  * PUBLIC: int __repmgr_find_version_info __P((ENV *,
1602  * PUBLIC:     REPMGR_CONNECTION *, DBT *));
1603  */
1604 int
1605 __repmgr_find_version_info(env, conn, vi)
1606         ENV *env;
1607         REPMGR_CONNECTION *conn;
1608         DBT *vi;
1609 {
1610         DBT *dbt;
1611         char *hostname;
1612         u_int32_t hostname_len;
1613
1614         dbt = &conn->input.repmgr_msg.rec;
1615         if (dbt->size == 0) {
1616                 __db_errx(env, DB_STR("3624",
1617                     "handshake is missing rec part"));
1618                 return (DB_REP_UNAVAIL);
1619         }
1620         hostname = dbt->data;
1621         hostname[dbt->size-1] = '\0';
1622         hostname_len = (u_int32_t)strlen(hostname);
1623         if (hostname_len + 1 == dbt->size) {
1624                 /*
1625                  * The rec DBT held only the host name.  This is a simple legacy
1626                  * V1 handshake; it contains no version information.
1627                  */
1628                 vi->size = 0;
1629         } else {
1630                 /*
1631                  * There's more data than just the host name.  The remainder is
1632                  * available to be treated as a normal byte buffer (and read in
1633                  * by one of the unmarshal functions).  Note that the remaining
1634                  * length should not include the padding byte that we have
1635                  * already clobbered.
1636                  */
1637                 vi->data = &((u_int8_t *)dbt->data)[hostname_len + 1];
1638                 vi->size = (dbt->size - (hostname_len+1)) - 1;
1639         }
1640         return (0);
1641 }
1642
1643 static int
1644 accept_handshake(env, conn, hostname)
1645         ENV *env;
1646         REPMGR_CONNECTION *conn;
1647         char *hostname;
1648 {
1649         __repmgr_handshake_args hs;
1650         __repmgr_v2handshake_args hs2;
1651         __repmgr_v3handshake_args hs3;
1652         u_int port;
1653         u_int32_t ack, flags;
1654         int electable;
1655
1656         switch (conn->version) {
1657         case 2:
1658                 if (__repmgr_v2handshake_unmarshal(env, &hs2,
1659                     conn->input.repmgr_msg.cntrl.data,
1660                     conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1661                         return (DB_REP_UNAVAIL);
1662                 port = hs2.port;
1663                 electable = hs2.priority > 0;
1664                 ack = flags = 0;
1665                 break;
1666         case 3:
1667                 if (__repmgr_v3handshake_unmarshal(env, &hs3,
1668                    conn->input.repmgr_msg.cntrl.data,
1669                    conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1670                         return (DB_REP_UNAVAIL);
1671                 port = hs3.port;
1672                 electable = hs3.priority > 0;
1673                 flags = hs3.flags;
1674                 ack = 0;
1675                 break;
1676         case 4:
1677                 if (__repmgr_handshake_unmarshal(env, &hs,
1678                    conn->input.repmgr_msg.cntrl.data,
1679                    conn->input.repmgr_msg.cntrl.size, NULL) != 0)
1680                         return (DB_REP_UNAVAIL);
1681                 port = hs.port;
1682                 electable = F_ISSET(&hs, ELECTABLE_SITE);
1683                 flags = hs.flags;
1684                 ack = hs.ack_policy;
1685                 break;
1686         default:
1687                 __db_errx(env, DB_STR_A("3679",
1688                     "unexpected conn version %lu in accept_handshake", "%lu"),
1689                     (u_long)conn->version);
1690                 return (DB_REP_UNAVAIL);
1691         }
1692
1693         return (process_parameters(env,
1694             conn, hostname, port, ack, electable, flags));
1695 }
1696
1697 static int
1698 accept_v1_handshake(env, conn, hostname)
1699         ENV *env;
1700         REPMGR_CONNECTION *conn;
1701         char *hostname;
1702 {
1703         DB_REPMGR_V1_HANDSHAKE *handshake;
1704         u_int32_t prio;
1705         int electable;
1706
1707         handshake = conn->input.repmgr_msg.cntrl.data;
1708         if (conn->input.repmgr_msg.cntrl.size != sizeof(*handshake) ||
1709             handshake->version != 1) {
1710                 __db_errx(env, DB_STR("3625", "malformed V1 handshake"));
1711                 return (DB_REP_UNAVAIL);
1712         }
1713
1714         conn->version = 1;
1715         prio = ntohl(handshake->priority);
1716         electable = prio > 0;
1717         return (process_parameters(env,
1718             conn, hostname, handshake->port, 0, electable, 0));
1719 }
1720
1721 /* Caller must hold mutex. */
1722 static int
1723 process_parameters(env, conn, host, port, ack, electable, flags)
1724         ENV *env;
1725         REPMGR_CONNECTION *conn;
1726         char *host;
1727         u_int port;
1728         int electable;
1729         u_int32_t ack, flags;
1730 {
1731         DB_REP *db_rep;
1732         REPMGR_RETRY *retry;
1733         REPMGR_SITE *site;
1734         __repmgr_connect_reject_args reject;
1735         u_int8_t reject_buf[__REPMGR_CONNECT_REJECT_SIZE];
1736         int eid, ret;
1737
1738         db_rep = env->rep_handle;
1739
1740         /* Connection state can be used to discern incoming versus outgoing. */
1741         if (conn->state == CONN_CONNECTED) {
1742                 /*
1743                  * Since we initiated this as an outgoing connection, we
1744                  * obviously already know the host, port and site.  We just need
1745                  * the other site's electability flag (which we'll grab below,
1746                  * after the big "else" clause).
1747                  */
1748                 DB_ASSERT(env, IS_KNOWN_REMOTE_SITE(conn->eid));
1749                 site = SITE_FROM_EID(conn->eid);
1750                 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1751                     "handshake from connection to %s:%lu EID %u",
1752                     site->net_addr.host,
1753                     (u_long)site->net_addr.port, conn->eid));
1754         } else {
1755                 DB_ASSERT(env, conn->state == CONN_NEGOTIATE ||
1756                     conn->state == CONN_PARAMETERS);
1757                 /*
1758                  * Incoming connection: until now we haven't known what kind of
1759                  * connection we're dealing with (and in the case of a
1760                  * REP_CONNECTION, what its EID is); so it must be on the
1761                  * "orphans" list.  But now that we've received the parameters
1762                  * we'll be able to figure all that out.
1763                  */
1764                 if (LF_ISSET(APP_CHANNEL_CONNECTION)) {
1765                         conn->type = APP_CONNECTION;
1766                         return (0);
1767                 } else
1768                         conn->type = REP_CONNECTION;
1769
1770                 /*
1771                  * Now that we've been given the host and port, use them to find
1772                  * the site.
1773                  */
1774                 if ((site = __repmgr_lookup_site(env, host, port)) != NULL &&
1775                     site->membership == SITE_PRESENT) {
1776                         TAILQ_REMOVE(&db_rep->connections, conn, entries);
1777                         conn->ref_count--;
1778
1779                         eid = EID_FROM_SITE(site);
1780                         if (LF_ISSET(REPMGR_SUBORDINATE)) {
1781                                 /*
1782                                  * Accept it, as a supplementary source of
1783                                  * input, but nothing else.
1784                                  */
1785                                 TAILQ_INSERT_TAIL(&site->sub_conns,
1786                                     conn, entries);
1787                                 conn->eid = eid;
1788                         } else {
1789                                 DB_EVENT(env,
1790                                     DB_EVENT_REP_CONNECT_ESTD, &eid);
1791                                 switch (site->state) {
1792                                 case SITE_PAUSING:
1793                                         RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1794                                       "handshake from paused site %s:%u EID %u",
1795                                             host, port, eid));
1796                                         retry = site->ref.retry;
1797                                         TAILQ_REMOVE(&db_rep->retries,
1798                                             retry, entries);
1799                                         __os_free(env, retry);
1800                                         break;
1801                                 case SITE_CONNECTED:
1802                                         /*
1803                                          * We got an incoming connection for a
1804                                          * site we were already connected to; at
1805                                          * least we thought we were.
1806                                          */
1807                                         RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1808                          "connection from %s:%u EID %u while already connected",
1809                                             host, port, eid));
1810                                         if ((ret = resolve_collision(env,
1811                                             site, conn)) != 0)
1812                                                 return (ret);
1813                                         break;
1814                                 case SITE_CONNECTING:
1815                                         RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1816                                   "handshake from connecting site %s:%u EID %u",
1817                                             host, port, eid));
1818                                         /*
1819                                          * Connector thread will give up when it
1820                                          * sees this site's state change, so we
1821                                          * don't have to do anything else here.
1822                                          */
1823                                         break;
1824                                 default:
1825                                         DB_ASSERT(env, FALSE);
1826                                 }
1827                                 conn->eid = eid;
1828                                 site->state = SITE_CONNECTED;
1829                                 site->ref.conn.in = conn;
1830                                 __os_gettime(env,
1831                                     &site->last_rcvd_timestamp, 1);
1832                         }
1833                 } else {
1834                         RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1835                   "rejecting connection from unknown or provisional site %s:%u",
1836                             host, port));
1837                         reject.version = db_rep->membership_version;
1838                         reject.gen = db_rep->member_version_gen;
1839                         __repmgr_connect_reject_marshal(env,
1840                             &reject, reject_buf);
1841
1842                         if ((ret = __repmgr_send_own_msg(env, conn,
1843                             REPMGR_CONNECT_REJECT, reject_buf,
1844                             __REPMGR_CONNECT_REJECT_SIZE)) != 0)
1845                                 return (ret);
1846
1847                         /*
1848                          * Since we haven't set conn->eid, bust_connection will
1849                          * not schedule a retry for this "failure", which is
1850                          * exactly what we want.
1851                          */
1852                         return (DB_REP_UNAVAIL);
1853                 }
1854         }
1855
1856         if (electable)
1857                 F_SET(site, SITE_ELECTABLE);
1858         else
1859                 F_CLR(site, SITE_ELECTABLE);
1860         F_SET(site, SITE_HAS_PRIO);
1861         site->ack_policy = (int)ack;
1862
1863         /*
1864          * If we're moping around wishing we knew who the master was, then
1865          * getting in touch with another site might finally provide sufficient
1866          * connectivity to find out.
1867          */
1868         if (!IS_SUBORDINATE(db_rep) && /* us */
1869             !__repmgr_master_is_known(env) &&
1870             !LF_ISSET(REPMGR_SUBORDINATE)) { /* the remote site */
1871                 RPRINT(env, (env, DB_VERB_REPMGR_MISC,
1872                     "handshake with no known master to wake election thread"));
1873                 db_rep->new_connection = TRUE;
1874                 if ((ret = __repmgr_signal(&db_rep->check_election)) != 0)
1875                         return (ret);
1876         }
1877
1878         return (0);
1879 }
1880
1881 static int
1882 resolve_collision(env, site, conn)
1883         ENV *env;
1884         REPMGR_SITE *site;
1885         REPMGR_CONNECTION *conn;
1886 {
1887         int ret;
1888
1889         /*
1890          * No need for site-oriented recovery, since we now have a replacement
1891          * connection; so skip bust_connection() and call disable_conn()
1892          * directly.
1893          *
1894          * If we already had an incoming connection, this new one always
1895          * replaces it.  Whether it also/alternatively replaces an outgoing
1896          * connection depends on whether we're client or server (so as to avoid
1897          * connection collisions resulting in no remaining connections).  (If
1898          * it's an older version that doesn't know about our collision
1899          * resolution protocol, it will behave like a client.)
1900          */
1901         if (site->ref.conn.in != NULL) {
1902                 ret = __repmgr_disable_connection(env, site->ref.conn.in);
1903                 site->ref.conn.in = NULL;
1904                 if (ret != 0)
1905                         return (ret);
1906         }
1907         if (site->ref.conn.out != NULL &&
1908             conn->version >= CONN_COLLISION_VERSION &&
1909             __repmgr_is_server(env, site)) {
1910                 ret = __repmgr_disable_connection(env, site->ref.conn.out);
1911                 site->ref.conn.out = NULL;
1912                 if (ret != 0)
1913                         return (ret);
1914         }
1915         return (0);
1916 }
1917
1918 static int
1919 record_permlsn(env, conn)
1920         ENV *env;
1921         REPMGR_CONNECTION *conn;
1922 {
1923         DB_REP *db_rep;
1924         REPMGR_SITE *site;
1925         __repmgr_permlsn_args *ackp, ack;
1926         SITE_STRING_BUFFER location;
1927         u_int32_t gen;
1928         int ret;
1929         u_int do_log_check;
1930
1931         db_rep = env->rep_handle;
1932         do_log_check = 0;
1933
1934         if (conn->version == 0 ||
1935             !IS_READY_STATE(conn->state) || !IS_VALID_EID(conn->eid)) {
1936                 __db_errx(env, DB_STR("3682",
1937                     "unexpected connection info in record_permlsn"));
1938                 return (DB_REP_UNAVAIL);
1939         }
1940         site = SITE_FROM_EID(conn->eid);
1941
1942         /*
1943          * Extract the LSN.  Save it only if it is an improvement over what the
1944          * site has already ack'ed.
1945          */
1946         if (conn->version == 1) {
1947                 ackp = conn->input.repmgr_msg.cntrl.data;
1948                 if (conn->input.repmgr_msg.cntrl.size != sizeof(ack) ||
1949                     conn->input.repmgr_msg.rec.size != 0) {
1950                         __db_errx(env, DB_STR("3627", "bad ack msg size"));
1951                         return (DB_REP_UNAVAIL);
1952                 }
1953         } else {
1954                 ackp = &ack;
1955                 if ((ret = __repmgr_permlsn_unmarshal(env, ackp,
1956                          conn->input.repmgr_msg.cntrl.data,
1957                          conn->input.repmgr_msg.cntrl.size, NULL)) != 0)
1958                         return (DB_REP_UNAVAIL);
1959         }
1960
1961         /* Ignore stale acks. */
1962         gen = db_rep->region->gen;
1963         if (ackp->generation < gen) {
1964                 VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1965                     "ignoring stale ack (%lu<%lu), from %s",
1966                      (u_long)ackp->generation, (u_long)gen,
1967                      __repmgr_format_site_loc(site, location)));
1968                 return (0);
1969         }
1970         VPRINT(env, (env, DB_VERB_REPMGR_MISC,
1971             "got ack [%lu][%lu](%lu) from %s", (u_long)ackp->lsn.file,
1972             (u_long)ackp->lsn.offset, (u_long)ackp->generation,
1973             __repmgr_format_site_loc(site, location)));
1974
1975         if (ackp->generation == gen &&
1976             LOG_COMPARE(&ackp->lsn, &site->max_ack) == 1) {
1977                 /*
1978                  * If file number for this site changed, check lowest log
1979                  * file needed after recording new permlsn for this site.
1980                  */
1981                 if (ackp->lsn.file > site->max_ack.file)
1982                         do_log_check = 1;
1983                 memcpy(&site->max_ack, &ackp->lsn, sizeof(DB_LSN));
1984                 if (do_log_check)
1985                         check_min_log_file(env);
1986                 if ((ret = __repmgr_wake_waiters(env,
1987                     &db_rep->ack_waiters)) != 0)
1988                         return (ret);
1989         }
1990         return (0);
1991 }
1992
1993 /*
1994  * Maintains lowest log file still needed by the repgroup.  This is stored
1995  * in shared rep region so that it is accessible to repmgr subordinate
1996  * processes that may not themselves have connections to other sites
1997  * (e.g. a separate db_archive process.)
1998  */
1999 static void
2000 check_min_log_file(env)
2001         ENV *env;
2002 {
2003         DB_REP *db_rep;
2004         REP *rep;
2005         REPMGR_CONNECTION *conn;
2006         REPMGR_SITE *site;
2007         u_int32_t min_log;
2008         int eid;
2009
2010         db_rep = env->rep_handle;
2011         rep = db_rep->region;
2012         min_log = 0;
2013
2014         /*
2015          * Record the lowest log file number from all connected sites.  If this
2016          * is a client, ignore the master because the master does not maintain
2017          * nor send out its repmgr perm LSN in this way.  Consider connections
2018          * so that we don't allow a site that has been down a long time to
2019          * indefinitely prevent log archiving.
2020          */
2021         FOR_EACH_REMOTE_SITE_INDEX(eid) {
2022                 if (eid == rep->master_id)
2023                         continue;
2024                 site = SITE_FROM_EID(eid);
2025                 if (site->state == SITE_CONNECTED &&
2026                     (((conn = site->ref.conn.in) != NULL &&
2027                     conn->state == CONN_READY) ||
2028                     ((conn = site->ref.conn.out) != NULL &&
2029                     conn->state == CONN_READY)) &&
2030                     !IS_ZERO_LSN(site->max_ack) &&
2031                     (min_log == 0 || site->max_ack.file < min_log))
2032                         min_log = site->max_ack.file;
2033         }
2034         /*
2035          * During normal operation min_log should increase over time, but it
2036          * is possible if a site returns after being disconnected for a while
2037          * that min_log could decrease.
2038          */
2039         if (min_log != 0 && min_log != rep->min_log_file)
2040                 rep->min_log_file = min_log;
2041 }
2042
2043 /*
2044  * PUBLIC: int __repmgr_write_some __P((ENV *, REPMGR_CONNECTION *));
2045  */
2046 int
2047 __repmgr_write_some(env, conn)
2048         ENV *env;
2049         REPMGR_CONNECTION *conn;
2050 {
2051         QUEUED_OUTPUT *output;
2052         REPMGR_FLAT *msg;
2053         int bytes, ret;
2054
2055         while (!STAILQ_EMPTY(&conn->outbound_queue)) {
2056                 output = STAILQ_FIRST(&conn->outbound_queue);
2057                 msg = output->msg;
2058                 if ((bytes = sendsocket(conn->fd, &msg->data[output->offset],
2059                     msg->length - output->offset, 0)) == SOCKET_ERROR) {
2060                         switch (ret = net_errno) {
2061                         case WOULDBLOCK:
2062 #if defined(DB_REPMGR_EAGAIN) && DB_REPMGR_EAGAIN != WOULDBLOCK
2063                         case DB_REPMGR_EAGAIN:
2064 #endif
2065                                 return (0);
2066                         default:
2067                                 __repmgr_fire_conn_err_event(env, conn, ret);
2068                                 STAT(env->rep_handle->
2069                                     region->mstat.st_connection_drop++);
2070                                 return (DB_REP_UNAVAIL);
2071                         }
2072                 }
2073
2074                 if ((output->offset += (size_t)bytes) >= msg->length) {
2075                         STAILQ_REMOVE_HEAD(&conn->outbound_queue, entries);
2076                         __os_free(env, output);
2077                         conn->out_queue_length--;
2078                         if (--msg->ref_count <= 0)
2079                                 __os_free(env, msg);
2080
2081                         /*
2082                          * We've achieved enough movement to free up at least
2083                          * one space in the outgoing queue.  Wake any message
2084                          * threads that may be waiting for space.  Leave
2085                          * CONGESTED state so that when the queue reaches the
2086                          * high-water mark again, the filling thread will be
2087                          * allowed to try waiting again.
2088                          */
2089                         conn->state = CONN_READY;
2090                         if ((ret = __repmgr_signal(&conn->drained)) != 0)
2091                                 return (ret);
2092                 }
2093         }
2094
2095         return (0);
2096 }