Source code for datopy.modeling

"""
Tools for data modeling, validation, and raw data processing, including
auto-generated data models and a flexible framework for ETL workflows.
"""

import json
import pprint
import doctest
import pandas as pd
from jsonschema import validate
from pydantic import BaseModel, Field, PositiveInt, ValidationError
from typing import (
    Annotated, Any, Callable, Collection, Dict, Iterable, List,
    NamedTuple, TypeVar,
)
from typing import TYPE_CHECKING

# import datopy._settings
from datopy.workflow import doctest_function

# Custom types
# (recursively) nested dict with arbitrary depth and pre-defined node type
# TODO check this!
NestedDict = dict[str, "NestedDict" | List[str] | None]
GenericNestedDict = dict[object, object]

# Define TypeVars
# XXX remove unused
# for dictionary (key/value type)
# _KT = TypeVar('_KT')
# _VT = TypeVar('_VT')


# ----------------------------------------
# --- Data dictionary generation utils ---
# ----------------------------------------

[docs] def list_to_dict(obj: list[object] | tuple[object] | set[object], max_items: int | None = None) -> dict[int, object]: """ Provide a dictionary representation of a list or other non-dictionary or string-like iterable, using indices as keys. Parameters ---------- obj : list A list to convert to a dictionary representation. max_items : int, default=None Option to impose a limit on the number of elements to iterate over. Intended use: constructing pattern-based data models from a sample. Returns ------- res : dict The supplied list's dictionary representation. Examples -------- >>> from datopy.modeling import list_to_dict >>> my_list = [1, 'two', [3], {'four': 5}] >>> list_to_dict(my_list) {1: 1, 2: 'two', 3: [3], 4: {'four': 5}} >>> my_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] >>> list_to_dict(my_list, max_items=5) {1: 1, 2: 2, 3: 3, 4: 4, 5: 5} >>> my_dict = dict(a=1, b='two') >>> list_to_dict(my_dict) Not running conversion since obj is already a dictionary. {'a': 1, 'b': 'two'} """ if isinstance(obj, dict): print("Not running conversion since obj", # type: ignore [unreachable] "is already a dictionary.") return obj else: return {(key + 1): value for key, value in enumerate(obj) if (max_items is None) or (key < max_items)}
[docs] def compare_dict_keys( dict1: GenericNestedDict | object, dict2: GenericNestedDict | object ) -> GenericNestedDict | str | None: """ Recursively compare two dictionaries and identify missing keys. Parameters ---------- dict1 : dict The reference dictionary. dict2 : dict The comparison dictionary to be checked against `dict1`. Returns ------- result : dict | List[str] | None The nested dictionary of fields missing from `dict2` relative `dict1`. Examples -------- Setup >>> from datopy.modeling import compare_dict_keys >>> import copy >>> dict1 = {'a1': 1, 'a2': 'two', 'a3': [3], ... 'b1': {'b11': 1, 'b12': 'two', 'b13': [3]}, ... 'c1': {'c11': {'c111': 1, 'c112': 'two', 'c113': [3]}} ... } >>> from datopy.modeling import compare_dict_keys Identical dictionaries >>> dict2 = copy.deepcopy(dict1) >>> compare_dict_keys(dict1, dict2) Missing nesting level 0 key >>> del dict2['a1'] >>> compare_dict_keys(dict1, dict2) {'missing_keys': ['a1']} Missing nesting level 1 key >>> dict2 = copy.deepcopy(dict1) >>> del dict2['b1']['b12'] >>> compare_dict_keys(dict1, dict2) {'nested_diff': {'b1': {'missing_keys': ['b12']}}} Missing nesting level 2 key >>> dict2 = copy.deepcopy(dict1) >>> del dict2['c1']['c11']['c113'] >>> compare_dict_keys(dict1, dict2) {'nested_diff': {'c1': {'nested_diff': {'c11': {'missing_keys': ['c113']}}}}} """ if isinstance(dict1, dict) and not isinstance(dict2, dict): return "missing nested dictionary" if not (isinstance(dict1, dict) and isinstance(dict2, dict)): return None missing_keys = set(dict1.keys()) - set(dict2.keys()) shared_keys = set(dict1.keys()).intersection(set(dict2.keys())) # Initialize difference dictionary diff_dict: dict[object, object] = {} for key in shared_keys: nested_diff = compare_dict_keys(dict1[key], dict2[key]) # Add any differences to the difference if nested_diff is not None: diff_dict[key] = nested_diff # Return result if no missing keys or no diffs in nested dicts found if missing_keys or diff_dict: result: dict[object, object] = {} if missing_keys: result['missing_keys'] = list(missing_keys) if diff_dict: result['nested_diff'] = diff_dict return result # Return None if no missing keys or differences found return None
[docs] def apply_recursive(func: Callable[..., Any], obj) -> dict[str | int, Any] | Any: """ Convert a nested data structure (with explicit or implied key/value pairs) into a tree-like dictionary, applying a given function to terminal values. Parameters ---------- func : Callable[..., Any] _description_ obj : _description_ Returns ------- dict: _description_ Examples -------- >>> from datopy.modeling import apply_recursive Define the data >>> nested_data = {'type': 'album', 'url': 'link.com', 'audio_features': [ ... {'loudness': -11.4, 'duration_ms': 251}, ... {'loudness': -15.5, 'duration_ms': 284}]} >>> print(nested_data) {'type': 'album', 'url': 'link.com', 'audio_features': [{'loudness': -11.4, 'duration_ms': 251}, {'loudness': -15.5, 'duration_ms': 284}]} Convert to json-friendly representation >>> serialized = apply_recursive(str, nested_data) >>> print(serialized) {'type': 'album', 'url': 'link.com', 'audio_features': {1: {'loudness': '-11.4', 'duration_ms': '251'}, 2: {'loudness': '-15.5', 'duration_ms': '284'}}} Convert to field/type pairs >>> schema = apply_recursive(lambda x: type(x).__name__, nested_data) >>> print(schema) {'type': 'str', 'url': 'str', 'audio_features': {1: {'loudness': 'float', 'duration_ms': 'int'}, 2: {'loudness': 'float', 'duration_ms': 'int'}}} """ # Handle dictionary-like objects if hasattr(obj, 'items'): return {key: apply_recursive(func, value) for key, value in obj.items()} # Handle list-like objects elif isinstance(obj, (list, tuple, set)): return {key: apply_recursive(func, value) for key, value in list_to_dict(obj, max_items=5).items()} # Handle base cases elif isinstance(obj, str): return func(obj) else: return func(obj)
[docs] def schema_jsonify(obj: GenericNestedDict) -> GenericNestedDict: r""" _summary_ Parameters ---------- schema : dict _description_ Returns ------- dict : _description_ Examples -------- >>> import pprint >>> from datopy.modeling import schema_jsonify >>> original_schema = {'name': 'str', 'quantity': 'int', 'features': {1: {'volume': 'str', 'duration': 'float'}, 2: {'volume': 'str', 'duration': 'float'}}, 'creator': {'person': {'name': 'str'}, 'company': {'name': 'str', 'location': 'str'}}} >>> schema = schema_jsonify(original_schema) >>> schema = {**{"title": "title", "description": "description"}, **schema} >>> pprint.pp(schema, compact=True, depth=3) {'title': 'title', 'description': 'description', 'type': 'object', 'properties': {'name': {'type': 'string'}, 'quantity': {'type': 'number'}, 'features': {'type': 'array', 'minItems': 1, 'maxItems': 2, 'uniqueItems': True, 'items': {...}}, 'creator': {'type': 'object', 'properties': {...}, 'required': [...]}}, 'required': ['name', 'quantity', 'features', 'creator']} """ schema: GenericNestedDict = {} is_dict = isinstance(obj, dict) # Case 1 (array-like) if obj and is_dict and isinstance(list(obj.keys())[0], int): field_len = list(obj.keys())[-1] schema = { "type": 'array', # coerced to object; includes tuple/list "minItems": 1, "maxItems": field_len, "uniqueItems": True } # Recurse on first item, assuming homogeneity for simplicity schema["items"] = schema_jsonify(obj[1]) # type: ignore [arg-type] return schema # Case 2 (dictionary) elif obj and is_dict: schema["type"] = "object" schema["properties"] = {} # Require all by default to easily edit later schema["required"] = list(obj.keys()) for key, val in obj.items(): # Recurse on each value schema["properties"][key] = schema_jsonify(val) # type: ignore [index, arg-type] return schema # Base cases (non-container types) # "str" -> "string" elif obj == "str": # type: ignore [comparison-overlap] schema["type"] = "string" return schema # "int"/"float" -> "number" elif obj in ("int", "float"): # type: ignore [comparison-overlap] schema["type"] = "number" return schema else: schema["type"] = "null" return schema
# -------------------------------------------- # --- Data processing base types and class --- # --------------------------------------------
[docs] class CustomTypes: """ Reusable custom field types. Whitespace around commas should be stripped before analysis. """ CSVstr = Annotated[str, Field(pattern=r'^[a-z, ]+$', description="Custom lowercase comma-separated string type. Excludes num and special chars")] CSVnumstr = Annotated[str, Field(pattern=r'^[a-z0-9,.! ]+$', description="Allows numerics")] CSVnumsent = Annotated[str, Field(pattern=r'^[a-z0-9,.! ]+$')]
# TODO implement BaseProcessor
[docs] class BaseProcessor: """_summary_ Parameters ---------- model : BaseModel _description_ query : NamedTuple _description_ """ def __init__(self, model: BaseModel, query: NamedTuple): self.query = query self.model = model
[docs] def retrieve(self): """ Retrieve data for the query from the API of the supplied model. Raises ------ NotImplementedError: _description_ """ ### Retrieval routine goes here ### raise NotImplementedError
# include return here? self assignment?
[docs] def process(self): """ Process (extract/clean) retrieved data. Raises ------ NotImplementedError: _description_ """ ### Processing routine goes here ### # TODO raise NotRetrieved error (try model.obj) raise NotImplementedError
def _validate(self): """ Validate the processed data against the supplied model. Raises ------ ValidationError: _description_ """ model = self.model model # try: # model(**self.data) # except ValidationError as e: # pprint.pp(e.errors()) print("Validated") return None
[docs] def to_df(self): """ Load the data into a dataframe for further processing or analysis. """ # Validate before loading self._validate() df = pd.DataFrame([self.data]) return df
if __name__ == "__main__": # Comment out (2) to run all tests in script; (1) to run specific tests doctest.testmod(verbose=True) # doctest_function(get_film_metadata, globs=globals()) ## One-off tests # type checks that compiler does not see/understand (run mypy on module) if TYPE_CHECKING: # reveal_type((1, 'hello')) pass