Source code for debusine.db.models.scopes

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db scopes."""

import re
from collections.abc import Collection
from typing import (
    Any,
    Generic,
    TYPE_CHECKING,
    TypeAlias,
    TypeVar,
    Union,
    assert_never,
    cast,
)

from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import QuerySet, UniqueConstraint

from debusine.db.context import context
from debusine.db.models import permissions
from debusine.db.models.permissions import (
    PartialCheckResult,
    PermissionUser,
    ROLES,
    permission_check,
    permission_filter,
)

if TYPE_CHECKING:
    from django.contrib.auth.models import AnonymousUser
    from django_stubs_ext.db.models import TypedModelMeta

    from debusine.db.models.auth import User
else:
    TypedModelMeta = object

#: Name of the fallback scope used for transitioning to scoped models
FALLBACK_SCOPE_NAME = "debusine"

#: Scope names reserved for use in toplevel URL path components
RESERVED_SCOPE_NAMES = frozenset(
    (
        "accounts",
        "admin",
        "api",
        "api-auth",
        "artifact",
        "task-status",
        "user",
        "workers",
        "work-request",
        "workspace",
    )
)

#: Regexp matching the structure of scope names
scope_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")


def is_valid_scope_name(value: str) -> bool:
    """Check if value is a valid scope name."""
    if value in RESERVED_SCOPE_NAMES:
        return False
    return bool(scope_name_regex.match(value))


def validate_scope_name(value: str) -> None:
    """Validate scope names."""
    if not is_valid_scope_name(value):
        raise ValidationError(
            "%(value)r is not a valid scope name", params={"value": value}
        )


A = TypeVar("A")


class ScopeQuerySet(QuerySet["Scope", A], Generic[A]):
    """Custom QuerySet for Scope."""

    @permission_filter
    def can_display(
        self, user: PermissionUser  # noqa: U100
    ) -> "ScopeQuerySet[A]":
        """Keep only Scopes that can be displayed."""
        assert user is not None  # Enforced by decorator
        return self

    @permission_filter
    def can_create_workspace(self, user: PermissionUser) -> "ScopeQuerySet[A]":
        """Keep only Scopes where the user can create workspaces."""
        assert user is not None  # Enforced by decorator
        if not user.is_authenticated:
            return self.none()
        return self.filter(ROLES(user, Scope.Roles.OWNER))


class ScopeManager(models.Manager["Scope"]):
    """Manager for Scope model."""

    def get_roles_model(self) -> type["ScopeRole"]:
        """Get the model used for role assignment."""
        return ScopeRole

    def get_queryset(self) -> ScopeQuerySet[Any]:
        """Use the custom QuerySet."""
        return ScopeQuerySet(self.model, using=self._db)


class ScopeRoles(permissions.Roles):
    """Available roles for a Scope."""

    OWNER = "owner", "Owner"


[docs] class Scope(models.Model): """ Scope model. This is used to create different distinct sets of groups and workspaces """ Roles: TypeAlias = ScopeRoles objects = ScopeManager.from_queryset(ScopeQuerySet)() name = models.CharField( max_length=255, unique=True, validators=[validate_scope_name], help_text="internal name for the scope", ) label = models.CharField( max_length=255, unique=True, help_text="User-visible name for the scope", ) icon = models.CharField( max_length=255, default="", blank=True, help_text=( "Optional user-visible icon," " resolved via ``{% static %}`` in templates" ), ) def __str__(self) -> str: """Return basic information of Scope.""" return self.name
[docs] @permission_check("{user} cannot display scope {resource}") def can_display(self, user: PermissionUser) -> bool: # noqa: U100 """Check if the scope can be displayed.""" assert user is not None # enforced by decorator return True
[docs] @permission_check("{user} cannot create workspaces in {resource}") def can_create_workspace(self, user: PermissionUser) -> bool: """Check if the user can create workspaces in this scope.""" assert user is not None # enforced by decorator # Token is not taken into account here if not user.is_authenticated: return False # Shortcut to avoid hitting the database for common cases match self.context_has_role(user, Scope.Roles.OWNER): case PartialCheckResult.ALLOW: return True case PartialCheckResult.DENY: return False case PartialCheckResult.PASS: pass case _ as unreachable: assert_never(unreachable) return ( Scope.objects.can_create_workspace(user).filter(pk=self.pk).exists() )
[docs] def context_has_role( self, user: "User", roles: ScopeRoles | Collection[ScopeRoles] ) -> PartialCheckResult: """ Check user roles in the current context. :returns: * ALLOW if the context has enough information to determine that the user has at least one of the given roles * DENY if the context has enough information to determine that the user does not have any of the given roles * PASS if the context does not have enough information to decide """ if not roles: raise ValueError("context_has_role needs at least one role") if context.user != user or context.scope != self: return PartialCheckResult.PASS if isinstance(roles, ScopeRoles): roles = (roles,) for role in roles: if role in context.scope_roles: return PartialCheckResult.ALLOW return PartialCheckResult.DENY
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs] def get_roles( self, user: Union["User", "AnonymousUser"] ) -> QuerySet["ScopeRole", "ScopeRoles"]: """Get the roles of the user on this scope.""" if not user.is_authenticated: result = ScopeRole.objects.none().values_list("role", flat=True) else: result = ( ScopeRole.objects.filter(resource=self, group__users=user) .values_list("role", flat=True) .distinct() ) # QuerySet sees a CharField, but we know it's a ScopeRoles enum return cast(QuerySet["ScopeRole", "ScopeRoles"], result)
class ScopeRole(models.Model): """Role assignments for scopes.""" Roles: TypeAlias = ScopeRoles resource = models.ForeignKey( Scope, on_delete=models.CASCADE, related_name="roles", ) group = models.ForeignKey( "Group", on_delete=models.CASCADE, related_name="scope_roles", ) role = models.CharField(max_length=16, choices=Roles.choices) class Meta(TypedModelMeta): constraints = [ UniqueConstraint( fields=["resource", "group", "role"], name="%(app_label)s_%(class)s_unique_resource_group_role", ), ] def __str__(self) -> str: """Return a description of the role assignment.""" return f"{self.group}{self.role}{self.resource}"