From d73084d01110f3deff4de23d6961d59be5e30950 Mon Sep 17 00:00:00 2001
From: dmt <>
Date: Tue, 12 Nov 2019 18:37:20 +0100
Subject: [PATCH] Parallelize construction and reconstruction.

---
 cml/domain/construction.py   | 144 +++++++++++++----
 cml/domain/deconstruction.py | 303 ++++++-----------------------------
 cml/domain/reconstruction.py | 245 +++++++++++++++++++---------
 3 files changed, 334 insertions(+), 358 deletions(-)

diff --git a/cml/domain/construction.py b/cml/domain/construction.py
index 507642f..7e4f720 100644
--- a/cml/domain/construction.py
+++ b/cml/domain/construction.py
@@ -1,4 +1,8 @@
+import sys
+from queue import Empty
+from copy import deepcopy
 from functools import partial
+from multiprocessing import Process, Queue
 
 from cml.shared.parameter import PROTOCOL_LEVEL
 
@@ -36,20 +40,63 @@ def update_construction(func):
     return wrapper
 
 
-class Constructor:
+class BackgroundConstructor(Process):
+    def __init__(self, models, logger, settings, input_queue, output_queue):
+        super(BackgroundConstructor, self).__init__(name="BackgroundConstructor")
+        self.models = models
+        self.logger = logger
+        self.settings = settings
+        self.input_queue = input_queue
+        self.output_queue = output_queue
+
+    def run(self):
+        while 1:
+            try:
+                learnblock = self.input_queue.get(timeout=30)
+                for args in self.prepare_args(learnblock):
+                    learnblock, model, min_category_size = args
+                    if learnblock.labeled:
+                        self.output_queue.put(learnblock)
+                    else:
+                        trained_model = model.train(learnblock)
+                        for cluster, size in trained_model.cluster_sizes.items():
+                            if size < min_category_size:
+                                break
+                        else:
+                            labels = trained_model.get_labels()
+                            labeled_learnblock = learnblock.set_labels(labels)
+                            labeled_learnblock.n_cluster = model.cluster
+                            purpose = "{}{:02}".format(model.abbreviation,
+                                                       model.cluster)
+                            labeled_learnblock.purpose = purpose
+                            self.output_queue.put(labeled_learnblock)
+
+            except Empty:
+                sys.exit()
 
-    def __init__(self, ml_models, settings):
+    def prepare_args(self, learnblock):
+        for ml_model in self.models:
+            for complexity in range(2, self.settings.max_categories+1):
+                ml_model.cluster = complexity
+                yield deepcopy(learnblock), deepcopy(ml_model), complexity
+
+
+class Constructor:
+    def __init__(self, mode, ml_models, settings):
+        self.mode = mode
         self.settings = settings
         self.ml_models = ml_models
+        self._construction = self._init_construction(mode)
         self.logger = None
-        self._logging_dir = None
-        self._construction = None
-        self._category = None
+        self.__process = None
+        self.__intput_queue = None
+        self.__output_queue = None
 
     @log_construction
     def construct(self, learnblock):
         for block in self._construction(learnblock):
             block.origin = learnblock.origin
+            if block.origin is None: input()
             yield block
 
     @property
@@ -86,33 +133,52 @@ class Constructor:
     @max_categories.setter
     def max_categories(self, value):
         self.settings.max_categories = value
-        self.construction_type = self.construction_type
 
-    @property
-    def construction_type(self):
-        return self._category
-
-    @construction_type.setter
-    def construction_type(self, construction_type):
-        if construction_type == "conceptual":
-            self._construction = partial(
-                self._construct_conceptual_knowledge,
-                categorial_complexity=self.settings.max_categories,
-                min_category_size=self.settings.min_category_size,
-            )
-
-        elif construction_type == "procedural":
-            self._construction = partial(
-                self._construct_procedural_knowledge,
-                procedural_complexity=self.settings.max_model_targets,
-                max_target_error=self.settings.max_target_error
-            )
+    def prepare_args(self, learnblock):
+        for ml_model in self.ml_models:
+            for complexity in range(2, self.settings.max_categories+1):
+                ml_model.cluster = complexity
+                yield deepcopy(learnblock), deepcopy(ml_model), complexity
 
