Source code for scim2_tester.checkers.resource_query_attributes

from typing import Any

from scim2_models import ListResponse
from scim2_models import Mutability
from scim2_models import Required
from scim2_models import Resource
from scim2_models import ResponseParameters
from scim2_models import Returned
from scim2_models import SearchRequest

from ..utils import CheckContext
from ..utils import CheckResult
from ..utils import Status
from ..utils import check_result
from ..utils import checker


def _pick_attribute_names(
    model: type[Resource[Any]],
) -> tuple[str | None, str | None]:
    """Pick two default-returned, non-required attribute names for testing.

    Returns a tuple ``(included, excluded)`` of serialization aliases.
    Either may be :data:`None` if the model does not have enough suitable attributes.
    """
    candidates: list[str] = []
    for field_name, field_info in model.model_fields.items():
        returnability = model.get_field_annotation(field_name, Returned)
        mutability = model.get_field_annotation(field_name, Mutability)
        required = model.get_field_annotation(field_name, Required)
        if (
            returnability == Returned.default
            and mutability != Mutability.read_only
            and required != Required.true
            and field_name not in ("schemas", "meta", "id")
        ):
            alias = field_info.serialization_alias or field_name
            candidates.append(alias)

    included = candidates[0] if len(candidates) >= 1 else None
    excluded = candidates[1] if len(candidates) >= 2 else None
    return included, excluded


def _check_attribute_filtering(
    response_data: dict[str, Any],
    included: str | None,
    excluded: str | None,
    model_name: str,
    endpoint: str,
) -> tuple[Status, str]:
    """Verify that the response honours ``attributes`` or ``excludedAttributes``.

    Returns a ``(status, reason)`` pair.
    """
    if included is not None and included not in response_data:
        return (
            Status.ERROR,
            f"{endpoint}: requested attribute '{included}' missing "
            f"from {model_name} response",
        )

    if excluded is not None and excluded in response_data:
        return (
            Status.ERROR,
            f"{endpoint}: excluded attribute '{excluded}' still present "
            f"in {model_name} response",
        )

    return Status.SUCCESS, f"{endpoint}: attribute filtering honoured for {model_name}"


def _find_resource_in_list(
    response: ListResponse[Resource[Any]] | Any,
    resource_id: str,
) -> dict[str, Any] | None:
    """Find a resource by id in a list response and return its dumped dict."""
    if isinstance(response, ListResponse):
        for r in response.resources:
            if r.id == resource_id:
                return r.model_dump()
    return None


def _run_single_attribute_check(
    context: CheckContext,
    model: type[Resource[Any]],
    test_obj: Resource[Any],
    query_parameters: ResponseParameters,
    included: str | None,
    excluded: str | None,
    query_fn: Any,
    endpoint: str,
) -> CheckResult:
    """Run a single inclusion or exclusion check.

    :param query_fn: callable that takes a :class:`ResponseParameters` and returns
        a single :class:`Resource` or a :class:`ListResponse`.
    :param endpoint: human-readable endpoint label for messages.
    """
    response = query_fn(query_parameters)

    if isinstance(response, Resource) and not isinstance(response, ListResponse):
        response_data = response.model_dump()
    elif isinstance(response, ListResponse):
        response_data = _find_resource_in_list(response, test_obj.id)
        if response_data is None:
            return check_result(
                context,
                status=Status.ERROR,
                reason=f"{endpoint}: could not find {model.__name__} "
                f"with id {test_obj.id} in filtered results",
                data=response,
            )
    else:
        return check_result(
            context,
            status=Status.ERROR,
            reason=f"{endpoint}: unexpected response type {type(response).__name__}",
            data=response,
        )

    status, reason = _check_attribute_filtering(
        response_data, included, excluded, model.__name__, endpoint
    )
    return check_result(context, status=status, reason=reason, data=response)


def _run_attribute_checks(
    context: CheckContext,
    model: type[Resource[Any]],
    test_obj: Resource[Any],
    included: str | None,
    excluded: str | None,
    query_fn: Any,
    endpoint: str,
) -> list[CheckResult]:
    """Run inclusion and exclusion checks using the given query function.

    :param query_fn: callable that takes a :class:`ResponseParameters` and returns
        a single :class:`Resource` or a :class:`ListResponse`.
    :param endpoint: human-readable endpoint label for messages.
    """
    results: list[CheckResult] = []

    if included is not None:
        results.append(
            _run_single_attribute_check(
                context,
                model,
                test_obj,
                ResponseParameters(attributes=[included]),
                included=included,
                excluded=None,
                query_fn=query_fn,
                endpoint=endpoint,
            )
        )

    if excluded is not None:
        results.append(
            _run_single_attribute_check(
                context,
                model,
                test_obj,
                ResponseParameters(excluded_attributes=[excluded]),
                included=None,
                excluded=excluded,
                query_fn=query_fn,
                endpoint=endpoint,
            )
        )

    return results


