1 // SPDX-License-Identifier: GPL-2.0-only
2 /******************************************************************************
3 *******************************************************************************
5 ** Copyright (C) Sistina Software, Inc. 1997-2003 All rights reserved.
6 ** Copyright (C) 2004-2005 Red Hat, Inc. All rights reserved.
9 *******************************************************************************
10 ******************************************************************************/
12 #include "dlm_internal.h"
13 #include "lockspace.h"
25 * We use the upper 16 bits of the hash value to select the directory node.
26 * Low bits are used for distribution of rsb's among hash buckets on each node.
28 * To give the exact range wanted (0 to num_nodes-1), we apply a modulus of
29 * num_nodes to the hash value. This value in the desired range is used as an
30 * offset into the sorted list of nodeid's to give the particular nodeid.
33 int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash)
37 if (ls->ls_num_nodes == 1)
38 return dlm_our_nodeid();
40 node = (hash >> 16) % ls->ls_total_weight;
41 return ls->ls_node_array[node];
45 int dlm_dir_nodeid(struct dlm_rsb *r)
47 return r->res_dir_nodeid;
50 void dlm_recover_dir_nodeid(struct dlm_ls *ls)
54 down_read(&ls->ls_root_sem);
55 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
56 r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash);
58 up_read(&ls->ls_root_sem);
61 int dlm_recover_directory(struct dlm_ls *ls)
63 struct dlm_member *memb;
64 char *b, *last_name = NULL;
65 int error = -ENOMEM, last_len, nodeid, result;
67 unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0;
69 log_rinfo(ls, "dlm_recover_directory");
71 if (dlm_no_directory(ls))
74 last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS);
78 list_for_each_entry(memb, &ls->ls_nodes, list) {
79 if (memb->nodeid == dlm_our_nodeid())
82 memset(last_name, 0, DLM_RESNAME_MAXLEN);
87 if (dlm_recovery_stopped(ls)) {
92 error = dlm_rcom_names(ls, memb->nodeid,
100 * pick namelen/name pairs out of received buffer
103 b = ls->ls_recover_buf->rc_buf;
104 left = le16_to_cpu(ls->ls_recover_buf->rc_header.h_length);
105 left -= sizeof(struct dlm_rcom);
111 if (left < sizeof(__be16))
114 memcpy(&v, b, sizeof(__be16));
115 namelen = be16_to_cpu(v);
117 left -= sizeof(__be16);
119 /* namelen of 0xFFFFF marks end of names for
120 this node; namelen of 0 marks end of the
123 if (namelen == 0xFFFF)
131 if (namelen > DLM_RESNAME_MAXLEN)
134 error = dlm_master_lookup(ls, memb->nodeid,
139 log_error(ls, "recover_dir lookup %d",
144 /* The name was found in rsbtbl, but the
145 * master nodeid is different from
146 * memb->nodeid which says it is the master.
147 * This should not happen. */
149 if (result == DLM_LU_MATCH &&
150 nodeid != memb->nodeid) {
152 log_error(ls, "recover_dir lookup %d "
153 "nodeid %d memb %d bad %u",
154 result, nodeid, memb->nodeid,
156 print_hex_dump_bytes("dlm_recover_dir ",
161 /* The name was found in rsbtbl, and the
162 * master nodeid matches memb->nodeid. */
164 if (result == DLM_LU_MATCH &&
165 nodeid == memb->nodeid) {
169 /* The name was not found in rsbtbl and was
170 * added with memb->nodeid as the master. */
172 if (result == DLM_LU_ADD) {
177 memcpy(last_name, b, namelen);
189 dlm_set_recover_status(ls, DLM_RS_DIR);
191 log_rinfo(ls, "dlm_recover_directory %u in %u new",
199 static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len)
202 uint32_t hash, bucket;
205 hash = jhash(name, len, 0);
206 bucket = hash & (ls->ls_rsbtbl_size - 1);
208 spin_lock(&ls->ls_rsbtbl[bucket].lock);
209 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
211 rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
213 spin_unlock(&ls->ls_rsbtbl[bucket].lock);
218 down_read(&ls->ls_root_sem);
219 list_for_each_entry(r, &ls->ls_root_list, res_root_list) {
220 if (len == r->res_length && !memcmp(name, r->res_name, len)) {
221 up_read(&ls->ls_root_sem);
222 log_debug(ls, "find_rsb_root revert to root_list %s",
227 up_read(&ls->ls_root_sem);
231 /* Find the rsb where we left off (or start again), then send rsb names
232 for rsb's we're master of and whose directory node matches the requesting
233 node. inbuf is the rsb name last sent, inlen is the name's length */
235 void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen,
236 char *outbuf, int outlen, int nodeid)
238 struct list_head *list;
240 int offset = 0, dir_nodeid;
243 down_read(&ls->ls_root_sem);
246 r = find_rsb_root(ls, inbuf, inlen);
248 inbuf[inlen - 1] = '\0';
249 log_error(ls, "copy_master_names from %d start %d %s",
250 nodeid, inlen, inbuf);
253 list = r->res_root_list.next;
255 list = ls->ls_root_list.next;
258 for (offset = 0; list != &ls->ls_root_list; list = list->next) {
259 r = list_entry(list, struct dlm_rsb, res_root_list);
263 dir_nodeid = dlm_dir_nodeid(r);
264 if (dir_nodeid != nodeid)
268 * The block ends when we can't fit the following in the
269 * remaining buffer space:
270 * namelen (uint16_t) +
271 * name (r->res_length) +
272 * end-of-block record 0x0000 (uint16_t)
275 if (offset + sizeof(uint16_t)*2 + r->res_length > outlen) {
276 /* Write end-of-block record */
277 be_namelen = cpu_to_be16(0);
278 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
279 offset += sizeof(__be16);
280 ls->ls_recover_dir_sent_msg++;
284 be_namelen = cpu_to_be16(r->res_length);
285 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
286 offset += sizeof(__be16);
287 memcpy(outbuf + offset, r->res_name, r->res_length);
288 offset += r->res_length;
289 ls->ls_recover_dir_sent_res++;
293 * If we've reached the end of the list (and there's room) write a
294 * terminating record.
297 if ((list == &ls->ls_root_list) &&
298 (offset + sizeof(uint16_t) <= outlen)) {
299 be_namelen = cpu_to_be16(0xFFFF);
300 memcpy(outbuf + offset, &be_namelen, sizeof(__be16));
301 offset += sizeof(__be16);
302 ls->ls_recover_dir_sent_msg++;
305 up_read(&ls->ls_root_sem);