-        else:
-            # TODO (dmt): Provide proper exception handling.
-            raise Exception("Provide valid construction type.")
+    def prepare_background_process(self):
+        if not self.__process or not self.__process.is_alive():
+            self.__intput_queue = Queue()
+            self.__output_queue = Queue()
+            background = BackgroundConstructor(self.ml_models,
+                                               self.logger,
+                                               self.settings,
+                                               self.__intput_queue,
+                                               self.__output_queue)
+            background.name = "BackgroundConstructor"
+            background.daemon = True
+            background.start()
+            self.__process = background
+
+    def construct_parallel(self, learnblock):
+        self.prepare_background_process()
+        self.__intput_queue.put(learnblock)
+        yield self.__output_queue.get()
+        while not self.__output_queue.empty():
+            yield self.__output_queue.get()
 
-        self._category = construction_type
+    def _init_construction(self, mode):
+        return partial(self._construct_conceptual_knowledge,
+                       categorial_complexity=self.settings.max_categories,
+                       min_category_size=self.settings.min_category_size) \
+            if mode == "conceptual" else \
+            partial(self._construct_procedural_knowledge,
+                    procedural_complexity=self.settings.max_model_targets,
+                    max_target_error=self.settings.max_target_error)
+
+    #def construct_parallel(self, learnblock):
+    #    pool = Pool(cpu_count()-3)
+    #    results = pool.map(construct_conceptual_knowledge, self.prepare_args(learnblock))
+    #    pool.close()
+    #    pool.join()
+    #    for block in results:
+    #       if block:
+    #           block.origin = learnblock.origin
+    #           yield block
 
     def _construct_conceptual_knowledge(self,
                                         learnblock,
@@ -174,3 +240,23 @@ class Constructor:
                             labeled_learnblock = learnblock.set_labels(list(labels))
                             labeled_learnblock.n_cluster = target_number
                             yield labeled_learnblock
+
+
+def construct_conceptual_knowledge(*args):
+    learnblock, model, min_category_size = args[0]
+    if learnblock.labeled:
+        return learnblock
+
+    else:
+        trained_model = model.train(learnblock)
+        for cluster, size in trained_model.cluster_sizes.items():
+            if size < min_category_size:
+                return
+        else:
+            labels = trained_model.get_labels()
+            labeled_learnblock = learnblock.set_labels(labels)
+            labeled_learnblock.n_cluster = model.cluster
+            purpose = "{}{:02}".format(model.abbreviation, model.cluster)
+            labeled_learnblock.purpose = purpose
+            return labeled_learnblock
+
diff --git a/cml/domain/deconstruction.py b/cml/domain/deconstruction.py
index c22c39b..10b4c5d 100644
--- a/cml/domain/deconstruction.py
+++ b/cml/domain/deconstruction.py
@@ -1,249 +1,21 @@
-from abc import ABC
 from functools import partial
-from collections import defaultdict
-from itertools import count
-from collections import deque
-from typing import Tuple, Optional
 
 import krippendorff
 
-from cml.domain.data_source import DataSource
 from cml.domain.reconstruction import PragmaticMachineLearningModel
+from cml.domain.data_source import DataSource
+from cml.domain.knowledge import KnowledgeDatabase, RelativeFinder, TS_QUEUE
 from cml.shared.settings import DeconstructionSettings
 from cml.shared.errors import (
     DeconstructionFailed,
     NoModelReconstructedError,
-    NotEnoughFeaturesWarning
-)
+    NotEnoughFeaturesWarning)
 
 
 __all__ = (
     "Deconstructor",
-    "KnowledgeDatabase",
-    "RelativeFinder",
-    "ConceptualKnowledgeDatabase",
-    "ProceduralKnowledgeDatabase",
-    "RelativeFinder",
-    "KnowledgeDomain"
 )
 