[docs] @checker("crud:read:attributes") def object_query_with_attributes( context: CheckContext, model: type[Resource[Any]] ) -> list[CheckResult]: """Validate that GET on a single resource honours ``attributes`` and ``excludedAttributes``. Creates a resource with all writable fields populated, then retrieves it twice: once with ``attributes`` restricting the response to a single attribute, and once with ``excludedAttributes`` hiding another attribute. **Status:** - :attr:`~scim2_tester.Status.SUCCESS`: Server correctly filters response attributes - :attr:`~scim2_tester.Status.ERROR`: Server ignores attribute filtering parameters - :attr:`~scim2_tester.Status.SKIPPED`: Model has no suitable attributes to test .. pull-quote:: :rfc:`RFC 7644 Section 3.4.1 <7644#section-3.4.1>` "Clients MAY request a partial resource representation on any operation that returns a resource within the response by specifying either of the mutually exclusive URL query parameters ``attributes`` or ``excludedAttributes``." """ included, excluded = _pick_attribute_names(model) if included is None and excluded is None: return [ check_result( context, status=Status.SKIPPED, reason=f"No suitable attributes to test filtering on {model.__name__}", ) ] test_obj = context.resource_manager.create_and_register(model, fill_all=True) def query_fn(query_parameters: ResponseParameters) -> Any: return context.client.query( model, test_obj.id, query_parameters=query_parameters, expected_status_codes=context.conf.expected_status_codes or [200], ) return _run_attribute_checks( context, model, test_obj, included, excluded, query_fn, "GET /Resource/{id}" )
[docs] @checker("crud:read:attributes") def object_list_with_attributes( context: CheckContext, model: type[Resource[Any]] ) -> list[CheckResult]: """Validate that GET on the collection endpoint honours ``attributes`` and ``excludedAttributes``. Creates a resource with all writable fields populated, then lists the collection twice: once with ``attributes`` and once with ``excludedAttributes``. Verifies that the created resource appears in the list and that its serialized form respects the filtering parameters. **Status:** - :attr:`~scim2_tester.Status.SUCCESS`: Server correctly filters list response attributes - :attr:`~scim2_tester.Status.ERROR`: Server ignores attribute filtering on list endpoint - :attr:`~scim2_tester.Status.SKIPPED`: Model has no suitable attributes to test .. pull-quote:: :rfc:`RFC 7644 Section 3.4.2 <7644#section-3.4.2>` "Clients MAY use the ``attributes`` query parameter to request particular attributes be included in a query response." """ included, excluded = _pick_attribute_names(model) if included is None and excluded is None: return [ check_result( context, status=Status.SKIPPED, reason=f"No suitable attributes to test filtering on {model.__name__}", ) ] test_obj = context.resource_manager.create_and_register(model, fill_all=True) def query_fn(query_parameters: ResponseParameters) -> Any: return context.client.query( model, query_parameters=query_parameters, expected_status_codes=context.conf.expected_status_codes or [200], ) return _run_attribute_checks( context, model, test_obj, included, excluded, query_fn, "GET /Resource" )
[docs] @checker("crud:read:attributes") def search_with_attributes( context: CheckContext, model: type[Resource[Any]] ) -> list[CheckResult]: """Validate that POST ``/.search`` honours ``attributes`` and ``excludedAttributes``. Creates a resource with all writable fields populated, then issues ``/.search`` requests with attribute filtering. Verifies that the created resource appears in the results and respects the filtering. **Status:** - :attr:`~scim2_tester.Status.SUCCESS`: Server correctly filters search response attributes - :attr:`~scim2_tester.Status.ERROR`: Server ignores attribute filtering on search endpoint - :attr:`~scim2_tester.Status.SKIPPED`: Model has no suitable attributes to test .. pull-quote:: :rfc:`RFC 7644 Section 3.4.3 <7644#section-3.4.3>` "Clients MAY execute queries without passing parameters on the URL by using the HTTP POST verb combined with the ``/.search`` path extension." """ included, excluded = _pick_attribute_names(model) if included is None and excluded is None: return [ check_result( context, status=Status.SKIPPED, reason=f"No suitable attributes to test filtering on {model.__name__}", ) ] test_obj = context.resource_manager.create_and_register(model, fill_all=True) def query_fn(query_parameters: ResponseParameters) -> Any: return context.client.search( search_request=SearchRequest( attributes=query_parameters.attributes, excluded_attributes=query_parameters.excluded_attributes, ), expected_status_codes=context.conf.expected_status_codes or [200], ) return _run_attribute_checks( context, model, test_obj, included, excluded, query_fn, "POST /.search" )