diff --git a/cml/domain/reconstruction.py b/cml/domain/reconstruction.py index c114b825e22c580f2b8d94f2ce9e6b674c4affaa..14a7dd387eea681d12fd6e763a0c73d56e92b6a9 100644 --- a/cml/domain/reconstruction.py +++ b/cml/domain/reconstruction.py @@ -1,3 +1,10 @@ +from random import sample +from collections import defaultdict +from functools import partial + +import krippendorff + +from cml.shared.errors import NoModelReconstructedError __all__ = ( @@ -5,6 +12,117 @@ __all__ = ( ) +class PragmaticMachineLearningModel: + def __init__(self, model, learnblock): + self.model = model + self.domain_size = learnblock.n_features + + class Reconstructor: - def __init__(self): + def __init__(self, settings, ml_models, knowlege_domain): self.logger = None + self.settings = settings + self.ml_models = ml_models + self.knowledge_domain = knowlege_domain + self._category = None + self.__reconstruction = None + + def reconstruct(self, learnblock): + reliabilities_to_model = self.__reconstruction(learnblock) + if reliabilities_to_model.keys(): + return determine_winner(reliabilities_to_model) + raise NoModelReconstructedError() + + @property + def category(self): + return self._category + + @category.setter + def category(self, value): + if value == "conceptual": + self.__reconstruction = partial(self._reconstruct_conceptual, + krippen="nominal") + elif value == "procedural": + self.__reconstruction = partial(self._reconstruct_procedural, + krippen="ratio") + else: + raise ValueError() + + def _reconstruct_conceptual(self, learnblock, krippen=None): + reliability_to_model = defaultdict(list) + for model in self.ml_models: + + # train model + train_block, eval_block = self.split(learnblock) + trained_model = model.train( + train_block.as_numpy_array(), + [i for i in train_block.get_column_values("Z")]) + + # check constraints + if self._valid_reconstructed(trained_model, "conceptual"): + reliability = self.calc_reliability(trained_model, + learnblock, + krippen) + if reliability >= self.settings.min_reliability: + reliability_to_model[reliability].append( + PragmaticMachineLearningModel(trained_model, + learnblock)) + return reliability_to_model + + def _reconstruct_procedural(self, learnblock, krippen=None): + reliability_to_model = defaultdict(list) + for model in self.ml_models: + + # train model + train_block, eval_block = self.split(learnblock) + trained_model = model.train( + train_block.as_numpy_array(), + [i for i in train_block.get_column_values("Z")]) + + # check contraints + if self._valid_reconstructed(trained_model, "procedural"): + reliability = self.calc_reliability(trained_model, + learnblock, + krippen) + if reliability >= self.settings.min_reliability: + reliability_to_model[reliability].append( + PragmaticMachineLearningModel(trained_model, + learnblock)) + + return reliability_to_model + + def split(self, learnblock): + indices = learnblock.indexes + eval_size = int(learnblock.length * self.settings.reliability_sample) + eval_idx = sample(indices, eval_size) + train_idx = list(set(indices).difference(set(eval_idx))) + return learnblock.new_block_from_rows_index(train_idx), \ + learnblock.new_block_from_rows_index(eval_idx) + + def calc_reliability(self, trained_model, eval_block, metric): + y_pre = trained_model.predict(eval_block.as_numpy_array()) + y_true = [i for i in eval_block.get_column_values("Z")] + reliability_data = [y_pre, y_true] + return krippendorff.alpha(reliability_data, + level_of_measurement=metric) + + def _valid_reconstructed(self, model, knowledge_domain): + if knowledge_domain == "conceptual": + return model.accuracy >= self.settings.min_test_accuracy + else: + return model.mean_error <= self.settings.max_test_error_avg and \ + model.max_error <= self.settings.max_test_error_max + + +def determine_winner(reliability_to_model): + sorted_reliabilities = sorted(reliability_to_model.keys(), reverse=True) + biggest_reliabilities = reliability_to_model[sorted_reliabilities.pop()] + + winner = None + min_domain = float("inf") + for model in biggest_reliabilities: + if model.domain_size < min_domain: + min_domain = model.domain_size + winner = model + + return winner