-TS_QUEUE = deque()
-
-
-def notify_inverted_index(func):
-    def wrapper(self, *args, **kwargs):
-        for obs in self.observer:
-            getattr(obs, func.__name__)(*args, **kwargs)
-        return func(self, *args, **kwargs)
-    return wrapper
-
-
-def notify_deque(func):
-    def wrapper(self, *args, **kwargs):
-        # replace
-        if len(args) == 2:
-            model, *_ = args
-        # remove
-        else: model, = args
-        deque_copy = TS_QUEUE.copy()
-        for tier, learnblock in deque_copy:
-            if model.uid in learnblock.origin:
-                TS_QUEUE.remove((tier, learnblock))
-        return func(self, *args, **kwargs)
-    return wrapper
-
-
-class KnowledgeDatabase(ABC):
-    def __init__(self, highest_tier):
-        self.database = [KnowledgeDomain(i) for i in range(highest_tier)]
-        self.observer = []
-        self.n_models = 0
-        super().__init__()
-
-    def __contains__(self, item: PragmaticMachineLearningModel):
-        if not isinstance(item, PragmaticMachineLearningModel):
-            raise TypeError()
-        try:
-            self.get(item.uid)
-            return True
-        except KeyError:
-            return False
-
-    @notify_inverted_index
-    def insert(self, model: PragmaticMachineLearningModel):
-        if model not in self:
-            self.database[model.tier].insert(model)
-            self.n_models += 1
-
-            ###################################################################
-            self.logger.protocol("{:<20}: {}".format("Inserted", model))
-            ###################################################################
-
-    @notify_deque
-    @notify_inverted_index
-    def remove(self, model: PragmaticMachineLearningModel):
-        self.database[model.tier].remove(model)
-        self.n_models -= 1
-
-        ###################################################################
-        self.logger.protocol("{:<20}: {}".format("Removed", model))
-        ###################################################################
-
-    @notify_deque
-    @notify_inverted_index
-    def replace(self,
-                replaced: PragmaticMachineLearningModel,
-                replacer: PragmaticMachineLearningModel):
-        self.database[replaced.tier].replace(replaced, replacer)
-        ###################################################################
-        self.logger.protocol("{:<20}: {} {:<20}: {}".format(
-            "Replaced", str(replaced), "with", str(replacer)))
-        ###################################################################
-
-    @notify_inverted_index
-    def extend(self): pass
-
-    def get(self, uid: str):
-        _, tier, _ = uid.split(".")
-        return self.database[int(tier)].get(uid)
-
-    def deserialize(self): pass
-
-    def serialize(self): pass
-
-    def model_counter(self):
-        def counts(tier):
-            return self.database[tier].biggest_id + 1
-        return counts
-
-    def remove_dependent_models(self,
-                                relative_model: PragmaticMachineLearningModel):
-        for domain in self.database:
-            for model in domain.knowledge.values():
-                if model.origin == relative_model.origin:
-                    self.remove(model)
-
-    def inject_logger(self, logger):
-        self.logger = logger
-
-
-class ConceptualKnowledgeDatabase(KnowledgeDatabase):
-    def __init__(self, highest_tier):
-        super().__init__(highest_tier)
-
-
-class ProceduralKnowledgeDatabase(KnowledgeDatabase):
-    def __init__(self, highest_tier):
-        super().__init__(highest_tier)
-
-class KnowledgeDomain:
-    def __init__(self, tier: int):
-        self.tier = tier
-        self.knowledge = {}
-        self.biggest_id = 0
-
-    def get(self, uid: str):
-        return self.knowledge[uid]
-
-    def insert(self, model: PragmaticMachineLearningModel):
-        self.knowledge[model] = model
-        self.update_biggest_id(model.counter)
-
-    def remove(self, model: PragmaticMachineLearningModel):
-        del self.knowledge[model]
-
-    def replace(self,
-                replaced: PragmaticMachineLearningModel,
-                replacer: PragmaticMachineLearningModel):
-        del self.knowledge[replaced]
-        self.knowledge[replacer] = replacer
-        self.update_biggest_id(replacer.counter)
-
-    def update_biggest_id(self, new_id):
-        self.biggest_id = max(self.biggest_id, new_id)
-
-
-class RelativeFinder:
-    def __init__(self):
-        self.index_t = defaultdict(set)
-        self.index_z = defaultdict(set)
-        self.index_sigma = defaultdict(set)
-
-    def find(self,
-             pair: Tuple[str, Optional[str]],
-             model: PragmaticMachineLearningModel):
-
-        if pair == ("T", "Z"):
-            set_t = self.index_t[(model.min_timestamp, model.max_timestamp)]
-            set_z = self.index_z[model.purpose]
-            relatives = set_t.intersection(set_z)
-        elif pair == ("T", "Sigma"):
-            set_t = self.index_t[(model.min_timestamp, model.max_timestamp)]
-            set_s = self.index_sigma[model.subject]
-            relatives = set_t.intersection(set_s)
-        elif pair == ("Sigma", "Z"):
-            set_s = self.index_sigma[model.subject]
-            set_z = self.index_z[model.purpose]
-            relatives = set_s.intersection(set_z)
-        elif pair == ("complete", ):
-            set_t = self.index_t[(model.min_timestamp, model.max_timestamp)]
-            set_s = self.index_sigma[model.subject]
-            set_z = self.index_z[model.purpose]
-            relatives = set_t.intersection(set_s).intersection(set_z)
-        else: raise ValueError
-
-        if model in relatives:
-            relatives.remove(model)
-        for relative in relatives:
-            yield relative
-        if not relatives:
-            yield None
-
-    def remove(self, model: PragmaticMachineLearningModel):
-        t_list, s_list, z_list = self.get_index_lists(model,
-                                                      time=True,
-                                                      subject=True,
-                                                      purpose=True)
-        t_list.remove(model)
-        s_list.remove(model)
-        z_list.remove(model)
-
-    def replace(self,
-                replaced: PragmaticMachineLearningModel,
-                replacer: PragmaticMachineLearningModel):
-        t_list, s_list, z_list = self.get_index_lists(replaced,
-                                                      time=True,
-                                                      subject=True,
-                                                      purpose=True)
-        t_list.remove(replaced)
-        s_list.remove(replaced)
-        z_list.remove(replaced)
-        self.index_t[(replacer.min_timestamp, replacer.max_timestamp)].add(
-            replacer)
-        self.index_z[replacer.purpose].add(replacer)
-        self.index_sigma[replacer.subject].add(replacer)
-
-    def insert(self, model: PragmaticMachineLearningModel):
-        self.index_t[(model.min_timestamp, model.max_timestamp)].add(model)
-        self.index_z[model.purpose].add(model)
-        self.index_sigma[model.subject].add(model)
-
-    def get_index_lists(self,
-                        model: PragmaticMachineLearningModel,
-                        time: bool,
-                        subject: bool,
-                        purpose: bool):
-        t_list = s_list = z_list = None
-        if time:
-            t_list = self.index_t[(model.min_timestamp, model.max_timestamp)]
-        if subject:
-            s_list = self.index_sigma[model.subject]
-        if purpose:
-            z_list = self.index_z[model.purpose]
-        return t_list, s_list, z_list
-
-    def extend(self): pass
-
 
 def log(func):
     def wrapper(self, tier, pragmatic, learnblock):
