Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
ClausHolbechArista committed Jul 3, 2024
1 parent ffe6195 commit f2e2080
Show file tree
Hide file tree
Showing 23 changed files with 5,618 additions and 2,682 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ repos:
language: python
files: python-avd/pyavd/[a-z_]+/schema
pass_filenames: false
additional_dependencies: ['deepmerge>=1.1.0', 'PyYAML>=6.0.0', 'pydantic>=2.3.0', 'jsonschema>=4.10.3', 'referencing>=0.35.0']
additional_dependencies: ['deepmerge>=1.1.0', 'PyYAML>=6.0.0', 'pydantic>=2.3.0', 'jsonschema>=4.10.3', 'referencing>=0.35.0', 'isort']

- id: templates
name: Precompile eos_cli_config_gen Jinja2
Expand Down
8 changes: 7 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
[tool.black]
line-length = 160

force-exclude = '''python-avd/pyavd/_cv/api/.*'''
force-exclude = '''
/(
python-avd/pyavd/_cv/api/\.\*
| python-avd/pyavd/_eos_designs/_schema/eos_designs\.py
| python-avd/pyavd/_eos_cli_config_gen/_schema/eos_cli_config_gen\.py
)/
'''

[tool.isort]

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -6697,7 +6697,7 @@
"algorithm": {
"type": "string",
"default": "first_id",
"description": "This variable defines the Multi-chassis Link Aggregation (MLAG) algorithm used.\nEach MLAG link will have a /31* subnet with each subnet allocated from the relevant MLAG pool via a calculated offset.\nThe offset is calculated using one of the following algorithms:\n - first_id: `(mlag_primary_id - 1) * 2` where `mlag_primary_id` is the ID of the first node defined under the node_group.\n This allocation method will skip every other /31* subnet making it less space efficient than `odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group to have a node with an odd ID and a node with an even ID.\n - same_subnet: the offset will always be zero.\n This allocation method will cause every MLAG link to be addressed with the same /31* subnet.\n\\* - The prefix length is configurable with a default of /31.",
"description": "This variable defines the Multi-chassis Link Aggregation (MLAG) algorithm used.\nEach MLAG link will have a /31\u00b9 subnet with each subnet allocated from the relevant MLAG pool via a calculated offset.\nThe offset is calculated using one of the following algorithms:\n - first_id: `(mlag_primary_id - 1) * 2` where `mlag_primary_id` is the ID of the first node defined under the node_group.\n This allocation method will skip every other /31\u00b9 subnet making it less space efficient than `odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group to have a node with an odd ID and a node with an even ID.\n - same_subnet: the offset will always be zero.\n This allocation method will cause every MLAG link to be addressed with the same /31\u00b9 subnet.\n\u00b9 The prefix length is configurable with a default of /31.",
"enum": [
"first_id",
"odd_id",
Expand Down

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions python-avd/pyavd/_eos_designs/schema/eos_designs.schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1363,17 +1363,17 @@ keys:
type: str
default: first_id
description: "This variable defines the Multi-chassis Link Aggregation
(MLAG) algorithm used.\nEach MLAG link will have a /31* subnet with
(MLAG) algorithm used.\nEach MLAG link will have a /31\xB9 subnet with
each subnet allocated from the relevant MLAG pool via a calculated offset.\nThe
offset is calculated using one of the following algorithms:\n - first_id:
`(mlag_primary_id - 1) * 2` where `mlag_primary_id` is the ID of the
first node defined under the node_group.\n This allocation method
will skip every other /31* subnet making it less space efficient than
`odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group to
have a node with an odd ID and a node with an even ID.\n - same_subnet:
will skip every other /31\xB9 subnet making it less space efficient
than `odd_id`.\n - odd_id: `(odd_id - 1) / 2`. Requires the node_group
to have a node with an odd ID and a node with an even ID.\n - same_subnet:
the offset will always be zero.\n This allocation method will cause
every MLAG link to be addressed with the same /31* subnet.\n\\* - The
prefix length is configurable with a default of /31."
every MLAG link to be addressed with the same /31\xB9 subnet.\n\xB9
The prefix length is configurable with a default of /31."
valid_values:
- first_id
- odd_id
Expand Down
8 changes: 0 additions & 8 deletions python-avd/pyavd/_schema/__init__.py

This file was deleted.

181 changes: 129 additions & 52 deletions python-avd/pyavd/_schema/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,78 +3,156 @@
# that can be found in the LICENSE file.
from __future__ import annotations

from typing import Any, TYPE_CHECKING, get_type_hints, get_origin, get_args, Annotated
from collections import ChainMap
from types import NoneType, UnionType
from typing import TYPE_CHECKING, Annotated, Any, Generator, Optional, Union, get_args, get_origin, get_type_hints
from warnings import warn

from .._utils import get_all
from .types import InvalidKey
from .validator import validator

if TYPE_CHECKING:
from .eos_designs import EosDesigns

from typing import TypeVar

from .._eos_designs.schema.eos_designs import EosDesigns

T = TypeVar("T")


class AvdBase:

_allow_other_keys: bool = False

def __init__(self, *args, **kwargs):
pass

def __repr__(self) -> str:
# cls_name = super().__repr__().split(maxsplit=1)[0]
cls_name = self.__class__.__name__
attrs = [f"{key}={repr(getattr(self, key, None))}" for key in self._get_type_hints() if not key.startswith("_") and key[0].islower()]
attrs = [f"{key}={repr(getattr(self, key, None))}" for key in self.keys()]
return f"<{cls_name}({', '.join(attrs)})>"

def validate(self, data: Any) -> bool:
pass

def _set_with_coercion(self, attr: str, value: Any):
type_hints = self._get_type_hints()[attr]
if isinstance(value, type_hints):
return setattr(self, attr, value)
base_class = self._get_attr_base_class(attr)
print(base_class)
if issubclass(base_class, AvdBase) and isinstance(value, dict):
return setattr(self, attr, base_class(**value))
if base_class is int and isinstance(value, (str, bool)):
value = int(value)
return setattr(self, attr, value)
if base_class is str and isinstance(value, (int, bool, float)):
value = str(value)
return setattr(self, attr, value)
raise TypeError(f"Invalid type '{type(value)}. Expected '{base_class}'")

@classmethod
def _get_type_hints(cls) -> type:
return get_type_hints(cls, include_extras=True)
def keys(cls) -> Generator[str, None, None]:
return (key for key in cls._get_type_hints() if not key.startswith("_") and key[0].islower())

def items(self) -> Generator[tuple[str, Any], None, None]:
return ((key, getattr(self, key, None)) for key in self.keys())

@classmethod
def _get_attr_base_class(cls, attr: str) -> type:
type_hints = cls._get_type_hints()[attr]
type_hints = cls._strip_union_none_from_type_hints(type_hints)
origin = get_origin(type_hints)
if origin is Annotated:
return get_args(type_hints)[0]

return type_hints

@staticmethod
def _strip_union_none_from_type_hints(type_hints: type) -> type:
origin = get_origin(type_hints)
if origin is UnionType:
args = tuple(arg for arg in get_args(type_hints) if arg is not NoneType)
if len(args) > 1:
raise TypeError("Unable to remove union since this field has more non-NonType than one")
return args[0]
return type_hints
def validate(cls, data: dict, path: list | None = None) -> bool: # TODO: Rename all methods to something that cannot collide with schema keys
if warnings := validator(data, cls, path or []):
for warning in warnings:
warn(warning)
return False
return True

# if not isinstance(data, (dict, ChainMap)):
# raise TypeError("Invalid data type '{type(data)}'. Expected dict.")

# valid = True
# if path is None:
# path = []

# base_type_hints = cls._get_type_hints(include_extras=False)
# for key, value in data.items():
# if not isinstance(key, str) or key.startswith("_") or key not in base_type_hints:
# if not cls._allow_other_keys:
# warn(InvalidKey(str([*path, key]), path=[*path, key]))
# valid = False
# continue

# type_hints = base_type_hints[key]
# base_class = cls._get_attr_base_class(key)

# if value is None and cls._is_optional(type_hints):
# # No need to try further validation if the value is None.
# continue

# try:
# if issubclass(base_class, AvdBase):
# avd_class: AvdBase = cls._strip_union_none_from_type_hints(type_hints)
# if not isinstance(value, (avd_class, dict, ChainMap)):
# warn(f"{key}: Invalid type '{type(value)}'. Expected '{avd_class}' or dict or ChainMap. Optional: {cls._is_optional(type_hints)}")
# valid = False
# # No need to try further validation if the type is wrong.
# continue
# # Perform validation of the value on the base_class
# valid = avd_class.validate(value, path=[*path, key]) and valid

# elif not (isinstance(value, base_class) or (base_class in ACCEPTED_COERCION_MAP and isinstance(value, ACCEPTED_COERCION_MAP[base_class]))):
# warn(f"{key}: Invalid type '{type(value)}'. Expected '{base_class}'")
# valid = False
# # No need to try further validation if the type is wrong.
# continue
# except TypeError as e:
# raise TypeError(f"type_hints: {type_hints}, base_class: {base_class}", e)

# return valid

# @staticmethod
# def _is_optional(type_hints: type) -> bool:
# origin = get_origin(type_hints)
# if origin is Optional:
# return True

# if origin is UnionType or origin is Union:
# return NoneType in get_args(type_hints)

# return False

# def _set_with_coercion(self, attr: str, value: Any):
# type_hints = self._get_type_hints(include_extras=False)[attr]
# if isinstance(value, type_hints):
# return setattr(self, attr, value)

# base_class = self._get_attr_base_class(attr)
# print(base_class)

# if issubclass(base_class, AvdBase) and isinstance(value, dict):
# avd_class: AvdBase = self._strip_union_none_from_type_hints(type_hints)
# return setattr(self, attr, avd_class(**value))

# if base_class in ACCEPTED_COERCION_MAP and isinstance(value, ACCEPTED_COERCION_MAP[base_class]):
# value = base_class(value)
# return setattr(self, attr, value)

# raise TypeError(f"Invalid type '{type(value)}. Expected '{base_class}'")

@classmethod
def _get_attr_annotations(cls, attr: str) -> tuple[type]:
type_hints = cls._get_type_hints()[attr]
type_hints = cls._strip_union_none_from_type_hints(type_hints)
origin = get_origin(type_hints)
if origin is Annotated:
return get_args(type_hints)[1:]
def _get_type_hints(cls, include_extras: bool = True) -> dict[str, type]:
return get_type_hints(cls, include_extras=include_extras)

# @classmethod
# def _get_attr_base_class(cls, attr: str) -> type:
# org_type_hints = cls._get_type_hints(include_extras=False)[attr]
# type_hints = cls._strip_union_none_from_type_hints(org_type_hints)
# origin = get_origin(type_hints)
# if origin is None:
# return type_hints
# return origin

# @staticmethod
# def _strip_union_none_from_type_hints(type_hints: type) -> type:
# while True:
# origin = get_origin(type_hints)
# if origin is UnionType or origin is Union:
# args = tuple(arg for arg in get_args(type_hints) if arg is not NoneType)
# if len(args) > 1:
# raise TypeError("Unable to remove union since this field has more non-NonType than one")
# type_hints = args[0]
# continue

# if origin is Optional:
# args = get_args(type_hints)
# if len(args) > 1:
# raise TypeError("Unable to remove Optional since this field has types inside than one")
# type_hints = args[0]
# continue

# break
# return type_hints


class AvdDictBaseModel(AvdBase):
Expand All @@ -86,7 +164,6 @@ class AvdDictBaseModel(AvdBase):
"""

_custom_data: dict[str, Any] | None = None
_allow_other_keys: bool = False

def __init__(self, **kwargs):
"""
Expand All @@ -95,7 +172,7 @@ def __init__(self, **kwargs):
self._custom_data = {key: kwargs.pop(key) for key in kwargs if key.startswith("_")}
if kwargs and not self._allow_other_keys:
for kwarg in kwargs:
warn(kwarg, InvalidKey)
warn(InvalidKey(kwarg, [kwarg]))


class AvdEosDesignsRootDictBase(AvdBase):
Expand Down Expand Up @@ -172,7 +249,7 @@ def _extract_dynamic_keys(self: EosDesigns, kwargs: dict[str, Any]):
model_key: str = dynamic_key_map["model_key"]
model_key_list = []

dynamic_keys = get_all(self, dynamic_keys_path, required=True)
dynamic_keys = get_all(self, dynamic_keys_path)
for dynamic_key in dynamic_keys:
# dynamic_key is one key like "l3leaf".
if kwargs.get(dynamic_key) is None:
Expand Down
Loading

0 comments on commit f2e2080

Please sign in to comment.