Tizen_4.0 base
[platform/upstream/docker-engine.git] / vendor / github.com / hashicorp / serf / serf / query.go
1 package serf
2
3 import (
4         "math"
5         "regexp"
6         "sync"
7         "time"
8 )
9
10 // QueryParam is provided to Query() to configure the parameters of the
11 // query. If not provided, sane defaults will be used.
12 type QueryParam struct {
13         // If provided, we restrict the nodes that should respond to those
14         // with names in this list
15         FilterNodes []string
16
17         // FilterTags maps a tag name to a regular expression that is applied
18         // to restrict the nodes that should respond
19         FilterTags map[string]string
20
21         // If true, we are requesting an delivery acknowledgement from
22         // every node that meets the filter requirement. This means nodes
23         // the receive the message but do not pass the filters, will not
24         // send an ack.
25         RequestAck bool
26
27         // The timeout limits how long the query is left open. If not provided,
28         // then a default timeout is used based on the configuration of Serf
29         Timeout time.Duration
30 }
31
32 // DefaultQueryTimeout returns the default timeout value for a query
33 // Computed as GossipInterval * QueryTimeoutMult * log(N+1)
34 func (s *Serf) DefaultQueryTimeout() time.Duration {
35         n := s.memberlist.NumMembers()
36         timeout := s.config.MemberlistConfig.GossipInterval
37         timeout *= time.Duration(s.config.QueryTimeoutMult)
38         timeout *= time.Duration(math.Ceil(math.Log10(float64(n + 1))))
39         return timeout
40 }
41
42 // DefaultQueryParam is used to return the default query parameters
43 func (s *Serf) DefaultQueryParams() *QueryParam {
44         return &QueryParam{
45                 FilterNodes: nil,
46                 FilterTags:  nil,
47                 RequestAck:  false,
48                 Timeout:     s.DefaultQueryTimeout(),
49         }
50 }
51
52 // encodeFilters is used to convert the filters into the wire format
53 func (q *QueryParam) encodeFilters() ([][]byte, error) {
54         var filters [][]byte
55
56         // Add the node filter
57         if len(q.FilterNodes) > 0 {
58                 if buf, err := encodeFilter(filterNodeType, q.FilterNodes); err != nil {
59                         return nil, err
60                 } else {
61                         filters = append(filters, buf)
62                 }
63         }
64
65         // Add the tag filters
66         for tag, expr := range q.FilterTags {
67                 filt := filterTag{tag, expr}
68                 if buf, err := encodeFilter(filterTagType, &filt); err != nil {
69                         return nil, err
70                 } else {
71                         filters = append(filters, buf)
72                 }
73         }
74
75         return filters, nil
76 }
77
78 // QueryResponse is returned for each new Query. It is used to collect
79 // Ack's as well as responses and to provide those back to a client.
80 type QueryResponse struct {
81         // ackCh is used to send the name of a node for which we've received an ack
82         ackCh chan string
83
84         // deadline is the query end time (start + query timeout)
85         deadline time.Time
86
87         // Query ID
88         id uint32
89
90         // Stores the LTime of the query
91         lTime LamportTime
92
93         // respCh is used to send a response from a node
94         respCh chan NodeResponse
95
96         closed    bool
97         closeLock sync.Mutex
98 }
99
100 // newQueryResponse is used to construct a new query response
101 func newQueryResponse(n int, q *messageQuery) *QueryResponse {
102         resp := &QueryResponse{
103                 deadline: time.Now().Add(q.Timeout),
104                 id:       q.ID,
105                 lTime:    q.LTime,
106                 respCh:   make(chan NodeResponse, n),
107         }
108         if q.Ack() {
109                 resp.ackCh = make(chan string, n)
110         }
111         return resp
112 }
113
114 // Close is used to close the query, which will close the underlying
115 // channels and prevent further deliveries
116 func (r *QueryResponse) Close() {
117         r.closeLock.Lock()
118         defer r.closeLock.Unlock()
119         if r.closed {
120                 return
121         }
122         r.closed = true
123         if r.ackCh != nil {
124                 close(r.ackCh)
125         }
126         if r.respCh != nil {
127                 close(r.respCh)
128         }
129 }
130
131 // Deadline returns the ending deadline of the query
132 func (r *QueryResponse) Deadline() time.Time {
133         return r.deadline
134 }
135
136 // Finished returns if the query is finished running
137 func (r *QueryResponse) Finished() bool {
138         return r.closed || time.Now().After(r.deadline)
139 }
140
141 // AckCh returns a channel that can be used to listen for acks
142 // Channel will be closed when the query is finished. This is nil,
143 // if the query did not specify RequestAck.
144 func (r *QueryResponse) AckCh() <-chan string {
145         return r.ackCh
146 }
147
148 // ResponseCh returns a channel that can be used to listen for responses.
149 // Channel will be closed when the query is finished.
150 func (r *QueryResponse) ResponseCh() <-chan NodeResponse {
151         return r.respCh
152 }
153
154 // NodeResponse is used to represent a single response from a node
155 type NodeResponse struct {
156         From    string
157         Payload []byte
158 }
159
160 // shouldProcessQuery checks if a query should be proceeded given
161 // a set of filers.
162 func (s *Serf) shouldProcessQuery(filters [][]byte) bool {
163         for _, filter := range filters {
164                 switch filterType(filter[0]) {
165                 case filterNodeType:
166                         // Decode the filter
167                         var nodes filterNode
168                         if err := decodeMessage(filter[1:], &nodes); err != nil {
169                                 s.logger.Printf("[WARN] serf: failed to decode filterNodeType: %v", err)
170                                 return false
171                         }
172
173                         // Check if we are being targeted
174                         found := false
175                         for _, n := range nodes {
176                                 if n == s.config.NodeName {
177                                         found = true
178                                         break
179                                 }
180                         }
181                         if !found {
182                                 return false
183                         }
184
185                 case filterTagType:
186                         // Decode the filter
187                         var filt filterTag
188                         if err := decodeMessage(filter[1:], &filt); err != nil {
189                                 s.logger.Printf("[WARN] serf: failed to decode filterTagType: %v", err)
190                                 return false
191                         }
192
193                         // Check if we match this regex
194                         tags := s.config.Tags
195                         matched, err := regexp.MatchString(filt.Expr, tags[filt.Tag])
196                         if err != nil {
197                                 s.logger.Printf("[WARN] serf: failed to compile filter regex (%s): %v", filt.Expr, err)
198                                 return false
199                         }
200                         if !matched {
201                                 return false
202                         }
203
204                 default:
205                         s.logger.Printf("[WARN] serf: query has unrecognized filter type: %d", filter[0])
206                         return false
207                 }
208         }
209         return true
210 }