@@ -258,23 +30,24 @@ class Deconstructor:
     TIME_COLUMN = "T"
     SUBJECT_COLUMN = "Sigma"
     PURPOSE_COLUMN = "Z"
-    NEXT_MODEL_COUNTER = None
 
     def __init__(self,
+                 mode,
                  knowledge_database: KnowledgeDatabase,
                  relative_finder: RelativeFinder,
                  source: DataSource,
                  settings: DeconstructionSettings):
-
-        self.knowledge_database = knowledge_database
-        self.relative_finder = relative_finder
-        self.settings = settings
+        self.mode = mode
         self.source = source
+        self.settings = settings
+        self.relative_finder = relative_finder
+        self.knowledge_database = knowledge_database
+        self.next_model_counter = knowledge_database.model_counter()
         self.logger = None
         self.reconstructor = None
 
     def _strategies(self, block):
-        yield (("complete", ), partial(self.deconstruct_complete, block=block))
+        # yield (("complete", ), partial(self.deconstruct_complete, block=block))
         yield (("Sigma", "Z"), partial(self.deconstruct_sigma_zeta, block=block))
         yield (("T", "Sigma"), partial(self.deconstruct_time_sigma, block=block))
         # yield (("T", "Z"), partial(self.deconstruct_time_zeta, block=block))
@@ -328,22 +101,31 @@ class Deconstructor:
                                block):
         success = False
         if r_model and p_model.tier < self.settings.highest_tier-1:
-
             second_block = r_model.trained_with(self.source)
-            overlapping = second_block.new_block_from(block.get_column_values("T"))
-
-            if overlapping.rows >= self.settings.learn_block_minimum:
-                alpha = self.calculate_reliability(p_model.pre_image_labels,
-                                                   r_model.pre_image_labels)
+            # overlapping_times = set(second_block.get_column_values("T")).intersection(
+             #   set(block.get_column_values("T")))
+            #overlapp_a = block.new_block_from(overlapping_times)
+            #overlapp_b = second_block.new_block_from(overlapping_times)
+            # biggest = max(overlapp_a, overlapp_b)
+            # overlapping = second_block.new_block_from(block.get_column_values("T"))
+
+            overlapping = block.get_overlapping(second_block)
+            if len(overlapping) >= self.settings.learn_block_minimum:
+                # TODO (dmt): What to do if the pre images have different size?
+                #overlapping_b = block.new_block_from(overlapping.get_column_values("T"))
+
+                alpha = self.calculate_reliability(
+                    overlapping.get_column_values("Z"),
+                    overlapping.get_column_values("Z_other"))
                 alpha_systematic = alpha < 0
                 alpha_weak_reliability = 0 <= alpha < self.settings.min_reliability
                 if (self.settings.allow_weak_reliability and
                    alpha_weak_reliability) or alpha_systematic:
