corosync  2.4.2
totemsrp.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2003-2006 MontaVista Software, Inc.
3  * Copyright (c) 2006-2009 Red Hat, Inc.
4  *
5  * All rights reserved.
6  *
7  * Author: Steven Dake (sdake@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 /*
37  * The first version of this code was based upon Yair Amir's PhD thesis:
38  * http://www.cs.jhu.edu/~yairamir/phd.ps) (ch4,5).
39  *
40  * The current version of totemsrp implements the Totem protocol specified in:
41  * http://citeseer.ist.psu.edu/amir95totem.html
42  *
43  * The deviations from the above published protocols are:
44  * - encryption of message contents with nss
45  * - authentication of meessage contents with SHA1/HMAC
46  * - token hold mode where token doesn't rotate on unused ring - reduces cpu
47  * usage on 1.6ghz xeon from 35% to less then .1 % as measured by top
48  */
49 
50 #include <config.h>
51 
52 #include <assert.h>
53 #ifdef HAVE_ALLOCA_H
54 #include <alloca.h>
55 #endif
56 #include <sys/mman.h>
57 #include <sys/types.h>
58 #include <sys/stat.h>
59 #include <sys/socket.h>
60 #include <netdb.h>
61 #include <sys/un.h>
62 #include <sys/ioctl.h>
63 #include <sys/param.h>
64 #include <netinet/in.h>
65 #include <arpa/inet.h>
66 #include <unistd.h>
67 #include <fcntl.h>
68 #include <stdlib.h>
69 #include <stdio.h>
70 #include <errno.h>
71 #include <sched.h>
72 #include <time.h>
73 #include <sys/time.h>
74 #include <sys/poll.h>
75 #include <sys/uio.h>
76 #include <limits.h>
77 
78 #include <qb/qbdefs.h>
79 #include <qb/qbutil.h>
80 #include <qb/qbloop.h>
81 
82 #include <corosync/swab.h>
83 #include <corosync/sq.h>
84 #include <corosync/list.h>
85 
86 #define LOGSYS_UTILS_ONLY 1
87 #include <corosync/logsys.h>
88 
89 #include "totemsrp.h"
90 #include "totemrrp.h"
91 #include "totemnet.h"
92 
93 #include "cs_queue.h"
94 
95 #define LOCALHOST_IP inet_addr("127.0.0.1")
96 #define QUEUE_RTR_ITEMS_SIZE_MAX 16384 /* allow 16384 retransmit items */
97 #define RETRANS_MESSAGE_QUEUE_SIZE_MAX 16384 /* allow 500 messages to be queued */
98 #define RECEIVED_MESSAGE_QUEUE_SIZE_MAX 500 /* allow 500 messages to be queued */
99 #define MAXIOVS 5
100 #define RETRANSMIT_ENTRIES_MAX 30
101 #define TOKEN_SIZE_MAX 64000 /* bytes */
102 #define LEAVE_DUMMY_NODEID 0
103 
104 /*
105  * Rollover handling:
106  * SEQNO_START_MSG is the starting sequence number after a new configuration
107  * This should remain zero, unless testing overflow in which case
108  * 0x7ffff000 and 0xfffff000 are good starting values.
109  *
110  * SEQNO_START_TOKEN is the starting sequence number after a new configuration
111  * for a token. This should remain zero, unless testing overflow in which
112  * case 07fffff00 or 0xffffff00 are good starting values.
113  */
114 #define SEQNO_START_MSG 0x0
115 #define SEQNO_START_TOKEN 0x0
116 
117 /*
118  * These can be used ot test different rollover points
119  * #define SEQNO_START_MSG 0xfffffe00
120  * #define SEQNO_START_TOKEN 0xfffffe00
121  */
122 
123 /*
124  * These can be used to test the error recovery algorithms
125  * #define TEST_DROP_ORF_TOKEN_PERCENTAGE 30
126  * #define TEST_DROP_COMMIT_TOKEN_PERCENTAGE 30
127  * #define TEST_DROP_MCAST_PERCENTAGE 50
128  * #define TEST_RECOVERY_MSG_COUNT 300
129  */
130 
131 /*
132  * we compare incoming messages to determine if their endian is
133  * different - if so convert them
134  *
135  * do not change
136  */
137 #define ENDIAN_LOCAL 0xff22
138 
140  MESSAGE_TYPE_ORF_TOKEN = 0, /* Ordering, Reliability, Flow (ORF) control Token */
141  MESSAGE_TYPE_MCAST = 1, /* ring ordered multicast message */
142  MESSAGE_TYPE_MEMB_MERGE_DETECT = 2, /* merge rings if there are available rings */
143  MESSAGE_TYPE_MEMB_JOIN = 3, /* membership join message */
144  MESSAGE_TYPE_MEMB_COMMIT_TOKEN = 4, /* membership commit token */
145  MESSAGE_TYPE_TOKEN_HOLD_CANCEL = 5, /* cancel the holding of the token */
146 };
147 
151 };
152 
153 /*
154  * New membership algorithm local variables
155  */
157  struct srp_addr addr;
158  int set;
159 };
160 
161 
163  struct list_head list;
164  int (*callback_fn) (enum totem_callback_token_type type, const void *);
166  int delete;
167  void *data;
168 };
169 
170 
172  int mcast;
173  int token;
174 };
175 
176 struct message_header {
177  char type;
178  char encapsulated;
179  unsigned short endian_detector;
180  unsigned int nodeid;
181 } __attribute__((packed));
182 
183 
184 struct mcast {
187  unsigned int seq;
190  unsigned int node_id;
192 } __attribute__((packed));
193 
194 
195 struct rtr_item {
197  unsigned int seq;
198 }__attribute__((packed));
199 
200 
201 struct orf_token {
203  unsigned int seq;
204  unsigned int token_seq;
205  unsigned int aru;
206  unsigned int aru_addr;
208  unsigned int backlog;
209  unsigned int fcc;
212  struct rtr_item rtr_list[0];
213 }__attribute__((packed));
214 
215 
216 struct memb_join {
219  unsigned int proc_list_entries;
220  unsigned int failed_list_entries;
221  unsigned long long ring_seq;
222  unsigned char end_of_memb_join[0];
223 /*
224  * These parts of the data structure are dynamic:
225  * struct srp_addr proc_list[];
226  * struct srp_addr failed_list[];
227  */
228 } __attribute__((packed));
229 
230 
235 } __attribute__((packed));
236 
237 
241 } __attribute__((packed));
242 
243 
246  unsigned int aru;
247  unsigned int high_delivered;
248  unsigned int received_flg;
249 }__attribute__((packed));
250 
251 
254  unsigned int token_seq;
256  unsigned int retrans_flg;
259  unsigned char end_of_commit_token[0];
260 /*
261  * These parts of the data structure are dynamic:
262  *
263  * struct srp_addr addr[PROCESSOR_COUNT_MAX];
264  * struct memb_commit_token_memb_entry memb_list[PROCESSOR_COUNT_MAX];
265  */
266 }__attribute__((packed));
267 
268 struct message_item {
269  struct mcast *mcast;
270  unsigned int msg_len;
271 };
272 
274  struct mcast *mcast;
275  unsigned int msg_len;
276 };
277 
283 };
284 
287 
289 
290  /*
291  * Flow control mcasts and remcasts on last and current orf_token
292  */
294 
296 
298 
300 
302 
303  struct srp_addr my_id;
304 
306 
308 
310 
312 
314 
316 
318 
320 
322 
324 
326 
328 
330 
332 
334 
336 
338 
340 
342 
344 
345  unsigned int my_last_aru;
346 
348 
350 
351  unsigned int my_high_seq_received;
352 
353  unsigned int my_install_seq;
354 
356 
358 
360 
362 
364 
365  /*
366  * Queues used to order, deliver, and recover messages
367  */
369 
371 
373 
375 
377 
378  /*
379  * Received up to and including
380  */
381  unsigned int my_aru;
382 
383  unsigned int my_high_delivered;
384 
386 
388 
390 
392 
393  unsigned int my_token_seq;
394 
395  /*
396  * Timers
397  */
398  qb_loop_timer_handle timer_pause_timeout;
399 
400  qb_loop_timer_handle timer_orf_token_timeout;
401 
403 
405 
406  qb_loop_timer_handle timer_merge_detect_timeout;
407 
409 
411 
412  qb_loop_timer_handle memb_timer_state_commit_timeout;
413 
414  qb_loop_timer_handle timer_heartbeat_timeout;
415 
416  /*
417  * Function and data used to log messages
418  */
420 
422 
424 
426 
428 
430 
432 
434  int level,
435  int sybsys,
436  const char *function,
437  const char *file,
438  int line,
439  const char *format, ...)__attribute__((format(printf, 6, 7)));;
440 
442 
443 //TODO struct srp_addr next_memb;
444 
446 
448 
450  unsigned int nodeid,
451  const void *msg,
452  unsigned int msg_len,
453  int endian_conversion_required);
454 
456  enum totem_configuration_type configuration_type,
457  const unsigned int *member_list, size_t member_list_entries,
458  const unsigned int *left_list, size_t left_list_entries,
459  const unsigned int *joined_list, size_t joined_list_entries,
460  const struct memb_ring_id *ring_id);
461 
463 
465  int waiting_trans_ack);
466 
468  struct memb_ring_id *memb_ring_id,
469  const struct totem_ip_address *addr);
470 
472  const struct memb_ring_id *memb_ring_id,
473  const struct totem_ip_address *addr);
474 
476 
478 
479  unsigned long long token_ring_id_seq;
480 
481  unsigned int last_released;
482 
483  unsigned int set_aru;
484 
486 
488 
490 
491  unsigned int my_last_seq;
492 
493  struct timeval tv_old;
494 
496 
498 
499  unsigned int use_heartbeat;
500 
501  unsigned int my_trc;
502 
503  unsigned int my_pbl;
504 
505  unsigned int my_cbl;
506 
507  uint64_t pause_timestamp;
508 
510 
512 
514 
516 
518 
520 
521  int flushing;
522 
525  char commit_token_storage[40000];
526 };
527 
529  int count;
530  int (*handler_functions[6]) (
531  struct totemsrp_instance *instance,
532  const void *msg,
533  size_t msg_len,
534  int endian_conversion_needed);
535 };
536 
555 };
556 
557 const char* gather_state_from_desc [] = {
558  [TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT] = "consensus timeout",
559  [TOTEMSRP_GSFROM_GATHER_MISSING1] = "MISSING",
560  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE] = "The token was lost in the OPERATIONAL state.",
561  [TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED] = "The consensus timeout expired.",
562  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE] = "The token was lost in the COMMIT state.",
563  [TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE] = "The token was lost in the RECOVERY state.",
564  [TOTEMSRP_GSFROM_FAILED_TO_RECEIVE] = "failed to receive",
565  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE] = "foreign message in operational state",
566  [TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE] = "foreign message in gather state",
567  [TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE] = "merge during operational state",
568  [TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE] = "merge during gather state",
569  [TOTEMSRP_GSFROM_MERGE_DURING_JOIN] = "merge during join",
570  [TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE] = "join during operational state",
571  [TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE] = "join during commit state",
572  [TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY] = "join during recovery",
573  [TOTEMSRP_GSFROM_INTERFACE_CHANGE] = "interface change",
574 };
575 
576 /*
577  * forward decls
578  */
579 static int message_handler_orf_token (
580  struct totemsrp_instance *instance,
581  const void *msg,
582  size_t msg_len,
583  int endian_conversion_needed);
584 
585 static int message_handler_mcast (
586  struct totemsrp_instance *instance,
587  const void *msg,
588  size_t msg_len,
589  int endian_conversion_needed);
590 
591 static int message_handler_memb_merge_detect (
592  struct totemsrp_instance *instance,
593  const void *msg,
594  size_t msg_len,
595  int endian_conversion_needed);
596 
597 static int message_handler_memb_join (
598  struct totemsrp_instance *instance,
599  const void *msg,
600  size_t msg_len,
601  int endian_conversion_needed);
602 
603 static int message_handler_memb_commit_token (
604  struct totemsrp_instance *instance,
605  const void *msg,
606  size_t msg_len,
607  int endian_conversion_needed);
608 
609 static int message_handler_token_hold_cancel (
610  struct totemsrp_instance *instance,
611  const void *msg,
612  size_t msg_len,
613  int endian_conversion_needed);
614 
615 static void totemsrp_instance_initialize (struct totemsrp_instance *instance);
616 
617 static unsigned int main_msgs_missing (void);
618 
619 static void main_token_seqid_get (
620  const void *msg,
621  unsigned int *seqid,
622  unsigned int *token_is);
623 
624 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src);
625 
626 static void srp_addr_to_nodeid (
627  unsigned int *nodeid_out,
628  struct srp_addr *srp_addr_in,
629  unsigned int entries);
630 
631 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b);
632 
633 static void memb_leave_message_send (struct totemsrp_instance *instance);
634 
635 static void token_callbacks_execute (struct totemsrp_instance *instance, enum totem_callback_token_type type);
636 static void memb_state_gather_enter (struct totemsrp_instance *instance, enum gather_state_from gather_from);
637 static void messages_deliver_to_app (struct totemsrp_instance *instance, int skip, unsigned int end_point);
638 static int orf_token_mcast (struct totemsrp_instance *instance, struct orf_token *oken,
639  int fcc_mcasts_allowed);
640 static void messages_free (struct totemsrp_instance *instance, unsigned int token_aru);
641 
642 static void memb_ring_id_set (struct totemsrp_instance *instance,
643  const struct memb_ring_id *ring_id);
644 static void target_set_completed (void *context);
645 static void memb_state_commit_token_update (struct totemsrp_instance *instance);
646 static void memb_state_commit_token_target_set (struct totemsrp_instance *instance);
647 static int memb_state_commit_token_send (struct totemsrp_instance *instance);
648 static int memb_state_commit_token_send_recovery (struct totemsrp_instance *instance, struct memb_commit_token *memb_commit_token);
649 static void memb_state_commit_token_create (struct totemsrp_instance *instance);
650 static int token_hold_cancel_send (struct totemsrp_instance *instance);
651 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out);
652 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out);
653 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out);
654 static void mcast_endian_convert (const struct mcast *in, struct mcast *out);
655 static void memb_merge_detect_endian_convert (
656  const struct memb_merge_detect *in,
657  struct memb_merge_detect *out);
658 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in);
659 static void timer_function_orf_token_timeout (void *data);
660 static void timer_function_pause_timeout (void *data);
661 static void timer_function_heartbeat_timeout (void *data);
662 static void timer_function_token_retransmit_timeout (void *data);
663 static void timer_function_token_hold_retransmit_timeout (void *data);
664 static void timer_function_merge_detect_timeout (void *data);
665 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance);
666 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr);
667 static const char* gsfrom_to_msg(enum gather_state_from gsfrom);
668 
669 void main_deliver_fn (
670  void *context,
671  const void *msg,
672  unsigned int msg_len);
673 
675  void *context,
676  const struct totem_ip_address *iface_address,
677  unsigned int iface_no);
678 
680  6,
681  {
682  message_handler_orf_token, /* MESSAGE_TYPE_ORF_TOKEN */
683  message_handler_mcast, /* MESSAGE_TYPE_MCAST */
684  message_handler_memb_merge_detect, /* MESSAGE_TYPE_MEMB_MERGE_DETECT */
685  message_handler_memb_join, /* MESSAGE_TYPE_MEMB_JOIN */
686  message_handler_memb_commit_token, /* MESSAGE_TYPE_MEMB_COMMIT_TOKEN */
687  message_handler_token_hold_cancel /* MESSAGE_TYPE_TOKEN_HOLD_CANCEL */
688  }
689 };
690 
691 #define log_printf(level, format, args...) \
692 do { \
693  instance->totemsrp_log_printf ( \
694  level, instance->totemsrp_subsys_id, \
695  __FUNCTION__, __FILE__, __LINE__, \
696  format, ##args); \
697 } while (0);
698 #define LOGSYS_PERROR(err_num, level, fmt, args...) \
699 do { \
700  char _error_str[LOGSYS_MAX_PERROR_MSG_LEN]; \
701  const char *_error_ptr = qb_strerror_r(err_num, _error_str, sizeof(_error_str)); \
702  instance->totemsrp_log_printf ( \
703  level, instance->totemsrp_subsys_id, \
704  __FUNCTION__, __FILE__, __LINE__, \
705  fmt ": %s (%d)\n", ##args, _error_ptr, err_num); \
706  } while(0)
707 
708 static const char* gsfrom_to_msg(enum gather_state_from gsfrom)
709 {
710  if (gsfrom <= TOTEMSRP_GSFROM_MAX) {
711  return gather_state_from_desc[gsfrom];
712  }
713  else {
714  return "UNKNOWN";
715  }
716 }
717 
718 static void totemsrp_instance_initialize (struct totemsrp_instance *instance)
719 {
720  memset (instance, 0, sizeof (struct totemsrp_instance));
721 
722  list_init (&instance->token_callback_received_listhead);
723 
724  list_init (&instance->token_callback_sent_listhead);
725 
726  instance->my_received_flg = 1;
727 
728  instance->my_token_seq = SEQNO_START_TOKEN - 1;
729 
731 
732  instance->set_aru = -1;
733 
734  instance->my_aru = SEQNO_START_MSG;
735 
737 
739 
740  instance->orf_token_discard = 0;
741 
742  instance->originated_orf_token = 0;
743 
744  instance->commit_token = (struct memb_commit_token *)instance->commit_token_storage;
745 
746  instance->my_id.no_addrs = INTERFACE_MAX;
747 
748  instance->waiting_trans_ack = 1;
749 }
750 
751 static void main_token_seqid_get (
752  const void *msg,
753  unsigned int *seqid,
754  unsigned int *token_is)
755 {
756  const struct orf_token *token = msg;
757 
758  *seqid = 0;
759  *token_is = 0;
760  if (token->header.type == MESSAGE_TYPE_ORF_TOKEN) {
761  *seqid = token->token_seq;
762  *token_is = 1;
763  }
764 }
765 
766 static unsigned int main_msgs_missing (void)
767 {
768 // TODO
769  return (0);
770 }
771 
772 static int pause_flush (struct totemsrp_instance *instance)
773 {
774  uint64_t now_msec;
775  uint64_t timestamp_msec;
776  int res = 0;
777 
778  now_msec = (qb_util_nano_current_get () / QB_TIME_NS_IN_MSEC);
779  timestamp_msec = instance->pause_timestamp / QB_TIME_NS_IN_MSEC;
780 
781  if ((now_msec - timestamp_msec) > (instance->totem_config->token_timeout / 2)) {
783  "Process pause detected for %d ms, flushing membership messages.", (unsigned int)(now_msec - timestamp_msec));
784  /*
785  * -1 indicates an error from recvmsg
786  */
787  do {
789  } while (res == -1);
790  }
791  return (res);
792 }
793 
794 static int token_event_stats_collector (enum totem_callback_token_type type, const void *void_instance)
795 {
796  struct totemsrp_instance *instance = (struct totemsrp_instance *)void_instance;
797  uint32_t time_now;
798  unsigned long long nano_secs = qb_util_nano_current_get ();
799 
800  time_now = (nano_secs / QB_TIME_NS_IN_MSEC);
801 
802  if (type == TOTEM_CALLBACK_TOKEN_RECEIVED) {
803  /* incr latest token the index */
804  if (instance->stats.latest_token == (TOTEM_TOKEN_STATS_MAX - 1))
805  instance->stats.latest_token = 0;
806  else
807  instance->stats.latest_token++;
808 
809  if (instance->stats.earliest_token == instance->stats.latest_token) {
810  /* we have filled up the array, start overwriting */
811  if (instance->stats.earliest_token == (TOTEM_TOKEN_STATS_MAX - 1))
812  instance->stats.earliest_token = 0;
813  else
814  instance->stats.earliest_token++;
815 
816  instance->stats.token[instance->stats.earliest_token].rx = 0;
817  instance->stats.token[instance->stats.earliest_token].tx = 0;
818  instance->stats.token[instance->stats.earliest_token].backlog_calc = 0;
819  }
820 
821  instance->stats.token[instance->stats.latest_token].rx = time_now;
822  instance->stats.token[instance->stats.latest_token].tx = 0; /* in case we drop the token */
823  } else {
824  instance->stats.token[instance->stats.latest_token].tx = time_now;
825  }
826  return 0;
827 }
828 
829 /*
830  * Exported interfaces
831  */
833  qb_loop_t *poll_handle,
834  void **srp_context,
835  struct totem_config *totem_config,
837 
838  void (*deliver_fn) (
839  unsigned int nodeid,
840  const void *msg,
841  unsigned int msg_len,
842  int endian_conversion_required),
843 
844  void (*confchg_fn) (
845  enum totem_configuration_type configuration_type,
846  const unsigned int *member_list, size_t member_list_entries,
847  const unsigned int *left_list, size_t left_list_entries,
848  const unsigned int *joined_list, size_t joined_list_entries,
849  const struct memb_ring_id *ring_id),
850  void (*waiting_trans_ack_cb_fn) (
851  int waiting_trans_ack))
852 {
853  struct totemsrp_instance *instance;
854 
855  instance = malloc (sizeof (struct totemsrp_instance));
856  if (instance == NULL) {
857  goto error_exit;
858  }
859 
860  totemsrp_instance_initialize (instance);
861 
862  instance->totemsrp_waiting_trans_ack_cb_fn = waiting_trans_ack_cb_fn;
863  instance->totemsrp_waiting_trans_ack_cb_fn (1);
864 
865  stats->srp = &instance->stats;
866  instance->stats.latest_token = 0;
867  instance->stats.earliest_token = 0;
868 
869  instance->totem_config = totem_config;
870 
871  /*
872  * Configure logging
873  */
882 
883  /*
884  * Configure totem store and load functions
885  */
887  instance->memb_ring_id_store = totem_config->totem_memb_ring_id_store;
888 
889  /*
890  * Initialize local variables for totemsrp
891  */
892  totemip_copy (&instance->mcast_address, &totem_config->interfaces[0].mcast_addr);
893 
894  /*
895  * Display totem configuration
896  */
898  "Token Timeout (%d ms) retransmit timeout (%d ms)",
899  totem_config->token_timeout, totem_config->token_retransmit_timeout);
901  "token hold (%d ms) retransmits before loss (%d retrans)",
902  totem_config->token_hold_timeout, totem_config->token_retransmits_before_loss_const);
904  "join (%d ms) send_join (%d ms) consensus (%d ms) merge (%d ms)",
905  totem_config->join_timeout,
906  totem_config->send_join_timeout,
907  totem_config->consensus_timeout,
908 
909  totem_config->merge_timeout);
911  "downcheck (%d ms) fail to recv const (%d msgs)",
912  totem_config->downcheck_timeout, totem_config->fail_to_recv_const);
914  "seqno unchanged const (%d rotations) Maximum network MTU %d", totem_config->seqno_unchanged_const, totem_config->net_mtu);
915 
917  "window size per rotation (%d messages) maximum messages per rotation (%d messages)",
918  totem_config->window_size, totem_config->max_messages);
919 
921  "missed count const (%d messages)",
922  totem_config->miss_count_const);
923 
925  "send threads (%d threads)", totem_config->threads);
927  "RRP token expired timeout (%d ms)",
928  totem_config->rrp_token_expired_timeout);
930  "RRP token problem counter (%d ms)",
931  totem_config->rrp_problem_count_timeout);
933  "RRP threshold (%d problem count)",
934  totem_config->rrp_problem_count_threshold);
936  "RRP multicast threshold (%d problem count)",
937  totem_config->rrp_problem_count_mcast_threshold);
939  "RRP automatic recovery check timeout (%d ms)",
940  totem_config->rrp_autorecovery_check_timeout);
942  "RRP mode set to %s.", instance->totem_config->rrp_mode);
943 
945  "heartbeat_failures_allowed (%d)", totem_config->heartbeat_failures_allowed);
947  "max_network_delay (%d ms)", totem_config->max_network_delay);
948 
949 
950  cs_queue_init (&instance->retrans_message_queue, RETRANS_MESSAGE_QUEUE_SIZE_MAX,
951  sizeof (struct message_item), instance->threaded_mode_enabled);
952 
953  sq_init (&instance->regular_sort_queue,
954  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
955 
956  sq_init (&instance->recovery_sort_queue,
957  QUEUE_RTR_ITEMS_SIZE_MAX, sizeof (struct sort_queue_item), 0);
958 
959  instance->totemsrp_poll_handle = poll_handle;
960 
961  instance->totemsrp_deliver_fn = deliver_fn;
962 
963  instance->totemsrp_confchg_fn = confchg_fn;
964  instance->use_heartbeat = 1;
965 
966  timer_function_pause_timeout (instance);
967 
968  if ( totem_config->heartbeat_failures_allowed == 0 ) {
970  "HeartBeat is Disabled. To enable set heartbeat_failures_allowed > 0");
971  instance->use_heartbeat = 0;
972  }
973 
974  if (instance->use_heartbeat) {
975  instance->heartbeat_timeout
976  = (totem_config->heartbeat_failures_allowed) * totem_config->token_retransmit_timeout
977  + totem_config->max_network_delay;
978 
979  if (instance->heartbeat_timeout >= totem_config->token_timeout) {
981  "total heartbeat_timeout (%d ms) is not less than token timeout (%d ms)",
982  instance->heartbeat_timeout,
983  totem_config->token_timeout);
985  "heartbeat_timeout = heartbeat_failures_allowed * token_retransmit_timeout + max_network_delay");
987  "heartbeat timeout should be less than the token timeout. Heartbeat is disabled!!");
988  instance->use_heartbeat = 0;
989  }
990  else {
992  "total heartbeat_timeout (%d ms)", instance->heartbeat_timeout);
993  }
994  }
995 
997  poll_handle,
998  &instance->totemrrp_context,
999  totem_config,
1000  stats->srp,
1001  instance,
1004  main_token_seqid_get,
1005  main_msgs_missing,
1006  target_set_completed);
1007 
1008  /*
1009  * Must have net_mtu adjusted by totemrrp_initialize first
1010  */
1011  cs_queue_init (&instance->new_message_queue,
1013  sizeof (struct message_item), instance->threaded_mode_enabled);
1014 
1015  cs_queue_init (&instance->new_message_queue_trans,
1017  sizeof (struct message_item), instance->threaded_mode_enabled);
1018 
1020  &instance->token_recv_event_handle,
1022  0,
1023  token_event_stats_collector,
1024  instance);
1026  &instance->token_sent_event_handle,
1028  0,
1029  token_event_stats_collector,
1030  instance);
1031  *srp_context = instance;
1032  return (0);
1033 
1034 error_exit:
1035  return (-1);
1036 }
1037 
1039  void *srp_context)
1040 {
1041  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1042 
1043 
1044  memb_leave_message_send (instance);
1045  totemrrp_finalize (instance->totemrrp_context);
1046  cs_queue_free (&instance->new_message_queue);
1047  cs_queue_free (&instance->new_message_queue_trans);
1048  cs_queue_free (&instance->retrans_message_queue);
1049  sq_free (&instance->regular_sort_queue);
1050  sq_free (&instance->recovery_sort_queue);
1051  free (instance);
1052 }
1053 
1054 /*
1055  * Return configured interfaces. interfaces is array of totem_ip addresses allocated by caller,
1056  * with interaces_size number of items. iface_count is final number of interfaces filled by this
1057  * function.
1058  *
1059  * Function returns 0 on success, otherwise if interfaces array is not big enough, -2 is returned,
1060  * and if interface was not found, -1 is returned.
1061  */
1063  void *srp_context,
1064  unsigned int nodeid,
1065  struct totem_ip_address *interfaces,
1066  unsigned int interfaces_size,
1067  char ***status,
1068  unsigned int *iface_count)
1069 {
1070  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1071  int res = 0;
1072  unsigned int found = 0;
1073  unsigned int i;
1074 
1075  for (i = 0; i < instance->my_memb_entries; i++) {
1076  if (instance->my_memb_list[i].addr[0].nodeid == nodeid) {
1077  found = 1;
1078  break;
1079  }
1080  }
1081 
1082  if (found) {
1083  *iface_count = instance->totem_config->interface_count;
1084 
1085  if (interfaces_size >= *iface_count) {
1086  memcpy (interfaces, instance->my_memb_list[i].addr,
1087  sizeof (struct totem_ip_address) * *iface_count);
1088  } else {
1089  res = -2;
1090  }
1091 
1092  goto finish;
1093  }
1094 
1095  for (i = 0; i < instance->my_left_memb_entries; i++) {
1096  if (instance->my_left_memb_list[i].addr[0].nodeid == nodeid) {
1097  found = 1;
1098  break;
1099  }
1100  }
1101 
1102  if (found) {
1103  *iface_count = instance->totem_config->interface_count;
1104 
1105  if (interfaces_size >= *iface_count) {
1106  memcpy (interfaces, instance->my_left_memb_list[i].addr,
1107  sizeof (struct totem_ip_address) * *iface_count);
1108  } else {
1109  res = -2;
1110  }
1111  } else {
1112  res = -1;
1113  }
1114 
1115 finish:
1116  totemrrp_ifaces_get (instance->totemrrp_context, status, NULL);
1117  return (res);
1118 }
1119 
1121  void *srp_context,
1122  const char *cipher_type,
1123  const char *hash_type)
1124 {
1125  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1126  int res;
1127 
1128  res = totemrrp_crypto_set(instance->totemrrp_context, cipher_type, hash_type);
1129 
1130  return (res);
1131 }
1132 
1133 
1135  void *srp_context)
1136 {
1137  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1138  unsigned int res;
1139 
1140  res = instance->totem_config->interfaces[0].boundto.nodeid;
1141 
1142  return (res);
1143 }
1144 
1146  void *srp_context)
1147 {
1148  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1149  int res;
1150 
1151  res = instance->totem_config->interfaces[0].boundto.family;
1152 
1153  return (res);
1154 }
1155 
1156 
1158  void *srp_context)
1159 {
1160  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
1161 
1163  instance->totem_config->interface_count);
1164 
1165  return (0);
1166 }
1167 
1168 
1169 /*
1170  * Set operations for use by the membership algorithm
1171  */
1172 static int srp_addr_equal (const struct srp_addr *a, const struct srp_addr *b)
1173 {
1174  unsigned int i;
1175  unsigned int res;
1176 
1177  for (i = 0; i < 1; i++) {
1178  res = totemip_equal (&a->addr[i], &b->addr[i]);
1179  if (res == 0) {
1180  return (0);
1181  }
1182  }
1183  return (1);
1184 }
1185 
1186 static void srp_addr_copy (struct srp_addr *dest, const struct srp_addr *src)
1187 {
1188  unsigned int i;
1189 
1190  dest->no_addrs = src->no_addrs;
1191 
1192  for (i = 0; i < INTERFACE_MAX; i++) {
1193  totemip_copy (&dest->addr[i], &src->addr[i]);
1194  }
1195 }
1196 
1197 static void srp_addr_to_nodeid (
1198  unsigned int *nodeid_out,
1199  struct srp_addr *srp_addr_in,
1200  unsigned int entries)
1201 {
1202  unsigned int i;
1203 
1204  for (i = 0; i < entries; i++) {
1205  nodeid_out[i] = srp_addr_in[i].addr[0].nodeid;
1206  }
1207 }
1208 
1209 static void srp_addr_copy_endian_convert (struct srp_addr *out, const struct srp_addr *in)
1210 {
1211  int i;
1212 
1213  for (i = 0; i < INTERFACE_MAX; i++) {
1214  totemip_copy_endian_convert (&out->addr[i], &in->addr[i]);
1215  }
1216 }
1217 
1218 static void memb_consensus_reset (struct totemsrp_instance *instance)
1219 {
1220  instance->consensus_list_entries = 0;
1221 }
1222 
1223 static void memb_set_subtract (
1224  struct srp_addr *out_list, int *out_list_entries,
1225  struct srp_addr *one_list, int one_list_entries,
1226  struct srp_addr *two_list, int two_list_entries)
1227 {
1228  int found = 0;
1229  int i;
1230  int j;
1231 
1232  *out_list_entries = 0;
1233 
1234  for (i = 0; i < one_list_entries; i++) {
1235  for (j = 0; j < two_list_entries; j++) {
1236  if (srp_addr_equal (&one_list[i], &two_list[j])) {
1237  found = 1;
1238  break;
1239  }
1240  }
1241  if (found == 0) {
1242  srp_addr_copy (&out_list[*out_list_entries], &one_list[i]);
1243  *out_list_entries = *out_list_entries + 1;
1244  }
1245  found = 0;
1246  }
1247 }
1248 
1249 /*
1250  * Set consensus for a specific processor
1251  */
1252 static void memb_consensus_set (
1253  struct totemsrp_instance *instance,
1254  const struct srp_addr *addr)
1255 {
1256  int found = 0;
1257  int i;
1258 
1259  if (addr->addr[0].nodeid == LEAVE_DUMMY_NODEID)
1260  return;
1261 
1262  for (i = 0; i < instance->consensus_list_entries; i++) {
1263  if (srp_addr_equal(addr, &instance->consensus_list[i].addr)) {
1264  found = 1;
1265  break; /* found entry */
1266  }
1267  }
1268  srp_addr_copy (&instance->consensus_list[i].addr, addr);
1269  instance->consensus_list[i].set = 1;
1270  if (found == 0) {
1271  instance->consensus_list_entries++;
1272  }
1273  return;
1274 }
1275 
1276 /*
1277  * Is consensus set for a specific processor
1278  */
1279 static int memb_consensus_isset (
1280  struct totemsrp_instance *instance,
1281  const struct srp_addr *addr)
1282 {
1283  int i;
1284 
1285  for (i = 0; i < instance->consensus_list_entries; i++) {
1286  if (srp_addr_equal (addr, &instance->consensus_list[i].addr)) {
1287  return (instance->consensus_list[i].set);
1288  }
1289  }
1290  return (0);
1291 }
1292 
1293 /*
1294  * Is consensus agreed upon based upon consensus database
1295  */
1296 static int memb_consensus_agreed (
1297  struct totemsrp_instance *instance)
1298 {
1299  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
1300  int token_memb_entries = 0;
1301  int agreed = 1;
1302  int i;
1303 
1304  memb_set_subtract (token_memb, &token_memb_entries,
1305  instance->my_proc_list, instance->my_proc_list_entries,
1306  instance->my_failed_list, instance->my_failed_list_entries);
1307 
1308  for (i = 0; i < token_memb_entries; i++) {
1309  if (memb_consensus_isset (instance, &token_memb[i]) == 0) {
1310  agreed = 0;
1311  break;
1312  }
1313  }
1314 
1315  if (agreed && instance->failed_to_recv == 1) {
1316  /*
1317  * Both nodes agreed on our failure. We don't care how many proc list items left because we
1318  * will create single ring anyway.
1319  */
1320 
1321  return (agreed);
1322  }
1323 
1324  assert (token_memb_entries >= 1);
1325 
1326  return (agreed);
1327 }
1328 
1329 static void memb_consensus_notset (
1330  struct totemsrp_instance *instance,
1331  struct srp_addr *no_consensus_list,
1332  int *no_consensus_list_entries,
1333  struct srp_addr *comparison_list,
1334  int comparison_list_entries)
1335 {
1336  int i;
1337 
1338  *no_consensus_list_entries = 0;
1339 
1340  for (i = 0; i < instance->my_proc_list_entries; i++) {
1341  if (memb_consensus_isset (instance, &instance->my_proc_list[i]) == 0) {
1342  srp_addr_copy (&no_consensus_list[*no_consensus_list_entries], &instance->my_proc_list[i]);
1343  *no_consensus_list_entries = *no_consensus_list_entries + 1;
1344  }
1345  }
1346 }
1347 
1348 /*
1349  * Is set1 equal to set2 Entries can be in different orders
1350  */
1351 static int memb_set_equal (
1352  struct srp_addr *set1, int set1_entries,
1353  struct srp_addr *set2, int set2_entries)
1354 {
1355  int i;
1356  int j;
1357 
1358  int found = 0;
1359 
1360  if (set1_entries != set2_entries) {
1361  return (0);
1362  }
1363  for (i = 0; i < set2_entries; i++) {
1364  for (j = 0; j < set1_entries; j++) {
1365  if (srp_addr_equal (&set1[j], &set2[i])) {
1366  found = 1;
1367  break;
1368  }
1369  }
1370  if (found == 0) {
1371  return (0);
1372  }
1373  found = 0;
1374  }
1375  return (1);
1376 }
1377 
1378 /*
1379  * Is subset fully contained in fullset
1380  */
1381 static int memb_set_subset (
1382  const struct srp_addr *subset, int subset_entries,
1383  const struct srp_addr *fullset, int fullset_entries)
1384 {
1385  int i;
1386  int j;
1387  int found = 0;
1388 
1389  if (subset_entries > fullset_entries) {
1390  return (0);
1391  }
1392  for (i = 0; i < subset_entries; i++) {
1393  for (j = 0; j < fullset_entries; j++) {
1394  if (srp_addr_equal (&subset[i], &fullset[j])) {
1395  found = 1;
1396  }
1397  }
1398  if (found == 0) {
1399  return (0);
1400  }
1401  found = 0;
1402  }
1403  return (1);
1404 }
1405 /*
1406  * merge subset into fullset taking care not to add duplicates
1407  */
1408 static void memb_set_merge (
1409  const struct srp_addr *subset, int subset_entries,
1410  struct srp_addr *fullset, int *fullset_entries)
1411 {
1412  int found = 0;
1413  int i;
1414  int j;
1415 
1416  for (i = 0; i < subset_entries; i++) {
1417  for (j = 0; j < *fullset_entries; j++) {
1418  if (srp_addr_equal (&fullset[j], &subset[i])) {
1419  found = 1;
1420  break;
1421  }
1422  }
1423  if (found == 0) {
1424  srp_addr_copy (&fullset[*fullset_entries], &subset[i]);
1425  *fullset_entries = *fullset_entries + 1;
1426  }
1427  found = 0;
1428  }
1429  return;
1430 }
1431 
1432 static void memb_set_and_with_ring_id (
1433  struct srp_addr *set1,
1434  struct memb_ring_id *set1_ring_ids,
1435  int set1_entries,
1436  struct srp_addr *set2,
1437  int set2_entries,
1438  struct memb_ring_id *old_ring_id,
1439  struct srp_addr *and,
1440  int *and_entries)
1441 {
1442  int i;
1443  int j;
1444  int found = 0;
1445 
1446  *and_entries = 0;
1447 
1448  for (i = 0; i < set2_entries; i++) {
1449  for (j = 0; j < set1_entries; j++) {
1450  if (srp_addr_equal (&set1[j], &set2[i])) {
1451  if (memcmp (&set1_ring_ids[j], old_ring_id, sizeof (struct memb_ring_id)) == 0) {
1452  found = 1;
1453  }
1454  break;
1455  }
1456  }
1457  if (found) {
1458  srp_addr_copy (&and[*and_entries], &set1[j]);
1459  *and_entries = *and_entries + 1;
1460  }
1461  found = 0;
1462  }
1463  return;
1464 }
1465 
1466 #ifdef CODE_COVERAGE
1467 static void memb_set_print (
1468  char *string,
1469  struct srp_addr *list,
1470  int list_entries)
1471 {
1472  int i;
1473  int j;
1474  printf ("List '%s' contains %d entries:\n", string, list_entries);
1475 
1476  for (i = 0; i < list_entries; i++) {
1477  printf ("Address %d with %d rings\n", i, list[i].no_addrs);
1478  for (j = 0; j < list[i].no_addrs; j++) {
1479  printf ("\tiface %d %s\n", j, totemip_print (&list[i].addr[j]));
1480  printf ("\tfamily %d\n", list[i].addr[j].family);
1481  }
1482  }
1483 }
1484 #endif
1485 static void my_leave_memb_clear(
1486  struct totemsrp_instance *instance)
1487 {
1488  memset(instance->my_leave_memb_list, 0, sizeof(instance->my_leave_memb_list));
1489  instance->my_leave_memb_entries = 0;
1490 }
1491 
1492 static unsigned int my_leave_memb_match(
1493  struct totemsrp_instance *instance,
1494  unsigned int nodeid)
1495 {
1496  int i;
1497  unsigned int ret = 0;
1498 
1499  for (i = 0; i < instance->my_leave_memb_entries; i++){
1500  if (instance->my_leave_memb_list[i] == nodeid){
1501  ret = nodeid;
1502  break;
1503  }
1504  }
1505  return ret;
1506 }
1507 
1508 static void my_leave_memb_set(
1509  struct totemsrp_instance *instance,
1510  unsigned int nodeid)
1511 {
1512  int i, found = 0;
1513  for (i = 0; i < instance->my_leave_memb_entries; i++){
1514  if (instance->my_leave_memb_list[i] == nodeid){
1515  found = 1;
1516  break;
1517  }
1518  }
1519  if (found == 1) {
1520  return;
1521  }
1522  if (instance->my_leave_memb_entries < (PROCESSOR_COUNT_MAX - 1)) {
1523  instance->my_leave_memb_list[instance->my_leave_memb_entries] = nodeid;
1524  instance->my_leave_memb_entries++;
1525  } else {
1527  "Cannot set LEAVE nodeid=%d", nodeid);
1528  }
1529 }
1530 
1531 
1532 static void *totemsrp_buffer_alloc (struct totemsrp_instance *instance)
1533 {
1534  assert (instance != NULL);
1535  return totemrrp_buffer_alloc (instance->totemrrp_context);
1536 }
1537 
1538 static void totemsrp_buffer_release (struct totemsrp_instance *instance, void *ptr)
1539 {
1540  assert (instance != NULL);
1541  totemrrp_buffer_release (instance->totemrrp_context, ptr);
1542 }
1543 
1544 static void reset_token_retransmit_timeout (struct totemsrp_instance *instance)
1545 {
1546  int32_t res;
1547 
1548  qb_loop_timer_del (instance->totemsrp_poll_handle,
1550  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1551  QB_LOOP_MED,
1552  instance->totem_config->token_retransmit_timeout*QB_TIME_NS_IN_MSEC,
1553  (void *)instance,
1554  timer_function_token_retransmit_timeout,
1556  if (res != 0) {
1557  log_printf(instance->totemsrp_log_level_error, "reset_token_retransmit_timeout - qb_loop_timer_add error : %d", res);
1558  }
1559 
1560 }
1561 
1562 static void start_merge_detect_timeout (struct totemsrp_instance *instance)
1563 {
1564  int32_t res;
1565 
1566  if (instance->my_merge_detect_timeout_outstanding == 0) {
1567  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1568  QB_LOOP_MED,
1569  instance->totem_config->merge_timeout*QB_TIME_NS_IN_MSEC,
1570  (void *)instance,
1571  timer_function_merge_detect_timeout,
1572  &instance->timer_merge_detect_timeout);
1573  if (res != 0) {
1574  log_printf(instance->totemsrp_log_level_error, "start_merge_detect_timeout - qb_loop_timer_add error : %d", res);
1575  }
1576 
1578  }
1579 }
1580 
1581 static void cancel_merge_detect_timeout (struct totemsrp_instance *instance)
1582 {
1583  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_merge_detect_timeout);
1585 }
1586 
1587 /*
1588  * ring_state_* is used to save and restore the sort queue
1589  * state when a recovery operation fails (and enters gather)
1590  */
1591 static void old_ring_state_save (struct totemsrp_instance *instance)
1592 {
1593  if (instance->old_ring_state_saved == 0) {
1594  instance->old_ring_state_saved = 1;
1595  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
1596  sizeof (struct memb_ring_id));
1597  instance->old_ring_state_aru = instance->my_aru;
1600  "Saving state aru %x high seq received %x",
1601  instance->my_aru, instance->my_high_seq_received);
1602  }
1603 }
1604 
1605 static void old_ring_state_restore (struct totemsrp_instance *instance)
1606 {
1607  instance->my_aru = instance->old_ring_state_aru;
1610  "Restoring instance->my_aru %x my high seq received %x",
1611  instance->my_aru, instance->my_high_seq_received);
1612 }
1613 
1614 static void old_ring_state_reset (struct totemsrp_instance *instance)
1615 {
1617  "Resetting old ring state");
1618  instance->old_ring_state_saved = 0;
1619 }
1620 
1621 static void reset_pause_timeout (struct totemsrp_instance *instance)
1622 {
1623  int32_t res;
1624 
1625  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_pause_timeout);
1626  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1627  QB_LOOP_MED,
1628  instance->totem_config->token_timeout * QB_TIME_NS_IN_MSEC / 5,
1629  (void *)instance,
1630  timer_function_pause_timeout,
1631  &instance->timer_pause_timeout);
1632  if (res != 0) {
1633  log_printf(instance->totemsrp_log_level_error, "reset_pause_timeout - qb_loop_timer_add error : %d", res);
1634  }
1635 }
1636 
1637 static void reset_token_timeout (struct totemsrp_instance *instance) {
1638  int32_t res;
1639 
1640  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1641  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1642  QB_LOOP_MED,
1643  instance->totem_config->token_timeout*QB_TIME_NS_IN_MSEC,
1644  (void *)instance,
1645  timer_function_orf_token_timeout,
1646  &instance->timer_orf_token_timeout);
1647  if (res != 0) {
1648  log_printf(instance->totemsrp_log_level_error, "reset_token_timeout - qb_loop_timer_add error : %d", res);
1649  }
1650 }
1651 
1652 static void reset_heartbeat_timeout (struct totemsrp_instance *instance) {
1653  int32_t res;
1654 
1655  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1656  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1657  QB_LOOP_MED,
1658  instance->heartbeat_timeout*QB_TIME_NS_IN_MSEC,
1659  (void *)instance,
1660  timer_function_heartbeat_timeout,
1661  &instance->timer_heartbeat_timeout);
1662  if (res != 0) {
1663  log_printf(instance->totemsrp_log_level_error, "reset_heartbeat_timeout - qb_loop_timer_add error : %d", res);
1664  }
1665 }
1666 
1667 
1668 static void cancel_token_timeout (struct totemsrp_instance *instance) {
1669  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_timeout);
1670 }
1671 
1672 static void cancel_heartbeat_timeout (struct totemsrp_instance *instance) {
1673  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_heartbeat_timeout);
1674 }
1675 
1676 static void cancel_token_retransmit_timeout (struct totemsrp_instance *instance)
1677 {
1678  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->timer_orf_token_retransmit_timeout);
1679 }
1680 
1681 static void start_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1682 {
1683  int32_t res;
1684 
1685  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1686  QB_LOOP_MED,
1687  instance->totem_config->token_hold_timeout*QB_TIME_NS_IN_MSEC,
1688  (void *)instance,
1689  timer_function_token_hold_retransmit_timeout,
1691  if (res != 0) {
1692  log_printf(instance->totemsrp_log_level_error, "start_token_hold_retransmit_timeout - qb_loop_timer_add error : %d", res);
1693  }
1694 }
1695 
1696 static void cancel_token_hold_retransmit_timeout (struct totemsrp_instance *instance)
1697 {
1698  qb_loop_timer_del (instance->totemsrp_poll_handle,
1700 }
1701 
1702 static void memb_state_consensus_timeout_expired (
1703  struct totemsrp_instance *instance)
1704 {
1705  struct srp_addr no_consensus_list[PROCESSOR_COUNT_MAX];
1706  int no_consensus_list_entries;
1707 
1708  instance->stats.consensus_timeouts++;
1709  if (memb_consensus_agreed (instance)) {
1710  memb_consensus_reset (instance);
1711 
1712  memb_consensus_set (instance, &instance->my_id);
1713 
1714  reset_token_timeout (instance); // REVIEWED
1715  } else {
1716  memb_consensus_notset (
1717  instance,
1718  no_consensus_list,
1719  &no_consensus_list_entries,
1720  instance->my_proc_list,
1721  instance->my_proc_list_entries);
1722 
1723  memb_set_merge (no_consensus_list, no_consensus_list_entries,
1724  instance->my_failed_list, &instance->my_failed_list_entries);
1725  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_CONSENSUS_TIMEOUT);
1726  }
1727 }
1728 
1729 static void memb_join_message_send (struct totemsrp_instance *instance);
1730 
1731 static void memb_merge_detect_transmit (struct totemsrp_instance *instance);
1732 
1733 /*
1734  * Timers used for various states of the membership algorithm
1735  */
1736 static void timer_function_pause_timeout (void *data)
1737 {
1738  struct totemsrp_instance *instance = data;
1739 
1740  instance->pause_timestamp = qb_util_nano_current_get ();
1741  reset_pause_timeout (instance);
1742 }
1743 
1744 static void memb_recovery_state_token_loss (struct totemsrp_instance *instance)
1745 {
1746  old_ring_state_restore (instance);
1747  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_RECOVERY_STATE);
1748  instance->stats.recovery_token_lost++;
1749 }
1750 
1751 static void timer_function_orf_token_timeout (void *data)
1752 {
1753  struct totemsrp_instance *instance = data;
1754 
1755  switch (instance->memb_state) {
1758  "The token was lost in the OPERATIONAL state.");
1760  "A processor failed, forming new configuration.");
1762  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_OPERATIONAL_STATE);
1763  instance->stats.operational_token_lost++;
1764  break;
1765 
1766  case MEMB_STATE_GATHER:
1768  "The consensus timeout expired.");
1769  memb_state_consensus_timeout_expired (instance);
1770  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED);
1771  instance->stats.gather_token_lost++;
1772  break;
1773 
1774  case MEMB_STATE_COMMIT:
1776  "The token was lost in the COMMIT state.");
1777  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_THE_TOKEN_WAS_LOST_IN_THE_COMMIT_STATE);
1778  instance->stats.commit_token_lost++;
1779  break;
1780 
1781  case MEMB_STATE_RECOVERY:
1783  "The token was lost in the RECOVERY state.");
1784  memb_recovery_state_token_loss (instance);
1785  instance->orf_token_discard = 1;
1786  break;
1787  }
1788 }
1789 
1790 static void timer_function_heartbeat_timeout (void *data)
1791 {
1792  struct totemsrp_instance *instance = data;
1794  "HeartBeat Timer expired Invoking token loss mechanism in state %d ", instance->memb_state);
1795  timer_function_orf_token_timeout(data);
1796 }
1797 
1798 static void memb_timer_function_state_gather (void *data)
1799 {
1800  struct totemsrp_instance *instance = data;
1801  int32_t res;
1802 
1803  switch (instance->memb_state) {
1805  case MEMB_STATE_RECOVERY:
1806  assert (0); /* this should never happen */
1807  break;
1808  case MEMB_STATE_GATHER:
1809  case MEMB_STATE_COMMIT:
1810  memb_join_message_send (instance);
1811 
1812  /*
1813  * Restart the join timeout
1814  `*/
1815  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
1816 
1817  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
1818  QB_LOOP_MED,
1819  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
1820  (void *)instance,
1821  memb_timer_function_state_gather,
1823 
1824  if (res != 0) {
1825  log_printf(instance->totemsrp_log_level_error, "memb_timer_function_state_gather - qb_loop_timer_add error : %d", res);
1826  }
1827  break;
1828  }
1829 }
1830 
1831 static void memb_timer_function_gather_consensus_timeout (void *data)
1832 {
1833  struct totemsrp_instance *instance = data;
1834  memb_state_consensus_timeout_expired (instance);
1835 }
1836 
1837 static void deliver_messages_from_recovery_to_regular (struct totemsrp_instance *instance)
1838 {
1839  unsigned int i;
1840  struct sort_queue_item *recovery_message_item;
1841  struct sort_queue_item regular_message_item;
1842  unsigned int range = 0;
1843  int res;
1844  void *ptr;
1845  struct mcast *mcast;
1846 
1848  "recovery to regular %x-%x", SEQNO_START_MSG + 1, instance->my_aru);
1849 
1850  range = instance->my_aru - SEQNO_START_MSG;
1851  /*
1852  * Move messages from recovery to regular sort queue
1853  */
1854 // todo should i be initialized to 0 or 1 ?
1855  for (i = 1; i <= range; i++) {
1856  res = sq_item_get (&instance->recovery_sort_queue,
1857  i + SEQNO_START_MSG, &ptr);
1858  if (res != 0) {
1859  continue;
1860  }
1861  recovery_message_item = ptr;
1862 
1863  /*
1864  * Convert recovery message into regular message
1865  */
1866  mcast = recovery_message_item->mcast;
1867  if (mcast->header.encapsulated == MESSAGE_ENCAPSULATED) {
1868  /*
1869  * Message is a recovery message encapsulated
1870  * in a new ring message
1871  */
1872  regular_message_item.mcast =
1873  (struct mcast *)(((char *)recovery_message_item->mcast) + sizeof (struct mcast));
1874  regular_message_item.msg_len =
1875  recovery_message_item->msg_len - sizeof (struct mcast);
1876  mcast = regular_message_item.mcast;
1877  } else {
1878  /*
1879  * TODO this case shouldn't happen
1880  */
1881  continue;
1882  }
1883 
1885  "comparing if ring id is for this processors old ring seqno %d",
1886  mcast->seq);
1887 
1888  /*
1889  * Only add this message to the regular sort
1890  * queue if it was originated with the same ring
1891  * id as the previous ring
1892  */
1893  if (memcmp (&instance->my_old_ring_id, &mcast->ring_id,
1894  sizeof (struct memb_ring_id)) == 0) {
1895 
1896  res = sq_item_inuse (&instance->regular_sort_queue, mcast->seq);
1897  if (res == 0) {
1898  sq_item_add (&instance->regular_sort_queue,
1899  &regular_message_item, mcast->seq);
1900  if (sq_lt_compare (instance->old_ring_state_high_seq_received, mcast->seq)) {
1901  instance->old_ring_state_high_seq_received = mcast->seq;
1902  }
1903  }
1904  } else {
1906  "-not adding msg with seq no %x", mcast->seq);
1907  }
1908  }
1909 }
1910 
1911 /*
1912  * Change states in the state machine of the membership algorithm
1913  */
1914 static void memb_state_operational_enter (struct totemsrp_instance *instance)
1915 {
1916  struct srp_addr joined_list[PROCESSOR_COUNT_MAX];
1917  int joined_list_entries = 0;
1918  unsigned int aru_save;
1919  unsigned int joined_list_totemip[PROCESSOR_COUNT_MAX];
1920  unsigned int trans_memb_list_totemip[PROCESSOR_COUNT_MAX];
1921  unsigned int new_memb_list_totemip[PROCESSOR_COUNT_MAX];
1922  unsigned int left_list[PROCESSOR_COUNT_MAX];
1923  unsigned int i;
1924  unsigned int res;
1925  char left_node_msg[1024];
1926  char joined_node_msg[1024];
1927  char failed_node_msg[1024];
1928 
1929  instance->originated_orf_token = 0;
1930 
1931  memb_consensus_reset (instance);
1932 
1933  old_ring_state_reset (instance);
1934 
1935  deliver_messages_from_recovery_to_regular (instance);
1936 
1938  "Delivering to app %x to %x",
1939  instance->my_high_delivered + 1, instance->old_ring_state_high_seq_received);
1940 
1941  aru_save = instance->my_aru;
1942  instance->my_aru = instance->old_ring_state_aru;
1943 
1944  messages_deliver_to_app (instance, 0, instance->old_ring_state_high_seq_received);
1945 
1946  /*
1947  * Calculate joined and left list
1948  */
1949  memb_set_subtract (instance->my_left_memb_list,
1950  &instance->my_left_memb_entries,
1951  instance->my_memb_list, instance->my_memb_entries,
1952  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1953 
1954  memb_set_subtract (joined_list, &joined_list_entries,
1955  instance->my_new_memb_list, instance->my_new_memb_entries,
1956  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1957 
1958  /*
1959  * Install new membership
1960  */
1961  instance->my_memb_entries = instance->my_new_memb_entries;
1962  memcpy (&instance->my_memb_list, instance->my_new_memb_list,
1963  sizeof (struct srp_addr) * instance->my_memb_entries);
1964  instance->last_released = 0;
1965  instance->my_set_retrans_flg = 0;
1966 
1967  /*
1968  * Inform RRP about transitional change
1969  */
1971  instance->totemrrp_context,
1973  instance->my_trans_memb_list, instance->my_trans_memb_entries,
1974  instance->my_left_memb_list, instance->my_left_memb_entries,
1975  NULL, 0,
1976  &instance->my_ring_id);
1977  /*
1978  * Deliver transitional configuration to application
1979  */
1980  srp_addr_to_nodeid (left_list, instance->my_left_memb_list,
1981  instance->my_left_memb_entries);
1982  srp_addr_to_nodeid (trans_memb_list_totemip,
1983  instance->my_trans_memb_list, instance->my_trans_memb_entries);
1985  trans_memb_list_totemip, instance->my_trans_memb_entries,
1986  left_list, instance->my_left_memb_entries,
1987  0, 0, &instance->my_ring_id);
1988  instance->waiting_trans_ack = 1;
1989  instance->totemsrp_waiting_trans_ack_cb_fn (1);
1990 
1991 // TODO we need to filter to ensure we only deliver those
1992 // messages which are part of instance->my_deliver_memb
1993  messages_deliver_to_app (instance, 1, instance->old_ring_state_high_seq_received);
1994 
1995  instance->my_aru = aru_save;
1996 
1997  /*
1998  * Inform RRP about regular membership change
1999  */
2001  instance->totemrrp_context,
2003  instance->my_new_memb_list, instance->my_new_memb_entries,
2004  NULL, 0,
2005  joined_list, joined_list_entries,
2006  &instance->my_ring_id);
2007  /*
2008  * Deliver regular configuration to application
2009  */
2010  srp_addr_to_nodeid (new_memb_list_totemip,
2011  instance->my_new_memb_list, instance->my_new_memb_entries);
2012  srp_addr_to_nodeid (joined_list_totemip, joined_list,
2013  joined_list_entries);
2015  new_memb_list_totemip, instance->my_new_memb_entries,
2016  0, 0,
2017  joined_list_totemip, joined_list_entries, &instance->my_ring_id);
2018 
2019  /*
2020  * The recovery sort queue now becomes the regular
2021  * sort queue. It is necessary to copy the state
2022  * into the regular sort queue.
2023  */
2024  sq_copy (&instance->regular_sort_queue, &instance->recovery_sort_queue);
2025  instance->my_last_aru = SEQNO_START_MSG;
2026 
2027  /* When making my_proc_list smaller, ensure that the
2028  * now non-used entries are zero-ed out. There are some suspect
2029  * assert's that assume that there is always 2 entries in the list.
2030  * These fail when my_proc_list is reduced to 1 entry (and the
2031  * valid [0] entry is the same as the 'unused' [1] entry).
2032  */
2033  memset(instance->my_proc_list, 0,
2034  sizeof (struct srp_addr) * instance->my_proc_list_entries);
2035 
2036  instance->my_proc_list_entries = instance->my_new_memb_entries;
2037  memcpy (instance->my_proc_list, instance->my_new_memb_list,
2038  sizeof (struct srp_addr) * instance->my_memb_entries);
2039 
2040  instance->my_failed_list_entries = 0;
2041  /*
2042  * TODO Not exactly to spec
2043  *
2044  * At the entry to this function all messages without a gap are
2045  * deliered.
2046  *
2047  * This code throw away messages from the last gap in the sort queue
2048  * to my_high_seq_received
2049  *
2050  * What should really happen is we should deliver all messages up to
2051  * a gap, then delier the transitional configuration, then deliver
2052  * the messages between the first gap and my_high_seq_received, then
2053  * deliver a regular configuration, then deliver the regular
2054  * configuration
2055  *
2056  * Unfortunately totempg doesn't appear to like this operating mode
2057  * which needs more inspection
2058  */
2059  i = instance->my_high_seq_received + 1;
2060  do {
2061  void *ptr;
2062 
2063  i -= 1;
2064  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2065  if (i == 0) {
2066  break;
2067  }
2068  } while (res);
2069 
2070  instance->my_high_delivered = i;
2071 
2072  for (i = 0; i <= instance->my_high_delivered; i++) {
2073  void *ptr;
2074 
2075  res = sq_item_get (&instance->regular_sort_queue, i, &ptr);
2076  if (res == 0) {
2077  struct sort_queue_item *regular_message;
2078 
2079  regular_message = ptr;
2080  free (regular_message->mcast);
2081  }
2082  }
2083  sq_items_release (&instance->regular_sort_queue, instance->my_high_delivered);
2084  instance->last_released = instance->my_high_delivered;
2085 
2086  if (joined_list_entries) {
2087  int sptr = 0;
2088  sptr += snprintf(joined_node_msg, sizeof(joined_node_msg)-sptr, " joined:");
2089  for (i=0; i< joined_list_entries; i++) {
2090  sptr += snprintf(joined_node_msg+sptr, sizeof(joined_node_msg)-sptr, " %u", joined_list_totemip[i]);
2091  }
2092  }
2093  else {
2094  joined_node_msg[0] = '\0';
2095  }
2096 
2097  if (instance->my_left_memb_entries) {
2098  int sptr = 0;
2099  int sptr2 = 0;
2100  sptr += snprintf(left_node_msg, sizeof(left_node_msg)-sptr, " left:");
2101  for (i=0; i< instance->my_left_memb_entries; i++) {
2102  sptr += snprintf(left_node_msg+sptr, sizeof(left_node_msg)-sptr, " %u", left_list[i]);
2103  }
2104  for (i=0; i< instance->my_left_memb_entries; i++) {
2105  if (my_leave_memb_match(instance, left_list[i]) == 0) {
2106  if (sptr2 == 0) {
2107  sptr2 += snprintf(failed_node_msg, sizeof(failed_node_msg)-sptr2, " failed:");
2108  }
2109  sptr2 += snprintf(failed_node_msg+sptr2, sizeof(left_node_msg)-sptr2, " %u", left_list[i]);
2110  }
2111  }
2112  if (sptr2 == 0) {
2113  failed_node_msg[0] = '\0';
2114  }
2115  }
2116  else {
2117  left_node_msg[0] = '\0';
2118  failed_node_msg[0] = '\0';
2119  }
2120 
2121  my_leave_memb_clear(instance);
2122 
2124  "entering OPERATIONAL state.");
2126  "A new membership (%s:%lld) was formed. Members%s%s",
2127  totemip_print (&instance->my_ring_id.rep),
2128  instance->my_ring_id.seq,
2129  joined_node_msg,
2130  left_node_msg);
2131 
2132  if (strlen(failed_node_msg)) {
2134  "Failed to receive the leave message.%s",
2135  failed_node_msg);
2136  }
2137 
2138  instance->memb_state = MEMB_STATE_OPERATIONAL;
2139 
2140  instance->stats.operational_entered++;
2141  instance->stats.continuous_gather = 0;
2142 
2143  instance->my_received_flg = 1;
2144 
2145  reset_pause_timeout (instance);
2146 
2147  /*
2148  * Save ring id information from this configuration to determine
2149  * which processors are transitioning from old regular configuration
2150  * in to new regular configuration on the next configuration change
2151  */
2152  memcpy (&instance->my_old_ring_id, &instance->my_ring_id,
2153  sizeof (struct memb_ring_id));
2154 
2155  return;
2156 }
2157 
2158 static void memb_state_gather_enter (
2159  struct totemsrp_instance *instance,
2160  enum gather_state_from gather_from)
2161 {
2162  int32_t res;
2163 
2164  instance->orf_token_discard = 1;
2165 
2166  instance->originated_orf_token = 0;
2167 
2168  memb_set_merge (
2169  &instance->my_id, 1,
2170  instance->my_proc_list, &instance->my_proc_list_entries);
2171 
2172  memb_join_message_send (instance);
2173 
2174  /*
2175  * Restart the join timeout
2176  */
2177  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2178 
2179  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2180  QB_LOOP_MED,
2181  instance->totem_config->join_timeout*QB_TIME_NS_IN_MSEC,
2182  (void *)instance,
2183  memb_timer_function_state_gather,
2185  if (res != 0) {
2186  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(1) : %d", res);
2187  }
2188 
2189  /*
2190  * Restart the consensus timeout
2191  */
2192  qb_loop_timer_del (instance->totemsrp_poll_handle,
2194 
2195  res = qb_loop_timer_add (instance->totemsrp_poll_handle,
2196  QB_LOOP_MED,
2197  instance->totem_config->consensus_timeout*QB_TIME_NS_IN_MSEC,
2198  (void *)instance,
2199  memb_timer_function_gather_consensus_timeout,
2201  if (res != 0) {
2202  log_printf(instance->totemsrp_log_level_error, "memb_state_gather_enter - qb_loop_timer_add error(2) : %d", res);
2203  }
2204 
2205  /*
2206  * Cancel the token loss and token retransmission timeouts
2207  */
2208  cancel_token_retransmit_timeout (instance); // REVIEWED
2209  cancel_token_timeout (instance); // REVIEWED
2210  cancel_merge_detect_timeout (instance);
2211 
2212  memb_consensus_reset (instance);
2213 
2214  memb_consensus_set (instance, &instance->my_id);
2215 
2217  "entering GATHER state from %d(%s).",
2218  gather_from, gsfrom_to_msg(gather_from));
2219 
2220  instance->memb_state = MEMB_STATE_GATHER;
2221  instance->stats.gather_entered++;
2222 
2223  if (gather_from == TOTEMSRP_GSFROM_THE_CONSENSUS_TIMEOUT_EXPIRED) {
2224  /*
2225  * State 3 means gather, so we are continuously gathering.
2226  */
2227  instance->stats.continuous_gather++;
2228  }
2229 
2230  return;
2231 }
2232 
2233 static void timer_function_token_retransmit_timeout (void *data);
2234 
2235 static void target_set_completed (
2236  void *context)
2237 {
2238  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
2239 
2240  memb_state_commit_token_send (instance);
2241 
2242 }
2243 
2244 static void memb_state_commit_enter (
2245  struct totemsrp_instance *instance)
2246 {
2247  old_ring_state_save (instance);
2248 
2249  memb_state_commit_token_update (instance);
2250 
2251  memb_state_commit_token_target_set (instance);
2252 
2253  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_join_timeout);
2254 
2256 
2257  qb_loop_timer_del (instance->totemsrp_poll_handle, instance->memb_timer_state_gather_consensus_timeout);
2258 
2260 
2261  memb_ring_id_set (instance, &instance->commit_token->ring_id);
2262  instance->memb_ring_id_store (&instance->my_ring_id, &instance->my_id.addr[0]);
2263 
2264  instance->token_ring_id_seq = instance->my_ring_id.seq;
2265 
2267  "entering COMMIT state.");
2268 
2269  instance->memb_state = MEMB_STATE_COMMIT;
2270  reset_token_retransmit_timeout (instance); // REVIEWED
2271  reset_token_timeout (instance); // REVIEWED
2272 
2273  instance->stats.commit_entered++;
2274  instance->stats.continuous_gather = 0;
2275 
2276  /*
2277  * reset all flow control variables since we are starting a new ring
2278  */
2279  instance->my_trc = 0;
2280  instance->my_pbl = 0;
2281  instance->my_cbl = 0;
2282  /*
2283  * commit token sent after callback that token target has been set
2284  */
2285 }
2286 
2287 static void memb_state_recovery_enter (
2288  struct totemsrp_instance *instance,
2290 {
2291  int i;
2292  int local_received_flg = 1;
2293  unsigned int low_ring_aru;
2294  unsigned int range = 0;
2295  unsigned int messages_originated = 0;
2296  const struct srp_addr *addr;
2297  struct memb_commit_token_memb_entry *memb_list;
2298  struct memb_ring_id my_new_memb_ring_id_list[PROCESSOR_COUNT_MAX];
2299 
2300  addr = (const struct srp_addr *)commit_token->end_of_commit_token;
2301  memb_list = (struct memb_commit_token_memb_entry *)(addr + commit_token->addr_entries);
2302 
2304  "entering RECOVERY state.");
2305 
2306  instance->orf_token_discard = 0;
2307 
2308  instance->my_high_ring_delivered = 0;
2309 
2310  sq_reinit (&instance->recovery_sort_queue, SEQNO_START_MSG);
2311  cs_queue_reinit (&instance->retrans_message_queue);
2312 
2313  low_ring_aru = instance->old_ring_state_high_seq_received;
2314 
2315  memb_state_commit_token_send_recovery (instance, commit_token);
2316 
2317  instance->my_token_seq = SEQNO_START_TOKEN - 1;
2318 
2319  /*
2320  * Build regular configuration
2321  */
2323  instance->totemrrp_context,
2324  commit_token->addr_entries);
2325 
2326  /*
2327  * Build transitional configuration
2328  */
2329  for (i = 0; i < instance->my_new_memb_entries; i++) {
2330  memcpy (&my_new_memb_ring_id_list[i],
2331  &memb_list[i].ring_id,
2332  sizeof (struct memb_ring_id));
2333  }
2334  memb_set_and_with_ring_id (
2335  instance->my_new_memb_list,
2336  my_new_memb_ring_id_list,
2337  instance->my_new_memb_entries,
2338  instance->my_memb_list,
2339  instance->my_memb_entries,
2340  &instance->my_old_ring_id,
2341  instance->my_trans_memb_list,
2342  &instance->my_trans_memb_entries);
2343 
2344  for (i = 0; i < instance->my_trans_memb_entries; i++) {
2346  "TRANS [%d] member %s:", i, totemip_print (&instance->my_trans_memb_list[i].addr[0]));
2347  }
2348  for (i = 0; i < instance->my_new_memb_entries; i++) {
2350  "position [%d] member %s:", i, totemip_print (&addr[i].addr[0]));
2352  "previous ring seq %llx rep %s",
2353  memb_list[i].ring_id.seq,
2354  totemip_print (&memb_list[i].ring_id.rep));
2355 
2357  "aru %x high delivered %x received flag %d",
2358  memb_list[i].aru,
2359  memb_list[i].high_delivered,
2360  memb_list[i].received_flg);
2361 
2362  // assert (totemip_print (&memb_list[i].ring_id.rep) != 0);
2363  }
2364  /*
2365  * Determine if any received flag is false
2366  */
2367  for (i = 0; i < commit_token->addr_entries; i++) {
2368  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2369  instance->my_trans_memb_list, instance->my_trans_memb_entries) &&
2370 
2371  memb_list[i].received_flg == 0) {
2372  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
2373  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
2374  sizeof (struct srp_addr) * instance->my_trans_memb_entries);
2375  local_received_flg = 0;
2376  break;
2377  }
2378  }
2379  if (local_received_flg == 1) {
2380  goto no_originate;
2381  } /* Else originate messages if we should */
2382 
2383  /*
2384  * Calculate my_low_ring_aru, instance->my_high_ring_delivered for the transitional membership
2385  */
2386  for (i = 0; i < commit_token->addr_entries; i++) {
2387  if (memb_set_subset (&instance->my_new_memb_list[i], 1,
2388  instance->my_deliver_memb_list,
2389  instance->my_deliver_memb_entries) &&
2390 
2391  memcmp (&instance->my_old_ring_id,
2392  &memb_list[i].ring_id,
2393  sizeof (struct memb_ring_id)) == 0) {
2394 
2395  if (sq_lt_compare (memb_list[i].aru, low_ring_aru)) {
2396 
2397  low_ring_aru = memb_list[i].aru;
2398  }
2399  if (sq_lt_compare (instance->my_high_ring_delivered, memb_list[i].high_delivered)) {
2400  instance->my_high_ring_delivered = memb_list[i].high_delivered;
2401  }
2402  }
2403  }
2404 
2405  /*
2406  * Copy all old ring messages to instance->retrans_message_queue
2407  */
2408  range = instance->old_ring_state_high_seq_received - low_ring_aru;
2409  if (range == 0) {
2410  /*
2411  * No messages to copy
2412  */
2413  goto no_originate;
2414  }
2415  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2416 
2418  "copying all old ring messages from %x-%x.",
2419  low_ring_aru + 1, instance->old_ring_state_high_seq_received);
2420 
2421  for (i = 1; i <= range; i++) {
2423  struct message_item message_item;
2424  void *ptr;
2425  int res;
2426 
2427  res = sq_item_get (&instance->regular_sort_queue,
2428  low_ring_aru + i, &ptr);
2429  if (res != 0) {
2430  continue;
2431  }
2432  sort_queue_item = ptr;
2433  messages_originated++;
2434  memset (&message_item, 0, sizeof (struct message_item));
2435  // TODO LEAK
2436  message_item.mcast = totemsrp_buffer_alloc (instance);
2437  assert (message_item.mcast);
2439  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2441  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2442  assert (message_item.mcast->header.nodeid);
2444  memcpy (&message_item.mcast->ring_id, &instance->my_ring_id,
2445  sizeof (struct memb_ring_id));
2446  message_item.msg_len = sort_queue_item->msg_len + sizeof (struct mcast);
2447  memcpy (((char *)message_item.mcast) + sizeof (struct mcast),
2448  sort_queue_item->mcast,
2449  sort_queue_item->msg_len);
2450  cs_queue_item_add (&instance->retrans_message_queue, &message_item);
2451  }
2453  "Originated %d messages in RECOVERY.", messages_originated);
2454  goto originated;
2455 
2456 no_originate:
2458  "Did not need to originate any messages in recovery.");
2459 
2460 originated:
2461  instance->my_aru = SEQNO_START_MSG;
2462  instance->my_aru_count = 0;
2463  instance->my_seq_unchanged = 0;
2465  instance->my_install_seq = SEQNO_START_MSG;
2466  instance->last_released = SEQNO_START_MSG;
2467 
2468  reset_token_timeout (instance); // REVIEWED
2469  reset_token_retransmit_timeout (instance); // REVIEWED
2470 
2471  instance->memb_state = MEMB_STATE_RECOVERY;
2472  instance->stats.recovery_entered++;
2473  instance->stats.continuous_gather = 0;
2474 
2475  return;
2476 }
2477 
2478 void totemsrp_event_signal (void *srp_context, enum totem_event_type type, int value)
2479 {
2480  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2481 
2482  token_hold_cancel_send (instance);
2483 
2484  return;
2485 }
2486 
2488  void *srp_context,
2489  struct iovec *iovec,
2490  unsigned int iov_len,
2491  int guarantee)
2492 {
2493  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2494  int i;
2495  struct message_item message_item;
2496  char *addr;
2497  unsigned int addr_idx;
2498  struct cs_queue *queue_use;
2499 
2500  if (instance->waiting_trans_ack) {
2501  queue_use = &instance->new_message_queue_trans;
2502  } else {
2503  queue_use = &instance->new_message_queue;
2504  }
2505 
2506  if (cs_queue_is_full (queue_use)) {
2507  log_printf (instance->totemsrp_log_level_debug, "queue full");
2508  return (-1);
2509  }
2510 
2511  memset (&message_item, 0, sizeof (struct message_item));
2512 
2513  /*
2514  * Allocate pending item
2515  */
2516  message_item.mcast = totemsrp_buffer_alloc (instance);
2517  if (message_item.mcast == 0) {
2518  goto error_mcast;
2519  }
2520 
2521  /*
2522  * Set mcast header
2523  */
2524  memset(message_item.mcast, 0, sizeof (struct mcast));
2525  message_item.mcast->header.type = MESSAGE_TYPE_MCAST;
2526  message_item.mcast->header.endian_detector = ENDIAN_LOCAL;
2528  message_item.mcast->header.nodeid = instance->my_id.addr[0].nodeid;
2529  assert (message_item.mcast->header.nodeid);
2530 
2531  message_item.mcast->guarantee = guarantee;
2532  srp_addr_copy (&message_item.mcast->system_from, &instance->my_id);
2533 
2534  addr = (char *)message_item.mcast;
2535  addr_idx = sizeof (struct mcast);
2536  for (i = 0; i < iov_len; i++) {
2537  memcpy (&addr[addr_idx], iovec[i].iov_base, iovec[i].iov_len);
2538  addr_idx += iovec[i].iov_len;
2539  }
2540 
2541  message_item.msg_len = addr_idx;
2542 
2543  log_printf (instance->totemsrp_log_level_trace, "mcasted message added to pending queue");
2544  instance->stats.mcast_tx++;
2545  cs_queue_item_add (queue_use, &message_item);
2546 
2547  return (0);
2548 
2549 error_mcast:
2550  return (-1);
2551 }
2552 
2553 /*
2554  * Determine if there is room to queue a new message
2555  */
2556 int totemsrp_avail (void *srp_context)
2557 {
2558  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
2559  int avail;
2560  struct cs_queue *queue_use;
2561 
2562  if (instance->waiting_trans_ack) {
2563  queue_use = &instance->new_message_queue_trans;
2564  } else {
2565  queue_use = &instance->new_message_queue;
2566  }
2567  cs_queue_avail (queue_use, &avail);
2568 
2569  return (avail);
2570 }
2571 
2572 /*
2573  * ORF Token Management
2574  */
2575 /*
2576  * Recast message to mcast group if it is available
2577  */
2578 static int orf_token_remcast (
2579  struct totemsrp_instance *instance,
2580  int seq)
2581 {
2582  struct sort_queue_item *sort_queue_item;
2583  int res;
2584  void *ptr;
2585 
2586  struct sq *sort_queue;
2587 
2588  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2589  sort_queue = &instance->recovery_sort_queue;
2590  } else {
2591  sort_queue = &instance->regular_sort_queue;
2592  }
2593 
2594  res = sq_in_range (sort_queue, seq);
2595  if (res == 0) {
2596  log_printf (instance->totemsrp_log_level_debug, "sq not in range");
2597  return (-1);
2598  }
2599 
2600  /*
2601  * Get RTR item at seq, if not available, return
2602  */
2603  res = sq_item_get (sort_queue, seq, &ptr);
2604  if (res != 0) {
2605  return -1;
2606  }
2607 
2608  sort_queue_item = ptr;
2609 
2611  instance->totemrrp_context,
2612  sort_queue_item->mcast,
2613  sort_queue_item->msg_len);
2614 
2615  return (0);
2616 }
2617 
2618 
2619 /*
2620  * Free all freeable messages from ring
2621  */
2622 static void messages_free (
2623  struct totemsrp_instance *instance,
2624  unsigned int token_aru)
2625 {
2626  struct sort_queue_item *regular_message;
2627  unsigned int i;
2628  int res;
2629  int log_release = 0;
2630  unsigned int release_to;
2631  unsigned int range = 0;
2632 
2633  release_to = token_aru;
2634  if (sq_lt_compare (instance->my_last_aru, release_to)) {
2635  release_to = instance->my_last_aru;
2636  }
2637  if (sq_lt_compare (instance->my_high_delivered, release_to)) {
2638  release_to = instance->my_high_delivered;
2639  }
2640 
2641  /*
2642  * Ensure we dont try release before an already released point
2643  */
2644  if (sq_lt_compare (release_to, instance->last_released)) {
2645  return;
2646  }
2647 
2648  range = release_to - instance->last_released;
2649  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2650 
2651  /*
2652  * Release retransmit list items if group aru indicates they are transmitted
2653  */
2654  for (i = 1; i <= range; i++) {
2655  void *ptr;
2656 
2657  res = sq_item_get (&instance->regular_sort_queue,
2658  instance->last_released + i, &ptr);
2659  if (res == 0) {
2660  regular_message = ptr;
2661  totemsrp_buffer_release (instance, regular_message->mcast);
2662  }
2663  sq_items_release (&instance->regular_sort_queue,
2664  instance->last_released + i);
2665 
2666  log_release = 1;
2667  }
2668  instance->last_released += range;
2669 
2670  if (log_release) {
2672  "releasing messages up to and including %x", release_to);
2673  }
2674 }
2675 
2676 static void update_aru (
2677  struct totemsrp_instance *instance)
2678 {
2679  unsigned int i;
2680  int res;
2681  struct sq *sort_queue;
2682  unsigned int range;
2683  unsigned int my_aru_saved = 0;
2684 
2685  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2686  sort_queue = &instance->recovery_sort_queue;
2687  } else {
2688  sort_queue = &instance->regular_sort_queue;
2689  }
2690 
2691  range = instance->my_high_seq_received - instance->my_aru;
2692 
2693  my_aru_saved = instance->my_aru;
2694  for (i = 1; i <= range; i++) {
2695 
2696  void *ptr;
2697 
2698  res = sq_item_get (sort_queue, my_aru_saved + i, &ptr);
2699  /*
2700  * If hole, stop updating aru
2701  */
2702  if (res != 0) {
2703  break;
2704  }
2705  }
2706  instance->my_aru += i - 1;
2707 }
2708 
2709 /*
2710  * Multicasts pending messages onto the ring (requires orf_token possession)
2711  */
2712 static int orf_token_mcast (
2713  struct totemsrp_instance *instance,
2714  struct orf_token *token,
2715  int fcc_mcasts_allowed)
2716 {
2717  struct message_item *message_item = 0;
2718  struct cs_queue *mcast_queue;
2719  struct sq *sort_queue;
2720  struct sort_queue_item sort_queue_item;
2721  struct mcast *mcast;
2722  unsigned int fcc_mcast_current;
2723 
2724  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2725  mcast_queue = &instance->retrans_message_queue;
2726  sort_queue = &instance->recovery_sort_queue;
2727  reset_token_retransmit_timeout (instance); // REVIEWED
2728  } else {
2729  if (instance->waiting_trans_ack) {
2730  mcast_queue = &instance->new_message_queue_trans;
2731  } else {
2732  mcast_queue = &instance->new_message_queue;
2733  }
2734 
2735  sort_queue = &instance->regular_sort_queue;
2736  }
2737 
2738  for (fcc_mcast_current = 0; fcc_mcast_current < fcc_mcasts_allowed; fcc_mcast_current++) {
2739  if (cs_queue_is_empty (mcast_queue)) {
2740  break;
2741  }
2742  message_item = (struct message_item *)cs_queue_item_get (mcast_queue);
2743 
2744  message_item->mcast->seq = ++token->seq;
2745  message_item->mcast->this_seqno = instance->global_seqno++;
2746 
2747  /*
2748  * Build IO vector
2749  */
2750  memset (&sort_queue_item, 0, sizeof (struct sort_queue_item));
2751  sort_queue_item.mcast = message_item->mcast;
2752  sort_queue_item.msg_len = message_item->msg_len;
2753 
2754  mcast = sort_queue_item.mcast;
2755 
2756  memcpy (&mcast->ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
2757 
2758  /*
2759  * Add message to retransmit queue
2760  */
2761  sq_item_add (sort_queue, &sort_queue_item, message_item->mcast->seq);
2762 
2764  instance->totemrrp_context,
2765  message_item->mcast,
2766  message_item->msg_len);
2767 
2768  /*
2769  * Delete item from pending queue
2770  */
2771  cs_queue_item_remove (mcast_queue);
2772 
2773  /*
2774  * If messages mcasted, deliver any new messages to totempg
2775  */
2776  instance->my_high_seq_received = token->seq;
2777  }
2778 
2779  update_aru (instance);
2780 
2781  /*
2782  * Return 1 if more messages are available for single node clusters
2783  */
2784  return (fcc_mcast_current);
2785 }
2786 
2787 /*
2788  * Remulticasts messages in orf_token's retransmit list (requires orf_token)
2789  * Modify's orf_token's rtr to include retransmits required by this process
2790  */
2791 static int orf_token_rtr (
2792  struct totemsrp_instance *instance,
2793  struct orf_token *orf_token,
2794  unsigned int *fcc_allowed)
2795 {
2796  unsigned int res;
2797  unsigned int i, j;
2798  unsigned int found;
2799  struct sq *sort_queue;
2800  struct rtr_item *rtr_list;
2801  unsigned int range = 0;
2802  char retransmit_msg[1024];
2803  char value[64];
2804 
2805  if (instance->memb_state == MEMB_STATE_RECOVERY) {
2806  sort_queue = &instance->recovery_sort_queue;
2807  } else {
2808  sort_queue = &instance->regular_sort_queue;
2809  }
2810 
2811  rtr_list = &orf_token->rtr_list[0];
2812 
2813  strcpy (retransmit_msg, "Retransmit List: ");
2814  if (orf_token->rtr_list_entries) {
2816  "Retransmit List %d", orf_token->rtr_list_entries);
2817  for (i = 0; i < orf_token->rtr_list_entries; i++) {
2818  sprintf (value, "%x ", rtr_list[i].seq);
2819  strcat (retransmit_msg, value);
2820  }
2821  strcat (retransmit_msg, "");
2823  "%s", retransmit_msg);
2824  }
2825 
2826  /*
2827  * Retransmit messages on orf_token's RTR list from RTR queue
2828  */
2829  for (instance->fcc_remcast_current = 0, i = 0;
2830  instance->fcc_remcast_current < *fcc_allowed && i < orf_token->rtr_list_entries;) {
2831 
2832  /*
2833  * If this retransmit request isn't from this configuration,
2834  * try next rtr entry
2835  */
2836  if (memcmp (&rtr_list[i].ring_id, &instance->my_ring_id,
2837  sizeof (struct memb_ring_id)) != 0) {
2838 
2839  i += 1;
2840  continue;
2841  }
2842 
2843  res = orf_token_remcast (instance, rtr_list[i].seq);
2844  if (res == 0) {
2845  /*
2846  * Multicasted message, so no need to copy to new retransmit list
2847  */
2848  orf_token->rtr_list_entries -= 1;
2849  assert (orf_token->rtr_list_entries >= 0);
2850  memmove (&rtr_list[i], &rtr_list[i + 1],
2851  sizeof (struct rtr_item) * (orf_token->rtr_list_entries - i));
2852 
2853  instance->stats.mcast_retx++;
2854  instance->fcc_remcast_current++;
2855  } else {
2856  i += 1;
2857  }
2858  }
2859  *fcc_allowed = *fcc_allowed - instance->fcc_remcast_current;
2860 
2861  /*
2862  * Add messages to retransmit to RTR list
2863  * but only retry if there is room in the retransmit list
2864  */
2865 
2866  range = orf_token->seq - instance->my_aru;
2867  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
2868 
2869  for (i = 1; (orf_token->rtr_list_entries < RETRANSMIT_ENTRIES_MAX) &&
2870  (i <= range); i++) {
2871 
2872  /*
2873  * Ensure message is within the sort queue range
2874  */
2875  res = sq_in_range (sort_queue, instance->my_aru + i);
2876  if (res == 0) {
2877  break;
2878  }
2879 
2880  /*
2881  * Find if a message is missing from this processor
2882  */
2883  res = sq_item_inuse (sort_queue, instance->my_aru + i);
2884  if (res == 0) {
2885  /*
2886  * Determine how many times we have missed receiving
2887  * this sequence number. sq_item_miss_count increments
2888  * a counter for the sequence number. The miss count
2889  * will be returned and compared. This allows time for
2890  * delayed multicast messages to be received before
2891  * declaring the message is missing and requesting a
2892  * retransmit.
2893  */
2894  res = sq_item_miss_count (sort_queue, instance->my_aru + i);
2895  if (res < instance->totem_config->miss_count_const) {
2896  continue;
2897  }
2898 
2899  /*
2900  * Determine if missing message is already in retransmit list
2901  */
2902  found = 0;
2903  for (j = 0; j < orf_token->rtr_list_entries; j++) {
2904  if (instance->my_aru + i == rtr_list[j].seq) {
2905  found = 1;
2906  }
2907  }
2908  if (found == 0) {
2909  /*
2910  * Missing message not found in current retransmit list so add it
2911  */
2912  memcpy (&rtr_list[orf_token->rtr_list_entries].ring_id,
2913  &instance->my_ring_id, sizeof (struct memb_ring_id));
2914  rtr_list[orf_token->rtr_list_entries].seq = instance->my_aru + i;
2915  orf_token->rtr_list_entries++;
2916  }
2917  }
2918  }
2919  return (instance->fcc_remcast_current);
2920 }
2921 
2922 static void token_retransmit (struct totemsrp_instance *instance)
2923 {
2925  instance->orf_token_retransmit,
2926  instance->orf_token_retransmit_size);
2927 }
2928 
2929 /*
2930  * Retransmit the regular token if no mcast or token has
2931  * been received in retransmit token period retransmit
2932  * the token to the next processor
2933  */
2934 static void timer_function_token_retransmit_timeout (void *data)
2935 {
2936  struct totemsrp_instance *instance = data;
2937 
2938  switch (instance->memb_state) {
2939  case MEMB_STATE_GATHER:
2940  break;
2941  case MEMB_STATE_COMMIT:
2943  case MEMB_STATE_RECOVERY:
2944  token_retransmit (instance);
2945  reset_token_retransmit_timeout (instance); // REVIEWED
2946  break;
2947  }
2948 }
2949 
2950 static void timer_function_token_hold_retransmit_timeout (void *data)
2951 {
2952  struct totemsrp_instance *instance = data;
2953 
2954  switch (instance->memb_state) {
2955  case MEMB_STATE_GATHER:
2956  break;
2957  case MEMB_STATE_COMMIT:
2958  break;
2960  case MEMB_STATE_RECOVERY:
2961  token_retransmit (instance);
2962  break;
2963  }
2964 }
2965 
2966 static void timer_function_merge_detect_timeout(void *data)
2967 {
2968  struct totemsrp_instance *instance = data;
2969 
2971 
2972  switch (instance->memb_state) {
2974  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
2975  memb_merge_detect_transmit (instance);
2976  }
2977  break;
2978  case MEMB_STATE_GATHER:
2979  case MEMB_STATE_COMMIT:
2980  case MEMB_STATE_RECOVERY:
2981  break;
2982  }
2983 }
2984 
2985 /*
2986  * Send orf_token to next member (requires orf_token)
2987  */
2988 static int token_send (
2989  struct totemsrp_instance *instance,
2990  struct orf_token *orf_token,
2991  int forward_token)
2992 {
2993  int res = 0;
2994  unsigned int orf_token_size;
2995 
2996  orf_token_size = sizeof (struct orf_token) +
2997  (orf_token->rtr_list_entries * sizeof (struct rtr_item));
2998 
2999  orf_token->header.nodeid = instance->my_id.addr[0].nodeid;
3000  memcpy (instance->orf_token_retransmit, orf_token, orf_token_size);
3001  instance->orf_token_retransmit_size = orf_token_size;
3002  assert (orf_token->header.nodeid);
3003 
3004  if (forward_token == 0) {
3005  return (0);
3006  }
3007 
3009  orf_token,
3010  orf_token_size);
3011 
3012  return (res);
3013 }
3014 
3015 static int token_hold_cancel_send (struct totemsrp_instance *instance)
3016 {
3018 
3019  /*
3020  * Only cancel if the token is currently held
3021  */
3022  if (instance->my_token_held == 0) {
3023  return (0);
3024  }
3025  instance->my_token_held = 0;
3026 
3027  /*
3028  * Build message
3029  */
3034  memcpy (&token_hold_cancel.ring_id, &instance->my_ring_id,
3035  sizeof (struct memb_ring_id));
3036  assert (token_hold_cancel.header.nodeid);
3037 
3038  instance->stats.token_hold_cancel_tx++;
3039 
3041  sizeof (struct token_hold_cancel));
3042 
3043  return (0);
3044 }
3045 
3046 static int orf_token_send_initial (struct totemsrp_instance *instance)
3047 {
3048  struct orf_token orf_token;
3049  int res;
3050 
3051  orf_token.header.type = MESSAGE_TYPE_ORF_TOKEN;
3052  orf_token.header.endian_detector = ENDIAN_LOCAL;
3053  orf_token.header.encapsulated = 0;
3054  orf_token.header.nodeid = instance->my_id.addr[0].nodeid;
3055  assert (orf_token.header.nodeid);
3056  orf_token.seq = SEQNO_START_MSG;
3057  orf_token.token_seq = SEQNO_START_TOKEN;
3058  orf_token.retrans_flg = 1;
3059  instance->my_set_retrans_flg = 1;
3060  instance->stats.orf_token_tx++;
3061 
3062  if (cs_queue_is_empty (&instance->retrans_message_queue) == 1) {
3063  orf_token.retrans_flg = 0;
3064  instance->my_set_retrans_flg = 0;
3065  } else {
3066  orf_token.retrans_flg = 1;
3067  instance->my_set_retrans_flg = 1;
3068  }
3069 
3070  orf_token.aru = 0;
3071  orf_token.aru = SEQNO_START_MSG - 1;
3072  orf_token.aru_addr = instance->my_id.addr[0].nodeid;
3073 
3074  memcpy (&orf_token.ring_id, &instance->my_ring_id, sizeof (struct memb_ring_id));
3075  orf_token.fcc = 0;
3076  orf_token.backlog = 0;
3077 
3078  orf_token.rtr_list_entries = 0;
3079 
3080  res = token_send (instance, &orf_token, 1);
3081 
3082  return (res);
3083 }
3084 
3085 static void memb_state_commit_token_update (
3086  struct totemsrp_instance *instance)
3087 {
3088  struct srp_addr *addr;
3089  struct memb_commit_token_memb_entry *memb_list;
3090  unsigned int high_aru;
3091  unsigned int i;
3092 
3093  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3094  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3095 
3096  memcpy (instance->my_new_memb_list, addr,
3097  sizeof (struct srp_addr) * instance->commit_token->addr_entries);
3098 
3099  instance->my_new_memb_entries = instance->commit_token->addr_entries;
3100 
3101  memcpy (&memb_list[instance->commit_token->memb_index].ring_id,
3102  &instance->my_old_ring_id, sizeof (struct memb_ring_id));
3103 
3104  memb_list[instance->commit_token->memb_index].aru = instance->old_ring_state_aru;
3105  /*
3106  * TODO high delivered is really instance->my_aru, but with safe this
3107  * could change?
3108  */
3109  instance->my_received_flg =
3110  (instance->my_aru == instance->my_high_seq_received);
3111 
3112  memb_list[instance->commit_token->memb_index].received_flg = instance->my_received_flg;
3113 
3114  memb_list[instance->commit_token->memb_index].high_delivered = instance->my_high_delivered;
3115  /*
3116  * find high aru up to current memb_index for all matching ring ids
3117  * if any ring id matching memb_index has aru less then high aru set
3118  * received flag for that entry to false
3119  */
3120  high_aru = memb_list[instance->commit_token->memb_index].aru;
3121  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3122  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3123  &memb_list[i].ring_id,
3124  sizeof (struct memb_ring_id)) == 0) {
3125 
3126  if (sq_lt_compare (high_aru, memb_list[i].aru)) {
3127  high_aru = memb_list[i].aru;
3128  }
3129  }
3130  }
3131 
3132  for (i = 0; i <= instance->commit_token->memb_index; i++) {
3133  if (memcmp (&memb_list[instance->commit_token->memb_index].ring_id,
3134  &memb_list[i].ring_id,
3135  sizeof (struct memb_ring_id)) == 0) {
3136 
3137  if (sq_lt_compare (memb_list[i].aru, high_aru)) {
3138  memb_list[i].received_flg = 0;
3139  if (i == instance->commit_token->memb_index) {
3140  instance->my_received_flg = 0;
3141  }
3142  }
3143  }
3144  }
3145 
3146  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3147  instance->commit_token->memb_index += 1;
3148  assert (instance->commit_token->memb_index <= instance->commit_token->addr_entries);
3149  assert (instance->commit_token->header.nodeid);
3150 }
3151 
3152 static void memb_state_commit_token_target_set (
3153  struct totemsrp_instance *instance)
3154 {
3155  struct srp_addr *addr;
3156  unsigned int i;
3157 
3158  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3159 
3160  for (i = 0; i < instance->totem_config->interface_count; i++) {
3162  instance->totemrrp_context,
3163  &addr[instance->commit_token->memb_index %
3164  instance->commit_token->addr_entries].addr[i],
3165  i);
3166  }
3167 }
3168 
3169 static int memb_state_commit_token_send_recovery (
3170  struct totemsrp_instance *instance,
3171  struct memb_commit_token *commit_token)
3172 {
3173  unsigned int commit_token_size;
3174 
3175  commit_token->token_seq++;
3176  commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3177  commit_token_size = sizeof (struct memb_commit_token) +
3178  ((sizeof (struct srp_addr) +
3179  sizeof (struct memb_commit_token_memb_entry)) * commit_token->addr_entries);
3180  /*
3181  * Make a copy for retransmission if necessary
3182  */
3183  memcpy (instance->orf_token_retransmit, commit_token, commit_token_size);
3184  instance->orf_token_retransmit_size = commit_token_size;
3185 
3186  instance->stats.memb_commit_token_tx++;
3187 
3189  commit_token,
3190  commit_token_size);
3191 
3192  /*
3193  * Request retransmission of the commit token in case it is lost
3194  */
3195  reset_token_retransmit_timeout (instance);
3196  return (0);
3197 }
3198 
3199 static int memb_state_commit_token_send (
3200  struct totemsrp_instance *instance)
3201 {
3202  unsigned int commit_token_size;
3203 
3204  instance->commit_token->token_seq++;
3205  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3206  commit_token_size = sizeof (struct memb_commit_token) +
3207  ((sizeof (struct srp_addr) +
3208  sizeof (struct memb_commit_token_memb_entry)) * instance->commit_token->addr_entries);
3209  /*
3210  * Make a copy for retransmission if necessary
3211  */
3212  memcpy (instance->orf_token_retransmit, instance->commit_token, commit_token_size);
3213  instance->orf_token_retransmit_size = commit_token_size;
3214 
3215  instance->stats.memb_commit_token_tx++;
3216 
3218  instance->commit_token,
3219  commit_token_size);
3220 
3221  /*
3222  * Request retransmission of the commit token in case it is lost
3223  */
3224  reset_token_retransmit_timeout (instance);
3225  return (0);
3226 }
3227 
3228 
3229 static int memb_lowest_in_config (struct totemsrp_instance *instance)
3230 {
3231  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3232  int token_memb_entries = 0;
3233  int i;
3234  struct totem_ip_address *lowest_addr;
3235 
3236  memb_set_subtract (token_memb, &token_memb_entries,
3237  instance->my_proc_list, instance->my_proc_list_entries,
3238  instance->my_failed_list, instance->my_failed_list_entries);
3239 
3240  /*
3241  * find representative by searching for smallest identifier
3242  */
3243 
3244  lowest_addr = &token_memb[0].addr[0];
3245  for (i = 1; i < token_memb_entries; i++) {
3246  if (totemip_compare(lowest_addr, &token_memb[i].addr[0]) > 0) {
3247  totemip_copy (lowest_addr, &token_memb[i].addr[0]);
3248  }
3249  }
3250  return (totemip_compare (lowest_addr, &instance->my_id.addr[0]) == 0);
3251 }
3252 
3253 static int srp_addr_compare (const void *a, const void *b)
3254 {
3255  const struct srp_addr *srp_a = (const struct srp_addr *)a;
3256  const struct srp_addr *srp_b = (const struct srp_addr *)b;
3257 
3258  return (totemip_compare (&srp_a->addr[0], &srp_b->addr[0]));
3259 }
3260 
3261 static void memb_state_commit_token_create (
3262  struct totemsrp_instance *instance)
3263 {
3264  struct srp_addr token_memb[PROCESSOR_COUNT_MAX];
3265  struct srp_addr *addr;
3266  struct memb_commit_token_memb_entry *memb_list;
3267  int token_memb_entries = 0;
3268 
3270  "Creating commit token because I am the rep.");
3271 
3272  memb_set_subtract (token_memb, &token_memb_entries,
3273  instance->my_proc_list, instance->my_proc_list_entries,
3274  instance->my_failed_list, instance->my_failed_list_entries);
3275 
3276  memset (instance->commit_token, 0, sizeof (struct memb_commit_token));
3279  instance->commit_token->header.encapsulated = 0;
3280  instance->commit_token->header.nodeid = instance->my_id.addr[0].nodeid;
3281  assert (instance->commit_token->header.nodeid);
3282 
3283  totemip_copy(&instance->commit_token->ring_id.rep, &instance->my_id.addr[0]);
3284 
3285  instance->commit_token->ring_id.seq = instance->token_ring_id_seq + 4;
3286 
3287  /*
3288  * This qsort is necessary to ensure the commit token traverses
3289  * the ring in the proper order
3290  */
3291  qsort (token_memb, token_memb_entries, sizeof (struct srp_addr),
3292  srp_addr_compare);
3293 
3294  instance->commit_token->memb_index = 0;
3295  instance->commit_token->addr_entries = token_memb_entries;
3296 
3297  addr = (struct srp_addr *)instance->commit_token->end_of_commit_token;
3298  memb_list = (struct memb_commit_token_memb_entry *)(addr + instance->commit_token->addr_entries);
3299 
3300  memcpy (addr, token_memb,
3301  token_memb_entries * sizeof (struct srp_addr));
3302  memset (memb_list, 0,
3303  sizeof (struct memb_commit_token_memb_entry) * token_memb_entries);
3304 }
3305 
3306 static void memb_join_message_send (struct totemsrp_instance *instance)
3307 {
3308  char memb_join_data[40000];
3309  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3310  char *addr;
3311  unsigned int addr_idx;
3312  size_t msg_len;
3313 
3314  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3315  memb_join->header.endian_detector = ENDIAN_LOCAL;
3316  memb_join->header.encapsulated = 0;
3317  memb_join->header.nodeid = instance->my_id.addr[0].nodeid;
3318  assert (memb_join->header.nodeid);
3319 
3320  msg_len = sizeof(struct memb_join) +
3321  ((instance->my_proc_list_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3322 
3323  if (msg_len > sizeof(memb_join_data)) {
3325  "memb_join_message too long. Ignoring message.");
3326 
3327  return ;
3328  }
3329 
3330  memb_join->ring_seq = instance->my_ring_id.seq;
3331  memb_join->proc_list_entries = instance->my_proc_list_entries;
3332  memb_join->failed_list_entries = instance->my_failed_list_entries;
3333  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3334 
3335  /*
3336  * This mess adds the joined and failed processor lists into the join
3337  * message
3338  */
3339  addr = (char *)memb_join;
3340  addr_idx = sizeof (struct memb_join);
3341  memcpy (&addr[addr_idx],
3342  instance->my_proc_list,
3343  instance->my_proc_list_entries *
3344  sizeof (struct srp_addr));
3345  addr_idx +=
3346  instance->my_proc_list_entries *
3347  sizeof (struct srp_addr);
3348  memcpy (&addr[addr_idx],
3349  instance->my_failed_list,
3350  instance->my_failed_list_entries *
3351  sizeof (struct srp_addr));
3352  addr_idx +=
3353  instance->my_failed_list_entries *
3354  sizeof (struct srp_addr);
3355 
3356  if (instance->totem_config->send_join_timeout) {
3357  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3358  }
3359 
3360  instance->stats.memb_join_tx++;
3361 
3363  instance->totemrrp_context,
3364  memb_join,
3365  addr_idx);
3366 }
3367 
3368 static void memb_leave_message_send (struct totemsrp_instance *instance)
3369 {
3370  char memb_join_data[40000];
3371  struct memb_join *memb_join = (struct memb_join *)memb_join_data;
3372  char *addr;
3373  unsigned int addr_idx;
3374  int active_memb_entries;
3375  struct srp_addr active_memb[PROCESSOR_COUNT_MAX];
3376  size_t msg_len;
3377 
3379  "sending join/leave message");
3380 
3381  /*
3382  * add us to the failed list, and remove us from
3383  * the members list
3384  */
3385  memb_set_merge(
3386  &instance->my_id, 1,
3387  instance->my_failed_list, &instance->my_failed_list_entries);
3388 
3389  memb_set_subtract (active_memb, &active_memb_entries,
3390  instance->my_proc_list, instance->my_proc_list_entries,
3391  &instance->my_id, 1);
3392 
3393  msg_len = sizeof(struct memb_join) +
3394  ((active_memb_entries + instance->my_failed_list_entries) * sizeof(struct srp_addr));
3395 
3396  if (msg_len > sizeof(memb_join_data)) {
3398  "memb_leave message too long. Ignoring message.");
3399 
3400  return ;
3401  }
3402 
3403  memb_join->header.type = MESSAGE_TYPE_MEMB_JOIN;
3404  memb_join->header.endian_detector = ENDIAN_LOCAL;
3405  memb_join->header.encapsulated = 0;
3406  memb_join->header.nodeid = LEAVE_DUMMY_NODEID;
3407 
3408  memb_join->ring_seq = instance->my_ring_id.seq;
3409  memb_join->proc_list_entries = active_memb_entries;
3410  memb_join->failed_list_entries = instance->my_failed_list_entries;
3411  srp_addr_copy (&memb_join->system_from, &instance->my_id);
3412  memb_join->system_from.addr[0].nodeid = LEAVE_DUMMY_NODEID;
3413 
3414  // TODO: CC Maybe use the actual join send routine.
3415  /*
3416  * This mess adds the joined and failed processor lists into the join
3417  * message
3418  */
3419  addr = (char *)memb_join;
3420  addr_idx = sizeof (struct memb_join);
3421  memcpy (&addr[addr_idx],
3422  active_memb,
3423  active_memb_entries *
3424  sizeof (struct srp_addr));
3425  addr_idx +=
3426  active_memb_entries *
3427  sizeof (struct srp_addr);
3428  memcpy (&addr[addr_idx],
3429  instance->my_failed_list,
3430  instance->my_failed_list_entries *
3431  sizeof (struct srp_addr));
3432  addr_idx +=
3433  instance->my_failed_list_entries *
3434  sizeof (struct srp_addr);
3435 
3436 
3437  if (instance->totem_config->send_join_timeout) {
3438  usleep (random() % (instance->totem_config->send_join_timeout * 1000));
3439  }
3440  instance->stats.memb_join_tx++;
3441 
3443  instance->totemrrp_context,
3444  memb_join,
3445  addr_idx);
3446 }
3447 
3448 static void memb_merge_detect_transmit (struct totemsrp_instance *instance)
3449 {
3451 
3456  srp_addr_copy (&memb_merge_detect.system_from, &instance->my_id);
3457  memcpy (&memb_merge_detect.ring_id, &instance->my_ring_id,
3458  sizeof (struct memb_ring_id));
3459  assert (memb_merge_detect.header.nodeid);
3460 
3461  instance->stats.memb_merge_detect_tx++;
3464  sizeof (struct memb_merge_detect));
3465 }
3466 
3467 static void memb_ring_id_set (
3468  struct totemsrp_instance *instance,
3469  const struct memb_ring_id *ring_id)
3470 {
3471 
3472  memcpy (&instance->my_ring_id, ring_id, sizeof (struct memb_ring_id));
3473 }
3474 
3476  void *srp_context,
3477  void **handle_out,
3478  enum totem_callback_token_type type,
3479  int delete,
3480  int (*callback_fn) (enum totem_callback_token_type type, const void *),
3481  const void *data)
3482 {
3483  struct totemsrp_instance *instance = (struct totemsrp_instance *)srp_context;
3484  struct token_callback_instance *callback_handle;
3485 
3486  token_hold_cancel_send (instance);
3487 
3488  callback_handle = malloc (sizeof (struct token_callback_instance));
3489  if (callback_handle == 0) {
3490  return (-1);
3491  }
3492  *handle_out = (void *)callback_handle;
3493  list_init (&callback_handle->list);
3494  callback_handle->callback_fn = callback_fn;
3495  callback_handle->data = (void *) data;
3496  callback_handle->callback_type = type;
3497  callback_handle->delete = delete;
3498  switch (type) {
3500  list_add (&callback_handle->list, &instance->token_callback_received_listhead);
3501  break;
3503  list_add (&callback_handle->list, &instance->token_callback_sent_listhead);
3504  break;
3505  }
3506 
3507  return (0);
3508 }
3509 
3510 void totemsrp_callback_token_destroy (void *srp_context, void **handle_out)
3511 {
3512  struct token_callback_instance *h;
3513 
3514  if (*handle_out) {
3515  h = (struct token_callback_instance *)*handle_out;
3516  list_del (&h->list);
3517  free (h);
3518  h = NULL;
3519  *handle_out = 0;
3520  }
3521 }
3522 
3523 static void token_callbacks_execute (
3524  struct totemsrp_instance *instance,
3525  enum totem_callback_token_type type)
3526 {
3527  struct list_head *list;
3528  struct list_head *list_next;
3529  struct list_head *callback_listhead = 0;
3531  int res;
3532  int del;
3533 
3534  switch (type) {
3536  callback_listhead = &instance->token_callback_received_listhead;
3537  break;
3539  callback_listhead = &instance->token_callback_sent_listhead;
3540  break;
3541  default:
3542  assert (0);
3543  }
3544 
3545  for (list = callback_listhead->next; list != callback_listhead;
3546  list = list_next) {
3547 
3548  token_callback_instance = list_entry (list, struct token_callback_instance, list);
3549 
3550  list_next = list->next;
3551  del = token_callback_instance->delete;
3552  if (del == 1) {
3553  list_del (list);
3554  }
3555 
3556  res = token_callback_instance->callback_fn (
3557  token_callback_instance->callback_type,
3558  token_callback_instance->data);
3559  /*
3560  * This callback failed to execute, try it again on the next token
3561  */
3562  if (res == -1 && del == 1) {
3563  list_add (list, callback_listhead);
3564  } else if (del) {
3565  free (token_callback_instance);
3566  }
3567  }
3568 }
3569 
3570 /*
3571  * Flow control functions
3572  */
3573 static unsigned int backlog_get (struct totemsrp_instance *instance)
3574 {
3575  unsigned int backlog = 0;
3576  struct cs_queue *queue_use = NULL;
3577 
3578  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
3579  if (instance->waiting_trans_ack) {
3580  queue_use = &instance->new_message_queue_trans;
3581  } else {
3582  queue_use = &instance->new_message_queue;
3583  }
3584  } else
3585  if (instance->memb_state == MEMB_STATE_RECOVERY) {
3586  queue_use = &instance->retrans_message_queue;
3587  }
3588 
3589  if (queue_use != NULL) {
3590  backlog = cs_queue_used (queue_use);
3591  }
3592 
3593  instance->stats.token[instance->stats.latest_token].backlog_calc = backlog;
3594  return (backlog);
3595 }
3596 
3597 static int fcc_calculate (
3598  struct totemsrp_instance *instance,
3599  struct orf_token *token)
3600 {
3601  unsigned int transmits_allowed;
3602  unsigned int backlog_calc;
3603 
3604  transmits_allowed = instance->totem_config->max_messages;
3605 
3606  if (transmits_allowed > instance->totem_config->window_size - token->fcc) {
3607  transmits_allowed = instance->totem_config->window_size - token->fcc;
3608  }
3609 
3610  instance->my_cbl = backlog_get (instance);
3611 
3612  /*
3613  * Only do backlog calculation if there is a backlog otherwise
3614  * we would result in div by zero
3615  */
3616  if (token->backlog + instance->my_cbl - instance->my_pbl) {
3617  backlog_calc = (instance->totem_config->window_size * instance->my_pbl) /
3618  (token->backlog + instance->my_cbl - instance->my_pbl);
3619  if (backlog_calc > 0 && transmits_allowed > backlog_calc) {
3620  transmits_allowed = backlog_calc;
3621  }
3622  }
3623 
3624  return (transmits_allowed);
3625 }
3626 
3627 /*
3628  * don't overflow the RTR sort queue
3629  */
3630 static void fcc_rtr_limit (
3631  struct totemsrp_instance *instance,
3632  struct orf_token *token,
3633  unsigned int *transmits_allowed)
3634 {
3635  int check = QUEUE_RTR_ITEMS_SIZE_MAX;
3636  check -= (*transmits_allowed + instance->totem_config->window_size);
3637  assert (check >= 0);
3638  if (sq_lt_compare (instance->last_released +
3639  QUEUE_RTR_ITEMS_SIZE_MAX - *transmits_allowed -
3640  instance->totem_config->window_size,
3641 
3642  token->seq)) {
3643 
3644  *transmits_allowed = 0;
3645  }
3646 }
3647 
3648 static void fcc_token_update (
3649  struct totemsrp_instance *instance,
3650  struct orf_token *token,
3651  unsigned int msgs_transmitted)
3652 {
3653  token->fcc += msgs_transmitted - instance->my_trc;
3654  token->backlog += instance->my_cbl - instance->my_pbl;
3655  instance->my_trc = msgs_transmitted;
3656  instance->my_pbl = instance->my_cbl;
3657 }
3658 
3659 /*
3660  * Sanity checkers
3661  */
3662 static int check_totemip_sanity(
3663  const struct totemsrp_instance *instance,
3664  const struct totem_ip_address *addr,
3665  int endian_conversion_needed)
3666 {
3667  unsigned short family;
3668 
3669  family = addr->family;
3670  if (endian_conversion_needed) {
3671  family = swab16(family);
3672  }
3673 
3674  if (family != AF_INET && family != AF_INET6) {
3676  "Received message corrupted... ignoring.");
3677 
3678  return (-1);
3679  }
3680 
3681  return (0);
3682 }
3683 
3684 static int check_srpaddr_sanity(
3685  const struct totemsrp_instance *instance,
3686  const struct srp_addr *addr,
3687  int endian_conversion_needed)
3688 {
3689  int i;
3690 
3691  if (addr->no_addrs < 1 || addr->no_addrs > INTERFACE_MAX) {
3692  return (-1);
3693  }
3694 
3695  for (i = 0; i < addr->no_addrs; i++) {
3696  if (i == 0 || addr->addr[i].family != 0) {
3697  if (check_totemip_sanity(instance, &addr->addr[i], endian_conversion_needed) == -1) {
3698  return (-1);
3699  }
3700  }
3701  }
3702 
3703  return (0);
3704 }
3705 
3706 static int check_orf_token_sanity(
3707  const struct totemsrp_instance *instance,
3708  const void *msg,
3709  size_t msg_len,
3710  int endian_conversion_needed)
3711 {
3712  int rtr_entries;
3713  const struct orf_token *token = (const struct orf_token *)msg;
3714  size_t required_len;
3715  int i;
3716 
3717  if (msg_len < sizeof(struct orf_token)) {
3719  "Received orf_token message is too short... ignoring.");
3720 
3721  return (-1);
3722  }
3723 
3724  if (check_totemip_sanity(instance, &token->ring_id.rep, endian_conversion_needed) == -1) {
3725  return (-1);
3726  }
3727 
3728  if (endian_conversion_needed) {
3729  rtr_entries = swab32(token->rtr_list_entries);
3730  } else {
3731  rtr_entries = token->rtr_list_entries;
3732  }
3733 
3734  required_len = sizeof(struct orf_token) + rtr_entries * sizeof(struct rtr_item);
3735  if (msg_len < required_len) {
3737  "Received orf_token message is too short... ignoring.");
3738 
3739  return (-1);
3740  }
3741 
3742  for (i = 0; i < rtr_entries; i++) {
3743  if (check_totemip_sanity(instance, &token->rtr_list[i].ring_id.rep,
3744  endian_conversion_needed) == -1) {
3745  return (-1);
3746  }
3747  }
3748 
3749  return (0);
3750 }
3751 
3752 static int check_mcast_sanity(
3753  struct totemsrp_instance *instance,
3754  const void *msg,
3755  size_t msg_len,
3756  int endian_conversion_needed)
3757 {
3758  const struct mcast *mcast_msg = (const struct mcast *)msg;
3759 
3760  if (msg_len < sizeof(struct mcast)) {
3762  "Received mcast message is too short... ignoring.");
3763 
3764  return (-1);
3765  }
3766 
3767  if ((check_totemip_sanity(instance, &mcast_msg->ring_id.rep, endian_conversion_needed) == -1) ||
3768  (check_srpaddr_sanity(instance, &mcast_msg->system_from, endian_conversion_needed) == -1)) {
3769  return (-1);
3770  }
3771 
3772  return (0);
3773 }
3774 
3775 static int check_memb_merge_detect_sanity(
3776  struct totemsrp_instance *instance,
3777  const void *msg,
3778  size_t msg_len,
3779  int endian_conversion_needed)
3780 {
3781  const struct memb_merge_detect *mmd_msg = (const struct memb_merge_detect *)msg;
3782 
3783  if (msg_len < sizeof(struct memb_merge_detect)) {
3785  "Received memb_merge_detect message is too short... ignoring.");
3786 
3787  return (-1);
3788  }
3789 
3790  if ((check_totemip_sanity(instance, &mmd_msg->ring_id.rep, endian_conversion_needed) == -1) ||
3791  (check_srpaddr_sanity(instance, &mmd_msg->system_from, endian_conversion_needed) == -1)) {
3792  return (-1);
3793  }
3794 
3795  return (0);
3796 }
3797 
3798 static int check_memb_join_sanity(
3799  struct totemsrp_instance *instance,
3800  const void *msg,
3801  size_t msg_len,
3802  int endian_conversion_needed)
3803 {
3804  const struct memb_join *mj_msg = (const struct memb_join *)msg;
3805  unsigned int proc_list_entries;
3806  unsigned int failed_list_entries;
3807  size_t required_len;
3808  const struct srp_addr *proc_list;
3809  const struct srp_addr *failed_list;
3810  int i;
3811 
3812  if (msg_len < sizeof(struct memb_join)) {
3814  "Received memb_join message is too short... ignoring.");
3815 
3816  return (-1);
3817  }
3818 
3819  if (check_srpaddr_sanity(instance, &mj_msg->system_from, endian_conversion_needed) == -1) {
3820  return (-1);
3821  }
3822 
3823  proc_list_entries = mj_msg->proc_list_entries;
3824  failed_list_entries = mj_msg->failed_list_entries;
3825 
3826  if (endian_conversion_needed) {
3827  proc_list_entries = swab32(proc_list_entries);
3828  failed_list_entries = swab32(failed_list_entries);
3829  }
3830 
3831  required_len = sizeof(struct memb_join) + ((proc_list_entries + failed_list_entries) * sizeof(struct srp_addr));
3832  if (msg_len < required_len) {
3834  "Received memb_join message is too short... ignoring.");
3835 
3836  return (-1);
3837  }
3838 
3839  proc_list = (struct srp_addr *)mj_msg->end_of_memb_join;
3840  failed_list = proc_list + proc_list_entries;
3841 
3842  for (i = 0; i < proc_list_entries; i++) {
3843  if (check_srpaddr_sanity(instance, &proc_list[i], endian_conversion_needed) == -1) {
3844  return (-1);
3845  }
3846  }
3847 
3848  for (i = 0; i < failed_list_entries; i++) {
3849  if (check_srpaddr_sanity(instance, &failed_list[i], endian_conversion_needed) == -1) {
3850  return (-1);
3851  }
3852  }
3853 
3854  return (0);
3855 }
3856 
3857 static int check_memb_commit_token_sanity(
3858  struct totemsrp_instance *instance,
3859  const void *msg,
3860  size_t msg_len,
3861  int endian_conversion_needed)
3862 {
3863  const struct memb_commit_token *mct_msg = (const struct memb_commit_token *)msg;
3864  unsigned int addr_entries;
3865  const struct srp_addr *addr;
3866  const struct memb_commit_token_memb_entry *memb_list;
3867  size_t required_len;
3868  int i;
3869 
3870  if (msg_len < sizeof(struct memb_commit_token)) {
3872  "Received memb_commit_token message is too short... ignoring.");
3873 
3874  return (0);
3875  }
3876 
3877  if (check_totemip_sanity(instance, &mct_msg->ring_id.rep, endian_conversion_needed) == -1) {
3878  return (-1);
3879  }
3880 
3881  addr_entries= mct_msg->addr_entries;
3882  if (endian_conversion_needed) {
3883  addr_entries = swab32(addr_entries);
3884  }
3885 
3886  required_len = sizeof(struct memb_commit_token) +
3887  (addr_entries * (sizeof(struct srp_addr) + sizeof(struct memb_commit_token_memb_entry)));
3888  if (msg_len < required_len) {
3890  "Received memb_commit_token message is too short... ignoring.");
3891 
3892  return (-1);
3893  }
3894 
3895  addr = (const struct srp_addr *)mct_msg->end_of_commit_token;
3896  memb_list = (const struct memb_commit_token_memb_entry *)(addr + addr_entries);
3897 
3898  for (i = 0; i < addr_entries; i++) {
3899  if (check_srpaddr_sanity(instance, &addr[i], endian_conversion_needed) == -1) {
3900  return (-1);
3901  }
3902 
3903  if (memb_list[i].ring_id.rep.family != 0) {
3904  if (check_totemip_sanity(instance, &memb_list[i].ring_id.rep,
3905  endian_conversion_needed) == -1) {
3906  return (-1);
3907  }
3908  }
3909  }
3910 
3911  return (0);
3912 }
3913 
3914 static int check_token_hold_cancel_sanity(
3915  struct totemsrp_instance *instance,
3916  const void *msg,
3917  size_t msg_len,
3918  int endian_conversion_needed)
3919 {
3920  const struct token_hold_cancel *thc_msg = (const struct token_hold_cancel *)msg;
3921 
3922  if (msg_len < sizeof(struct token_hold_cancel)) {
3924  "Received token_hold_cancel message is too short... ignoring.");
3925 
3926  return (-1);
3927  }
3928 
3929  if (check_totemip_sanity(instance, &thc_msg->ring_id.rep, endian_conversion_needed) == -1) {
3930  return (-1);
3931  }
3932 
3933  return (0);
3934 }
3935 
3936 /*
3937  * Message Handlers
3938  */
3939 
3940 unsigned long long int tv_old;
3941 /*
3942  * message handler called when TOKEN message type received
3943  */
3944 static int message_handler_orf_token (
3945  struct totemsrp_instance *instance,
3946  const void *msg,
3947  size_t msg_len,
3948  int endian_conversion_needed)
3949 {
3950  char token_storage[1500];
3951  char token_convert[1500];
3952  struct orf_token *token = NULL;
3953  int forward_token;
3954  unsigned int transmits_allowed;
3955  unsigned int mcasted_retransmit;
3956  unsigned int mcasted_regular;
3957  unsigned int last_aru;
3958 
3959 #ifdef GIVEINFO
3960  unsigned long long tv_current;
3961  unsigned long long tv_diff;
3962 
3963  tv_current = qb_util_nano_current_get ();
3964  tv_diff = tv_current - tv_old;
3965  tv_old = tv_current;
3966 
3968  "Time since last token %0.4f ms", ((float)tv_diff) / 1000000.0);
3969 #endif
3970 
3971  if (check_orf_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
3972  return (0);
3973  }
3974 
3975  if (instance->orf_token_discard) {
3976  return (0);
3977  }
3978 #ifdef TEST_DROP_ORF_TOKEN_PERCENTAGE
3979  if (random()%100 < TEST_DROP_ORF_TOKEN_PERCENTAGE) {
3980  return (0);
3981  }
3982 #endif
3983 
3984  if (endian_conversion_needed) {
3985  orf_token_endian_convert ((struct orf_token *)msg,
3986  (struct orf_token *)token_convert);
3987  msg = (struct orf_token *)token_convert;
3988  }
3989 
3990  /*
3991  * Make copy of token and retransmit list in case we have
3992  * to flush incoming messages from the kernel queue
3993  */
3994  token = (struct orf_token *)token_storage;
3995  memcpy (token, msg, sizeof (struct orf_token));
3996  memcpy (&token->rtr_list[0], (char *)msg + sizeof (struct orf_token),
3997  sizeof (struct rtr_item) * RETRANSMIT_ENTRIES_MAX);
3998 
3999 
4000  /*
4001  * Handle merge detection timeout
4002  */
4003  if (token->seq == instance->my_last_seq) {
4004  start_merge_detect_timeout (instance);
4005  instance->my_seq_unchanged += 1;
4006  } else {
4007  cancel_merge_detect_timeout (instance);
4008  cancel_token_hold_retransmit_timeout (instance);
4009  instance->my_seq_unchanged = 0;
4010  }
4011 
4012  instance->my_last_seq = token->seq;
4013 
4014 #ifdef TEST_RECOVERY_MSG_COUNT
4015  if (instance->memb_state == MEMB_STATE_OPERATIONAL && token->seq > TEST_RECOVERY_MSG_COUNT) {
4016  return (0);
4017  }
4018 #endif
4019  instance->flushing = 1;
4021  instance->flushing = 0;
4022 
4023  /*
4024  * Determine if we should hold (in reality drop) the token
4025  */
4026  instance->my_token_held = 0;
4027  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
4028  instance->my_seq_unchanged > instance->totem_config->seqno_unchanged_const) {
4029  instance->my_token_held = 1;
4030  } else
4031  if (!totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0]) &&
4032  instance->my_seq_unchanged >= instance->totem_config->seqno_unchanged_const) {
4033  instance->my_token_held = 1;
4034  }
4035 
4036  /*
4037  * Hold onto token when there is no activity on ring and
4038  * this processor is the ring rep
4039  */
4040  forward_token = 1;
4041  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
4042  if (instance->my_token_held) {
4043  forward_token = 0;
4044  }
4045  }
4046 
4047  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_RECEIVED);
4048 
4049  switch (instance->memb_state) {
4050  case MEMB_STATE_COMMIT:
4051  /* Discard token */
4052  break;
4053 
4055  messages_free (instance, token->aru);
4056  /*
4057  * Do NOT add break, this case should also execute code in gather case.
4058  */
4059 
4060  case MEMB_STATE_GATHER:
4061  /*
4062  * DO NOT add break, we use different free mechanism in recovery state
4063  */
4064 
4065  case MEMB_STATE_RECOVERY:
4066  /*
4067  * Discard tokens from another configuration
4068  */
4069  if (memcmp (&token->ring_id, &instance->my_ring_id,
4070  sizeof (struct memb_ring_id)) != 0) {
4071 
4072  if ((forward_token)
4073  && instance->use_heartbeat) {
4074  reset_heartbeat_timeout(instance);
4075  }
4076  else {
4077  cancel_heartbeat_timeout(instance);
4078  }
4079 
4080  return (0); /* discard token */
4081  }
4082 
4083  /*
4084  * Discard retransmitted tokens
4085  */
4086  if (sq_lte_compare (token->token_seq, instance->my_token_seq)) {
4087  return (0); /* discard token */
4088  }
4089  last_aru = instance->my_last_aru;
4090  instance->my_last_aru = token->aru;
4091 
4092  transmits_allowed = fcc_calculate (instance, token);
4093  mcasted_retransmit = orf_token_rtr (instance, token, &transmits_allowed);
4094 
4095  if (instance->my_token_held == 1 &&
4096  (token->rtr_list_entries > 0 || mcasted_retransmit > 0)) {
4097  instance->my_token_held = 0;
4098  forward_token = 1;
4099  }
4100 
4101  fcc_rtr_limit (instance, token, &transmits_allowed);
4102  mcasted_regular = orf_token_mcast (instance, token, transmits_allowed);
4103 /*
4104 if (mcasted_regular) {
4105 printf ("mcasted regular %d\n", mcasted_regular);
4106 printf ("token seq %d\n", token->seq);
4107 }
4108 */
4109  fcc_token_update (instance, token, mcasted_retransmit +
4110  mcasted_regular);
4111 
4112  if (sq_lt_compare (instance->my_aru, token->aru) ||
4113  instance->my_id.addr[0].nodeid == token->aru_addr ||
4114  token->aru_addr == 0) {
4115 
4116  token->aru = instance->my_aru;
4117  if (token->aru == token->seq) {
4118  token->aru_addr = 0;
4119  } else {
4120  token->aru_addr = instance->my_id.addr[0].nodeid;
4121  }
4122  }
4123  if (token->aru == last_aru && token->aru_addr != 0) {
4124  instance->my_aru_count += 1;
4125  } else {
4126  instance->my_aru_count = 0;
4127  }
4128 
4129  /*
4130  * We really don't follow specification there. In specification, OTHER nodes
4131  * detect failure of one node (based on aru_count) and my_id IS NEVER added
4132  * to failed list (so node never mark itself as failed)
4133  */
4134  if (instance->my_aru_count > instance->totem_config->fail_to_recv_const &&
4135  token->aru_addr == instance->my_id.addr[0].nodeid) {
4136 
4138  "FAILED TO RECEIVE");
4139 
4140  instance->failed_to_recv = 1;
4141 
4142  memb_set_merge (&instance->my_id, 1,
4143  instance->my_failed_list,
4144  &instance->my_failed_list_entries);
4145 
4146  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FAILED_TO_RECEIVE);
4147  } else {
4148  instance->my_token_seq = token->token_seq;
4149  token->token_seq += 1;
4150 
4151  if (instance->memb_state == MEMB_STATE_RECOVERY) {
4152  /*
4153  * instance->my_aru == instance->my_high_seq_received means this processor
4154  * has recovered all messages it can recover
4155  * (ie: its retrans queue is empty)
4156  */
4157  if (cs_queue_is_empty (&instance->retrans_message_queue) == 0) {
4158 
4159  if (token->retrans_flg == 0) {
4160  token->retrans_flg = 1;
4161  instance->my_set_retrans_flg = 1;
4162  }
4163  } else
4164  if (token->retrans_flg == 1 && instance->my_set_retrans_flg) {
4165  token->retrans_flg = 0;
4166  instance->my_set_retrans_flg = 0;
4167  }
4169  "token retrans flag is %d my set retrans flag%d retrans queue empty %d count %d, aru %x",
4170  token->retrans_flg, instance->my_set_retrans_flg,
4171  cs_queue_is_empty (&instance->retrans_message_queue),
4172  instance->my_retrans_flg_count, token->aru);
4173  if (token->retrans_flg == 0) {
4174  instance->my_retrans_flg_count += 1;
4175  } else {
4176  instance->my_retrans_flg_count = 0;
4177  }
4178  if (instance->my_retrans_flg_count == 2) {
4179  instance->my_install_seq = token->seq;
4180  }
4182  "install seq %x aru %x high seq received %x",
4183  instance->my_install_seq, instance->my_aru, instance->my_high_seq_received);
4184  if (instance->my_retrans_flg_count >= 2 &&
4185  instance->my_received_flg == 0 &&
4186  sq_lte_compare (instance->my_install_seq, instance->my_aru)) {
4187  instance->my_received_flg = 1;
4188  instance->my_deliver_memb_entries = instance->my_trans_memb_entries;
4189  memcpy (instance->my_deliver_memb_list, instance->my_trans_memb_list,
4190  sizeof (struct totem_ip_address) * instance->my_trans_memb_entries);
4191  }
4192  if (instance->my_retrans_flg_count >= 3 &&
4193  sq_lte_compare (instance->my_install_seq, token->aru)) {
4194  instance->my_rotation_counter += 1;
4195  } else {
4196  instance->my_rotation_counter = 0;
4197  }
4198  if (instance->my_rotation_counter == 2) {
4200  "retrans flag count %x token aru %x install seq %x aru %x %x",
4201  instance->my_retrans_flg_count, token->aru, instance->my_install_seq,
4202  instance->my_aru, token->seq);
4203 
4204  memb_state_operational_enter (instance);
4205  instance->my_rotation_counter = 0;
4206  instance->my_retrans_flg_count = 0;
4207  }
4208  }
4209 
4211  token_send (instance, token, forward_token);
4212 
4213 #ifdef GIVEINFO
4214  tv_current = qb_util_nano_current_get ();
4215  tv_diff = tv_current - tv_old;
4216  tv_old = tv_current;
4218  "I held %0.4f ms",
4219  ((float)tv_diff) / 1000000.0);
4220 #endif
4221  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4222  messages_deliver_to_app (instance, 0,
4223  instance->my_high_seq_received);
4224  }
4225 
4226  /*
4227  * Deliver messages after token has been transmitted
4228  * to improve performance
4229  */
4230  reset_token_timeout (instance); // REVIEWED
4231  reset_token_retransmit_timeout (instance); // REVIEWED
4232  if (totemip_equal(&instance->my_id.addr[0], &instance->my_ring_id.rep) &&
4233  instance->my_token_held == 1) {
4234 
4235  start_token_hold_retransmit_timeout (instance);
4236  }
4237 
4238  token_callbacks_execute (instance, TOTEM_CALLBACK_TOKEN_SENT);
4239  }
4240  break;
4241  }
4242 
4243  if ((forward_token)
4244  && instance->use_heartbeat) {
4245  reset_heartbeat_timeout(instance);
4246  }
4247  else {
4248  cancel_heartbeat_timeout(instance);
4249  }
4250 
4251  return (0);
4252 }
4253 
4254 static void messages_deliver_to_app (
4255  struct totemsrp_instance *instance,
4256  int skip,
4257  unsigned int end_point)
4258 {
4259  struct sort_queue_item *sort_queue_item_p;
4260  unsigned int i;
4261  int res;
4262  struct mcast *mcast_in;
4263  struct mcast mcast_header;
4264  unsigned int range = 0;
4265  int endian_conversion_required;
4266  unsigned int my_high_delivered_stored = 0;
4267 
4268 
4269  range = end_point - instance->my_high_delivered;
4270 
4271  if (range) {
4273  "Delivering %x to %x", instance->my_high_delivered,
4274  end_point);
4275  }
4276  assert (range < QUEUE_RTR_ITEMS_SIZE_MAX);
4277  my_high_delivered_stored = instance->my_high_delivered;
4278 
4279  /*
4280  * Deliver messages in order from rtr queue to pending delivery queue
4281  */
4282  for (i = 1; i <= range; i++) {
4283 
4284  void *ptr = 0;
4285 
4286  /*
4287  * If out of range of sort queue, stop assembly
4288  */
4289  res = sq_in_range (&instance->regular_sort_queue,
4290  my_high_delivered_stored + i);
4291  if (res == 0) {
4292  break;
4293  }
4294 
4295  res = sq_item_get (&instance->regular_sort_queue,
4296  my_high_delivered_stored + i, &ptr);
4297  /*
4298  * If hole, stop assembly
4299  */
4300  if (res != 0 && skip == 0) {
4301  break;
4302  }
4303 
4304  instance->my_high_delivered = my_high_delivered_stored + i;
4305 
4306  if (res != 0) {
4307  continue;
4308 
4309  }
4310 
4311  sort_queue_item_p = ptr;
4312 
4313  mcast_in = sort_queue_item_p->mcast;
4314  assert (mcast_in != (struct mcast *)0xdeadbeef);
4315 
4316  endian_conversion_required = 0;
4317  if (mcast_in->header.endian_detector != ENDIAN_LOCAL) {
4318  endian_conversion_required = 1;
4319  mcast_endian_convert (mcast_in, &mcast_header);
4320  } else {
4321  memcpy (&mcast_header, mcast_in, sizeof (struct mcast));
4322  }
4323 
4324  /*
4325  * Skip messages not originated in instance->my_deliver_memb
4326  */
4327  if (skip &&
4328  memb_set_subset (&mcast_header.system_from,
4329  1,
4330  instance->my_deliver_memb_list,
4331  instance->my_deliver_memb_entries) == 0) {
4332 
4333  instance->my_high_delivered = my_high_delivered_stored + i;
4334 
4335  continue;
4336  }
4337 
4338  /*
4339  * Message found
4340  */
4342  "Delivering MCAST message with seq %x to pending delivery queue",
4343  mcast_header.seq);
4344 
4345  /*
4346  * Message is locally originated multicast
4347  */
4348  instance->totemsrp_deliver_fn (
4349  mcast_header.header.nodeid,
4350  ((char *)sort_queue_item_p->mcast) + sizeof (struct mcast),
4351  sort_queue_item_p->msg_len - sizeof (struct mcast),
4352  endian_conversion_required);
4353  }
4354 }
4355 
4356 /*
4357  * recv message handler called when MCAST message type received
4358  */
4359 static int message_handler_mcast (
4360  struct totemsrp_instance *instance,
4361  const void *msg,
4362  size_t msg_len,
4363  int endian_conversion_needed)
4364 {
4365  struct sort_queue_item sort_queue_item;
4366  struct sq *sort_queue;
4367  struct mcast mcast_header;
4368 
4369  if (check_mcast_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4370  return (0);
4371  }
4372 
4373  if (endian_conversion_needed) {
4374  mcast_endian_convert (msg, &mcast_header);
4375  } else {
4376  memcpy (&mcast_header, msg, sizeof (struct mcast));
4377  }
4378 
4379  if (mcast_header.header.encapsulated == MESSAGE_ENCAPSULATED) {
4380  sort_queue = &instance->recovery_sort_queue;
4381  } else {
4382  sort_queue = &instance->regular_sort_queue;
4383  }
4384 
4385  assert (msg_len <= FRAME_SIZE_MAX);
4386 
4387 #ifdef TEST_DROP_MCAST_PERCENTAGE
4388  if (random()%100 < TEST_DROP_MCAST_PERCENTAGE) {
4389  return (0);
4390  }
4391 #endif
4392 
4393  /*
4394  * If the message is foreign execute the switch below
4395  */
4396  if (memcmp (&instance->my_ring_id, &mcast_header.ring_id,
4397  sizeof (struct memb_ring_id)) != 0) {
4398 
4399  switch (instance->memb_state) {
4401  memb_set_merge (
4402  &mcast_header.system_from, 1,
4403  instance->my_proc_list, &instance->my_proc_list_entries);
4404  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_OPERATIONAL_STATE);
4405  break;
4406 
4407  case MEMB_STATE_GATHER:
4408  if (!memb_set_subset (
4409  &mcast_header.system_from,
4410  1,
4411  instance->my_proc_list,
4412  instance->my_proc_list_entries)) {
4413 
4414  memb_set_merge (&mcast_header.system_from, 1,
4415  instance->my_proc_list, &instance->my_proc_list_entries);
4416  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_FOREIGN_MESSAGE_IN_GATHER_STATE);
4417  return (0);
4418  }
4419  break;
4420 
4421  case MEMB_STATE_COMMIT:
4422  /* discard message */
4423  instance->stats.rx_msg_dropped++;
4424  break;
4425 
4426  case MEMB_STATE_RECOVERY:
4427  /* discard message */
4428  instance->stats.rx_msg_dropped++;
4429  break;
4430  }
4431  return (0);
4432  }
4433 
4435  "Received ringid(%s:%lld) seq %x",
4436  totemip_print (&mcast_header.ring_id.rep),
4437  mcast_header.ring_id.seq,
4438  mcast_header.seq);
4439 
4440  /*
4441  * Add mcast message to rtr queue if not already in rtr queue
4442  * otherwise free io vectors
4443  */
4444  if (msg_len > 0 && msg_len <= FRAME_SIZE_MAX &&
4445  sq_in_range (sort_queue, mcast_header.seq) &&
4446  sq_item_inuse (sort_queue, mcast_header.seq) == 0) {
4447 
4448  /*
4449  * Allocate new multicast memory block
4450  */
4451 // TODO LEAK
4452  sort_queue_item.mcast = totemsrp_buffer_alloc (instance);
4453  if (sort_queue_item.mcast == NULL) {
4454  return (-1); /* error here is corrected by the algorithm */
4455  }
4456  memcpy (sort_queue_item.mcast, msg, msg_len);
4457  sort_queue_item.msg_len = msg_len;
4458 
4459  if (sq_lt_compare (instance->my_high_seq_received,
4460  mcast_header.seq)) {
4461  instance->my_high_seq_received = mcast_header.seq;
4462  }
4463 
4464  sq_item_add (sort_queue, &sort_queue_item, mcast_header.seq);
4465  }
4466 
4467  update_aru (instance);
4468  if (instance->memb_state == MEMB_STATE_OPERATIONAL) {
4469  messages_deliver_to_app (instance, 0, instance->my_high_seq_received);
4470  }
4471 
4472 /* TODO remove from retrans message queue for old ring in recovery state */
4473  return (0);
4474 }
4475 
4476 static int message_handler_memb_merge_detect (
4477  struct totemsrp_instance *instance,
4478  const void *msg,
4479  size_t msg_len,
4480  int endian_conversion_needed)
4481 {
4483 
4484  if (check_memb_merge_detect_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4485  return (0);
4486  }
4487 
4488  if (endian_conversion_needed) {
4489  memb_merge_detect_endian_convert (msg, &memb_merge_detect);
4490  } else {
4491  memcpy (&memb_merge_detect, msg,
4492  sizeof (struct memb_merge_detect));
4493  }
4494 
4495  /*
4496  * do nothing if this is a merge detect from this configuration
4497  */
4498  if (memcmp (&instance->my_ring_id, &memb_merge_detect.ring_id,
4499  sizeof (struct memb_ring_id)) == 0) {
4500 
4501  return (0);
4502  }
4503 
4504  /*
4505  * Execute merge operation
4506  */
4507  switch (instance->memb_state) {
4509  memb_set_merge (&memb_merge_detect.system_from, 1,
4510  instance->my_proc_list, &instance->my_proc_list_entries);
4511  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_OPERATIONAL_STATE);
4512  break;
4513 
4514  case MEMB_STATE_GATHER:
4515  if (!memb_set_subset (
4517  1,
4518  instance->my_proc_list,
4519  instance->my_proc_list_entries)) {
4520 
4521  memb_set_merge (&memb_merge_detect.system_from, 1,
4522  instance->my_proc_list, &instance->my_proc_list_entries);
4523  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_GATHER_STATE);
4524  return (0);
4525  }
4526  break;
4527 
4528  case MEMB_STATE_COMMIT:
4529  /* do nothing in commit */
4530  break;
4531 
4532  case MEMB_STATE_RECOVERY:
4533  /* do nothing in recovery */
4534  break;
4535  }
4536  return (0);
4537 }
4538 
4539 static void memb_join_process (
4540  struct totemsrp_instance *instance,
4541  const struct memb_join *memb_join)
4542 {
4543  struct srp_addr *proc_list;
4544  struct srp_addr *failed_list;
4545  int gather_entered = 0;
4546  int fail_minus_memb_entries = 0;
4547  struct srp_addr fail_minus_memb[PROCESSOR_COUNT_MAX];
4548 
4549  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4550  failed_list = proc_list + memb_join->proc_list_entries;
4551 
4552 /*
4553  memb_set_print ("proclist", proc_list, memb_join->proc_list_entries);
4554  memb_set_print ("faillist", failed_list, memb_join->failed_list_entries);
4555  memb_set_print ("my_proclist", instance->my_proc_list, instance->my_proc_list_entries);
4556  memb_set_print ("my_faillist", instance->my_failed_list, instance->my_failed_list_entries);
4557 -*/
4558 
4559  if (memb_join->header.type == MESSAGE_TYPE_MEMB_JOIN) {
4560  if (instance->flushing) {
4561  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4563  "Discarding LEAVE message during flush, nodeid=%u",
4564  memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4565  if (memb_join->failed_list_entries > 0) {
4566  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4567  }
4568  } else {
4570  "Discarding JOIN message during flush, nodeid=%d", memb_join->header.nodeid);
4571  }
4572  return;
4573  } else {
4574  if (memb_join->header.nodeid == LEAVE_DUMMY_NODEID) {
4576  "Received LEAVE message from %u", memb_join->failed_list_entries > 0 ? failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid : LEAVE_DUMMY_NODEID);
4577  if (memb_join->failed_list_entries > 0) {
4578  my_leave_memb_set(instance, failed_list[memb_join->failed_list_entries - 1 ].addr[0].nodeid);
4579  }
4580  }
4581  }
4582 
4583  }
4584 
4585  if (memb_set_equal (proc_list,
4586  memb_join->proc_list_entries,
4587  instance->my_proc_list,
4588  instance->my_proc_list_entries) &&
4589 
4590  memb_set_equal (failed_list,
4591  memb_join->failed_list_entries,
4592  instance->my_failed_list,
4593  instance->my_failed_list_entries)) {
4594 
4595  memb_consensus_set (instance, &memb_join->system_from);
4596 
4597  if (memb_consensus_agreed (instance) && instance->failed_to_recv == 1) {
4598  instance->failed_to_recv = 0;
4599  srp_addr_copy (&instance->my_proc_list[0],
4600  &instance->my_id);
4601  instance->my_proc_list_entries = 1;
4602  instance->my_failed_list_entries = 0;
4603 
4604  memb_state_commit_token_create (instance);
4605 
4606  memb_state_commit_enter (instance);
4607  return;
4608  }
4609  if (memb_consensus_agreed (instance) &&
4610  memb_lowest_in_config (instance)) {
4611 
4612  memb_state_commit_token_create (instance);
4613 
4614  memb_state_commit_enter (instance);
4615  } else {
4616  goto out;
4617  }
4618  } else
4619  if (memb_set_subset (proc_list,
4620  memb_join->proc_list_entries,
4621  instance->my_proc_list,
4622  instance->my_proc_list_entries) &&
4623 
4624  memb_set_subset (failed_list,
4625  memb_join->failed_list_entries,
4626  instance->my_failed_list,
4627  instance->my_failed_list_entries)) {
4628 
4629  goto out;
4630  } else
4631  if (memb_set_subset (&memb_join->system_from, 1,
4632  instance->my_failed_list, instance->my_failed_list_entries)) {
4633 
4634  goto out;
4635  } else {
4636  memb_set_merge (proc_list,
4637  memb_join->proc_list_entries,
4638  instance->my_proc_list, &instance->my_proc_list_entries);
4639 
4640  if (memb_set_subset (
4641  &instance->my_id, 1,
4642  failed_list, memb_join->failed_list_entries)) {
4643 
4644  memb_set_merge (
4645  &memb_join->system_from, 1,
4646  instance->my_failed_list, &instance->my_failed_list_entries);
4647  } else {
4648  if (memb_set_subset (
4649  &memb_join->system_from, 1,
4650  instance->my_memb_list,
4651  instance->my_memb_entries)) {
4652 
4653  if (memb_set_subset (
4654  &memb_join->system_from, 1,
4655  instance->my_failed_list,
4656  instance->my_failed_list_entries) == 0) {
4657 
4658  memb_set_merge (failed_list,
4659  memb_join->failed_list_entries,
4660  instance->my_failed_list, &instance->my_failed_list_entries);
4661  } else {
4662  memb_set_subtract (fail_minus_memb,
4663  &fail_minus_memb_entries,
4664  failed_list,
4665  memb_join->failed_list_entries,
4666  instance->my_memb_list,
4667  instance->my_memb_entries);
4668 
4669  memb_set_merge (fail_minus_memb,
4670  fail_minus_memb_entries,
4671  instance->my_failed_list,
4672  &instance->my_failed_list_entries);
4673  }
4674  }
4675  }
4676  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_MERGE_DURING_JOIN);
4677  gather_entered = 1;
4678  }
4679 
4680 out:
4681  if (gather_entered == 0 &&
4682  instance->memb_state == MEMB_STATE_OPERATIONAL) {
4683 
4684  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_OPERATIONAL_STATE);
4685  }
4686 }
4687 
4688 static void memb_join_endian_convert (const struct memb_join *in, struct memb_join *out)
4689 {
4690  int i;
4691  struct srp_addr *in_proc_list;
4692  struct srp_addr *in_failed_list;
4693  struct srp_addr *out_proc_list;
4694  struct srp_addr *out_failed_list;
4695 
4696  out->header.type = in->header.type;
4698  out->header.nodeid = swab32 (in->header.nodeid);
4699  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4702  out->ring_seq = swab64 (in->ring_seq);
4703 
4704  in_proc_list = (struct srp_addr *)in->end_of_memb_join;
4705  in_failed_list = in_proc_list + out->proc_list_entries;
4706  out_proc_list = (struct srp_addr *)out->end_of_memb_join;
4707  out_failed_list = out_proc_list + out->proc_list_entries;
4708 
4709  for (i = 0; i < out->proc_list_entries; i++) {
4710  srp_addr_copy_endian_convert (&out_proc_list[i], &in_proc_list[i]);
4711  }
4712  for (i = 0; i < out->failed_list_entries; i++) {
4713  srp_addr_copy_endian_convert (&out_failed_list[i], &in_failed_list[i]);
4714  }
4715 }
4716 
4717 static void memb_commit_token_endian_convert (const struct memb_commit_token *in, struct memb_commit_token *out)
4718 {
4719  int i;
4720  struct srp_addr *in_addr = (struct srp_addr *)in->end_of_commit_token;
4721  struct srp_addr *out_addr = (struct srp_addr *)out->end_of_commit_token;
4722  struct memb_commit_token_memb_entry *in_memb_list;
4723  struct memb_commit_token_memb_entry *out_memb_list;
4724 
4725  out->header.type = in->header.type;
4727  out->header.nodeid = swab32 (in->header.nodeid);
4728  out->token_seq = swab32 (in->token_seq);
4730  out->ring_id.seq = swab64 (in->ring_id.seq);
4731  out->retrans_flg = swab32 (in->retrans_flg);
4732  out->memb_index = swab32 (in->memb_index);
4733  out->addr_entries = swab32 (in->addr_entries);
4734 
4735  in_memb_list = (struct memb_commit_token_memb_entry *)(in_addr + out->addr_entries);
4736  out_memb_list = (struct memb_commit_token_memb_entry *)(out_addr + out->addr_entries);
4737  for (i = 0; i < out->addr_entries; i++) {
4738  srp_addr_copy_endian_convert (&out_addr[i], &in_addr[i]);
4739 
4740  /*
4741  * Only convert the memb entry if it has been set
4742  */
4743  if (in_memb_list[i].ring_id.rep.family != 0) {
4744  totemip_copy_endian_convert (&out_memb_list[i].ring_id.rep,
4745  &in_memb_list[i].ring_id.rep);
4746 
4747  out_memb_list[i].ring_id.seq =
4748  swab64 (in_memb_list[i].ring_id.seq);
4749  out_memb_list[i].aru = swab32 (in_memb_list[i].aru);
4750  out_memb_list[i].high_delivered = swab32 (in_memb_list[i].high_delivered);
4751  out_memb_list[i].received_flg = swab32 (in_memb_list[i].received_flg);
4752  }
4753  }
4754 }
4755 
4756 static void orf_token_endian_convert (const struct orf_token *in, struct orf_token *out)
4757 {
4758  int i;
4759 
4760  out->header.type = in->header.type;
4762  out->header.nodeid = swab32 (in->header.nodeid);
4763  out->seq = swab32 (in->seq);
4764  out->token_seq = swab32 (in->token_seq);
4765  out->aru = swab32 (in->aru);
4767  out->aru_addr = swab32(in->aru_addr);
4768  out->ring_id.seq = swab64 (in->ring_id.seq);
4769  out->fcc = swab32 (in->fcc);
4770  out->backlog = swab32 (in->backlog);
4771  out->retrans_flg = swab32 (in->retrans_flg);
4773  for (i = 0; i < out->rtr_list_entries; i++) {
4775  out->rtr_list[i].ring_id.seq = swab64 (in->rtr_list[i].ring_id.seq);
4776  out->rtr_list[i].seq = swab32 (in->rtr_list[i].seq);
4777  }
4778 }
4779 
4780 static void mcast_endian_convert (const struct mcast *in, struct mcast *out)
4781 {
4782  out->header.type = in->header.type;
4784  out->header.nodeid = swab32 (in->header.nodeid);
4786 
4787  out->seq = swab32 (in->seq);
4788  out->this_seqno = swab32 (in->this_seqno);
4790  out->ring_id.seq = swab64 (in->ring_id.seq);
4791  out->node_id = swab32 (in->node_id);
4792  out->guarantee = swab32 (in->guarantee);
4793  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4794 }
4795 
4796 static void memb_merge_detect_endian_convert (
4797  const struct memb_merge_detect *in,
4798  struct memb_merge_detect *out)
4799 {
4800  out->header.type = in->header.type;
4802  out->header.nodeid = swab32 (in->header.nodeid);
4804  out->ring_id.seq = swab64 (in->ring_id.seq);
4805  srp_addr_copy_endian_convert (&out->system_from, &in->system_from);
4806 }
4807 
4808 static int ignore_join_under_operational (
4809  struct totemsrp_instance *instance,
4810  const struct memb_join *memb_join)
4811 {
4812  struct srp_addr *proc_list;
4813  struct srp_addr *failed_list;
4814  unsigned long long ring_seq;
4815 
4816  proc_list = (struct srp_addr *)memb_join->end_of_memb_join;
4817  failed_list = proc_list + memb_join->proc_list_entries;
4818  ring_seq = memb_join->ring_seq;
4819 
4820  if (memb_set_subset (&instance->my_id, 1,
4821  failed_list, memb_join->failed_list_entries)) {
4822  return (1);
4823  }
4824 
4825  /*
4826  * In operational state, my_proc_list is exactly the same as
4827  * my_memb_list.
4828  */
4829  if ((memb_set_subset (&memb_join->system_from, 1,
4830  instance->my_memb_list, instance->my_memb_entries)) &&
4831  (ring_seq < instance->my_ring_id.seq)) {
4832  return (1);
4833  }
4834 
4835  return (0);
4836 }
4837 
4838 static int message_handler_memb_join (
4839  struct totemsrp_instance *instance,
4840  const void *msg,
4841  size_t msg_len,
4842  int endian_conversion_needed)
4843 {
4844  const struct memb_join *memb_join;
4845  struct memb_join *memb_join_convert = alloca (msg_len);
4846 
4847  if (check_memb_join_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4848  return (0);
4849  }
4850 
4851  if (endian_conversion_needed) {
4852  memb_join = memb_join_convert;
4853  memb_join_endian_convert (msg, memb_join_convert);
4854 
4855  } else {
4856  memb_join = msg;
4857  }
4858  /*
4859  * If the process paused because it wasn't scheduled in a timely
4860  * fashion, flush the join messages because they may be queued
4861  * entries
4862  */
4863  if (pause_flush (instance)) {
4864  return (0);
4865  }
4866 
4867  if (instance->token_ring_id_seq < memb_join->ring_seq) {
4868  instance->token_ring_id_seq = memb_join->ring_seq;
4869  }
4870  switch (instance->memb_state) {
4872  if (!ignore_join_under_operational (instance, memb_join)) {
4873  memb_join_process (instance, memb_join);
4874  }
4875  break;
4876 
4877  case MEMB_STATE_GATHER:
4878  memb_join_process (instance, memb_join);
4879  break;
4880 
4881  case MEMB_STATE_COMMIT:
4882  if (memb_set_subset (&memb_join->system_from,
4883  1,
4884  instance->my_new_memb_list,
4885  instance->my_new_memb_entries) &&
4886 
4887  memb_join->ring_seq >= instance->my_ring_id.seq) {
4888 
4889  memb_join_process (instance, memb_join);
4890  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_COMMIT_STATE);
4891  }
4892  break;
4893 
4894  case MEMB_STATE_RECOVERY:
4895  if (memb_set_subset (&memb_join->system_from,
4896  1,
4897  instance->my_new_memb_list,
4898  instance->my_new_memb_entries) &&
4899 
4900  memb_join->ring_seq >= instance->my_ring_id.seq) {
4901 
4902  memb_join_process (instance, memb_join);
4903  memb_recovery_state_token_loss (instance);
4904  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_JOIN_DURING_RECOVERY);
4905  }
4906  break;
4907  }
4908  return (0);
4909 }
4910 
4911 static int message_handler_memb_commit_token (
4912  struct totemsrp_instance *instance,
4913  const void *msg,
4914  size_t msg_len,
4915  int endian_conversion_needed)
4916 {
4917  struct memb_commit_token *memb_commit_token_convert = alloca (msg_len);
4919  struct srp_addr sub[PROCESSOR_COUNT_MAX];
4920  int sub_entries;
4921 
4922  struct srp_addr *addr;
4923 
4925  "got commit token");
4926 
4927  if (check_memb_commit_token_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
4928  return (0);
4929  }
4930 
4931  if (endian_conversion_needed) {
4932  memb_commit_token_endian_convert (msg, memb_commit_token_convert);
4933  } else {
4934  memcpy (memb_commit_token_convert, msg, msg_len);
4935  }
4936  memb_commit_token = memb_commit_token_convert;
4937  addr = (struct srp_addr *)memb_commit_token->end_of_commit_token;
4938 
4939 #ifdef TEST_DROP_COMMIT_TOKEN_PERCENTAGE
4940  if (random()%100 < TEST_DROP_COMMIT_TOKEN_PERCENTAGE) {
4941  return (0);
4942  }
4943 #endif
4944  switch (instance->memb_state) {
4946  /* discard token */
4947  break;
4948 
4949  case MEMB_STATE_GATHER:
4950  memb_set_subtract (sub, &sub_entries,
4951  instance->my_proc_list, instance->my_proc_list_entries,
4952  instance->my_failed_list, instance->my_failed_list_entries);
4953 
4954  if (memb_set_equal (addr,
4955  memb_commit_token->addr_entries,
4956  sub,
4957  sub_entries) &&
4958 
4959  memb_commit_token->ring_id.seq > instance->my_ring_id.seq) {
4960  memcpy (instance->commit_token, memb_commit_token, msg_len);
4961  memb_state_commit_enter (instance);
4962  }
4963  break;
4964 
4965  case MEMB_STATE_COMMIT:
4966  /*
4967  * If retransmitted commit tokens are sent on this ring
4968  * filter them out and only enter recovery once the
4969  * commit token has traversed the array. This is
4970  * determined by :
4971  * memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4972  */
4973  if (memb_commit_token->ring_id.seq == instance->my_ring_id.seq &&
4974  memb_commit_token->memb_index == memb_commit_token->addr_entries) {
4975  memb_state_recovery_enter (instance, memb_commit_token);
4976  }
4977  break;
4978 
4979  case MEMB_STATE_RECOVERY:
4980  if (totemip_equal (&instance->my_id.addr[0], &instance->my_ring_id.rep)) {
4981 
4982  /* Filter out duplicated tokens */
4983  if (instance->originated_orf_token) {
4984  break;
4985  }
4986 
4987  instance->originated_orf_token = 1;
4988 
4990  "Sending initial ORF token");
4991 
4992  // TODO convert instead of initiate
4993  orf_token_send_initial (instance);
4994  reset_token_timeout (instance); // REVIEWED
4995  reset_token_retransmit_timeout (instance); // REVIEWED
4996  }
4997  break;
4998  }
4999  return (0);
5000 }
5001 
5002 static int message_handler_token_hold_cancel (
5003  struct totemsrp_instance *instance,
5004  const void *msg,
5005  size_t msg_len,
5006  int endian_conversion_needed)
5007 {
5008  const struct token_hold_cancel *token_hold_cancel = msg;
5009 
5010  if (check_token_hold_cancel_sanity(instance, msg, msg_len, endian_conversion_needed) == -1) {
5011  return (0);
5012  }
5013 
5014  if (memcmp (&token_hold_cancel->ring_id, &instance->my_ring_id,
5015  sizeof (struct memb_ring_id)) == 0) {
5016 
5017  instance->my_seq_unchanged = 0;
5018  if (totemip_equal(&instance->my_ring_id.rep, &instance->my_id.addr[0])) {
5019  timer_function_token_retransmit_timeout (instance);
5020  }
5021  }
5022  return (0);
5023 }
5024 
5026  void *context,
5027  const void *msg,
5028  unsigned int msg_len)
5029 {
5030  struct totemsrp_instance *instance = context;
5031  const struct message_header *message_header = msg;
5032 
5033  if (msg_len < sizeof (struct message_header)) {
5035  "Received message is too short... ignoring %u.",
5036  (unsigned int)msg_len);
5037  return;
5038  }
5039 
5040 
5041  switch (message_header->type) {
5043  instance->stats.orf_token_rx++;
5044  break;
5045  case MESSAGE_TYPE_MCAST:
5046  instance->stats.mcast_rx++;
5047  break;
5049  instance->stats.memb_merge_detect_rx++;
5050  break;
5052  instance->stats.memb_join_rx++;
5053  break;
5055  instance->stats.memb_commit_token_rx++;
5056  break;
5058  instance->stats.token_hold_cancel_rx++;
5059  break;
5060  default:
5061  log_printf (instance->totemsrp_log_level_security, "Type of received message is wrong... ignoring %d.\n", (int)message_header->type);
5062 printf ("wrong message type\n");
5063  instance->stats.rx_msg_dropped++;
5064  return;
5065  }
5066  /*
5067  * Handle incoming message
5068  */
5069  totemsrp_message_handlers.handler_functions[(int)message_header->type] (
5070  instance,
5071  msg,
5072  msg_len,
5073  message_header->endian_detector != ENDIAN_LOCAL);
5074 }
5075 
5077  void *context,
5078  const struct totem_ip_address *iface_addr,
5079  unsigned int iface_no)
5080 {
5081  struct totemsrp_instance *instance = context;
5082  int i;
5083 
5084  totemip_copy (&instance->my_id.addr[iface_no], iface_addr);
5085  assert (instance->my_id.addr[iface_no].nodeid);
5086 
5087  totemip_copy (&instance->my_memb_list[0].addr[iface_no], iface_addr);
5088 
5089  if (instance->iface_changes++ == 0) {
5090  instance->memb_ring_id_create_or_load (&instance->my_ring_id,
5091  &instance->my_id.addr[0]);
5092  instance->token_ring_id_seq = instance->my_ring_id.seq;
5093  log_printf (
5094  instance->totemsrp_log_level_debug,
5095  "Created or loaded sequence id %llx.%s for this ring.",
5096  instance->my_ring_id.seq,
5097  totemip_print (&instance->my_ring_id.rep));
5098 
5099  if (instance->totemsrp_service_ready_fn) {
5100  instance->totemsrp_service_ready_fn ();
5101  }
5102 
5103  }
5104 
5105  for (i = 0; i < instance->totem_config->interfaces[iface_no].member_count; i++) {
5106  totemsrp_member_add (instance,
5107  &instance->totem_config->interfaces[iface_no].member_list[i],
5108  iface_no);
5109  }
5110 
5111  if (instance->iface_changes >= instance->totem_config->interface_count) {
5112  memb_state_gather_enter (instance, TOTEMSRP_GSFROM_INTERFACE_CHANGE);
5113  }
5114 }
5115 
5116 void totemsrp_net_mtu_adjust (struct totem_config *totem_config) {
5117  totem_config->net_mtu -= sizeof (struct mcast);
5118 }
5119 
5121  void *context,
5122  void (*totem_service_ready) (void))
5123 {
5124  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5125 
5126  instance->totemsrp_service_ready_fn = totem_service_ready;
5127 }
5128 
5130  void *context,
5131  const struct totem_ip_address *member,
5132  int ring_no)
5133 {
5134  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5135  int res;
5136 
5137  res = totemrrp_member_add (instance->totemrrp_context, member, ring_no);
5138 
5139  return (res);
5140 }
5141 
5143  void *context,
5144  const struct totem_ip_address *member,
5145  int ring_no)
5146 {
5147  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5148  int res;
5149 
5150  res = totemrrp_member_remove (instance->totemrrp_context, member, ring_no);
5151 
5152  return (res);
5153 }
5154 
5155 void totemsrp_threaded_mode_enable (void *context)
5156 {
5157  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5158 
5159  instance->threaded_mode_enabled = 1;
5160 }
5161 
5162 void totemsrp_trans_ack (void *context)
5163 {
5164  struct totemsrp_instance *instance = (struct totemsrp_instance *)context;
5165 
5166  instance->waiting_trans_ack = 0;
5167  instance->totemsrp_waiting_trans_ack_cb_fn (0);
5168 }
unsigned int backlog
Definition: totemsrp.c:208
uint8_t no_addrs
Definition: totemrrp.h:59
unsigned short family
Definition: coroapi.h:113
gather_state_from
Definition: totemsrp.c:537
int totemrrp_iface_check(void *rrp_context)
Definition: totemrrp.c:2216
void(*) in log_level_security)
Definition: totem.h:82
void main_iface_change_fn(void *context, const struct totem_ip_address *iface_address, unsigned int iface_no)
Definition: totemsrp.c:5076
void totemip_copy_endian_convert(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:101
struct srp_addr system_from
Definition: totemsrp.c:218
#define ENDIAN_LOCAL
Definition: totemsrp.c:137
uint64_t gather_entered
Definition: totem.h:260
struct memb_ring_id ring_id
Definition: totemsrp.c:196
struct list_head list
Definition: totemsrp.c:163
uint32_t waiting_trans_ack
Definition: totemsrp.c:519
struct srp_addr system_from
Definition: totemsrp.c:186
struct memb_ring_id ring_id
Definition: totemsrp.c:255
int totemsrp_log_level_debug
Definition: totemsrp.c:427
struct memb_ring_id my_ring_id
Definition: totemsrp.c:337
Totem Single Ring Protocol.
uint64_t memb_commit_token_rx
Definition: totem.h:255
void(* totemsrp_service_ready_fn)(void)
Definition: totemsrp.c:462
void(* memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totemsrp.c:471
int my_leave_memb_entries
Definition: totemsrp.c:335
struct message_header header
Definition: totemsrp.c:185
unsigned int old_ring_state_high_seq_received
Definition: totemsrp.c:489
unsigned int proc_list_entries
Definition: totemsrp.c:219
uint32_t value
struct totem_interface * interfaces
Definition: totem.h:114
unsigned int interface_count
Definition: totem.h:115
int totemsrp_my_family_get(void *srp_context)
Definition: totemsrp.c:1145
struct list_head * next
Definition: list.h:47
uint64_t memb_join_tx
Definition: totem.h:249
The totem_ip_address struct.
Definition: coroapi.h:111
unsigned int seq
Definition: totemsrp.c:62
totemsrp_token_stats_t token[TOTEM_TOKEN_STATS_MAX]
Definition: totem.h:274
const char * totemip_print(const struct totem_ip_address *addr)
Definition: totemip.c:214
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:114
int totemsrp_log_level_error
Definition: totemsrp.c:421
int old_ring_state_aru
Definition: totemsrp.c:487
#define LEAVE_DUMMY_NODEID
Definition: totemsrp.c:102
unsigned int seq
Definition: totemsrp.c:203
struct memb_ring_id ring_id
Definition: totemsrp.c:245
int fcc_remcast_current
Definition: totemsrp.c:297
qb_loop_timer_handle timer_heartbeat_timeout
Definition: totemsrp.c:414
unsigned int failed_list_entries
Definition: totemsrp.c:220
uint64_t mcast_rx
Definition: totem.h:253
unsigned long long int tv_old
Definition: totemsrp.c:3940
#define SEQNO_START_TOKEN
Definition: totemsrp.c:115
unsigned int token_hold_timeout
Definition: totem.h:133
int member_count
Definition: totem.h:70
unsigned int msg_len
Definition: totemsrp.c:270
struct memb_ring_id ring_id
Definition: totemsrp.c:207
struct totem_ip_address member_list[PROCESSOR_COUNT_MAX]
Definition: totem.h:71
int totemip_compare(const void *a, const void *b)
Definition: totemip.c:130
int totemsrp_member_add(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:5129
void * token_sent_event_handle
Definition: totemsrp.c:524
struct timeval tv_old
Definition: totemsrp.c:493
int retrans_flg
Definition: totemsrp.c:210
struct srp_addr system_from
Definition: totemsrp.c:233
int my_new_memb_entries
Definition: totemsrp.c:325
totem_configuration_type
The totem_configuration_type enum.
Definition: coroapi.h:132
int addr_entries
Definition: totemsrp.c:65
int totemsrp_log_level_notice
Definition: totemsrp.c:425
unsigned int proc_list_entries
Definition: totemsrp.c:62
unsigned int totemsrp_my_nodeid_get(void *srp_context)
Definition: totemsrp.c:1134
unsigned int my_pbl
Definition: totemsrp.c:503
char rrp_mode[TOTEM_RRP_MODE_BYTES]
Definition: totem.h:161
void totemsrp_net_mtu_adjust(struct totem_config *totem_config)
Definition: totemsrp.c:5116
int totemsrp_log_level_warning
Definition: totemsrp.c:423
int totemsrp_crypto_set(void *srp_context, const char *cipher_type, const char *hash_type)
Definition: totemsrp.c:1120
void totemrrp_membership_changed(void *rrp_context, enum totem_configuration_type configuration_type, const struct srp_addr *member_list, size_t member_list_entries, const struct srp_addr *left_list, size_t left_list_entries, const struct srp_addr *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemrrp.c:2319
unsigned int my_aru
Definition: totemsrp.c:381
uint64_t memb_merge_detect_rx
Definition: totem.h:248
int totemsrp_ifaces_get(void *srp_context, unsigned int nodeid, struct totem_ip_address *interfaces, unsigned int interfaces_size, char ***status, unsigned int *iface_count)
Definition: totemsrp.c:1062
int guarantee
Definition: totemsrp.c:66
struct cs_queue new_message_queue_trans
Definition: totemsrp.c:370
struct message_header header
Definition: totemsrp.c:232
unsigned char end_of_commit_token[0]
Definition: totemsrp.c:259
unsigned int seq
Definition: totemsrp.c:187
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:77
char commit_token_storage[40000]
Definition: totemsrp.c:525
unsigned int rrp_problem_count_timeout
Definition: totem.h:153
struct list_head token_callback_sent_listhead
Definition: totemsrp.c:387
The sq struct.
Definition: sq.h:43
unsigned int set_aru
Definition: totemsrp.c:483
struct cs_queue new_message_queue
Definition: totemsrp.c:368
int my_rotation_counter
Definition: totemsrp.c:355
int earliest_token
Definition: totem.h:271
struct srp_addr my_deliver_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:315
uint64_t orf_token_tx
Definition: totem.h:245
void totemsrp_callback_token_destroy(void *srp_context, void **handle_out)
Definition: totemsrp.c:3510
uint64_t gather_token_lost
Definition: totem.h:261
int totemsrp_log_level_trace
Definition: totemsrp.c:429
void totemip_copy(struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:95
int totemrrp_ifaces_get(void *rrp_context, char ***status, unsigned int *iface_count)
Definition: totemrrp.c:2225
struct memb_ring_id my_old_ring_id
Definition: totemsrp.c:339
memb_state
Definition: totemsrp.c:278
void * totemrrp_buffer_alloc(void *rrp_context)
Definition: totemrrp.c:2118
unsigned int downcheck_timeout
Definition: totem.h:145
struct srp_addr my_new_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:309
#define TOKEN_SIZE_MAX
Definition: totemsrp.c:101
uint64_t memb_commit_token_tx
Definition: totem.h:254
Definition: list.h:46
int my_deliver_memb_entries
Definition: totemsrp.c:331
unsigned int max_network_delay
Definition: totem.h:171
unsigned int heartbeat_failures_allowed
Definition: totem.h:169
#define TOTEM_TOKEN_STATS_MAX
Definition: totem.h:273
unsigned int my_last_seq
Definition: totemsrp.c:491
int my_left_memb_entries
Definition: totemsrp.c:333
#define swab64(x)
The swab64 macro.
Definition: swab.h:65
struct message_item __attribute__
unsigned long long token_ring_id_seq
Definition: totemsrp.c:479
struct totem_ip_address mcast_address
Definition: totemsrp.c:447
int totemsrp_callback_token_create(void *srp_context, void **handle_out, enum totem_callback_token_type type, int delete, int(*callback_fn)(enum totem_callback_token_type type, const void *), const void *data)
Definition: totemsrp.c:3475
unsigned int send_join_timeout
Definition: totem.h:139
unsigned int window_size
Definition: totem.h:173
int guarantee
Definition: totemsrp.c:191
unsigned int seq
Definition: totemsrp.c:197
void totemsrp_service_ready_register(void *context, void(*totem_service_ready)(void))
Definition: totemsrp.c:5120
unsigned int rrp_problem_count_threshold
Definition: totem.h:155
struct mcast * mcast
Definition: totemsrp.c:274
struct srp_addr my_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:313
uint64_t operational_entered
Definition: totem.h:258
unsigned long long ring_seq
Definition: totemsrp.c:221
void(* memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totemsrp.c:467
#define INTERFACE_MAX
Definition: coroapi.h:88
int totemsrp_mcast(void *srp_context, struct iovec *iovec, unsigned int iov_len, int guarantee)
Multicast a message.
Definition: totemsrp.c:2487
message_type
Definition: totemsrp.c:139
int latest_token
Definition: totem.h:272
uint64_t operational_token_lost
Definition: totem.h:259
unsigned int received_flg
Definition: totemsrp.c:63
uint64_t consensus_timeouts
Definition: totem.h:266
unsigned int aru_addr
Definition: totemsrp.c:206
Totem Network interface - also does encryption/decryption.
unsigned int my_high_delivered
Definition: totemsrp.c:383
struct message_handlers totemsrp_message_handlers
Definition: totemsrp.c:679
qb_loop_timer_handle memb_timer_state_gather_consensus_timeout
Definition: totemsrp.c:410
uint64_t recovery_token_lost
Definition: totem.h:265
unsigned int backlog
Definition: totemsrp.c:66
int this_seqno
Definition: totemsrp.c:188
unsigned int token_retransmits_before_loss_const
Definition: totem.h:135
unsigned char end_of_memb_join[0]
Definition: totemsrp.c:222
struct message_header header
Definition: totemsrp.c:239
int totemrrp_finalize(void *rrp_context)
Definition: totemrrp.c:1974
struct list_head token_callback_received_listhead
Definition: totemsrp.c:385
int totemrrp_member_remove(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2306
struct rtr_item rtr_list[0]
Definition: totemsrp.c:70
unsigned int retrans_flg
Definition: totemsrp.c:256
int totemsrp_ring_reenable(void *srp_context)
Definition: totemsrp.c:1157
struct memb_ring_id ring_id
Definition: totemsrp.c:189
unsigned int seqno_unchanged_const
Definition: totem.h:149
uint64_t commit_token_lost
Definition: totem.h:263
unsigned int miss_count_const
Definition: totem.h:187
int totemrrp_crypto_set(void *rrp_context, const char *cipher_type, const char *hash_type)
Definition: totemrrp.c:2240
uint64_t token_hold_cancel_rx
Definition: totem.h:257
void(* totemsrp_waiting_trans_ack_cb_fn)(int waiting_trans_ack)
Definition: totemsrp.c:464
unsigned int join_timeout
Definition: totem.h:137
unsigned int aru
Definition: totemsrp.c:246
uint32_t originated_orf_token
Definition: totemsrp.c:515
unsigned int nodeid
Definition: coroapi.h:112
int totemrrp_send_flush(void *rrp_context)
Definition: totemrrp.c:2163
uint64_t pause_timestamp
Definition: totemsrp.c:507
int my_set_retrans_flg
Definition: totemsrp.c:357
struct message_header header
Definition: totemsrp.c:202
struct totem_ip_address mcast_addr
Definition: totem.h:67
char encapsulated
Definition: totemrrp.c:554
#define MESSAGE_QUEUE_MAX
Definition: coroapi.h:98
int totemrrp_member_add(void *rrp_context, const struct totem_ip_address *member, int iface_no)
Definition: totemrrp.c:2293
Linked list API.
unsigned int received_flg
Definition: totemsrp.c:248
unsigned int my_cbl
Definition: totemsrp.c:505
struct totem_ip_address rep
Definition: coroapi.h:123
unsigned int last_released
Definition: totemsrp.c:481
int orf_token_retransmit_size
Definition: totemsrp.c:391
int totemsrp_avail(void *srp_context)
Return number of available messages that can be queued.
Definition: totemsrp.c:2556
unsigned int rrp_autorecovery_check_timeout
Definition: totem.h:159
uint64_t mcast_retx
Definition: totem.h:252
unsigned int msg_len
Definition: totemsrp.c:275
#define RETRANS_MESSAGE_QUEUE_SIZE_MAX
Definition: totemsrp.c:97
void(* log_printf)(int level, int subsys, const char *function_name, const char *file_name, int file_line, const char *format,...) __attribute__((format(printf
Definition: totem.h:75
unsigned int fail_to_recv_const
Definition: totem.h:147
unsigned int token_seq
Definition: totemsrp.c:204
struct mcast * mcast
Definition: totemsrp.c:269
void * token_recv_event_handle
Definition: totemsrp.c:523
struct totem_ip_address boundto
Definition: totem.h:66
unsigned int my_high_seq_received
Definition: totemsrp.c:351
int totemrrp_initialize(qb_loop_t *poll_handle, void **rrp_context, struct totem_config *totem_config, totemsrp_stats_t *stats, void *context, void(*deliver_fn)(void *context, const void *msg, unsigned int msg_len), void(*iface_change_fn)(void *context, const struct totem_ip_address *iface_addr, unsigned int iface_no), void(*token_seqid_get)(const void *msg, unsigned int *seqid, unsigned int *token_is), unsigned int(*msgs_missing)(void), void(*target_set_completed)(void *context))
Create an instance.
Definition: totemrrp.c:2003
qb_loop_t * totemsrp_poll_handle
Definition: totemsrp.c:445
totem_event_type
Definition: totem.h:212
qb_loop_timer_handle timer_pause_timeout
Definition: totemsrp.c:398
qb_loop_timer_handle timer_merge_detect_timeout
Definition: totemsrp.c:406
int old_ring_state_saved
Definition: totemsrp.c:485
int my_merge_detect_timeout_outstanding
Definition: totemsrp.c:343
uint64_t rx_msg_dropped
Definition: totem.h:267
int totemsrp_log_level_security
Definition: totemsrp.c:419
qb_loop_timer_handle timer_orf_token_retransmit_timeout
Definition: totemsrp.c:402
struct totem_config * totem_config
Definition: totemsrp.c:497
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
qb_loop_timer_handle timer_orf_token_timeout
Definition: totemsrp.c:400
uint32_t continuous_gather
Definition: totem.h:268
void totemsrp_threaded_mode_enable(void *context)
Definition: totemsrp.c:5155
unsigned int aru
Definition: totemsrp.c:63
encapsulation_type
Definition: totemsrp.c:148
unsigned int net_mtu
Definition: totem.h:165
int totemsrp_initialize(qb_loop_t *poll_handle, void **srp_context, struct totem_config *totem_config, totemmrp_stats_t *stats, void(*deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required), void(*confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id), void(*waiting_trans_ack_cb_fn)(int waiting_trans_ack))
Create a protocol instance.
Definition: totemsrp.c:832
void totemsrp_event_signal(void *srp_context, enum totem_event_type type, int value)
Definition: totemsrp.c:2478
unsigned int node_id
Definition: totemsrp.c:190
int totemrrp_recv_flush(void *rrp_context)
Definition: totemrrp.c:2154
uint32_t orf_token_discard
Definition: totemsrp.c:513
int my_failed_list_entries
Definition: totemsrp.c:323
struct srp_addr my_id
Definition: totemsrp.c:303
struct srp_addr my_left_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:317
uint64_t token_hold_cancel_tx
Definition: totem.h:256
unsigned int token_timeout
Definition: totem.h:129
Definition: totemsrp.c:244
unsigned int high_delivered
Definition: totemsrp.c:247
unsigned int consensus_timeout
Definition: totem.h:141
totemsrp_stats_t stats
Definition: totemsrp.c:511
void main_deliver_fn(void *context, const void *msg, unsigned int msg_len)
Definition: totemsrp.c:5025
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
uint64_t mcast_tx
Definition: totem.h:251
void totemrrp_buffer_release(void *rrp_context, void *ptr)
Definition: totemrrp.c:2125
void * totemrrp_context
Definition: totemsrp.c:495
Totem Network interface - also does encryption/decryption.
char orf_token_retransmit[TOKEN_SIZE_MAX]
Definition: totemsrp.c:389
struct message_header header
Definition: totemsrp.c:217
struct sq regular_sort_queue
Definition: totemsrp.c:374
int my_retrans_flg_count
Definition: totemsrp.c:359
unsigned int nodeid
Definition: totemsrp.c:63
The memb_ring_id struct.
Definition: coroapi.h:122
#define SEQNO_START_MSG
Definition: totemsrp.c:114
#define swab16(x)
The swab16 macro.
Definition: swab.h:39
void totemsrp_finalize(void *srp_context)
Definition: totemsrp.c:1038
void(* totemsrp_log_printf)(int level, int sybsys, const char *function, const char *file, int line, const char *format,...) __attribute__((format(printf
Definition: totemsrp.c:433
#define QUEUE_RTR_ITEMS_SIZE_MAX
Definition: totemsrp.c:96
struct srp_addr my_failed_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:307
unsigned short family
Definition: coroapi.h:76
struct cs_queue retrans_message_queue
Definition: totemsrp.c:372
unsigned int aru
Definition: totemsrp.c:205
const char * gather_state_from_desc[]
Definition: totemsrp.c:557
qb_loop_timer_handle memb_timer_state_gather_join_timeout
Definition: totemsrp.c:408
int my_trans_memb_entries
Definition: totemsrp.c:327
unsigned int my_trc
Definition: totemsrp.c:501
uint64_t memb_merge_detect_tx
Definition: totem.h:247
unsigned int high_delivered
Definition: totemsrp.c:62
struct rtr_item rtr_list[0]
Definition: totemsrp.c:212
totemsrp_stats_t * srp
Definition: totem.h:283
int consensus_list_entries
Definition: totemsrp.c:301
unsigned int rrp_problem_count_mcast_threshold
Definition: totem.h:157
int totemrrp_processor_count_set(void *rrp_context, unsigned int processor_count)
Definition: totemrrp.c:2132
char type
Definition: totemsrp.c:60
void(*) enum memb_stat memb_state)
Definition: totemsrp.c:441
uint64_t memb_join_rx
Definition: totem.h:250
int totemrrp_mcast_noflush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2196
#define FRAME_SIZE_MAX
Definition: totem.h:50
int rtr_list_entries
Definition: totemsrp.c:211
uint32_t threaded_mode_enabled
Definition: totemsrp.c:517
enum totem_callback_token_type callback_type
Definition: totemsrp.c:165
int totemrrp_mcast_recv_empty(void *rrp_context)
Definition: totemrrp.c:2282
int my_proc_list_entries
Definition: totemsrp.c:321
#define list_entry(ptr, type, member)
Definition: list.h:84
unsigned long long ring_seq
Definition: totemsrp.c:64
struct totem_logging_configuration totem_logging_configuration
Definition: totem.h:163
unsigned short endian_detector
Definition: totemrrp.c:555
int totemrrp_mcast_flush_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2182
struct memb_ring_id ring_id
Definition: totemsrp.c:240
#define log_printf(level, format, args...)
Definition: totemsrp.c:691
unsigned long long seq
Definition: coroapi.h:124
void totemsrp_trans_ack(void *context)
Definition: totemsrp.c:5162
unsigned int max_messages
Definition: totem.h:175
uint64_t recovery_entered
Definition: totem.h:264
qb_loop_timer_handle memb_timer_state_commit_timeout
Definition: totemsrp.c:412
struct memb_commit_token * commit_token
Definition: totemsrp.c:509
struct consensus_list_item consensus_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:299
struct srp_addr addr
Definition: totemsrp.c:157
struct srp_addr my_proc_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:305
int(* handler_functions[6])(struct totemsrp_instance *instance, const void *msg, size_t msg_len, int endian_conversion_needed)
Definition: totemsrp.c:530
int totemsrp_subsys_id
Definition: totemsrp.c:431
unsigned int merge_timeout
Definition: totem.h:143
unsigned int use_heartbeat
Definition: totemsrp.c:499
struct message_header header
Definition: totemsrp.c:253
int totemsrp_member_remove(void *context, const struct totem_ip_address *member, int ring_no)
Definition: totemsrp.c:5142
unsigned int token_retransmit_timeout
Definition: totem.h:131
int rtr_list_entries
Definition: totemsrp.c:69
struct srp_addr my_trans_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:311
#define RETRANSMIT_ENTRIES_MAX
Definition: totemsrp.c:100
unsigned int token_seq
Definition: totemsrp.c:254
int totemip_equal(const struct totem_ip_address *addr1, const struct totem_ip_address *addr2)
Definition: totemip.c:71
unsigned int my_token_seq
Definition: totemsrp.c:393
struct memb_ring_id ring_id
Definition: totemsrp.c:64
unsigned int my_last_aru
Definition: totemsrp.c:345
int totemrrp_ring_reenable(void *rrp_context, unsigned int iface_no)
Definition: totemrrp.c:2259
unsigned int my_leave_memb_list[PROCESSOR_COUNT_MAX]
Definition: totemsrp.c:319
uint64_t commit_entered
Definition: totem.h:262
qb_loop_timer_handle timer_orf_token_hold_retransmit_timeout
Definition: totemsrp.c:404
void(* totemsrp_deliver_fn)(unsigned int nodeid, const void *msg, unsigned int msg_len, int endian_conversion_required)
Definition: totemsrp.c:449
void(* totemsrp_confchg_fn)(enum totem_configuration_type configuration_type, const unsigned int *member_list, size_t member_list_entries, const unsigned int *left_list, size_t left_list_entries, const unsigned int *joined_list, size_t joined_list_entries, const struct memb_ring_id *ring_id)
Definition: totemsrp.c:455
struct totem_ip_address addr[INTERFACE_MAX]
Definition: totemrrp.h:60
unsigned int rrp_token_expired_timeout
Definition: totem.h:151
struct memb_ring_id ring_id
Definition: totemsrp.c:234
unsigned int my_install_seq
Definition: totemsrp.c:353
uint64_t orf_token_rx
Definition: totem.h:246
unsigned int nodeid
Definition: totemsrp.c:180
int totemrrp_token_send(void *rrp_context, const void *msg, unsigned int msg_len)
Definition: totemrrp.c:2171
unsigned int threads
Definition: totem.h:167
unsigned int failed_list_entries
Definition: totemsrp.c:63
struct sq recovery_sort_queue
Definition: totemsrp.c:376
void(* totem_memb_ring_id_create_or_load)(struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totem.h:191
int totemrrp_token_target_set(void *rrp_context, struct totem_ip_address *addr, unsigned int iface_no)
Definition: totemrrp.c:2144
totem_callback_token_type
The totem_callback_token_type enum.
Definition: coroapi.h:142
int(* callback_fn)(enum totem_callback_token_type type, const void *)
Definition: totemsrp.c:164
unsigned int my_high_ring_delivered
Definition: totemsrp.c:361
unsigned int fcc
Definition: totemsrp.c:209
void(* totem_memb_ring_id_store)(const struct memb_ring_id *memb_ring_id, const struct totem_ip_address *addr)
Definition: totem.h:195