1 # SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
9 # To be loaded dynamically as needed
14 """Netlink spec element.
16 Abstract element of the Netlink spec. Implements the dictionary interface
17 for access to the raw spec. Supports iterative resolution of dependencies
18 across elements and class inheritance levels. The elements of the spec
19 may refer to each other, and although loops should be very rare, having
20 to maintain correct ordering of instantiation is painful, so the resolve()
21 method should be used to perform parts of init which require access to
22 other parts of the spec.
25 yaml raw spec as loaded from the spec file
26 family back reference to the full family
28 name name of the entity as listed in the spec (optional)
29 ident_name name which can be safely used as identifier in code (optional)
31 def __init__(self, family, yaml):
35 if 'name' in self.yaml:
36 self.name = self.yaml['name']
37 self.ident_name = self.name.replace('-', '_')
39 self._super_resolved = False
40 family.add_unresolved(self)
42 def __getitem__(self, key):
45 def __contains__(self, key):
46 return key in self.yaml
48 def get(self, key, default=None):
49 return self.yaml.get(key, default)
51 def resolve_up(self, up):
52 if not self._super_resolved:
54 self._super_resolved = True
60 class SpecEnumEntry(SpecElement):
61 """ Entry within an enum declared in the Netlink spec.
64 doc documentation string
65 enum_set back reference to the enum
66 value numerical value of this enum (use accessors in most situations!)
69 raw_value raw value, i.e. the id in the enum, unlike user value which is a mask for flags
70 user_value user value, same as raw value for enums, for flags it's the mask
72 def __init__(self, enum_set, yaml, prev, value_start):
73 if isinstance(yaml, str):
75 super().__init__(enum_set.family, yaml)
77 self.doc = yaml.get('doc', '')
78 self.enum_set = enum_set
81 self.value = yaml['value']
83 self.value = prev.value + 1
85 self.value = value_start
93 def user_value(self, as_flags=None):
94 if self.enum_set['type'] == 'flags' or as_flags:
95 return 1 << self.value
100 class SpecEnumSet(SpecElement):
103 Represents an enumeration (list of numerical constants)
104 as declared in the "definitions" section of the spec.
108 entries entries by name
109 entries_by_val entries by value
111 get_mask for flags compute the mask of all defined values
113 def __init__(self, family, yaml):
114 super().__init__(family, yaml)
116 self.type = yaml['type']
119 value_start = self.yaml.get('value-start', 0)
120 self.entries = dict()
121 self.entries_by_val = dict()
122 for entry in self.yaml['entries']:
123 e = self.new_entry(entry, prev_entry, value_start)
124 self.entries[e.name] = e
125 self.entries_by_val[e.raw_value()] = e
128 def new_entry(self, entry, prev_entry, value_start):
129 return SpecEnumEntry(self, entry, prev_entry, value_start)
132 if 'doc' in self.yaml:
134 for entry in self.entries.values():
139 def get_mask(self, as_flags=None):
141 for e in self.entries.values():
142 mask += e.user_value(as_flags)
146 class SpecAttr(SpecElement):
147 """ Single Netlink atttribute type
149 Represents a single attribute type within an attr space.
152 value numerical ID when serialized
153 attr_set Attribute Set containing this attr
154 is_multi bool, attr may repeat multiple times
155 struct_name string, name of struct definition
156 sub_type string, name of sub type
158 def __init__(self, family, attr_set, yaml, value):
159 super().__init__(family, yaml)
162 self.attr_set = attr_set
163 self.is_multi = yaml.get('multi-attr', False)
164 self.struct_name = yaml.get('struct')
165 self.sub_type = yaml.get('sub-type')
166 self.byte_order = yaml.get('byte-order')
169 class SpecAttrSet(SpecElement):
170 """ Netlink Attribute Set class.
172 Represents a ID space of attributes within Netlink.
174 Note that unlike other elements, which expose contents of the raw spec
175 via the dictionary interface Attribute Set exposes attributes by name.
178 attrs ordered dict of all attributes (indexed by name)
179 attrs_by_val ordered dict of all attributes (indexed by value)
180 subset_of parent set if this is a subset, otherwise None
182 def __init__(self, family, yaml):
183 super().__init__(family, yaml)
185 self.subset_of = self.yaml.get('subset-of', None)
187 self.attrs = collections.OrderedDict()
188 self.attrs_by_val = collections.OrderedDict()
190 if self.subset_of is None:
192 for elem in self.yaml['attributes']:
196 attr = self.new_attr(elem, val)
197 self.attrs[attr.name] = attr
198 self.attrs_by_val[attr.value] = attr
201 real_set = family.attr_sets[self.subset_of]
202 for elem in self.yaml['attributes']:
203 attr = real_set[elem['name']]
204 self.attrs[attr.name] = attr
205 self.attrs_by_val[attr.value] = attr
207 def new_attr(self, elem, value):
208 return SpecAttr(self.family, self, elem, value)
210 def __getitem__(self, key):
211 return self.attrs[key]
213 def __contains__(self, key):
214 return key in self.attrs
217 yield from self.attrs
220 return self.attrs.items()
223 class SpecStructMember(SpecElement):
224 """Struct member attribute
226 Represents a single struct member attribute.
229 type string, type of the member attribute
230 byte_order string or None for native byte order
231 enum string, name of the enum definition
233 def __init__(self, family, yaml):
234 super().__init__(family, yaml)
235 self.type = yaml['type']
236 self.byte_order = yaml.get('byte-order')
237 self.enum = yaml.get('enum')
240 class SpecStruct(SpecElement):
241 """Netlink struct type
243 Represents a C struct definition.
246 members ordered list of struct members
248 def __init__(self, family, yaml):
249 super().__init__(family, yaml)
252 for member in yaml.get('members', []):
253 self.members.append(self.new_member(family, member))
255 def new_member(self, family, elem):
256 return SpecStructMember(family, elem)
259 yield from self.members
262 return self.members.items()
265 class SpecOperation(SpecElement):
268 Information about a single Netlink operation.
271 value numerical ID when serialized, None if req/rsp values differ
273 req_value numerical ID when serialized, user -> kernel
274 rsp_value numerical ID when serialized, user <- kernel
275 is_call bool, whether the operation is a call
276 is_async bool, whether the operation is a notification
277 is_resv bool, whether the operation does not exist (it's just a reserved ID)
278 attr_set attribute set name
279 fixed_header string, optional name of fixed header struct
281 yaml raw spec as loaded from the spec file
283 def __init__(self, family, yaml, req_value, rsp_value):
284 super().__init__(family, yaml)
286 self.value = req_value if req_value == rsp_value else None
287 self.req_value = req_value
288 self.rsp_value = rsp_value
290 self.is_call = 'do' in yaml or 'dump' in yaml
291 self.is_async = 'notify' in yaml or 'event' in yaml
292 self.is_resv = not self.is_async and not self.is_call
293 self.fixed_header = self.yaml.get('fixed-header', family.fixed_header)
297 delattr(self, "attr_set")
300 self.resolve_up(super())
302 if 'attribute-set' in self.yaml:
303 attr_set_name = self.yaml['attribute-set']
304 elif 'notify' in self.yaml:
305 msg = self.family.msgs[self.yaml['notify']]
306 attr_set_name = msg['attribute-set']
310 raise Exception(f"Can't resolve attribute set for op '{self.name}'")
312 self.attr_set = self.family.attr_sets[attr_set_name]
315 class SpecFamily(SpecElement):
316 """ Netlink Family Spec class.
318 Netlink family information loaded from a spec (e.g. in YAML).
319 Takes care of unfolding implicit information which can be skipped
320 in the spec itself for brevity.
322 The class can be used like a dictionary to access the raw spec
323 elements but that's usually a bad idea.
326 proto protocol type (e.g. genetlink)
327 msg_id_model enum-model for operations (unified, directional etc.)
328 license spec license (loaded from an SPDX tag on the spec)
330 attr_sets dict of attribute sets
331 msgs dict of all messages (index by name)
332 ops dict of all valid requests / responses
333 ntfs dict of all async events
334 consts dict of all constants/enums
335 fixed_header string, optional name of family default fixed header struct
337 def __init__(self, spec_path, schema_path=None):
338 with open(spec_path, "r") as stream:
339 prefix = '# SPDX-License-Identifier: '
340 first = stream.readline().strip()
341 if not first.startswith(prefix):
342 raise Exception('SPDX license tag required in the spec')
343 self.license = first[len(prefix):]
346 spec = yaml.safe_load(stream)
348 self._resolution_list = []
350 super().__init__(self, spec)
352 self.proto = self.yaml.get('protocol', 'genetlink')
353 self.msg_id_model = self.yaml['operations'].get('enum-model', 'unified')
355 if schema_path is None:
356 schema_path = os.path.dirname(os.path.dirname(spec_path)) + f'/{self.proto}.yaml'
360 with open(schema_path, "r") as stream:
361 schema = yaml.safe_load(stream)
363 if jsonschema is None:
364 jsonschema = importlib.import_module("jsonschema")
366 jsonschema.validate(self.yaml, schema)
368 self.attr_sets = collections.OrderedDict()
369 self.msgs = collections.OrderedDict()
370 self.req_by_value = collections.OrderedDict()
371 self.rsp_by_value = collections.OrderedDict()
372 self.ops = collections.OrderedDict()
373 self.ntfs = collections.OrderedDict()
374 self.consts = collections.OrderedDict()
376 last_exception = None
377 while len(self._resolution_list) > 0:
379 unresolved = self._resolution_list
380 self._resolution_list = []
382 for elem in unresolved:
385 except (KeyError, AttributeError) as e:
386 self._resolution_list.append(elem)
390 resolved.append(elem)
392 if len(resolved) == 0:
395 def new_enum(self, elem):
396 return SpecEnumSet(self, elem)
398 def new_attr_set(self, elem):
399 return SpecAttrSet(self, elem)
401 def new_struct(self, elem):
402 return SpecStruct(self, elem)
404 def new_operation(self, elem, req_val, rsp_val):
405 return SpecOperation(self, elem, req_val, rsp_val)
407 def add_unresolved(self, elem):
408 self._resolution_list.append(elem)
410 def _dictify_ops_unified(self):
411 self.fixed_header = self.yaml['operations'].get('fixed-header')
413 for elem in self.yaml['operations']['list']:
417 op = self.new_operation(elem, val, val)
420 self.msgs[op.name] = op
422 def _dictify_ops_directional(self):
423 self.fixed_header = self.yaml['operations'].get('fixed-header')
424 req_val = rsp_val = 1
425 for elem in self.yaml['operations']['list']:
428 rsp_val = elem['value']
429 req_val_next = req_val
430 rsp_val_next = rsp_val + 1
432 elif 'do' in elem or 'dump' in elem:
433 mode = elem['do'] if 'do' in elem else elem['dump']
435 v = mode.get('request', {}).get('value', None)
438 v = mode.get('reply', {}).get('value', None)
442 rsp_inc = 1 if 'reply' in mode else 0
443 req_val_next = req_val + 1
444 rsp_val_next = rsp_val + rsp_inc
446 raise Exception("Can't parse directional ops")
448 if req_val == req_val_next:
450 if rsp_val == rsp_val_next:
452 op = self.new_operation(elem, req_val, rsp_val)
453 req_val = req_val_next
454 rsp_val = rsp_val_next
456 self.msgs[op.name] = op
458 def find_operation(self, name):
460 For a given operation name, find and return operation spec.
462 for op in self.yaml['operations']['list']:
463 if name == op['name']:
468 self.resolve_up(super())
470 definitions = self.yaml.get('definitions', [])
471 for elem in definitions:
472 if elem['type'] == 'enum' or elem['type'] == 'flags':
473 self.consts[elem['name']] = self.new_enum(elem)
474 elif elem['type'] == 'struct':
475 self.consts[elem['name']] = self.new_struct(elem)
477 self.consts[elem['name']] = elem
479 for elem in self.yaml['attribute-sets']:
480 attr_set = self.new_attr_set(elem)
481 self.attr_sets[elem['name']] = attr_set
483 if self.msg_id_model == 'unified':
484 self._dictify_ops_unified()
485 elif self.msg_id_model == 'directional':
486 self._dictify_ops_directional()
488 for op in self.msgs.values():
489 if op.req_value is not None:
490 self.req_by_value[op.req_value] = op
491 if op.rsp_value is not None:
492 self.rsp_by_value[op.rsp_value] = op
493 if not op.is_async and 'attribute-set' in op:
494 self.ops[op.name] = op
496 self.ntfs[op.name] = op