10 // QueryParam is provided to Query() to configure the parameters of the
11 // query. If not provided, sane defaults will be used.
12 type QueryParam struct {
13 // If provided, we restrict the nodes that should respond to those
14 // with names in this list
17 // FilterTags maps a tag name to a regular expression that is applied
18 // to restrict the nodes that should respond
19 FilterTags map[string]string
21 // If true, we are requesting an delivery acknowledgement from
22 // every node that meets the filter requirement. This means nodes
23 // the receive the message but do not pass the filters, will not
27 // The timeout limits how long the query is left open. If not provided,
28 // then a default timeout is used based on the configuration of Serf
32 // DefaultQueryTimeout returns the default timeout value for a query
33 // Computed as GossipInterval * QueryTimeoutMult * log(N+1)
34 func (s *Serf) DefaultQueryTimeout() time.Duration {
35 n := s.memberlist.NumMembers()
36 timeout := s.config.MemberlistConfig.GossipInterval
37 timeout *= time.Duration(s.config.QueryTimeoutMult)
38 timeout *= time.Duration(math.Ceil(math.Log10(float64(n + 1))))
42 // DefaultQueryParam is used to return the default query parameters
43 func (s *Serf) DefaultQueryParams() *QueryParam {
48 Timeout: s.DefaultQueryTimeout(),
52 // encodeFilters is used to convert the filters into the wire format
53 func (q *QueryParam) encodeFilters() ([][]byte, error) {
56 // Add the node filter
57 if len(q.FilterNodes) > 0 {
58 if buf, err := encodeFilter(filterNodeType, q.FilterNodes); err != nil {
61 filters = append(filters, buf)
65 // Add the tag filters
66 for tag, expr := range q.FilterTags {
67 filt := filterTag{tag, expr}
68 if buf, err := encodeFilter(filterTagType, &filt); err != nil {
71 filters = append(filters, buf)
78 // QueryResponse is returned for each new Query. It is used to collect
79 // Ack's as well as responses and to provide those back to a client.
80 type QueryResponse struct {
81 // ackCh is used to send the name of a node for which we've received an ack
84 // deadline is the query end time (start + query timeout)
90 // Stores the LTime of the query
93 // respCh is used to send a response from a node
94 respCh chan NodeResponse
100 // newQueryResponse is used to construct a new query response
101 func newQueryResponse(n int, q *messageQuery) *QueryResponse {
102 resp := &QueryResponse{
103 deadline: time.Now().Add(q.Timeout),
106 respCh: make(chan NodeResponse, n),
109 resp.ackCh = make(chan string, n)
114 // Close is used to close the query, which will close the underlying
115 // channels and prevent further deliveries
116 func (r *QueryResponse) Close() {
118 defer r.closeLock.Unlock()
131 // Deadline returns the ending deadline of the query
132 func (r *QueryResponse) Deadline() time.Time {
136 // Finished returns if the query is finished running
137 func (r *QueryResponse) Finished() bool {
138 return r.closed || time.Now().After(r.deadline)
141 // AckCh returns a channel that can be used to listen for acks
142 // Channel will be closed when the query is finished. This is nil,
143 // if the query did not specify RequestAck.
144 func (r *QueryResponse) AckCh() <-chan string {
148 // ResponseCh returns a channel that can be used to listen for responses.
149 // Channel will be closed when the query is finished.
150 func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
154 // NodeResponse is used to represent a single response from a node
155 type NodeResponse struct {
160 // shouldProcessQuery checks if a query should be proceeded given
162 func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
163 for _, filter := range filters {
164 switch filterType(filter[0]) {
168 if err := decodeMessage(filter[1:], &nodes); err != nil {
169 s.logger.Printf("[WARN] serf: failed to decode filterNodeType: %v", err)
173 // Check if we are being targeted
175 for _, n := range nodes {
176 if n == s.config.NodeName {
188 if err := decodeMessage(filter[1:], &filt); err != nil {
189 s.logger.Printf("[WARN] serf: failed to decode filterTagType: %v", err)
193 // Check if we match this regex
194 tags := s.config.Tags
195 matched, err := regexp.MatchString(filt.Expr, tags[filt.Tag])
197 s.logger.Printf("[WARN] serf: failed to compile filter regex (%s): %v", filt.Expr, err)
205 s.logger.Printf("[WARN] serf: query has unrecognized filter type: %d", filter[0])