Refactor code
This commit is contained in:
parent
f6a78cf375
commit
8ee02ab5e2
|
@ -9,8 +9,8 @@ import py
|
|||
from mako.lookup import TemplateLookup
|
||||
|
||||
from .feature import get_features
|
||||
from .scenario import find_argumented_step_function, make_python_docstring, make_python_name, make_string_literal
|
||||
from .steps import get_step_fixture_name
|
||||
from .scenario import make_python_docstring, make_python_name, make_string_literal, patch_argumented_step_functions
|
||||
from .steps import get_parsed_step_fixture_name, get_step_fixture_name
|
||||
from .types import STEP_TYPES
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
@ -131,10 +131,11 @@ def _find_step_fixturedef(
|
|||
fixturedefs = fixturemanager.getfixturedefs(step_fixture_name, item.nodeid)
|
||||
if fixturedefs is not None:
|
||||
return fixturedefs
|
||||
|
||||
step_func_context = find_argumented_step_function(name, type_, fixturemanager)
|
||||
if step_func_context is not None:
|
||||
return fixturemanager.getfixturedefs(step_func_context.name, item.nodeid)
|
||||
with patch_argumented_step_functions(name=name, type_=type_, fixturemanager=fixturemanager, nodeid=item.nodeid):
|
||||
bdd_name = get_parsed_step_fixture_name(name, type_)
|
||||
fixturedefs = fixturemanager.getfixturedefs(bdd_name, item.nodeid)
|
||||
if fixturedefs is not None:
|
||||
return fixturedefs
|
||||
return None
|
||||
|
||||
|
||||
|
|
|
@ -12,14 +12,15 @@ test_publish_article = scenario(
|
|||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import contextlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from pprint import pformat
|
||||
from typing import TYPE_CHECKING, Callable, cast
|
||||
|
||||
import pytest
|
||||
from _pytest.fixtures import FixtureDef, FixtureManager, FixtureRequest, call_fixture_func
|
||||
from _pytest.nodes import iterparentnodeids
|
||||
|
||||
from . import exceptions
|
||||
from .feature import get_feature, get_features
|
||||
|
@ -42,8 +43,8 @@ ALPHA_REGEX = re.compile(r"^\d+_*")
|
|||
|
||||
|
||||
def iter_argumented_step_function(
|
||||
name: str, type_: str, fixturemanager: FixtureManager
|
||||
) -> Iterable[tuple[str, FixtureDef[Any], StepFunctionContext, int]]:
|
||||
name: str, type_: str, fixturemanager: FixtureManager, nodeid: str | None = None
|
||||
) -> Iterable[FixtureDef[Any]]:
|
||||
"""Iterate over argumented step functions."""
|
||||
# happens to be that _arg2fixturedefs is changed during the iteration so we use a copy
|
||||
fixture_def_by_name = list(fixturemanager._arg2fixturedefs.items())
|
||||
|
@ -60,37 +61,40 @@ def iter_argumented_step_function(
|
|||
if not match:
|
||||
continue
|
||||
|
||||
yield fixturename, fixturedef, step_func_context, pos
|
||||
if nodeid is not None:
|
||||
if fixturedef not in fixturemanager.getfixturedefs(fixturename, nodeid):
|
||||
continue
|
||||
|
||||
yield fixturedef
|
||||
|
||||
|
||||
def rewrite_argumented_step_functions(name: str, type_, fixturemanager) -> None:
|
||||
@contextlib.contextmanager
|
||||
def patch_argumented_step_functions(name: str, type_, fixturemanager, nodeid: str | None = None) -> None:
|
||||
bdd_name = get_parsed_step_fixture_name(name, type_)
|
||||
|
||||
l = list(iter_argumented_step_function(name=name, type_=type_, fixturemanager=fixturemanager))
|
||||
l = [f for _, f, _, _ in l]
|
||||
# TODO: quick n dirty way to give the right priority to the fixtures, but we should be more sophisticated than this.
|
||||
resorted = sorted(l, key=lambda x: x.baseid)
|
||||
# l are all the fixture names that parse the current step
|
||||
added = {}
|
||||
# TODO: Remove all the injected bdd_name we did here after the execution of the step
|
||||
# or maybe not, it could be used as a cache? would it poison steps for other scenarios?
|
||||
for fixturedef in resorted:
|
||||
existing_defs = fixturemanager._arg2fixturedefs.setdefault(bdd_name, [])
|
||||
if fixturedef not in existing_defs:
|
||||
existing_defs.append(fixturedef)
|
||||
added.setdefault(bdd_name, []).append(fixturedef)
|
||||
fixturedefs = list(
|
||||
iter_argumented_step_function(name=name, type_=type_, fixturemanager=fixturemanager, nodeid=nodeid)
|
||||
)
|
||||
# Sort the fixture definitions by their "path", so that the "pytestbdd_parsed_" fixturedef will
|
||||
# respect the fixture scope
|
||||
fixture_defs_by_path = [(tuple(iterparentnodeids(x.baseid)), x) for x in fixturedefs]
|
||||
resorted = sorted(fixture_defs_by_path, key=lambda x: x[0])
|
||||
|
||||
bdd_step_defs = fixturemanager._arg2fixturedefs[bdd_name] = []
|
||||
for fixture_path, fixturedef in resorted:
|
||||
if fixturedef not in bdd_step_defs:
|
||||
logger.debug("Adding provider for fixture %r}: %s", bdd_name, fixturedef)
|
||||
bdd_step_defs.append(fixturedef)
|
||||
else:
|
||||
logger.warning("%r already added to bdd name %r, SKIPPING", fixturedef, bdd_name)
|
||||
pass
|
||||
if logger.isEnabledFor(logging.DEBUG):
|
||||
logger.debug(f"Added the following fixtures:\n{pformat(added)}")
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
del fixturemanager._arg2fixturedefs[bdd_name]
|
||||
|
||||
|
||||
def find_argumented_step_function(
|
||||
request, name: str, type_: str, fixturemanager: FixtureManager
|
||||
) -> StepFunctionContext | None:
|
||||
def get_argumented_step_function(request, name: str, type_: str) -> StepFunctionContext | None:
|
||||
"""Find argumented step fixture name."""
|
||||
rewrite_argumented_step_functions(name=name, type_=type_, fixturemanager=fixturemanager)
|
||||
bdd_name = get_parsed_step_fixture_name(name, type_)
|
||||
try:
|
||||
return request.getfixturevalue(bdd_name)
|
||||
|
@ -151,7 +155,10 @@ def _execute_scenario(feature: Feature, scenario: Scenario, request: FixtureRequ
|
|||
request.config.hook.pytest_bdd_before_scenario(request=request, feature=feature, scenario=scenario)
|
||||
|
||||
for step in scenario.steps:
|
||||
context = find_argumented_step_function(request, step.name, step.type, request._fixturemanager)
|
||||
with patch_argumented_step_functions(
|
||||
name=step.name, type_=step.type, fixturemanager=request._fixturemanager, nodeid=request.node.nodeid
|
||||
):
|
||||
context = get_argumented_step_function(request, step.name, step.type)
|
||||
if context is None:
|
||||
exc = exceptions.StepDefinitionNotFoundError(
|
||||
f"Step definition is not found: {step}. "
|
||||
|
|
Loading…
Reference in New Issue