-                    overlapping_b = block.new_block_from(overlapping.get_column_values("T"))
+                    # overlapping_b = block.new_block_from(overlapping.get_column_values("T"))
                     overblock = self.source.new_learnblock(
                         values=list(zip(
                             overlapping.get_column_values("Z"),
-                            overlapping_b.get_column_values("Z"),
+                            overlapping.get_column_values("Z_other"),
                             overlapping.get_column_values("T"),
                             ("\"\"" for _ in range(overlapping.rows)),
                             ("\"\"" for _ in range(overlapping.rows)))),
@@ -386,7 +168,7 @@ class Deconstructor:
 
                 # Create new metadata for a pragmatic model
                 new_model = prag_model.fusion(
-                    relative_model, self.NEXT_MODEL_COUNTER(tier))
+                    relative_model, self.next_model_counter(tier))
 
                 # Which models should be used for the reconstruction
                 which_ml_models = new_model.sigma
@@ -428,7 +210,7 @@ class Deconstructor:
             # Check constraint
             if overlapping_block.n_features >= 2:
                 # Model fusion
-                new_model = p_model.fusion(r_model, self.NEXT_MODEL_COUNTER(tier))
+                new_model = p_model.fusion(r_model, tier, self.next_model_counter(tier))
                 which_ml_models = new_model.sigma
                 try:
                     # Reconstruct model
@@ -504,11 +286,13 @@ class Deconstructor:
             # Check feature intersection constraint
             if r_model and self._feature_intersection(p_model, r_model) >= 2:
                 new_model = p_model.fusion(
-                    r_model, self.NEXT_MODEL_COUNTER(tier))
+                    r_model, self.next_model_counter(tier))
 
             # Check time contraint
             elif r_model and self.time_constraint(p_model, r_model, "complete"):
 
+                # target match?
+
                 # Create submodel from TSgima relative samples
                 second_block = r_model.trained_with(self.source)
                 new_block = block.same_features_fusion(second_block)
@@ -516,7 +300,7 @@ class Deconstructor:
                 which_ml_models = p_model.subject + r_model.subject
                 self.reconstructor.reconstruct(
                     tier, ts_relatives, which_ml_models)
-                new_model = p_model.fusion(r_model, self.NEXT_MODEL_COUNTER(tier))
+                new_model = p_model.fusion(r_model, self.next_model_counter(tier))
             else: return
 
             # Create learnblock
@@ -579,10 +363,21 @@ class Deconstructor:
         )
 
     def calculate_reliability(self, predicts_a, predicts_b):
-        predictions = [predicts_a, predicts_b]
-        if self.reconstructor.category == "conceptual":
-            return krippendorff.alpha(predictions, level_of_measurement="nomimal")
-        elif self.reconstructor.category:
+        # TODO (dmt): Krippendorff.alpha needs numpy float64 values to work
+        # correct or an TypeError exception is thrown.
+        predicts_b = [int(i) for i in predicts_b]
+        predicts_a = [int(i) for i in predicts_a]
+        predictions = [predicts_b, predicts_a]
+        if self.reconstructor.mode == "conceptual":
+            try:
+                return krippendorff.alpha(predictions, level_of_measurement="nominal")
+            except TypeError:
+                print(len(predicts_a))
+                print(len(predicts_b))
+                print(predictions)
+                input()
+                raise Exception()
+        elif self.reconstructor.mode:
             return krippendorff.alpha(predictions, level_of_measurement="ration")
     #
     # def calc_reliability(self,
@@ -600,3 +395,5 @@ class Deconstructor:
     #                                   level_of_measurement="ratio")
     #     else:
     #         raise ValueError()
+
+
diff --git a/cml/domain/reconstruction.py b/cml/domain/reconstruction.py
index d6c2964..8d07d8d 100644
--- a/cml/domain/reconstruction.py
+++ b/cml/domain/reconstruction.py
@@ -1,18 +1,17 @@
 from random import sample
 from collections import defaultdict
-from dataclasses import dataclass
 from functools import partial
-from typing import Union, List, Tuple, Generator, Dict
+from multiprocessing import Pool, cpu_count
+from dataclasses import dataclass
+from typing import Union, List, Tuple, Dict
 
 from krippendorff import alpha
 
-from cml.shared.settings import ReconstructionSettings
 from cml.domain.data_source import DataSource
