Source code for tests.test_api

# tests/test_api.py
"""Test core API."""
# standard library
import math

# third-party packages
import pandas as pd
import pytest

# local packages
import dcttools


# -------------- dcttools.depth ------------------
[docs]@pytest.mark.parametrize( ("dct", "expected_result"), [ ({}, 0), ({1: 1}, 1), ({1: 1, 2: 1}, 1), ({1: {1: 2}}, 2), ], ) def test_depth(dct, expected_result): """Test correct dcttools.depth functionality.""" assert dcttools.depth(dct) == expected_result
[docs]def test_depth_exception(): """Test the correct dcttools.depth functionality on wrong argument type.""" with pytest.raises(Exception): dcttools.depth([])
# -------------- dcttools.kfltr ------------------ kfltr_kwargs = { "t": {"cat1": 1, "cat2": 2, "cat3": 3}, "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "tweak_txt": "See 'case'", }
[docs]def test_kfltr_simple(): """Test the correct dcttools.kfltr functionality in simple usage.""" expected_result = { "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "tweak_txt": "See 'case'", } results = dcttools.kfltr(dcts=[kfltr_kwargs], fltr="tweak_") assert results[0] == expected_result
[docs]def test_kfltr_kwargs(): """Test the correct dcttools.kfltr functionality using kwargs.""" expected_result = { "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "tweak_txt": "See 'case'", "x": 10, } rslts = dcttools.kfltr(dcts=[kfltr_kwargs], fltr="tweak", xcptns=["x"], x=10) # flat aggregate results, sinnce kfltr puts additional kwargs into # additional dicts by design flat_results = dcttools.flaggregate(rslts) assert flat_results == expected_result
# -------------- dcttools.kfrep ------------------ kfrep_dct = {"txt": "hi", "s": 5, "kwarg": 1} kfrep_dflts = {"mst_hve": "yes", "first_kwarg": 0} kfrep_kwargs = {"first_txt": "there", "first_mst_hve": "no", "first_kwarg": 2}
[docs]def test_kfrep_kwargs_as_dict(): """Test dcttools.kfrep functionality providing kwargs as dict.""" expected_result = [ {"txt": "hi", "s": 5, "kwarg": 1}, {"mst_hve": "yes", "kwarg": 0}, {"txt": "there", "mst_hve": "no", "kwarg": 2}, ] # Include kwargs into the dcts argument and remove the first_ prefices # pylint: disable=unbalanced-tuple-unpacking dct, dflts, kwargs = dcttools.kfrep( dcts=[kfrep_dct, kfrep_dflts, kfrep_kwargs], fnd="first_" ) assert [dct, dflts, kwargs] == expected_result
[docs]def test_kfrep_kwargs_as_dict_not_altering(): """Test dcttools.kfrep not altering input providing kwargs as dict.""" expected_result = [ kfrep_dct.copy(), kfrep_dflts.copy(), kfrep_kwargs.copy(), ] # Include kwargs into the dcts argument and remove the first_ prefices dcttools.kfrep(dcts=[kfrep_dct, kfrep_dflts, kfrep_kwargs], fnd="first_") assert [kfrep_dct, kfrep_dflts, kfrep_kwargs] == expected_result
[docs]def test_kfrep_kwargs_as_kwargs_and_frep(): """Test dcttools.kfrep functionality providing kwargs and kfrepping.""" expected_result = [ {"txt": "hi", "s": 5, "kwarg": 1}, {"mst_hve": "yes", "first_kwarg": 0}, {"first_kwarg": 2, "second_txt": "there", "second_mst_hve": "no"}, ] # Provide kwargs as kwargs and replace first_ prefices with second_ # pylint: disable=unbalanced-tuple-unpacking dct, dflts, kwargs = dcttools.kfrep( dcts=[kfrep_dct, kfrep_dflts], fnd="first_", rplc="second_", xcptns="first_kwarg", **kfrep_kwargs ) assert [dct, dflts, kwargs] == expected_result
[docs]def test_kfrep_kwargs_as_kwargs_and_frep_not_altering(): """Test dcttools.kfrep providing kwargs and kfrepping not altering input.""" expected_result = [ kfrep_dct.copy(), kfrep_dflts.copy(), kfrep_kwargs.copy(), ] # Provide kwargs as kwargs and replace first_ prefices with second_ dcttools.kfrep( dcts=[kfrep_dct, kfrep_dflts], fnd="first_", rplc="second_", xcptns="first_kwarg", **kfrep_kwargs ) assert [kfrep_dct, kfrep_dflts, kfrep_kwargs] == expected_result
[docs]def test_kfrep_design_case(): """Test dcttools.kfrep design case.""" new_kfrep_kwargs = { "t": {"cat1": 1, "cat2": 2, "cat3": 3}, "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "tweak_txt": "See 'case'", } expected_result = { "case": {"cat1": "1", "cat2": "2", "cat3": 1}, "txt": "See 'case'", } # Chaining filter and find and replace utility to only get kwargs # previously containing a tweak_ prefix, deleting this prefix, so # the kwargs are ready to be passed to a third party API. third_party_api_kwargs = dcttools.kfrep( dcts=dcttools.kfltr(dcts=[new_kfrep_kwargs], fltr="tweak_"), fnd="tweak_" ) assert third_party_api_kwargs[0] == expected_result
# -------------- dcttools.kswap ------------------ kswap_nd = {"tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}}
[docs]def test_kswap(): """Test the correct dcttools.kswap functionality.""" expected_result = { "cat1": {"tweak_case": "1"}, "cat2": {"tweak_case": "2"}, "cat3": {"tweak_case": 1}, } results = dcttools.kswap(kswap_nd) assert results == expected_result
[docs]def test_kswap_not_altering(): """Test the correct dcttools.kswap functionality not altering the input.""" expected_result = kswap_nd.copy() dcttools.kswap(kswap_nd) assert kswap_nd == expected_result
[docs]def test_kswap_on_flat(): """Test dcttools.kswap on non-nested dicts.""" assert not dcttools.kswap({"cat1": "1", "cat2": "2", "cat3": 1})
[docs]def test_kswap_similar(): """Test the correct dcttools.kswap functionality on similar nested keys.""" nested_dct = { "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "use_case": {"cat1": 1, "cat2": 2, "cat3": 1}, } expected_result = { "cat1": {"tweak_case": "1", "use_case": 1}, "cat2": {"tweak_case": "2", "use_case": 2}, "cat3": {"tweak_case": 1, "use_case": 1}, } assert dcttools.kswap(nested_dct) == expected_result
[docs]def test_kswap_multiple(): """Test the correct dcttools.kswap functionality on multiple keys.""" nested_dct = { "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "use_case": {"chap1": 1, "chap2": 2, "chap3": 1}, } expected_result = { "cat1": {"tweak_case": "1"}, "cat2": {"tweak_case": "2"}, "cat3": {"tweak_case": 1}, "chap1": {"use_case": 1}, "chap2": {"use_case": 2}, "chap3": {"use_case": 1}, } assert dcttools.kswap(nested_dct) == expected_result
# -------------- dcttools.flaggregate ------------------ flagg_dct = {"txt": "hi", "s": 5, "kwarg": 1} flagg_dflts = {"mst_hve": "yes", "first_kwarg": 0} flagg_kwargs = {"first_txt": "hi there", "first_mst_hve": "no", "first_kwarg": 2}
[docs]def test_flaggregate_kwargs_as_kwargs(): """Test correct dcttools.flaggregate functionality using kwargs as kwargs.""" expected_result = { "first_kwarg": 2, "first_mst_hve": "no", "first_txt": "hi there", "kwarg": 1, "mst_hve": "yes", "s": 5, "txt": "hi", } # Aggregate dct, dflts and kwargs, providing kwargs as kwargs: result = dcttools.flaggregate([flagg_dct, flagg_dflts], **flagg_kwargs) assert result == expected_result
[docs]def test_flaggregate_kwargs_as_dicts(): """Test correct dcttools.flaggregate functionality using kwargs as dicts.""" expected_result = { "first_kwarg": 2, "first_mst_hve": "no", "first_txt": "hi there", "kwarg": 1, "mst_hve": "yes", "s": 5, "txt": "hi", } # Aggregate dct, dflts, and kwargs, providing kwargs as part of dcts: result = dcttools.flaggregate([flagg_dct, flagg_dflts, flagg_kwargs]) assert result == expected_result
[docs]def test_flaggregate_design(): """Test correct dcttools.flaggregate design case functionality.""" expected_result = {"txt": "hi there", "s": 5, "kwarg": 2, "mst_hve": "no"} # Aggregate dct dflts and kwargs with prior filtering. Note how python's # last word policy is kept results = dcttools.flaggregate( dcts=dcttools.kfrep(dcts=[flagg_dct, flagg_dflts], fnd="first_", **flagg_kwargs) ) assert results == expected_result
# -------------- dcttools.naggregate ------------------ nagg_dct = {"n1": {"txt": "hi", "s": 5}, "n3": {"s": 10}} nagg_dct_2 = {"n1": {"txt": "hey", "mst_hve": "yes"}, "n2": {"s": 2}} nagg_dct_3 = { "n2": { "txt": "there", }, "n3": {"s": 3}, "n4": {"s": 4}, }
[docs]def test_naggregate_two(): """Test correct dcttools.naggregate using two dicts.""" expected_result = { "n1": {"mst_hve": "yes", "s": 5, "txt": "hey"}, "n2": {"s": 2}, "n3": {"s": 10}, } # Aggregate two nested dicts. Note how the dct provided last overrides the # other keys: results = dcttools.naggregate(nstd_dcts=[nagg_dct, nagg_dct_2]) assert results == expected_result
[docs]def test_naggregate_three(): """Test correct dcttools.naggregate design case functionality.""" expected_result = { "n1": {"mst_hve": "yes", "s": 5, "txt": "hey"}, "n2": {"s": 2, "txt": "there"}, "n3": {"s": 3}, "n4": {"s": 4}, } # Aggregate three nested dicts. Note how the dct provided last overrides # the other keys: result = dcttools.naggregate(nstd_dcts=[nagg_dct, nagg_dct_2, nagg_dct_3]) assert result == expected_result
list_of_naggs = [nagg_dct, nagg_dct_2, nagg_dct_3]
[docs]@pytest.mark.parametrize( ("dcts", "original", "expected_result"), [ (list_of_naggs, nagg_dct.copy(), nagg_dct), (list_of_naggs, nagg_dct_2.copy(), nagg_dct_2), (list_of_naggs, nagg_dct_3.copy(), nagg_dct_3), ], ) def test_naggregate_not_altering(dcts, original, expected_result): """Test dcttools.naggregate not altering the input.""" dcttools.naggregate(nstd_dcts=dcts) assert original == expected_result
# -------------- dcttools.maggregate ------------------ magg_params = {"cat1": {"txt": "hi", "s": 5}, "cat2": {"txt": "hey", "s": 7}} magg_tlkys = ["cat1", "cat2", "cat3"] magg_kwargs = {"s": {"cat1": 1, "cat2": 2, "cat3": 3}}
[docs]def test_maggregate_defaults_as_dct(): """Test correct dcttools.maggregate using defaults as flat dict.""" expected_result = { "cat1": {"first_step": 13, "s": 5, "txt": "hi", "type": "default"}, "cat2": {"first_step": 13, "s": 7, "txt": "hey", "type": "default"}, } dflts = {"type": "default", "s": 3, "first_step": 13} # Using a non nested dict as defaults to ensure each kwarg is present in a # nested dict (supposedly) provided by an api: result = dcttools.maggregate( tlkys=list(magg_params.keys()), dcts=[dflts], nstd_dcts=[magg_params], ) assert result == expected_result
[docs]def test_maggregate_new_tlkeys_and_updated_values(): """Test correct dcttools.maggregate expaning toplevel and updating values.""" expected_result = { "cat1": {"s": 1, "txt": "hi"}, "cat2": {"s": 2, "txt": "hey"}, "cat3": {"s": 3}, } # Using tlkys for adding new nodes to nested dictionairy (supposedly) # provided by an api while using kwargs to provide and update entries: result = dcttools.maggregate( tlkys=magg_tlkys, nstd_dcts=[magg_params], **magg_kwargs ) assert result == expected_result
[docs]def test_maggregate_no_fall_back_values(): """Test correct dcttools.maggregate not using fallback values.""" expected_result = { "cat1": {"s": 1, "txt": "ovrrdn"}, "cat2": {"s": 2, "txt": "overrdn"}, "cat3": {"s": 3, "txt": None}, } magg_kwargs_fallback = { "s": {"cat1": 1, "cat2": 2, "cat3": 3}, "txt": {"cat1": "ovrrdn", "cat2": "overrdn"}, } # Not providing fall back defaults as flat dicts can lead to None # parameters if kwargs do not provide entries for each top level key: result = dcttools.maggregate( tlkys=magg_tlkys, nstd_dcts=[magg_params], **magg_kwargs_fallback ) assert result == expected_result
[docs]def test_maggregate_design(): """Test correct dcttools.maggregate design functionality.""" # suppose this is provided by the api your using (potentially huge) api_returns = { "cat1": {"txt": "hi", "s": 5}, "cat2": {"txt": "hey", "s": 7}, } # add yout own defaults dflts = {"s": 0} # Manipulate certain top level keys and parameters using kwargs. # By using your own prefix, you can hard code a set of predefined # behaviours unambiguously defined. design_kwargs = { "t": {"cat1": 1, "cat2": 2, "cat3": 3}, "tweak_case": {"cat1": "1", "cat2": "2", "cat3": 1}, "tweak_txt": "See 'case'", } # Filter only the kwargs needed in this call and replace the prefix to # match the api required keywords # pylint: disable=unbalanced-tuple-unpacking (des_kwargs,) = dcttools.kfrep( dcts=dcttools.kfltr(dcts=[design_kwargs], fltr="tweak_"), fnd="tweak_" ) # Aggregate all back into as nested dict as the api expects, using your # own defaults and alterations result = dcttools.maggregate( tlkys=magg_tlkys, dcts=[dflts], nstd_dcts=[api_returns], **des_kwargs ) expected_result = { "cat1": {"case": "1", "s": 5, "txt": "See 'case'"}, "cat2": {"case": "2", "s": 7, "txt": "See 'case'"}, "cat3": {"case": 1, "s": 0, "txt": "See 'case'"}, } assert result == expected_result
[docs]def test_maggregate_nstdkwargs_not_overriding_nstddicts(): """Test dcttools.maggregate not overriding nstd_dicts with nested kwargs.""" parameters = { "cat1": {"txt": "hi", "s": 5}, "cat2": {"txt": "hey", "s": 7}, } tlkys = parameters.keys() kwargs = { "s": { "cat1": 1, } } result = dcttools.maggregate(tlkys=tlkys, nstd_dcts=[parameters], **kwargs) expected_result = { "cat1": {"txt": "hi", "s": 1}, # overriden by kwargs "cat2": {"txt": "hey", "s": 7}, # not provided by kwargs } assert result == expected_result
[docs]def test_maggregate_fallback_on_kwargs_not_overriding(): """Test dcttools.maggregate using dcts fallback when using nested kwargs.""" parameters = { "cat1": {"txt": "hi", "s": 5}, "cat2": {"txt": "hey", "s": 7}, } tlkys = ["cat1", "cat2", "cat3"] kwargs = {"s": {"cat1": 1, "cat2": 2}} dflts = {"s": "default"} # defaults fallback is used, since kwarg does not provide for cat3 which # is required by 'tlkys' result = dcttools.maggregate( tlkys=tlkys, dcts=[dflts], nstd_dcts=[parameters], **kwargs ) expected_result = { "cat1": {"s": 1, "txt": "hi"}, "cat2": {"s": 2, "txt": "hey"}, "cat3": {"s": "default"}, } assert result == expected_result
# -------------- dcttools.to_dataframe ------------------ to_dataframe_mapping = { "flow_costs": 0, "co2_emissions": 0, "installed_capacity": 0, "accumulated_min": None, "accumulated_max": None, }
[docs]def test_to_dataframe_desing_case(): """Test dcttools.to_dataframe functionality.""" result_list = [ ["flow_costs", 0, "accumulated_min", None], ["co2_emissions", 0, "accumulated_max", None], [ "installed_capacity", 0, "empty string", "empty string", ], ] expected_result = pd.DataFrame( result_list, columns=2 * ["key", "value"], index=range(3) ) # Design Case (emulateing an empty string ('') with 'empty string' for # verbosity): result = dcttools.to_dataframe( to_dataframe_mapping, columns=2, fillvalue="empty string" ) assert result.equals(expected_result)
[docs]def test_to_dataframe_fill_values_and_index(): """Test dcttools.to_dataframe functionality.""" result_list = [ ["flow_costs", 0, "installed_capacity", 0.0, "accumulated_max", None], ["co2_emissions", 0, "accumulated_min", None, "es", "es"], ] cols = 3 # Using fill values and adding an Index: result = dcttools.to_dataframe( to_dataframe_mapping, columns=cols, fillvalue="es", index=pd.Index( [ "i" + str(i) for i in range(math.ceil(len(to_dataframe_mapping.keys()) / cols)) ] ), ) expected_result = pd.DataFrame( result_list, columns=3 * ["key", "value"], index=["i0", "i1"] ) assert result.equals(expected_result)