1 # SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
9 # To be loaded dynamically as needed
14 """Netlink spec element.
16 Abstract element of the Netlink spec. Implements the dictionary interface
17 for access to the raw spec. Supports iterative resolution of dependencies
18 across elements and class inheritance levels. The elements of the spec
19 may refer to each other, and although loops should be very rare, having
20 to maintain correct ordering of instantiation is painful, so the resolve()
21 method should be used to perform parts of init which require access to
22 other parts of the spec.
25 yaml raw spec as loaded from the spec file
26 family back reference to the full family
28 name name of the entity as listed in the spec (optional)
29 ident_name name which can be safely used as identifier in code (optional)
31 def __init__(self, family, yaml):
35 if 'name' in self.yaml:
36 self.name = self.yaml['name']
37 self.ident_name = self.name.replace('-', '_')
39 self._super_resolved = False
40 family.add_unresolved(self)
42 def __getitem__(self, key):
45 def __contains__(self, key):
46 return key in self.yaml
48 def get(self, key, default=None):
49 return self.yaml.get(key, default)
51 def resolve_up(self, up):
52 if not self._super_resolved:
54 self._super_resolved = True
60 class SpecEnumEntry(SpecElement):
61 """ Entry within an enum declared in the Netlink spec.
64 doc documentation string
65 enum_set back reference to the enum
66 value numerical value of this enum (use accessors in most situations!)
69 raw_value raw value, i.e. the id in the enum, unlike user value which is a mask for flags
70 user_value user value, same as raw value for enums, for flags it's the mask
72 def __init__(self, enum_set, yaml, prev, value_start):
73 if isinstance(yaml, str):
75 super().__init__(enum_set.family, yaml)
77 self.doc = yaml.get('doc', '')
78 self.enum_set = enum_set
81 self.value = yaml['value']
83 self.value = prev.value + 1
85 self.value = value_start
94 if self.enum_set['type'] == 'flags':
95 return 1 << self.value
100 class SpecEnumSet(SpecElement):
103 Represents an enumeration (list of numerical constants)
104 as declared in the "definitions" section of the spec.
108 entries entries by name
110 get_mask for flags compute the mask of all defined values
112 def __init__(self, family, yaml):
113 super().__init__(family, yaml)
115 self.type = yaml['type']
118 value_start = self.yaml.get('value-start', 0)
119 self.entries = dict()
120 for entry in self.yaml['entries']:
121 e = self.new_entry(entry, prev_entry, value_start)
122 self.entries[e.name] = e
125 def new_entry(self, entry, prev_entry, value_start):
126 return SpecEnumEntry(self, entry, prev_entry, value_start)
129 if 'doc' in self.yaml:
131 for entry in self.entries.values():
138 idx = self.yaml.get('value-start', 0)
139 for _ in self.entries.values():
145 class SpecAttr(SpecElement):
146 """ Single Netlink atttribute type
148 Represents a single attribute type within an attr space.
151 value numerical ID when serialized
152 attr_set Attribute Set containing this attr
154 def __init__(self, family, attr_set, yaml, value):
155 super().__init__(family, yaml)
158 self.attr_set = attr_set
159 self.is_multi = yaml.get('multi-attr', False)
162 class SpecAttrSet(SpecElement):
163 """ Netlink Attribute Set class.
165 Represents a ID space of attributes within Netlink.
167 Note that unlike other elements, which expose contents of the raw spec
168 via the dictionary interface Attribute Set exposes attributes by name.
171 attrs ordered dict of all attributes (indexed by name)
172 attrs_by_val ordered dict of all attributes (indexed by value)
173 subset_of parent set if this is a subset, otherwise None
175 def __init__(self, family, yaml):
176 super().__init__(family, yaml)
178 self.subset_of = self.yaml.get('subset-of', None)
180 self.attrs = collections.OrderedDict()
181 self.attrs_by_val = collections.OrderedDict()
183 if self.subset_of is None:
185 for elem in self.yaml['attributes']:
189 attr = self.new_attr(elem, val)
190 self.attrs[attr.name] = attr
191 self.attrs_by_val[attr.value] = attr
194 real_set = family.attr_sets[self.subset_of]
195 for elem in self.yaml['attributes']:
196 attr = real_set[elem['name']]
197 self.attrs[attr.name] = attr
198 self.attrs_by_val[attr.value] = attr
200 def new_attr(self, elem, value):
201 return SpecAttr(self.family, self, elem, value)
203 def __getitem__(self, key):
204 return self.attrs[key]
206 def __contains__(self, key):
207 return key in self.attrs
210 yield from self.attrs
213 return self.attrs.items()
216 class SpecOperation(SpecElement):
219 Information about a single Netlink operation.
222 value numerical ID when serialized, None if req/rsp values differ
224 req_value numerical ID when serialized, user -> kernel
225 rsp_value numerical ID when serialized, user <- kernel
226 is_call bool, whether the operation is a call
227 is_async bool, whether the operation is a notification
228 is_resv bool, whether the operation does not exist (it's just a reserved ID)
229 attr_set attribute set name
231 yaml raw spec as loaded from the spec file
233 def __init__(self, family, yaml, req_value, rsp_value):
234 super().__init__(family, yaml)
236 self.value = req_value if req_value == rsp_value else None
237 self.req_value = req_value
238 self.rsp_value = rsp_value
240 self.is_call = 'do' in yaml or 'dump' in yaml
241 self.is_async = 'notify' in yaml or 'event' in yaml
242 self.is_resv = not self.is_async and not self.is_call
246 delattr(self, "attr_set")
249 self.resolve_up(super())
251 if 'attribute-set' in self.yaml:
252 attr_set_name = self.yaml['attribute-set']
253 elif 'notify' in self.yaml:
254 msg = self.family.msgs[self.yaml['notify']]
255 attr_set_name = msg['attribute-set']
259 raise Exception(f"Can't resolve attribute set for op '{self.name}'")
261 self.attr_set = self.family.attr_sets[attr_set_name]
264 class SpecFamily(SpecElement):
265 """ Netlink Family Spec class.
267 Netlink family information loaded from a spec (e.g. in YAML).
268 Takes care of unfolding implicit information which can be skipped
269 in the spec itself for brevity.
271 The class can be used like a dictionary to access the raw spec
272 elements but that's usually a bad idea.
275 proto protocol type (e.g. genetlink)
277 attr_sets dict of attribute sets
278 msgs dict of all messages (index by name)
279 msgs_by_value dict of all messages (indexed by name)
280 ops dict of all valid requests / responses
281 consts dict of all constants/enums
283 def __init__(self, spec_path, schema_path=None):
284 with open(spec_path, "r") as stream:
285 spec = yaml.safe_load(stream)
287 self._resolution_list = []
289 super().__init__(self, spec)
291 self.proto = self.yaml.get('protocol', 'genetlink')
293 if schema_path is None:
294 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
298 with open(schema_path, "r") as stream:
299 schema = yaml.safe_load(stream)
301 if jsonschema is None:
302 jsonschema = importlib.import_module("jsonschema")
304 jsonschema.validate(self.yaml, schema)
306 self.attr_sets = collections.OrderedDict()
307 self.msgs = collections.OrderedDict()
308 self.req_by_value = collections.OrderedDict()
309 self.rsp_by_value = collections.OrderedDict()
310 self.ops = collections.OrderedDict()
311 self.consts = collections.OrderedDict()
313 last_exception = None
314 while len(self._resolution_list) > 0:
316 unresolved = self._resolution_list
317 self._resolution_list = []
319 for elem in unresolved:
322 except (KeyError, AttributeError) as e:
323 self._resolution_list.append(elem)
327 resolved.append(elem)
329 if len(resolved) == 0:
332 def new_enum(self, elem):
333 return SpecEnumSet(self, elem)
335 def new_attr_set(self, elem):
336 return SpecAttrSet(self, elem)
338 def new_operation(self, elem, req_val, rsp_val):
339 return SpecOperation(self, elem, req_val, rsp_val)
341 def add_unresolved(self, elem):
342 self._resolution_list.append(elem)
344 def _dictify_ops_unified(self):
346 for elem in self.yaml['operations']['list']:
350 op = self.new_operation(elem, val, val)
353 self.msgs[op.name] = op
355 def _dictify_ops_directional(self):
356 req_val = rsp_val = 1
357 for elem in self.yaml['operations']['list']:
360 rsp_val = elem['value']
361 req_val_next = req_val
362 rsp_val_next = rsp_val + 1
364 elif 'do' in elem or 'dump' in elem:
365 mode = elem['do'] if 'do' in elem else elem['dump']
367 v = mode.get('request', {}).get('value', None)
370 v = mode.get('reply', {}).get('value', None)
374 rsp_inc = 1 if 'reply' in mode else 0
375 req_val_next = req_val + 1
376 rsp_val_next = rsp_val + rsp_inc
378 raise Exception("Can't parse directional ops")
380 op = self.new_operation(elem, req_val, rsp_val)
381 req_val = req_val_next
382 rsp_val = rsp_val_next
384 self.msgs[op.name] = op
387 self.resolve_up(super())
389 for elem in self.yaml['definitions']:
390 if elem['type'] == 'enum' or elem['type'] == 'flags':
391 self.consts[elem['name']] = self.new_enum(elem)
393 self.consts[elem['name']] = elem
395 for elem in self.yaml['attribute-sets']:
396 attr_set = self.new_attr_set(elem)
397 self.attr_sets[elem['name']] = attr_set
399 msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
400 if msg_id_model == 'unified':
401 self._dictify_ops_unified()
402 elif msg_id_model == 'directional':
403 self._dictify_ops_directional()
405 for op in self.msgs.values():
406 if op.req_value is not None:
407 self.req_by_value[op.req_value] = op
408 if op.rsp_value is not None:
409 self.rsp_by_value[op.rsp_value] = op
410 if not op.is_async and 'attribute-set' in op:
411 self.ops[op.name] = op