+from cml.shared.settings import ReconstructionSettings
 from cml.shared.errors import (
     NoModelReconstructedError,
-    NotEnoughFeaturesWarning
-)
-
+    NotEnoughFeaturesWarning)
 
 __all__ = (
     "Reconstructor",
@@ -42,12 +41,12 @@ class Metadata:
                f"T min: <{self.t_min}> \n" \
                f"T max: <{self.t_max}> \n" \
                f"Subjects: <{self.sigma}> \n" \
-               f"Puposes: <{self.zeta}> \n"
+               f"Purposes: <{self.zeta}> \n"
 
     def __hash__(self):
         return hash(".".join([self.knowledge_domain,
-                             str(self.knowledge_tier),
-                             str(self.identifier)]))
+                              str(self.knowledge_tier),
+                              str(self.identifier)]))
 
     def __eq__(self, other):
         if not isinstance(other, Metadata):
@@ -61,6 +60,8 @@ class PragmaticMachineLearningModel:
                  meta: Metadata,
                  model,
                  learnblock):
+        if meta.pre_image == 400:
+            raise Exception
         self.meta = meta
         self.model = model
         self.learnblock = learnblock if self.tier > 1 else None
@@ -123,25 +124,46 @@ class PragmaticMachineLearningModel:
                          str(self.meta.knowledge_tier),
                          str(self.meta.identifier)])
 
+    @property
+    def identifier(self):
+        return self.meta.identifier
+
+    @identifier.setter
+    def identifier(self, value):
+        self.meta.identifier = value
+
     @property
     def counter(self):
         return self.meta.identifier
 
     def fusion(self,
                model: 'PragmaticMachineLearningModel',
+               knowledge_tier: int,
                new_identifier: int) -> Metadata:
 
         return Metadata(self.meta.knowledge_domain,
-                        self.meta.knowledge_tier,
+                        knowledge_tier,
                         new_identifier,
-                        self.pre_image + model.pre_image,
-                        list(set(self.pre_image_features).intersection(
-                             set(model.pre_image_features))),
-                        self.pre_image_labels + model.pre_image_labels,
+                        None,
+                        None,
+                        None,
                         min(self.meta.t_min, model.min_timestamp),
                         max(self.meta.t_max, model.max_timestamp),
-                        self.subject + model.subject,
-                        self.meta.zeta + model.subject)
+                        tuple(set(self.subject + model.subject)),
+                        tuple(set(self.meta.zeta + model.purpose)))
+
+        # return Metadata(self.meta.knowledge_domain,
+        #                 self.meta.knowledge_tier,
+        #                 new_identifier,
+        #                 self.pre_image + model.pre_image,
+        #                 list(set(self.pre_image_features).intersection(
+        #                      set(model.pre_image_features))),
+        #                 self.pre_image_labels + model.pre_image_labels,
+        #                 min(self.meta.t_min, model.min_timestamp),
+        #                 max(self.meta.t_max, model.max_timestamp),
+        #                 tuple(set(self.subject + model.subject)),
+        #                 tuple(set(self.meta.zeta + model.purpose)))
+        #
 
     def trained_with(self, source: DataSource):
         if self.origin == "source":
@@ -162,36 +184,60 @@ def log(func):
     return wrapper
 
 
+class SubprocessDTO:
+    def __init__(self, model, learnblock, tier, abv, settings):
+        self.model = model
+        self.learnblock = learnblock
+        self.tier = tier
+        self.abv = abv
+        self.settings = settings
+
+
 class Reconstructor:
     CONCEPTUAL_KNOWLEDGE_ABBREVIATION = "C"
-    NEXT_MODEL_COUNTER = None
+    PROCEDURAL_KNOWLEDGE_ABBREVIATION = "P"
 
     def __init__(self,
                  settings: ReconstructionSettings,
                  ml_models: List,
-                 knowlege_domain: str):
+                 mode: str):
+        self.mode = mode
         self.settings = settings
         self.ml_models = ml_models
-        self.knowledge_domain = knowlege_domain
+        self.__reconstruction = self._init_reconstruction(mode)
+        self.next_model_counter = None
         self.logger = None
-        self._category = None
-        self.free_id = None
-        self.__reconstruction = None
 
