From 5f9d463b9f74f9babc65e42e95e28ccce2d49d4b Mon Sep 17 00:00:00 2001 From: dmt <> Date: Mon, 28 Oct 2019 21:39:42 +0100 Subject: [PATCH] Refactor deconstruction. --- cml/domain/deconstruction.py | 518 ++++++++++++++++++++++++++--------- 1 file changed, 391 insertions(+), 127 deletions(-) diff --git a/cml/domain/deconstruction.py b/cml/domain/deconstruction.py index a08fd16..65f9487 100644 --- a/cml/domain/deconstruction.py +++ b/cml/domain/deconstruction.py @@ -2,9 +2,20 @@ from abc import ABC from functools import partial from collections import defaultdict from itertools import count +from queue import Queue +from typing import Tuple, Optional +import krippendorff + +from cml.domain.data_source import DataSource +from cml.domain.reconstruction import PragmaticMachineLearningModel +from cml.shared.settings import DeconstructionSettings from cml.shared.parameter import HIGHEST_TIER -from cml.shared.errors import DeconstructionFailed, NoModelReconstructedError +from cml.shared.errors import ( + DeconstructionFailed, + NoModelReconstructedError, + NotEnoughFeaturesWarning +) __all__ = ( @@ -17,12 +28,14 @@ __all__ = ( "KnowledgeDomain" ) +TS_QUEUE = Queue() + def notify_inverted_index(func): def wrapper(self, *args, **kwargs): for obs in self.observer: getattr(obs, func.__name__)(*args, **kwargs) - return func(*args, **kwargs) + return func(self, *args, **kwargs) return wrapper @@ -30,28 +43,56 @@ class KnowledgeDatabase(ABC): def __init__(self): self.database = [KnowledgeDomain(i) for i in range(8)] self.observer = [] + self.n_models = 0 super().__init__() - def generate_free_ids(self): - for i in count(1): - yield i - - def deserialize(self): pass - - def serialize(self): pass + def __contains__(self, item: PragmaticMachineLearningModel): + if not isinstance(item, PragmaticMachineLearningModel): + raise TypeError() + try: + self.get(item.uid) + return True + except KeyError: + return False @notify_inverted_index - def extend(self): pass + def insert(self, model: PragmaticMachineLearningModel): + if model not in self: + self.database[model.tier].insert(model) + self.n_models += 1 @notify_inverted_index - def insert(self, model): - self.database[model.knowledge_domain].insert(model) + def remove(self, model: PragmaticMachineLearningModel): + self.database[model.tier].remove(model) + self.n_models -= 1 @notify_inverted_index - def remove(self): pass + def replace(self, + replaced: PragmaticMachineLearningModel, + replacer: PragmaticMachineLearningModel): + self.database[replaced.tier].replace(replaced, replacer) @notify_inverted_index - def replace(self): pass + def extend(self): pass + + def get(self, uid: str): + _, tier, _ = uid.split(".") + return self.database[int(tier)].get(uid) + + def deserialize(self): pass + + def serialize(self): pass + + def generate_free_ids(self): + for i in count(1): + yield i + + def remove_dependent_models(self, + relative_model: PragmaticMachineLearningModel): + for domain in self.database: + for model in domain.knowledge.values(): + if model.origin == relative_model.origin: + self.remove(model) class ConceptualKnowledgeDatabase(KnowledgeDatabase): @@ -65,30 +106,104 @@ class ProceduralKnowledgeDatabase(KnowledgeDatabase): class KnowledgeDomain: - def __init__(self, tier): + def __init__(self, tier: int): self.tier = tier self.knowledge = {} - def insert(self, model): - self.knowledge[model] = model + def get(self, uid: str): + return self.knowledge[uid] + def insert(self, model: PragmaticMachineLearningModel): + self.knowledge[model] = model -class RelativeFinder: - def __init__(self): - self.index_t = defaultdict(list) - self.index_z = defaultdict(list) - self.index_sigma = defaultdict(list) - - def find_relatives(self): - # TODO (dmt): If something is found return it, if nohtings is found - # then return None and then raise StopIteration! - pass + def remove(self, model: PragmaticMachineLearningModel): + del self.knowledge[model] - def remove(self): pass + def replace(self, + replaced: PragmaticMachineLearningModel, + replacer: PragmaticMachineLearningModel): + del self.knowledge[replaced] + self.knowledge[replacer] = replacer - def replace(self): pass - def insert(self, model): pass +class RelativeFinder: + def __init__(self): + self.index_t = defaultdict(set) + self.index_z = defaultdict(set) + self.index_sigma = defaultdict(set) + + def find(self, + pair: Tuple[str, Optional[str]], + model: PragmaticMachineLearningModel): + + if pair == ("T", "Z"): + set_t = self.index_t[(model.min_timestamp, model.max_timestamp)] + set_z = self.index_z[model.purpose] + relatives = set_t.intersection(set_z) + elif pair == ("T", "Sigma"): + set_t = self.index_t[(model.min_timestamp, model.max_timestamp)] + set_s = self.index_sigma[model.subject] + relatives = set_t.intersection(set_s) + elif pair == ("Sigma", "Z"): + set_s = self.index_sigma[model.subject] + set_z = self.index_z[model.purpose] + relatives = set_s.intersection(set_z) + elif pair == ("complete", ): + set_t = self.index_t[(model.min_timestamp, model.max_timestamp)] + set_s = self.index_sigma[model.subject] + set_z = self.index_z[model.purpose] + relatives = set_t.intersection(set_s).intersection(set_z) + else: raise ValueError + + if model in relatives: + relatives.remove(model) + for relative in relatives: + yield relative + if not relatives: + yield None + + def remove(self, model: PragmaticMachineLearningModel): + t_list, s_list, z_list = self.get_index_lists(model, + time=True, + subject=True, + purpose=True) + t_list.remove(model) + s_list.remove(model) + z_list.remove(model) + + def replace(self, + replaced: PragmaticMachineLearningModel, + replacer: PragmaticMachineLearningModel): + t_list, s_list, z_list = self.get_index_lists(replaced, + time=True, + subject=True, + purpose=True) + t_list.remove(replaced) + s_list.remove(replaced) + z_list.remove(replaced) + self.index_t[(replacer.min_timestamp, replacer.max_timestamp)].add( + replacer) + self.index_z[replacer.purpose].add(replacer) + self.index_sigma[replacer.subject].add(replacer) + + def insert(self, model: PragmaticMachineLearningModel): + self.index_t[(model.min_timestamp, model.max_timestamp)].add(model) + self.index_z[model.purpose].add(model) + self.index_sigma[model.subject].add(model) + + def get_index_lists(self, + model: PragmaticMachineLearningModel, + time: bool, + subject: bool, + purpose: bool): + t_list = s_list = z_list = None + if time: + t_list = self.index_t[(model.min_timestamp, model.max_timestamp)] + if subject: + s_list = self.index_sigma[model.subject] + if purpose: + z_list = self.index_z[model.purpose] + return t_list, s_list, z_list def extend(self): pass @@ -98,99 +213,184 @@ class Deconstructor: SUBJECT_COLUMN = "Sigma" PURPOSE_COLUMN = "Z" - def __init__(self, knowledge_database, relative_finder, settings): + def __init__(self, + knowledge_database: KnowledgeDatabase, + relative_finder: RelativeFinder, + source: DataSource, + settings: DeconstructionSettings): + self.knowledge_database = knowledge_database self.relative_finder = relative_finder self.settings = settings - self.source = None + self.source = source + self.logger = None self.reconstructor = None + self.free_id = None def _strategies(self, block): yield (("T", "Z"), partial(self.deconstruct_time_zeta, block=block)) + yield (("Sigma", "Z"), partial(self.deconstruct_sigma_zeta, block=block)) yield (("T", "Sigma"), partial(self.deconstruct_time_sigma, block=block)) - yield (("Sigma", "Z"), self.deconstruct_sigma_zeta) - yield (("complete", ), self.deconstruct_complete) + yield (("complete", ), partial(self.deconstruct_complete, block=block)) - def deconstruct(self, prag_model, learnblock): + def deconstruct(self, + tier: int, + prag_model: PragmaticMachineLearningModel, + learnblock) -> None: for pair, strategy in self._strategies(learnblock): - for relative in self.relative_finder(pair): + for relative in self.relative_finder.find(pair, prag_model): try: - strategy(prag_model, relative) - # successfull deconstruction + strategy(tier, prag_model, relative) if self.settings.deconst_mode == "minimal": return - except DeconstructionFailed: - # unsuccessfull deconstruction + except (DeconstructionFailed, NoModelReconstructedError): continue - - def deconstruct_time_sigma(self, prag_model, relative_model, block): + except Exception as error: + print(error.with_traceback()) + + def deconstruct_time_sigma(self, + tier: int, + p_model: PragmaticMachineLearningModel, + r_model: PragmaticMachineLearningModel, + block): + success = False + self.knowledge_database.insert(p_model) + if r_model and p_model.tier < HIGHEST_TIER-1: + + # Get learnblock that trained relative model + second_block = r_model.trained_with(self.source) + + # Get samples that have overlapping timestamp + over_block = block.overlapping_rows(second_block, subset=["T"]) + + # Check rows constraint + if over_block.rows >= self.settings.learn_block_minimum: + + # Calculate reliability (which block???) + alpha = self.calc_reliability(r_model, p_model, block) + + #alpha_systematic = alpha < 0 + #alpha_weak_reliability = 0 <= alpha < self.settings.min_reliability + #if (self.settings.allow_weak_reliability and + # alpha_weak_reliability) or alpha_systematic: + + if self.settings.allow_weak_reliability and \ + alpha > self.settings.min_reliability: + + # Create learnblock from the aim values of the overlapping + # samples + data = list(zip(over_block.get_column_values("Z"), + over_block.get_column_values("T"), + ["\"\"" for _ in range(over_block.rows)], + ["\"\"" for _ in range(over_block.rows)])) + feature = ".".join(["0", str(tier+1), "1"]) + columns = [feature, "T", "Sigma", "Z"] + source = self.source.new_learnblock( + values=data, columns=columns, index=over_block.indexes, + origin=[p_model.uid, r_model.uid]) + TS_QUEUE.put((tier+1, source)) + success = True + + if not success: + raise DeconstructionFailed() + + def deconstruct_time_zeta(self, + tier: int, + prag_model: PragmaticMachineLearningModel, + relative_model: PragmaticMachineLearningModel, + block): + success = False self.knowledge_database.insert(prag_model) - if relative_model and prag_model.tier < HIGHEST_TIER: - first_block = self.source.get_block(prag_model.pre_image) - second_block = self.source.get_block(relative_model.pre_image) - times_p = set(first_block.get_column_values(self.TIME_COLUMN)) - times_r = set(second_block.get_column_values(self.TIME_COLUMN)) - if len(times_p.union(times_r)) >= self.settings.min_learnblock: - alpha = self.calc_reliability(relative_model, prag_model, block) - alpha_systematic = alpha < 0 - alpha_weak_reliability = 0 <= alpha < self.settings.min_reliability - if (self.settings.allow_weak_reliability and - alpha_weak_reliability) or alpha_systematic: - new_model = prag_model.fusion(relative_model) - self.knowledge_database.insert(new_model) - # Create a new learnblock from times_p.union(times_r) - # Starte eine neure Folge von Construction, Reconstruction - # Deconstruction from this one! - else: - self.knowledge_database.insert(prag_model) + if relative_model: + # Get learnblock that trained relative model + second_block = relative_model.trained_with(self.source) - def deconstruct_time_zeta(self, prag_model, relative_model, block): - self.knowledge_database.insert(prag_model) - first_block = self.source.get_block(prag_model.pre_image) - second_block = self.source.get_block(relative_model.pre_image) - times_p = set(first_block.get_column_values(self.TIME_COLUMN)) - times_r = set(second_block.get_column_values(self.TIME_COLUMN)) - if len(times_p.union(times_r)) >= self.settings.min_learnblock: - new_model = prag_model.fusion(relative_model) - alpha = self.calc_reliability(relative_model, prag_model, block) - if alpha >= self.settings.min_reliability: - self.knowledge_database.replace(relative_model, new_model) - - def deconstruct_sigma_zeta(self, prag_model, relative_model): - if relative_model and self.time_constraint(prag_model, - relative_model, - "SigmaZ"): - first_block = self.source.get_block(prag_model.pre_image) - second_block = self.source.get_block(relative_model.pre_image) - overlapping_block = first_block.overlapping_rows(second_block) + # Get samples that have overlapping timestamp + over_block = block.overlapping_rows(second_block, subset=["T"]) + if over_block.rows >= self.settings.learn_block_minimum: + + # Create new metadata for a pragmatic model + new_model = prag_model.fusion(relative_model, + next(self.free_id)) + + # Which models should be used for the reconstruction + which_ml_models = new_model.sigma + + # Get learningblock + train_block = block.fusion(second_block) + + # Start the reconstruction + try: + recon_model = self.reconstructor.reconstruct( + tier, train_block, which_ml_models, new_model) + self.knowledge_database.replace(relative_model, recon_model) + success = True + except (NoModelReconstructedError, NotEnoughFeaturesWarning): + pass + except ValueError as error: + print(error.with_traceback()) + + # alpha = self.calc_reliability(relative_model, prag_model, block) + # if alpha >= self.settings.min_reliability: + + if not success: raise DeconstructionFailed() + + def deconstruct_sigma_zeta(self, + tier: int, + p_model: PragmaticMachineLearningModel, + r_model: PragmaticMachineLearningModel, + block): + success = False + + if r_model and self.time_constraint(p_model, r_model, "SigmaZ"): + + # Get learnblock that trained relative model + second_block = r_model.trained_with(self.source) + + # Get samples that have overlapping rows + overlapping_block = block.overlapping_rows(second_block) + # Check constraint if overlapping_block.rows >= 2: - new_model = prag_model.fusion(relative_model) + + # Model fusion + new_model = p_model.fusion(r_model, next(self.free_id)) + which_ml_models = new_model.sigma + + # Get learnblock + train_block = block.fusion(second_block) + try: - new_block = first_block.fusion(second_block) - which_ml_models = new_model.subject - recon_m = self.reconstructor.reconstruct(new_block, - which_ml_models, - new_model) - self.knowledge_database.replace(relative_model, recon_m) - - except NoModelReconstructedError: + # Reconstruct model + recon_m = self.reconstructor.reconstruct( + tier, train_block, which_ml_models, new_model) + + self.knowledge_database.replace(r_model, recon_m) + success = True + + except (NoModelReconstructedError, NotEnoughFeaturesWarning): if self.settings.deconst_mode == "conservative": - self.knowledge_database.remove(prag_model) + self.knowledge_database.remove(p_model) elif self.settings.deconst_mode == "integrative": - self.knowledge_database.insert(prag_model) + self.knowledge_database.insert(p_model) elif self.settings.deconst_mode == "oppurtunistic": - if first_block.rows > second_block.rows: - self.knowledge_database.remove(relative_model) - self.knowledge_database.insert(prag_model) + if block.rows > second_block.rows: + self.knowledge_database.remove(r_model) + self.knowledge_database.insert(p_model) else: - self.knowledge_database.remove(prag_model) - self.knowledge_database.insert(relative_model) - else: - self.knowledge_database.insert(prag_model) + self.knowledge_database.remove(p_model) + self.knowledge_database.insert(r_model) + + if not success: + self.knowledge_database.insert(p_model) + raise DeconstructionFailed() + + def time_constraint(self, + prag_model: PragmaticMachineLearningModel, + relative_model: PragmaticMachineLearningModel, + _type: str): - def time_constraint(self, prag_model, relative_model, _type): if _type == "SigmaZ": if self.settings.deconst_max_distance_t == 0: return self._overlapping_time(prag_model, relative_model) @@ -206,7 +406,10 @@ class Deconstructor: or prag_model.min_timestamp <= relative_model.min_timestamp \ and prag_model.max_timestamp >= relative_model.max_timestamp - def _overlapping_time(self, prag_model, relative_model): + def _overlapping_time(self, + prag_model: PragmaticMachineLearningModel, + relative_model: PragmaticMachineLearningModel): + return relative_model.min_timestamp >= prag_model.max_timestamp or \ prag_model.min_timestamp >= relative_model.max_timestamp @@ -222,41 +425,102 @@ class Deconstructor: relative_m_dash_time or (prag_model.min_timestamp - relative_model.max_timestamp) < relative_m_dash_time - def deconstruct_complete(self, prag_model, relative_model): - if relative_model: - if self.time_constraint(prag_model, relative_model, "complete") and \ - self._feature_intersection(prag_model, relative_model) >= 2: - new_model = prag_model.fusion(relative_model) - else: - new_block = self.source.get_block( - prag_model.pre_image + relative_model.pre_image) - ts_relatives = self.source.time_simga_relatives(new_block) - which_ml_models = prag_model.subject + relative_model.subject - new_model = self.reconstructor.reconstruct(ts_relatives, - which_ml_models) - - new_block = self.source.get_block(new_model.pre_image) - which_ml_models = new_block.subject + def deconstruct_complete(self, + tier: int, + p_model: PragmaticMachineLearningModel, + r_model: PragmaticMachineLearningModel, + block): + success = False + + try: + # Check feature intersection constraint + if r_model and self._feature_intersection(p_model, r_model) >= 2: + new_model = p_model.fusion(r_model, next(self.free_id)) + + # Check time contraint + elif r_model and self.time_constraint(p_model, r_model, "complete"): + + # Create submodel from TSgima relative samples + second_block = r_model.trained_with(self.source) + new_block = block.fusion(second_block) + ts_relatives = self.source.time_sigma_relatives(new_block) + which_ml_models = p_model.subject + r_model.subject + self.reconstructor.reconstruct( + tier, ts_relatives, which_ml_models) + new_model = p_model.fusion(r_model, next(self.free_id)) + else: return + + # Create learnblock + first_block = p_model.trained_with(self.source) + second_block = r_model.trained_with(self.source) + new_block = first_block.fusion(second_block) + which_ml_models = new_model.sigma try: - recon_model = self.reconstructor.reconstruct(new_block, - which_ml_models, - new_model) - self.knowledge_database.remove(relative_model) + # Reconstruct model + recon_model = self.reconstructor.reconstruct( + tier, new_block, which_ml_models, new_model) + self.knowledge_database.remove(r_model) self.knowledge_database.insert(recon_model) + success = True + + except (NoModelReconstructedError, NotEnoughFeaturesWarning): + success = self.model_differentiation(tier, new_block, r_model) + + except (NoModelReconstructedError, NotEnoughFeaturesWarning): + self.knowledge_database.remove(r_model) + self.knowledge_database.insert(p_model) + + finally: + if not success: + raise DeconstructionFailed() + + def model_differentiation(self, + tier: int, + block, + relative_model: PragmaticMachineLearningModel): + success = False + + time_column = block.get_column_values("T") + density = self.source.estimate_density(time_column) + self.source.remove_time_dense_relatives(block, density) + clusters = self.source.cluster(block, density) + for time_values in clusters: + learnblock = block.new_block_from(time_values) + try: + reconstructed_model = self.reconstructor.reconstruct( + tier, learnblock) + self.knowledge_database.insert(reconstructed_model) + success = True except NoModelReconstructedError: - # TODO (dmt): Implement the model differentiation! - pass + self.knowledge_database.remove_dependent_models(relative_model) - else: - self.knowledge_database.insert(prag_model) + if not success: + self.knowledge_database.remove(relative_model) + + return success - def _feature_intersection(self, prag_model, relative_model): - first_block = self.source.get_block(prag_model.pre_image) - second_block = self.source.get_block(relative_model.pre_image) + def _feature_intersection(self, + prag_model: PragmaticMachineLearningModel, + relative_model: PragmaticMachineLearningModel): + first_block = prag_model.trained_with(self.source) + second_block = relative_model.trained_with(self.source) return len( set(first_block.columns).intersection(set(second_block.columns)) ) - def calc_reliability(self, model_a, model_b, block): - return 0 + def calc_reliability(self, + model_a: PragmaticMachineLearningModel, + model_b: PragmaticMachineLearningModel, + block): + y_one = model_a.model.predict(block.as_numpy_array()) + y_two = model_b.model.predict(block.as_numpy_array()) + reliability_data = [y_one, y_two] + if self.reconstructor.category == "conceptual": + return krippendorff.alpha(reliability_data, + level_of_measurement="nominal") + elif self.reconstructor.category == "procedural": + return krippendorff.alpha(reliability_data, + level_of_measurement="ratio") + else: + raise ValueError() -- GitLab