Source code for dbload.context

# Copyright 2020-2021 Dynatrace LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import defaultdict
from pathlib import Path
import sys
import importlib
from types import FunctionType
from typing import List, Optional

from loguru import logger
from mapz import Mapz

from .config_singleton import get_config
from .query_parser import QueryParser
from .exceptions import (
    EmptyPathToModuleError,
    ImportlibResourcesNotFoundError,
    PredefinedSimulationImportError,
    QueryAlreadyExistsError,
    ScenarioAlreadyExistsError,
    MatchingSqlQueryNotFoundError,
    UnsupportedPredefinedSimulationError,
)


[docs]class Context: """Execution context. Context holds within itself all registered scenarios, queries, and plays. It assembles these objects both from annotated text files with SQL queries and from Python files with decorated objects and functions. Attributes: scenarios (:obj:`Mapz`): Dictionary of parsed and generated scenarios. queries (:obj:`Mapz`): Dictionary of parsed queries. Examples: Create a new context:: from dbload import Context context = Context() """ def __init__(self) -> None: self.scenarios = Mapz() self.queries = Mapz() self._is_infused = False
[docs] def register_query( self, query: FunctionType, name: Optional[str] = None, match: Optional[str] = None, auto: bool = False, ) -> None: """Register a query in the context.""" registration_name = name or query.__name__ logger.debug( f"Registering '{registration_name}' query in the context." ) if registration_name in self.queries: raise QueryAlreadyExistsError(registration_name) self.queries[registration_name] = Mapz( function=query, name=name, match=match, auto=auto )
[docs] def register_scenario( self, scenario: FunctionType, name: Optional[str] = None, infuse: bool = False, auto: bool = False, auto_run_queries: List[str] = [], ) -> None: """Register a scneario in the context.""" registration_name = name or scenario.__name__ logger.debug( f"Registering '{registration_name}' scenario in the context." ) if registration_name in self.scenarios: raise ScenarioAlreadyExistsError(registration_name) self.scenarios[registration_name] = Mapz( function=scenario, name=name, infuse=infuse, auto_run_queries=auto_run_queries, )
[docs] def infuse(self) -> None: if self._is_infused: logger.debug( "Context was already infused. Skipping infuse stage." ) return else: logger.debug("Infusing context.") cfg = get_config() # headers, rows = cfg.to_table() # from prettytable import PrettyTable # pt = PrettyTable(headers) # pt.add_rows(rows) # pt.align = "l" # print(pt) if cfg.predefined: self._load_predefined_simulation() elif cfg.module: self._load_requested_module(cfg.module) parsed = QueryParser.parse(cfg.sources) self._create_implicit_queries(parsed) self._create_implicit_scenarios(parsed) for q in self.queries: self._infuse_query_with_matching_sql(parsed, q) for s in self.scenarios: self._infuse_scenario_with_queries(s) self._is_infused = True
def _load_predefined_simulation(self): cfg = get_config() if cfg.predefined not in cfg.predefined_simulations: raise UnsupportedPredefinedSimulationError(cfg.predefined) # Import helper modules try: import importlib.resources as pkg_resources except ImportError: try: import importlib_resources as pkg_resources except: raise ImportlibResourcesNotFoundError() from None try: # SQL queries file gets imported from resources from dbload import resources sql_source = pkg_resources.read_text(resources, f"{cfg.predefined}.sql") cfg.sources = [sql_source] # Override any read sources # and the scenario file is imported from there as well from dbload.resources import scenarios except Exception as e: logger.error(f"{e}") raise PredefinedSimulationImportError() from None def _load_requested_module(self, path: str) -> None: path = Path(path) if len(path.parts) == 0: raise EmptyPathToModuleError(path) modname = None if len(path.parts) > 1: parent_path = str(path.parent) sys.path.insert(0, parent_path) modname = path.stem else: modname = path.stem importlib.import_module(modname) def _create_implicit_queries(self, parsed: Mapz): """Create pre-requested queries. Generate queries based on the annotated sql statements from the parsed SQL files. """ from .query import query, return_random def _gen(query_name: str): def _empty_query(*args, **kwargs): pass # pragma: no cover _empty_query.__name__ = query_name return _empty_query for query_name in parsed: # Annotated statements can create implicit queries that were not # declared explicitly in an accompanying python module. # We have to register such queries. if query_name not in self.queries: # Create an empty function that does nothing empty_query = _gen(query_name) # Wrap resulting function as query method empty_query = query(name=query_name, auto=True)(empty_query) # Infuse additional optional query modifications for option in parsed[query_name].options: if f"{query_name}_{option}" not in self.queries: if option == "return_random": return_random(auto=True)(empty_query) else: logger.warning( f"Unrecognized option in SQL query: {option}" ) def _create_implicit_scenarios(self, parsed: Mapz): """Create pre-requested scenarios. Generate scenarios based on the "scenario" keyword in query metadata from SQL file. """ from .scenario import scenario def _gen(scenario_name: str): def _empty_scenario(*args, **kwargs): pass # pragma: no cover _empty_scenario.__name__ = scenario_name return _empty_scenario auto_run_queries_per_scenario = defaultdict(list) # Each parsed query for query_name, params in parsed.items(): # Might have several scenarios assigned for name, order in params.scenarios: auto_run_queries_per_scenario[name].append( (query_name, order) ) for name in auto_run_queries_per_scenario: ordered_tuples = sorted( auto_run_queries_per_scenario[name], key=lambda i: i[1] ) ordered_query_names = [t[0] for t in ordered_tuples] # Annotated queries can create implicit scenarios that were not # declared explicitly in an accompanying python module. # We have to register such scenarios. if name not in self.scenarios: empty_scenario = _gen(name) scenario( name=name, infuse=False, auto=True, auto_run_queries=ordered_query_names, )(empty_scenario) # Otherwise, just change the auto_run_queries list for the # already registered scenario. else: self.scenarios[name].auto_run_queries = ordered_query_names def _infuse_query_with_matching_sql(self, parsed: Mapz, query_name: str): """Infuse query object with SQL text attribute. This is a helper method for the ``query`` decorator. """ # Fetch matching SQL query text from parsed queries match = self.queries[query_name].match query_text: Optional[str] = parsed[match].get("text", None) if not query_text: if self.queries[query_name].auto: raise MatchingSqlQueryNotFoundError( self.queries[query_name].function.__name__, query_name ) else: logger.warning( f"'{self.queries[query_name].function.__name__}' has no matching SQL query." ) self.queries[query_name].sql = query_text or None setattr(self.queries[query_name].function, "sql", query_text or None) def _infuse_scenario_with_queries(self, scenario_name: str): if self.scenarios[scenario_name].infuse: logger.info(f"Infusing {scenario_name} with queries") for query_name, q in self.queries.items(): setattr( self.scenarios[scenario_name].function, query_name, q.function, )