-    @property
-    def category(self) -> str:
-        return self._category
-
-    @category.setter
-    def category(self, value: str) -> None:
-        if value == "conceptual":
-            self.__reconstruction = partial(self._reconstruct_conceptual,
-                                            krippen="nominal")
-        elif value == "procedural":
-            self.__reconstruction = partial(self._reconstruct_procedural,
-                                            krippen="ratio")
-        else: raise ValueError()
-        self._category = value
+    def _init_reconstruction(self, mode: str):
+        return partial(self._reconstruct_conceptual, krippen="nominal") \
+               if mode == "conceptual" else \
+               partial(self._reconstruct_procedural, krippen="ratio")
+
+    def reconstruct_parallel(self, tier, learnblock, models=None, meta=None):
+        self.logger.protocol("{:^100}".format("Starting Reconstruction"))
+        if not models:
+            args = []
+            for model in self.ml_models:
+                args.append(
+                    SubprocessDTO(model, learnblock, tier,
+                                  self.CONCEPTUAL_KNOWLEDGE_ABBREVIATION,
+                                  self.settings)
+                )
+            pool = Pool(cpu_count()-2)
+            self.logger.protocol("{:^100}".format("Mapped"))
+            reliability_model = pool.map(reconstruct_in_parallel, args)
+            pool.close()
+            pool.join()
+            self.logger.protocol("{:^100}".format("Result erhalten"))
+            reliabilities_to_model = defaultdict(list)
+            for r, m in reliability_model:
+                if r and m:
+                    reliabilities_to_model[r].append(m)
+            if reliabilities_to_model.keys():
+                winner = determine_winner(reliabilities_to_model)
+                winner.identifier = self.next_model_counter(tier)
+                return winner
+            raise NoModelReconstructedError()
 
     @log
     def reconstruct(self,
@@ -231,20 +277,27 @@ class Reconstructor:
             if model.subject not in which_models: continue
 
             # train model
-            train_block, eval_block = self.split(learnblock)
-            trained_model = model.train(
-                train_block.as_numpy_array(),
-                [i for i in train_block.get_column_values("Z")])
-
+            train_block, eval_block = split(learnblock, self.settings.reliability_sample)
+            try:
+                trained_model = model.train(
+                    train_block.as_numpy_array(),
+                    [i for i in train_block.get_column_values("Z")])
+            except TypeError as error:
+                print(train_block.purpose)
+                print(error.with_traceback())
+                input()
             ###################################################################
             self.logger.protocol("\n{:<20}".format(model.subject))
             self.logger.protocol("{:<20}: {}".format("Accuracy", model.accuracy))
             ###################################################################
 
             # check constraints
-            if self._valid_reconstructed(trained_model, "conceptual"):
-                reliability = self.calc_reliability(trained_model, learnblock,
-                                                    krippen)
+            if valid_reconstructed(trained_model,
+                                   "conceptual",
+                                   self.settings.min_test_accuracy,
+                                   self.settings.max_test_error_avg,
+                                   self.settings.max_test_error_max):
+                reliability = calc_reliability(trained_model, learnblock, krippen)
                 ###################################################################
                 self.logger.protocol("{:<20}: {}".format("Reliability", str(reliability)))
                 ###################################################################
@@ -255,7 +308,7 @@ class Reconstructor:
                                         learnblock.purpose])
                         meta = Metadata(self.CONCEPTUAL_KNOWLEDGE_ABBREVIATION,
                                         tier,
-                                        self.NEXT_MODEL_COUNTER(tier),
+                                        self.next_model_counter(tier),
                                         learnblock.indexes,
                                         learnblock.columns(),
                                         learnblock.get_column_values("Z"),
@@ -263,12 +316,16 @@ class Reconstructor:
                                         learnblock.max_timestamp,
                                         (model.subject, ),
                                         (uid, ))
+                    else:
+                        meta.pre_image_features = learnblock.columns()
+                        meta.pre_image_labels = learnblock.get_column_values("Z")
+                        meta.pre_image = learnblock.indexes
+
                     reliability_to_model[reliability].append(
                         PragmaticMachineLearningModel(
                             meta, trained_model, learnblock
                         )
                     )
-
         return reliability_to_model
 
     def _reconstruct_procedural(self,
@@ -276,7 +333,7 @@ class Reconstructor:
                                 learnblock,
                                 which_models: List,
                                 krippen: str = None,
-                                meta: Metadata= None) -> Dict[float, List]:
+                                meta: Metadata = None) -> Dict[float, List]:
         reliability_to_model = defaultdict(list)
         for model in self.ml_models:
             if model.subject not in which_models: continue
@@ -288,7 +345,7 @@ class Reconstructor:
                 [i for i in train_block.get_column_values("Z")])
 
             # check contraints
-            if self._valid_reconstructed(trained_model, "procedural"):
+            if valid_reconstructed(trained_model, "procedural"):
                 reliability = self.calc_reliability(
                     trained_model, learnblock, krippen)
                 if reliability >= self.settings.min_reliability:
