1 # SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
9 # To be loaded dynamically as needed
14 """Netlink spec element.
16 Abstract element of the Netlink spec. Implements the dictionary interface
17 for access to the raw spec. Supports iterative resolution of dependencies
18 across elements and class inheritance levels. The elements of the spec
19 may refer to each other, and although loops should be very rare, having
20 to maintain correct ordering of instantiation is painful, so the resolve()
21 method should be used to perform parts of init which require access to
22 other parts of the spec.
25 yaml raw spec as loaded from the spec file
26 family back reference to the full family
28 name name of the entity as listed in the spec (optional)
29 ident_name name which can be safely used as identifier in code (optional)
31 def __init__(self, family, yaml):
35 if 'name' in self.yaml:
36 self.name = self.yaml['name']
37 self.ident_name = self.name.replace('-', '_')
39 self._super_resolved = False
40 family.add_unresolved(self)
42 def __getitem__(self, key):
45 def __contains__(self, key):
46 return key in self.yaml
48 def get(self, key, default=None):
49 return self.yaml.get(key, default)
51 def resolve_up(self, up):
52 if not self._super_resolved:
54 self._super_resolved = True
60 class SpecEnumEntry(SpecElement):
61 """ Entry within an enum declared in the Netlink spec.
64 doc documentation string
65 enum_set back reference to the enum
66 value numerical value of this enum (use accessors in most situations!)
69 raw_value raw value, i.e. the id in the enum, unlike user value which is a mask for flags
70 user_value user value, same as raw value for enums, for flags it's the mask
72 def __init__(self, enum_set, yaml, prev, value_start):
73 if isinstance(yaml, str):
75 super().__init__(enum_set.family, yaml)
77 self.doc = yaml.get('doc', '')
78 self.enum_set = enum_set
81 self.value = yaml['value']
83 self.value = prev.value + 1
85 self.value = value_start
93 def user_value(self, as_flags=None):
94 if self.enum_set['type'] == 'flags' or as_flags:
95 return 1 << self.value
100 class SpecEnumSet(SpecElement):
103 Represents an enumeration (list of numerical constants)
104 as declared in the "definitions" section of the spec.
108 entries entries by name
109 entries_by_val entries by value
111 get_mask for flags compute the mask of all defined values
113 def __init__(self, family, yaml):
114 super().__init__(family, yaml)
116 self.type = yaml['type']
119 value_start = self.yaml.get('value-start', 0)
120 self.entries = dict()
121 self.entries_by_val = dict()
122 for entry in self.yaml['entries']:
123 e = self.new_entry(entry, prev_entry, value_start)
124 self.entries[e.name] = e
125 self.entries_by_val[e.raw_value()] = e
128 def new_entry(self, entry, prev_entry, value_start):
129 return SpecEnumEntry(self, entry, prev_entry, value_start)
132 if 'doc' in self.yaml:
134 for entry in self.entries.values():
139 def get_mask(self, as_flags=None):
141 for e in self.entries.values():
142 mask += e.user_value(as_flags)
146 class SpecAttr(SpecElement):
147 """ Single Netlink atttribute type
149 Represents a single attribute type within an attr space.
152 value numerical ID when serialized
153 attr_set Attribute Set containing this attr
154 is_multi bool, attr may repeat multiple times
155 struct_name string, name of struct definition
156 sub_type string, name of sub type
158 def __init__(self, family, attr_set, yaml, value):
159 super().__init__(family, yaml)
162 self.attr_set = attr_set
163 self.is_multi = yaml.get('multi-attr', False)
164 self.struct_name = yaml.get('struct')
165 self.sub_type = yaml.get('sub-type')
166 self.byte_order = yaml.get('byte-order')
169 class SpecAttrSet(SpecElement):
170 """ Netlink Attribute Set class.
172 Represents a ID space of attributes within Netlink.
174 Note that unlike other elements, which expose contents of the raw spec
175 via the dictionary interface Attribute Set exposes attributes by name.
178 attrs ordered dict of all attributes (indexed by name)
179 attrs_by_val ordered dict of all attributes (indexed by value)
180 subset_of parent set if this is a subset, otherwise None
182 def __init__(self, family, yaml):
183 super().__init__(family, yaml)
185 self.subset_of = self.yaml.get('subset-of', None)
187 self.attrs = collections.OrderedDict()
188 self.attrs_by_val = collections.OrderedDict()
190 if self.subset_of is None:
192 for elem in self.yaml['attributes']:
196 attr = self.new_attr(elem, val)
197 self.attrs[attr.name] = attr
198 self.attrs_by_val[attr.value] = attr
201 real_set = family.attr_sets[self.subset_of]
202 for elem in self.yaml['attributes']:
203 attr = real_set[elem['name']]
204 self.attrs[attr.name] = attr
205 self.attrs_by_val[attr.value] = attr
207 def new_attr(self, elem, value):
208 return SpecAttr(self.family, self, elem, value)
210 def __getitem__(self, key):
211 return self.attrs[key]
213 def __contains__(self, key):
214 return key in self.attrs
217 yield from self.attrs
220 return self.attrs.items()
223 class SpecStructMember(SpecElement):
224 """Struct member attribute
226 Represents a single struct member attribute.
229 type string, type of the member attribute
230 byte_order string or None for native byte order
231 enum string, name of the enum definition
233 def __init__(self, family, yaml):
234 super().__init__(family, yaml)
235 self.type = yaml['type']
236 self.byte_order = yaml.get('byte-order')
237 self.enum = yaml.get('enum')
240 class SpecStruct(SpecElement):
241 """Netlink struct type
243 Represents a C struct definition.
246 members ordered list of struct members
248 def __init__(self, family, yaml):
249 super().__init__(family, yaml)
252 for member in yaml.get('members', []):
253 self.members.append(self.new_member(family, member))
255 def new_member(self, family, elem):
256 return SpecStructMember(family, elem)
259 yield from self.members
262 return self.members.items()
265 class SpecOperation(SpecElement):
268 Information about a single Netlink operation.
271 value numerical ID when serialized, None if req/rsp values differ
273 req_value numerical ID when serialized, user -> kernel
274 rsp_value numerical ID when serialized, user <- kernel
275 is_call bool, whether the operation is a call
276 is_async bool, whether the operation is a notification
277 is_resv bool, whether the operation does not exist (it's just a reserved ID)
278 attr_set attribute set name
279 fixed_header string, optional name of fixed header struct
281 yaml raw spec as loaded from the spec file
283 def __init__(self, family, yaml, req_value, rsp_value):
284 super().__init__(family, yaml)
286 self.value = req_value if req_value == rsp_value else None
287 self.req_value = req_value
288 self.rsp_value = rsp_value
290 self.is_call = 'do' in yaml or 'dump' in yaml
291 self.is_async = 'notify' in yaml or 'event' in yaml
292 self.is_resv = not self.is_async and not self.is_call
293 self.fixed_header = self.yaml.get('fixed-header', family.fixed_header)
297 delattr(self, "attr_set")
300 self.resolve_up(super())
302 if 'attribute-set' in self.yaml:
303 attr_set_name = self.yaml['attribute-set']
304 elif 'notify' in self.yaml:
305 msg = self.family.msgs[self.yaml['notify']]
306 attr_set_name = msg['attribute-set']
310 raise Exception(f"Can't resolve attribute set for op '{self.name}'")
312 self.attr_set = self.family.attr_sets[attr_set_name]
315 class SpecFamily(SpecElement):
316 """ Netlink Family Spec class.
318 Netlink family information loaded from a spec (e.g. in YAML).
319 Takes care of unfolding implicit information which can be skipped
320 in the spec itself for brevity.
322 The class can be used like a dictionary to access the raw spec
323 elements but that's usually a bad idea.
326 proto protocol type (e.g. genetlink)
327 msg_id_model enum-model for operations (unified, directional etc.)
328 license spec license (loaded from an SPDX tag on the spec)
330 attr_sets dict of attribute sets
331 msgs dict of all messages (index by name)
332 msgs_by_value dict of all messages (indexed by name)
333 ops dict of all valid requests / responses
334 consts dict of all constants/enums
335 fixed_header string, optional name of family default fixed header struct
337 def __init__(self, spec_path, schema_path=None):
338 with open(spec_path, "r") as stream:
339 prefix = '# SPDX-License-Identifier: '
340 first = stream.readline().strip()
341 if not first.startswith(prefix):
342 raise Exception('SPDX license tag required in the spec')
343 self.license = first[len(prefix):]
346 spec = yaml.safe_load(stream)
348 self._resolution_list = []
350 super().__init__(self, spec)
352 self.proto = self.yaml.get('protocol', 'genetlink')
353 self.msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
355 if schema_path is None:
356 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
360 with open(schema_path, "r") as stream:
361 schema = yaml.safe_load(stream)
363 if jsonschema is None:
364 jsonschema = importlib.import_module("jsonschema")
366 jsonschema.validate(self.yaml, schema)
368 self.attr_sets = collections.OrderedDict()
369 self.msgs = collections.OrderedDict()
370 self.req_by_value = collections.OrderedDict()
371 self.rsp_by_value = collections.OrderedDict()
372 self.ops = collections.OrderedDict()
373 self.consts = collections.OrderedDict()
375 last_exception = None
376 while len(self._resolution_list) > 0:
378 unresolved = self._resolution_list
379 self._resolution_list = []
381 for elem in unresolved:
384 except (KeyError, AttributeError) as e:
385 self._resolution_list.append(elem)
389 resolved.append(elem)
391 if len(resolved) == 0:
394 def new_enum(self, elem):
395 return SpecEnumSet(self, elem)
397 def new_attr_set(self, elem):
398 return SpecAttrSet(self, elem)
400 def new_struct(self, elem):
401 return SpecStruct(self, elem)
403 def new_operation(self, elem, req_val, rsp_val):
404 return SpecOperation(self, elem, req_val, rsp_val)
406 def add_unresolved(self, elem):
407 self._resolution_list.append(elem)
409 def _dictify_ops_unified(self):
410 self.fixed_header = self.yaml['operations'].get('fixed-header')
412 for elem in self.yaml['operations']['list']:
416 op = self.new_operation(elem, val, val)
419 self.msgs[op.name] = op
421 def _dictify_ops_directional(self):
422 self.fixed_header = self.yaml['operations'].get('fixed-header')
423 req_val = rsp_val = 1
424 for elem in self.yaml['operations']['list']:
427 rsp_val = elem['value']
428 req_val_next = req_val
429 rsp_val_next = rsp_val + 1
431 elif 'do' in elem or 'dump' in elem:
432 mode = elem['do'] if 'do' in elem else elem['dump']
434 v = mode.get('request', {}).get('value', None)
437 v = mode.get('reply', {}).get('value', None)
441 rsp_inc = 1 if 'reply' in mode else 0
442 req_val_next = req_val + 1
443 rsp_val_next = rsp_val + rsp_inc
445 raise Exception("Can't parse directional ops")
447 if req_val == req_val_next:
449 if rsp_val == rsp_val_next:
451 op = self.new_operation(elem, req_val, rsp_val)
452 req_val = req_val_next
453 rsp_val = rsp_val_next
455 self.msgs[op.name] = op
457 def find_operation(self, name):
459 For a given operation name, find and return operation spec.
461 for op in self.yaml['operations']['list']:
462 if name == op['name']:
467 self.resolve_up(super())
469 definitions = self.yaml.get('definitions', [])
470 for elem in definitions:
471 if elem['type'] == 'enum' or elem['type'] == 'flags':
472 self.consts[elem['name']] = self.new_enum(elem)
473 elif elem['type'] == 'struct':
474 self.consts[elem['name']] = self.new_struct(elem)
476 self.consts[elem['name']] = elem
478 for elem in self.yaml['attribute-sets']:
479 attr_set = self.new_attr_set(elem)
480 self.attr_sets[elem['name']] = attr_set
482 if self.msg_id_model == 'unified':
483 self._dictify_ops_unified()
484 elif self.msg_id_model == 'directional':
485 self._dictify_ops_directional()
487 for op in self.msgs.values():
488 if op.req_value is not None:
489 self.req_by_value[op.req_value] = op
490 if op.rsp_value is not None:
491 self.rsp_by_value[op.rsp_value] = op
492 if not op.is_async and 'attribute-set' in op:
493 self.ops[op.name] = op