From 3d80516c317899583fee2041597339cc36007c59 Mon Sep 17 00:00:00 2001 From: dmt <> Date: Mon, 28 Oct 2019 21:39:25 +0100 Subject: [PATCH] Refactor reconstruction. --- cml/domain/reconstruction.py | 259 ++++++++++++++++++++++++----------- 1 file changed, 182 insertions(+), 77 deletions(-) diff --git a/cml/domain/reconstruction.py b/cml/domain/reconstruction.py index fef8f6e..1471d5d 100644 --- a/cml/domain/reconstruction.py +++ b/cml/domain/reconstruction.py @@ -2,10 +2,16 @@ from random import sample from collections import defaultdict from dataclasses import dataclass from functools import partial +from typing import Union, List, Tuple, Generator, Dict -import krippendorff +from krippendorff import alpha -from cml.shared.errors import NoModelReconstructedError +from cml.shared.settings import ReconstructionSettings +from cml.domain.data_source import DataSource +from cml.shared.errors import ( + NoModelReconstructedError, + NotEnoughFeaturesWarning +) __all__ = ( @@ -19,127 +25,198 @@ class Metadata: knowledge_tier: int identifier: int pre_image: list + pre_image_features: list + pre_image_labels: list t_min: int t_max: int - sigma: list - zeta: list + sigma: tuple + zeta: tuple def __str__(self): - return f"Knowledge domain: <{self.knowledge_domain}> " \ - f"Knowledge tier: <{self.knowledge_tier}> " \ - f"Identifier: <{self.identifier}> " \ - f"Pre image: <{self.pre_image}> " \ - f"T min: <{self.t_min}> " \ - f"T max: <{self.t_max}> " \ - f"Subjects: <{self.sigma}> " \ - f"Puposes: <{self.zeta}>" + return f"Knowledge domain: <{self.knowledge_domain}> \n" \ + f"Knowledge tier: <{self.knowledge_tier}> \n" \ + f"Identifier: <{self.identifier}> \n" \ + f"Pre image: <{self.pre_image}> \n" \ + f"Pre image labels: <{self.pre_image_labels}> \n" \ + f"Pre image features: <{self.pre_image_features}> \n" \ + f"T min: <{self.t_min}> \n" \ + f"T max: <{self.t_max}> \n" \ + f"Subjects: <{self.sigma}> \n" \ + f"Puposes: <{self.zeta}> \n" + + def __hash__(self): + return hash("".join([self.knowledge_domain, + str(self.knowledge_tier), + str(self.identifier)])) class PragmaticMachineLearningModel: - def __init__(self, meta, model, learnblock): + def __init__(self, + meta: Metadata, + model, + learnblock): self.meta = meta self.model = model self.domain_size = learnblock.n_features + self.learnblock = learnblock if self.tier > 1 else None self.domain = learnblock.indexes + self.origin = learnblock.origin - def __hash__(self): - return hash(self.uid) + def __str__(self) -> str: + return self.uid + + def __repr__(self) -> str: + return self.uid - def __eq__(self, other): + def __hash__(self) -> int: + return hash(self.meta) + + def __eq__(self, + other: Union['PragmaticMachineLearningModel', str]) -> bool: if isinstance(other, PragmaticMachineLearningModel): return hash(self) == hash(other) + + if isinstance(other, str): + return hash(self) == hash(other) + raise NotImplementedError() @property - def tier(self): + def pre_image_features(self) -> List[str]: + return self.meta.pre_image_features + + @property + def tier(self) -> int: return self.meta.knowledge_tier @property - def min_timestamp(self): + def min_timestamp(self) -> int: return self.meta.t_min @property - def max_timestamp(self): + def max_timestamp(self) -> int: return self.meta.t_max @property - def pre_image(self): + def pre_image(self) -> List[int]: return self.meta.pre_image @property - def subject(self): + def pre_image_labels(self) -> List[Union[int, str]]: + return self.meta.pre_image_labels + + @property + def subject(self) -> Tuple[str]: return self.meta.sigma @property - def purpose(self): + def purpose(self) -> Tuple[str]: return self.meta.zeta @property - def uid(self): + def uid(self) -> str: return ".".join([self.meta.knowledge_domain, str(self.meta.knowledge_tier), str(self.meta.identifier)]) - @property - def sample_times(self): - pass + def fusion(self, + model: 'PragmaticMachineLearningModel', + new_identifier: int) -> Metadata: + + return Metadata(self.meta.knowledge_domain, + self.meta.knowledge_tier, + new_identifier, + self.pre_image + model.pre_image, + list(set(self.pre_image_features).intersection( + set(model.pre_image_features))), + self.pre_image_labels + model.pre_image_labels, + min(self.meta.t_min, model.min_timestamp), + max(self.meta.t_max, model.max_timestamp), + self.subject + model.subject, + self.meta.zeta + model.subject) + + def trained_with(self, source: DataSource): + if self.origin == "source": + block = source.get_block(self.pre_image, + columns=self.pre_image_features) + block = block.set_labels(self.pre_image_labels) + return block - def fusion(self, prag_model): - pass + else: + return self.learnblock class Reconstructor: - def __init__(self, settings, ml_models, knowlege_domain): - self.logger = None + def __init__(self, + settings: ReconstructionSettings, + ml_models: List, + knowlege_domain: str): self.settings = settings self.ml_models = ml_models self.knowledge_domain = knowlege_domain + self.logger = None self._category = None self._free_id = None self.__reconstruction = None - def reconstruct(self, learnblock, which_models=None, meta=None): - if not which_models: - which_models = [m.abbreviation for m in self.ml_models] - - reliabilities_to_model = self.__reconstruction(learnblock, - which_models, - meta) - if reliabilities_to_model.keys(): - return determine_winner(reliabilities_to_model) - raise NoModelReconstructedError() - @property - def category(self): + def category(self) -> str: return self._category @category.setter - def category(self, value): + def category(self, value: str) -> None: if value == "conceptual": self.__reconstruction = partial(self._reconstruct_conceptual, krippen="nominal") elif value == "procedural": self.__reconstruction = partial(self._reconstruct_procedural, krippen="ratio") - else: - raise ValueError() + else: raise ValueError() + self._category = value @property - def free_id(self): + def free_id(self) -> Generator[int, None, None]: return self._free_id @free_id.setter - def free_id(self, value): + def free_id(self, value: Generator[int, None, None]) -> None: self._free_id = iter(value) + def reconstruct(self, + tier: int, + learnblock, + which_models: List = None, + meta: Metadata = None) -> PragmaticMachineLearningModel: + + # Check if learnblock has enough features + if not (learnblock.learn_rows > 0): + raise NotEnoughFeaturesWarning() + + # Specify the models which should be trained + if not which_models: + which_models = [m.subject for m in self.ml_models] + + # Start the reconstruction + reliabilities_to_model = self.__reconstruction(tier, + learnblock, + which_models, + meta=meta) + + # Determine the best pragmatic machine learning model + if reliabilities_to_model.keys(): + return determine_winner(reliabilities_to_model) + raise NoModelReconstructedError() + def _reconstruct_conceptual(self, + tier: int, learnblock, - which_models, - krippen=None, - meta=None): + which_models: List, + krippen: str = None, + meta: Metadata = None): + reliability_to_model = defaultdict(list) for model in self.ml_models: - if model.abbreviation not in which_models: continue + if model.subject not in which_models: continue # train model train_block, eval_block = self.split(learnblock) @@ -149,21 +226,24 @@ class Reconstructor: # check constraints if self._valid_reconstructed(trained_model, "conceptual"): - reliability = self.calc_reliability(trained_model, - learnblock, - krippen) + reliability = self.calc_reliability( + trained_model, learnblock, krippen) if reliability >= self.settings.min_reliability: - # TODO (dmt): Fix the knowledge tier after first iteration! - prag_meta_data = Metadata( - "C", - 1, - next(self.free_id), - learnblock.indexes, - learnblock.min_timestamp, - learnblock.max_timestamp, - [model.subject], - [".".join(["C", '1', learnblock.purpose])] - ) + if not meta: + prag_meta_data = Metadata( + "C", + tier, + next(self.free_id), + learnblock.indexes, + learnblock.columns, + learnblock.get_column_values("Z"), + learnblock.min_timestamp, + learnblock.max_timestamp, + (model.subject, ), + (".".join(["C", '1', learnblock.purpose]), ) + ) + else: + prag_meta_data = meta reliability_to_model[reliability].append( PragmaticMachineLearningModel(prag_meta_data, @@ -171,9 +251,15 @@ class Reconstructor: learnblock)) return reliability_to_model - def _reconstruct_procedural(self, learnblock, krippen=None, meta=None): + def _reconstruct_procedural(self, + tier: int, + learnblock, + which_models: List, + krippen: str = None, + meta: Metadata= None) -> Dict[float, List]: reliability_to_model = defaultdict(list) for model in self.ml_models: + if model.subject not in which_models: continue # train model train_block, eval_block = self.split(learnblock) @@ -183,17 +269,32 @@ class Reconstructor: # check contraints if self._valid_reconstructed(trained_model, "procedural"): - reliability = self.calc_reliability(trained_model, - learnblock, - krippen) + reliability = self.calc_reliability( + trained_model, learnblock, krippen) if reliability >= self.settings.min_reliability: + if not meta: + prag_meta_data = Metadata( + "P", + tier, + next(self.free_id), + learnblock.indexes, + learnblock.columns, + learnblock.get_column_values("Z"), + learnblock.min_timestamp, + learnblock.max_timestamp, + (model.subject, ), + (".".join(["C", '1', learnblock.purpose]), ) + ) + else: + prag_meta_data = meta reliability_to_model[reliability].append( - PragmaticMachineLearningModel(trained_model, + PragmaticMachineLearningModel(prag_meta_data, + trained_model, learnblock)) return reliability_to_model - def split(self, learnblock): + def split(self, learnblock) -> Tuple: indices = learnblock.indexes eval_size = int(learnblock.length * self.settings.reliability_sample) eval_idx = sample(indices, eval_size) @@ -201,14 +302,18 @@ class Reconstructor: return learnblock.new_block_from_rows_index(train_idx), \ learnblock.new_block_from_rows_index(eval_idx) - def calc_reliability(self, trained_model, eval_block, metric): + def calc_reliability(self, + trained_model, + eval_block, + metric: str) -> float: y_pre = trained_model.predict(eval_block.as_numpy_array()) y_true = [i for i in eval_block.get_column_values("Z")] reliability_data = [y_pre, y_true] - return krippendorff.alpha(reliability_data, - level_of_measurement=metric) + return alpha(reliability_data, level_of_measurement=metric) - def _valid_reconstructed(self, model, knowledge_domain): + def _valid_reconstructed(self, + model, + knowledge_domain: str) -> bool: if knowledge_domain == "conceptual": return model.accuracy >= self.settings.min_test_accuracy else: @@ -216,10 +321,10 @@ class Reconstructor: model.max_error <= self.settings.max_test_error_max -def determine_winner(reliability_to_model): +def determine_winner( + reliability_to_model: dict) -> PragmaticMachineLearningModel: sorted_reliabilities = sorted(reliability_to_model.keys(), reverse=True) biggest_reliabilities = reliability_to_model[sorted_reliabilities.pop()] - winner = None min_domain = float("inf") for model in biggest_reliabilities: -- GitLab