@@ -296,7 +353,7 @@ class Reconstructor:
                         prag_meta_data = Metadata(
                             "P",
                             tier,
-                            next(self.free_id),
+                            next(self.next_model_counter),
                             learnblock.indexes,
                             learnblock.columns(),
                             learnblock.get_column_values("Z"),
@@ -306,39 +363,74 @@ class Reconstructor:
                             (".".join(["C", '1', learnblock.purpose]), )
                         )
                     else:
-                        prag_meta_data = meta
+                        meta.pre_image_features = learnblock.columns()
+                        meta.pre_image_labels = learnblock.get_column_values("Z")
+                        meta.pre_image = learnblock.indexes
+
                     reliability_to_model[reliability].append(
                         PragmaticMachineLearningModel(prag_meta_data,
                                                       trained_model,
                                                       learnblock))
-
         return reliability_to_model
 
-    def split(self, learnblock) -> Tuple:
-        indices = learnblock.indexes
-        eval_size = int(learnblock.length * self.settings.reliability_sample)
-        eval_idx = sample(indices, eval_size)
-        train_idx = list(set(indices).difference(set(eval_idx)))
-        return learnblock.new_block_from_rows_index(train_idx), \
-            learnblock.new_block_from_rows_index(eval_idx)
-
-    def calc_reliability(self,
-                         trained_model,
-                         eval_block,
-                         metric: str) -> float:
-        y_pre = trained_model.predict(eval_block.as_numpy_array())
-        y_true = [i for i in eval_block.get_column_values("Z")]
-        reliability_data = [y_pre, y_true]
-        return alpha(reliability_data, level_of_measurement=metric)
-
-    def _valid_reconstructed(self,
-                             model,
-                             knowledge_domain: str) -> bool:
-        if knowledge_domain == "conceptual":
-            return model.accuracy >= self.settings.min_test_accuracy
-        else:
-            return model.mean_error <= self.settings.max_test_error_avg and \
-                model.max_error <= self.settings.max_test_error_max
+
+def split(learnblock, reliability_sample) -> Tuple:
+    indices = learnblock.indexes
+    eval_size = int(learnblock.length * reliability_sample)
+    eval_idx = sample(indices, eval_size)
+    train_idx = list(set(indices).difference(set(eval_idx)))
+    return learnblock.new_block_from_rows_index(train_idx), \
+        learnblock.new_block_from_rows_index(eval_idx)
+
+
+def calc_reliability(trained_model, eval_block, metric: str) -> float:
+    y_pre = trained_model.predict(eval_block.as_numpy_array())
+    y_true = [i for i in eval_block.get_column_values("Z")]
+    reliability_data = [y_pre, y_true]
+    return alpha(reliability_data, level_of_measurement=metric)
+
+
+def valid_reconstructed(model,
+                        knowledge_domain: str,
+                        min_test_accuracy,
+                        max_test_error_avg,
+                        max_test_error_max) -> bool:
+    if knowledge_domain == "conceptual":
+        return model.accuracy >= min_test_accuracy
+    else:
+        return model.mean_error <= max_test_error_avg and \
+            model.max_error <= max_test_error_max
+
+
+def reconstruct_in_parallel(*args):
+    dto = args[0]
+    train_block, eval_block = split(dto.learnblock, dto.settings.reliability_sample)
+    trained_model = dto.model.train(train_block.as_numpy_array(),
+                                [i for i in train_block.get_column_values("Z")])
+    if valid_reconstructed(trained_model,
+                           "conceptual",
+                           dto.settings.min_test_accuracy,
+                           dto.settings.max_test_error_avg,
+                           dto.settings.max_test_error_max):
+        reliability = calc_reliability(trained_model, dto.learnblock, "nominal")
+        if reliability >= dto.settings.min_reliability:
+                uid = ".".join([dto.abv,
+                                str(dto.tier),
+                                dto.learnblock.purpose])
+                meta = Metadata(dto.abv,
+                                dto.tier,
+                                0,
+                                dto.learnblock.indexes,
+                                dto.learnblock.columns(),
+                                dto.learnblock.get_column_values("Z"),
+                                dto.learnblock.min_timestamp,
+                                dto.learnblock.max_timestamp,
+                                (dto.model.subject,),
+                                (uid,))
+                return reliability, PragmaticMachineLearningModel(
+                    meta, trained_model, dto.learnblock)
+
+    return None, None
 
 
 def determine_winner(
@@ -353,3 +445,4 @@ def determine_winner(
             winner = model
 
     return winner
+
-- 
GitLab