Skip to content
Snippets Groups Projects
Commit d73084d0 authored by dmt's avatar dmt
Browse files

Parallelize construction and reconstruction.

parent 0fd5c606
No related branches found
No related tags found
No related merge requests found
import sys
from queue import Empty
from copy import deepcopy
from functools import partial from functools import partial
from multiprocessing import Process, Queue
from cml.shared.parameter import PROTOCOL_LEVEL from cml.shared.parameter import PROTOCOL_LEVEL
...@@ -36,20 +40,63 @@ def update_construction(func): ...@@ -36,20 +40,63 @@ def update_construction(func):
return wrapper return wrapper
class Constructor: class BackgroundConstructor(Process):
def __init__(self, models, logger, settings, input_queue, output_queue):
super(BackgroundConstructor, self).__init__(name="BackgroundConstructor")
self.models = models
self.logger = logger
self.settings = settings
self.input_queue = input_queue
self.output_queue = output_queue
def run(self):
while 1:
try:
learnblock = self.input_queue.get(timeout=30)
for args in self.prepare_args(learnblock):
learnblock, model, min_category_size = args
if learnblock.labeled:
self.output_queue.put(learnblock)
else:
trained_model = model.train(learnblock)
for cluster, size in trained_model.cluster_sizes.items():
if size < min_category_size:
break
else:
labels = trained_model.get_labels()
labeled_learnblock = learnblock.set_labels(labels)
labeled_learnblock.n_cluster = model.cluster
purpose = "{}{:02}".format(model.abbreviation,
model.cluster)
labeled_learnblock.purpose = purpose
self.output_queue.put(labeled_learnblock)
except Empty:
sys.exit()
def __init__(self, ml_models, settings): def prepare_args(self, learnblock):
for ml_model in self.models:
for complexity in range(2, self.settings.max_categories+1):
ml_model.cluster = complexity
yield deepcopy(learnblock), deepcopy(ml_model), complexity
class Constructor:
def __init__(self, mode, ml_models, settings):
self.mode = mode
self.settings = settings self.settings = settings
self.ml_models = ml_models self.ml_models = ml_models
self._construction = self._init_construction(mode)
self.logger = None self.logger = None
self._logging_dir = None self.__process = None
self._construction = None self.__intput_queue = None
self._category = None self.__output_queue = None
@log_construction @log_construction
def construct(self, learnblock): def construct(self, learnblock):
for block in self._construction(learnblock): for block in self._construction(learnblock):
block.origin = learnblock.origin block.origin = learnblock.origin
if block.origin is None: input()
yield block yield block
@property @property
...@@ -86,33 +133,52 @@ class Constructor: ...@@ -86,33 +133,52 @@ class Constructor:
@max_categories.setter @max_categories.setter
def max_categories(self, value): def max_categories(self, value):
self.settings.max_categories = value self.settings.max_categories = value
self.construction_type = self.construction_type
@property def prepare_args(self, learnblock):
def construction_type(self): for ml_model in self.ml_models:
return self._category for complexity in range(2, self.settings.max_categories+1):
ml_model.cluster = complexity
@construction_type.setter yield deepcopy(learnblock), deepcopy(ml_model), complexity
def construction_type(self, construction_type):
if construction_type == "conceptual":
self._construction = partial(
self._construct_conceptual_knowledge,
categorial_complexity=self.settings.max_categories,
min_category_size=self.settings.min_category_size,
)
elif construction_type == "procedural":
self._construction = partial(
self._construct_procedural_knowledge,
procedural_complexity=self.settings.max_model_targets,
max_target_error=self.settings.max_target_error
)
else: def prepare_background_process(self):
# TODO (dmt): Provide proper exception handling. if not self.__process or not self.__process.is_alive():
raise Exception("Provide valid construction type.") self.__intput_queue = Queue()
self.__output_queue = Queue()
background = BackgroundConstructor(self.ml_models,
self.logger,
self.settings,
self.__intput_queue,
self.__output_queue)
background.name = "BackgroundConstructor"
background.daemon = True
background.start()
self.__process = background
def construct_parallel(self, learnblock):
self.prepare_background_process()
self.__intput_queue.put(learnblock)
yield self.__output_queue.get()
while not self.__output_queue.empty():
yield self.__output_queue.get()
self._category = construction_type def _init_construction(self, mode):
return partial(self._construct_conceptual_knowledge,
categorial_complexity=self.settings.max_categories,
min_category_size=self.settings.min_category_size) \
if mode == "conceptual" else \
partial(self._construct_procedural_knowledge,
procedural_complexity=self.settings.max_model_targets,
max_target_error=self.settings.max_target_error)
#def construct_parallel(self, learnblock):
# pool = Pool(cpu_count()-3)
# results = pool.map(construct_conceptual_knowledge, self.prepare_args(learnblock))
# pool.close()
# pool.join()
# for block in results:
# if block:
# block.origin = learnblock.origin
# yield block
def _construct_conceptual_knowledge(self, def _construct_conceptual_knowledge(self,
learnblock, learnblock,
...@@ -174,3 +240,23 @@ class Constructor: ...@@ -174,3 +240,23 @@ class Constructor:
labeled_learnblock = learnblock.set_labels(list(labels)) labeled_learnblock = learnblock.set_labels(list(labels))
labeled_learnblock.n_cluster = target_number labeled_learnblock.n_cluster = target_number
yield labeled_learnblock yield labeled_learnblock
def construct_conceptual_knowledge(*args):
learnblock, model, min_category_size = args[0]
if learnblock.labeled:
return learnblock
else:
trained_model = model.train(learnblock)
for cluster, size in trained_model.cluster_sizes.items():
if size < min_category_size:
return
else:
labels = trained_model.get_labels()
labeled_learnblock = learnblock.set_labels(labels)
labeled_learnblock.n_cluster = model.cluster
purpose = "{}{:02}".format(model.abbreviation, model.cluster)
labeled_learnblock.purpose = purpose
return labeled_learnblock
from abc import ABC
from functools import partial from functools import partial
from collections import defaultdict
from itertools import count
from collections import deque
from typing import Tuple, Optional
import krippendorff import krippendorff
from cml.domain.data_source import DataSource
from cml.domain.reconstruction import PragmaticMachineLearningModel from cml.domain.reconstruction import PragmaticMachineLearningModel
from cml.domain.data_source import DataSource
from cml.domain.knowledge import KnowledgeDatabase, RelativeFinder, TS_QUEUE
from cml.shared.settings import DeconstructionSettings from cml.shared.settings import DeconstructionSettings
from cml.shared.errors import ( from cml.shared.errors import (
DeconstructionFailed, DeconstructionFailed,
NoModelReconstructedError, NoModelReconstructedError,
NotEnoughFeaturesWarning NotEnoughFeaturesWarning)
)
__all__ = ( __all__ = (
"Deconstructor", "Deconstructor",
"KnowledgeDatabase",
"RelativeFinder",
"ConceptualKnowledgeDatabase",
"ProceduralKnowledgeDatabase",
"RelativeFinder",
"KnowledgeDomain"
) )
TS_QUEUE = deque()
def notify_inverted_index(func):
def wrapper(self, *args, **kwargs):
for obs in self.observer:
getattr(obs, func.__name__)(*args, **kwargs)
return func(self, *args, **kwargs)
return wrapper
def notify_deque(func):
def wrapper(self, *args, **kwargs):
# replace
if len(args) == 2:
model, *_ = args
# remove
else: model, = args
deque_copy = TS_QUEUE.copy()
for tier, learnblock in deque_copy:
if model.uid in learnblock.origin:
TS_QUEUE.remove((tier, learnblock))
return func(self, *args, **kwargs)
return wrapper
class KnowledgeDatabase(ABC):
def __init__(self, highest_tier):
self.database = [KnowledgeDomain(i) for i in range(highest_tier)]
self.observer = []
self.n_models = 0
super().__init__()
def __contains__(self, item: PragmaticMachineLearningModel):
if not isinstance(item, PragmaticMachineLearningModel):
raise TypeError()
try:
self.get(item.uid)
return True
except KeyError:
return False
@notify_inverted_index
def insert(self, model: PragmaticMachineLearningModel):
if model not in self:
self.database[model.tier].insert(model)
self.n_models += 1
###################################################################
self.logger.protocol("{:<20}: {}".format("Inserted", model))
###################################################################
@notify_deque
@notify_inverted_index
def remove(self, model: PragmaticMachineLearningModel):
self.database[model.tier].remove(model)
self.n_models -= 1
###################################################################
self.logger.protocol("{:<20}: {}".format("Removed", model))
###################################################################
@notify_deque
@notify_inverted_index
def replace(self,
replaced: PragmaticMachineLearningModel,
replacer: PragmaticMachineLearningModel):
self.database[replaced.tier].replace(replaced, replacer)
###################################################################
self.logger.protocol("{:<20}: {} {:<20}: {}".format(
"Replaced", str(replaced), "with", str(replacer)))
###################################################################
@notify_inverted_index
def extend(self): pass
def get(self, uid: str):
_, tier, _ = uid.split(".")
return self.database[int(tier)].get(uid)
def deserialize(self): pass
def serialize(self): pass
def model_counter(self):
def counts(tier):
return self.database[tier].biggest_id + 1
return counts
def remove_dependent_models(self,
relative_model: PragmaticMachineLearningModel):
for domain in self.database:
for model in domain.knowledge.values():
if model.origin == relative_model.origin:
self.remove(model)
def inject_logger(self, logger):
self.logger = logger
class ConceptualKnowledgeDatabase(KnowledgeDatabase):
def __init__(self, highest_tier):
super().__init__(highest_tier)
class ProceduralKnowledgeDatabase(KnowledgeDatabase):
def __init__(self, highest_tier):
super().__init__(highest_tier)
class KnowledgeDomain:
def __init__(self, tier: int):
self.tier = tier
self.knowledge = {}
self.biggest_id = 0
def get(self, uid: str):
return self.knowledge[uid]
def insert(self, model: PragmaticMachineLearningModel):
self.knowledge[model] = model
self.update_biggest_id(model.counter)
def remove(self, model: PragmaticMachineLearningModel):
del self.knowledge[model]
def replace(self,
replaced: PragmaticMachineLearningModel,
replacer: PragmaticMachineLearningModel):
del self.knowledge[replaced]
self.knowledge[replacer] = replacer
self.update_biggest_id(replacer.counter)
def update_biggest_id(self, new_id):
self.biggest_id = max(self.biggest_id, new_id)
class RelativeFinder:
def __init__(self):
self.index_t = defaultdict(set)
self.index_z = defaultdict(set)
self.index_sigma = defaultdict(set)
def find(self,
pair: Tuple[str, Optional[str]],
model: PragmaticMachineLearningModel):
if pair == ("T", "Z"):
set_t = self.index_t[(model.min_timestamp, model.max_timestamp)]
set_z = self.index_z[model.purpose]
relatives = set_t.intersection(set_z)
elif pair == ("T", "Sigma"):
set_t = self.index_t[(model.min_timestamp, model.max_timestamp)]
set_s = self.index_sigma[model.subject]
relatives = set_t.intersection(set_s)
elif pair == ("Sigma", "Z"):
set_s = self.index_sigma[model.subject]
set_z = self.index_z[model.purpose]
relatives = set_s.intersection(set_z)
elif pair == ("complete", ):
set_t = self.index_t[(model.min_timestamp, model.max_timestamp)]
set_s = self.index_sigma[model.subject]
set_z = self.index_z[model.purpose]
relatives = set_t.intersection(set_s).intersection(set_z)
else: raise ValueError
if model in relatives:
relatives.remove(model)
for relative in relatives:
yield relative
if not relatives:
yield None
def remove(self, model: PragmaticMachineLearningModel):
t_list, s_list, z_list = self.get_index_lists(model,
time=True,
subject=True,
purpose=True)
t_list.remove(model)
s_list.remove(model)
z_list.remove(model)
def replace(self,
replaced: PragmaticMachineLearningModel,
replacer: PragmaticMachineLearningModel):
t_list, s_list, z_list = self.get_index_lists(replaced,
time=True,
subject=True,
purpose=True)
t_list.remove(replaced)
s_list.remove(replaced)
z_list.remove(replaced)
self.index_t[(replacer.min_timestamp, replacer.max_timestamp)].add(
replacer)
self.index_z[replacer.purpose].add(replacer)
self.index_sigma[replacer.subject].add(replacer)
def insert(self, model: PragmaticMachineLearningModel):
self.index_t[(model.min_timestamp, model.max_timestamp)].add(model)
self.index_z[model.purpose].add(model)
self.index_sigma[model.subject].add(model)
def get_index_lists(self,
model: PragmaticMachineLearningModel,
time: bool,
subject: bool,
purpose: bool):
t_list = s_list = z_list = None
if time:
t_list = self.index_t[(model.min_timestamp, model.max_timestamp)]
if subject:
s_list = self.index_sigma[model.subject]
if purpose:
z_list = self.index_z[model.purpose]
return t_list, s_list, z_list
def extend(self): pass
def log(func): def log(func):
def wrapper(self, tier, pragmatic, learnblock): def wrapper(self, tier, pragmatic, learnblock):
...@@ -258,23 +30,24 @@ class Deconstructor: ...@@ -258,23 +30,24 @@ class Deconstructor:
TIME_COLUMN = "T" TIME_COLUMN = "T"
SUBJECT_COLUMN = "Sigma" SUBJECT_COLUMN = "Sigma"
PURPOSE_COLUMN = "Z" PURPOSE_COLUMN = "Z"
NEXT_MODEL_COUNTER = None
def __init__(self, def __init__(self,
mode,
knowledge_database: KnowledgeDatabase, knowledge_database: KnowledgeDatabase,
relative_finder: RelativeFinder, relative_finder: RelativeFinder,
source: DataSource, source: DataSource,
settings: DeconstructionSettings): settings: DeconstructionSettings):
self.mode = mode
self.knowledge_database = knowledge_database
self.relative_finder = relative_finder
self.settings = settings
self.source = source self.source = source
self.settings = settings
self.relative_finder = relative_finder
self.knowledge_database = knowledge_database
self.next_model_counter = knowledge_database.model_counter()
self.logger = None self.logger = None
self.reconstructor = None self.reconstructor = None
def _strategies(self, block): def _strategies(self, block):
yield (("complete", ), partial(self.deconstruct_complete, block=block)) # yield (("complete", ), partial(self.deconstruct_complete, block=block))
yield (("Sigma", "Z"), partial(self.deconstruct_sigma_zeta, block=block)) yield (("Sigma", "Z"), partial(self.deconstruct_sigma_zeta, block=block))
yield (("T", "Sigma"), partial(self.deconstruct_time_sigma, block=block)) yield (("T", "Sigma"), partial(self.deconstruct_time_sigma, block=block))
# yield (("T", "Z"), partial(self.deconstruct_time_zeta, block=block)) # yield (("T", "Z"), partial(self.deconstruct_time_zeta, block=block))
...@@ -328,22 +101,31 @@ class Deconstructor: ...@@ -328,22 +101,31 @@ class Deconstructor:
block): block):
success = False success = False
if r_model and p_model.tier < self.settings.highest_tier-1: if r_model and p_model.tier < self.settings.highest_tier-1:
second_block = r_model.trained_with(self.source) second_block = r_model.trained_with(self.source)
overlapping = second_block.new_block_from(block.get_column_values("T")) # overlapping_times = set(second_block.get_column_values("T")).intersection(
# set(block.get_column_values("T")))
if overlapping.rows >= self.settings.learn_block_minimum: #overlapp_a = block.new_block_from(overlapping_times)
alpha = self.calculate_reliability(p_model.pre_image_labels, #overlapp_b = second_block.new_block_from(overlapping_times)
r_model.pre_image_labels) # biggest = max(overlapp_a, overlapp_b)
# overlapping = second_block.new_block_from(block.get_column_values("T"))
overlapping = block.get_overlapping(second_block)
if len(overlapping) >= self.settings.learn_block_minimum:
# TODO (dmt): What to do if the pre images have different size?
#overlapping_b = block.new_block_from(overlapping.get_column_values("T"))
alpha = self.calculate_reliability(
overlapping.get_column_values("Z"),
overlapping.get_column_values("Z_other"))
alpha_systematic = alpha < 0 alpha_systematic = alpha < 0
alpha_weak_reliability = 0 <= alpha < self.settings.min_reliability alpha_weak_reliability = 0 <= alpha < self.settings.min_reliability
if (self.settings.allow_weak_reliability and if (self.settings.allow_weak_reliability and
alpha_weak_reliability) or alpha_systematic: alpha_weak_reliability) or alpha_systematic:
overlapping_b = block.new_block_from(overlapping.get_column_values("T")) # overlapping_b = block.new_block_from(overlapping.get_column_values("T"))
overblock = self.source.new_learnblock( overblock = self.source.new_learnblock(
values=list(zip( values=list(zip(
overlapping.get_column_values("Z"), overlapping.get_column_values("Z"),
overlapping_b.get_column_values("Z"), overlapping.get_column_values("Z_other"),
overlapping.get_column_values("T"), overlapping.get_column_values("T"),
("\"\"" for _ in range(overlapping.rows)), ("\"\"" for _ in range(overlapping.rows)),
("\"\"" for _ in range(overlapping.rows)))), ("\"\"" for _ in range(overlapping.rows)))),
...@@ -386,7 +168,7 @@ class Deconstructor: ...@@ -386,7 +168,7 @@ class Deconstructor:
# Create new metadata for a pragmatic model # Create new metadata for a pragmatic model
new_model = prag_model.fusion( new_model = prag_model.fusion(
relative_model, self.NEXT_MODEL_COUNTER(tier)) relative_model, self.next_model_counter(tier))
# Which models should be used for the reconstruction # Which models should be used for the reconstruction
which_ml_models = new_model.sigma which_ml_models = new_model.sigma
...@@ -428,7 +210,7 @@ class Deconstructor: ...@@ -428,7 +210,7 @@ class Deconstructor:
# Check constraint # Check constraint
if overlapping_block.n_features >= 2: if overlapping_block.n_features >= 2:
# Model fusion # Model fusion
new_model = p_model.fusion(r_model, self.NEXT_MODEL_COUNTER(tier)) new_model = p_model.fusion(r_model, tier, self.next_model_counter(tier))
which_ml_models = new_model.sigma which_ml_models = new_model.sigma
try: try:
# Reconstruct model # Reconstruct model
...@@ -504,11 +286,13 @@ class Deconstructor: ...@@ -504,11 +286,13 @@ class Deconstructor:
# Check feature intersection constraint # Check feature intersection constraint
if r_model and self._feature_intersection(p_model, r_model) >= 2: if r_model and self._feature_intersection(p_model, r_model) >= 2:
new_model = p_model.fusion( new_model = p_model.fusion(
r_model, self.NEXT_MODEL_COUNTER(tier)) r_model, self.next_model_counter(tier))
# Check time contraint # Check time contraint
elif r_model and self.time_constraint(p_model, r_model, "complete"): elif r_model and self.time_constraint(p_model, r_model, "complete"):
# target match?
# Create submodel from TSgima relative samples # Create submodel from TSgima relative samples
second_block = r_model.trained_with(self.source) second_block = r_model.trained_with(self.source)
new_block = block.same_features_fusion(second_block) new_block = block.same_features_fusion(second_block)
...@@ -516,7 +300,7 @@ class Deconstructor: ...@@ -516,7 +300,7 @@ class Deconstructor:
which_ml_models = p_model.subject + r_model.subject which_ml_models = p_model.subject + r_model.subject
self.reconstructor.reconstruct( self.reconstructor.reconstruct(
tier, ts_relatives, which_ml_models) tier, ts_relatives, which_ml_models)
new_model = p_model.fusion(r_model, self.NEXT_MODEL_COUNTER(tier)) new_model = p_model.fusion(r_model, self.next_model_counter(tier))
else: return else: return
# Create learnblock # Create learnblock
...@@ -579,10 +363,21 @@ class Deconstructor: ...@@ -579,10 +363,21 @@ class Deconstructor:
) )
def calculate_reliability(self, predicts_a, predicts_b): def calculate_reliability(self, predicts_a, predicts_b):
predictions = [predicts_a, predicts_b] # TODO (dmt): Krippendorff.alpha needs numpy float64 values to work
if self.reconstructor.category == "conceptual": # correct or an TypeError exception is thrown.
return krippendorff.alpha(predictions, level_of_measurement="nomimal") predicts_b = [int(i) for i in predicts_b]
elif self.reconstructor.category: predicts_a = [int(i) for i in predicts_a]
predictions = [predicts_b, predicts_a]
if self.reconstructor.mode == "conceptual":
try:
return krippendorff.alpha(predictions, level_of_measurement="nominal")
except TypeError:
print(len(predicts_a))
print(len(predicts_b))
print(predictions)
input()
raise Exception()
elif self.reconstructor.mode:
return krippendorff.alpha(predictions, level_of_measurement="ration") return krippendorff.alpha(predictions, level_of_measurement="ration")
# #
# def calc_reliability(self, # def calc_reliability(self,
...@@ -600,3 +395,5 @@ class Deconstructor: ...@@ -600,3 +395,5 @@ class Deconstructor:
# level_of_measurement="ratio") # level_of_measurement="ratio")
# else: # else:
# raise ValueError() # raise ValueError()
from random import sample from random import sample
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass
from functools import partial from functools import partial
from typing import Union, List, Tuple, Generator, Dict from multiprocessing import Pool, cpu_count
from dataclasses import dataclass
from typing import Union, List, Tuple, Dict
from krippendorff import alpha from krippendorff import alpha
from cml.shared.settings import ReconstructionSettings
from cml.domain.data_source import DataSource from cml.domain.data_source import DataSource
from cml.shared.settings import ReconstructionSettings
from cml.shared.errors import ( from cml.shared.errors import (
NoModelReconstructedError, NoModelReconstructedError,
NotEnoughFeaturesWarning NotEnoughFeaturesWarning)
)
__all__ = ( __all__ = (
"Reconstructor", "Reconstructor",
...@@ -42,12 +41,12 @@ class Metadata: ...@@ -42,12 +41,12 @@ class Metadata:
f"T min: <{self.t_min}> \n" \ f"T min: <{self.t_min}> \n" \
f"T max: <{self.t_max}> \n" \ f"T max: <{self.t_max}> \n" \
f"Subjects: <{self.sigma}> \n" \ f"Subjects: <{self.sigma}> \n" \
f"Puposes: <{self.zeta}> \n" f"Purposes: <{self.zeta}> \n"
def __hash__(self): def __hash__(self):
return hash(".".join([self.knowledge_domain, return hash(".".join([self.knowledge_domain,
str(self.knowledge_tier), str(self.knowledge_tier),
str(self.identifier)])) str(self.identifier)]))
def __eq__(self, other): def __eq__(self, other):
if not isinstance(other, Metadata): if not isinstance(other, Metadata):
...@@ -61,6 +60,8 @@ class PragmaticMachineLearningModel: ...@@ -61,6 +60,8 @@ class PragmaticMachineLearningModel:
meta: Metadata, meta: Metadata,
model, model,
learnblock): learnblock):
if meta.pre_image == 400:
raise Exception
self.meta = meta self.meta = meta
self.model = model self.model = model
self.learnblock = learnblock if self.tier > 1 else None self.learnblock = learnblock if self.tier > 1 else None
...@@ -123,25 +124,46 @@ class PragmaticMachineLearningModel: ...@@ -123,25 +124,46 @@ class PragmaticMachineLearningModel:
str(self.meta.knowledge_tier), str(self.meta.knowledge_tier),
str(self.meta.identifier)]) str(self.meta.identifier)])
@property
def identifier(self):
return self.meta.identifier
@identifier.setter
def identifier(self, value):
self.meta.identifier = value
@property @property
def counter(self): def counter(self):
return self.meta.identifier return self.meta.identifier
def fusion(self, def fusion(self,
model: 'PragmaticMachineLearningModel', model: 'PragmaticMachineLearningModel',
knowledge_tier: int,
new_identifier: int) -> Metadata: new_identifier: int) -> Metadata:
return Metadata(self.meta.knowledge_domain, return Metadata(self.meta.knowledge_domain,
self.meta.knowledge_tier, knowledge_tier,
new_identifier, new_identifier,
self.pre_image + model.pre_image, None,
list(set(self.pre_image_features).intersection( None,
set(model.pre_image_features))), None,
self.pre_image_labels + model.pre_image_labels,
min(self.meta.t_min, model.min_timestamp), min(self.meta.t_min, model.min_timestamp),
max(self.meta.t_max, model.max_timestamp), max(self.meta.t_max, model.max_timestamp),
self.subject + model.subject, tuple(set(self.subject + model.subject)),
self.meta.zeta + model.subject) tuple(set(self.meta.zeta + model.purpose)))
# return Metadata(self.meta.knowledge_domain,
# self.meta.knowledge_tier,
# new_identifier,
# self.pre_image + model.pre_image,
# list(set(self.pre_image_features).intersection(
# set(model.pre_image_features))),
# self.pre_image_labels + model.pre_image_labels,
# min(self.meta.t_min, model.min_timestamp),
# max(self.meta.t_max, model.max_timestamp),
# tuple(set(self.subject + model.subject)),
# tuple(set(self.meta.zeta + model.purpose)))
#
def trained_with(self, source: DataSource): def trained_with(self, source: DataSource):
if self.origin == "source": if self.origin == "source":
...@@ -162,36 +184,60 @@ def log(func): ...@@ -162,36 +184,60 @@ def log(func):
return wrapper return wrapper
class SubprocessDTO:
def __init__(self, model, learnblock, tier, abv, settings):
self.model = model
self.learnblock = learnblock
self.tier = tier
self.abv = abv
self.settings = settings
class Reconstructor: class Reconstructor:
CONCEPTUAL_KNOWLEDGE_ABBREVIATION = "C" CONCEPTUAL_KNOWLEDGE_ABBREVIATION = "C"
NEXT_MODEL_COUNTER = None PROCEDURAL_KNOWLEDGE_ABBREVIATION = "P"
def __init__(self, def __init__(self,
settings: ReconstructionSettings, settings: ReconstructionSettings,
ml_models: List, ml_models: List,
knowlege_domain: str): mode: str):
self.mode = mode
self.settings = settings self.settings = settings
self.ml_models = ml_models self.ml_models = ml_models
self.knowledge_domain = knowlege_domain self.__reconstruction = self._init_reconstruction(mode)
self.next_model_counter = None
self.logger = None self.logger = None
self._category = None
self.free_id = None
self.__reconstruction = None
@property def _init_reconstruction(self, mode: str):
def category(self) -> str: return partial(self._reconstruct_conceptual, krippen="nominal") \
return self._category if mode == "conceptual" else \
partial(self._reconstruct_procedural, krippen="ratio")
@category.setter
def category(self, value: str) -> None: def reconstruct_parallel(self, tier, learnblock, models=None, meta=None):
if value == "conceptual": self.logger.protocol("{:^100}".format("Starting Reconstruction"))
self.__reconstruction = partial(self._reconstruct_conceptual, if not models:
krippen="nominal") args = []
elif value == "procedural": for model in self.ml_models:
self.__reconstruction = partial(self._reconstruct_procedural, args.append(
krippen="ratio") SubprocessDTO(model, learnblock, tier,
else: raise ValueError() self.CONCEPTUAL_KNOWLEDGE_ABBREVIATION,
self._category = value self.settings)
)
pool = Pool(cpu_count()-2)
self.logger.protocol("{:^100}".format("Mapped"))
reliability_model = pool.map(reconstruct_in_parallel, args)
pool.close()
pool.join()
self.logger.protocol("{:^100}".format("Result erhalten"))
reliabilities_to_model = defaultdict(list)
for r, m in reliability_model:
if r and m:
reliabilities_to_model[r].append(m)
if reliabilities_to_model.keys():
winner = determine_winner(reliabilities_to_model)
winner.identifier = self.next_model_counter(tier)
return winner
raise NoModelReconstructedError()
@log @log
def reconstruct(self, def reconstruct(self,
...@@ -231,20 +277,27 @@ class Reconstructor: ...@@ -231,20 +277,27 @@ class Reconstructor:
if model.subject not in which_models: continue if model.subject not in which_models: continue
# train model # train model
train_block, eval_block = self.split(learnblock) train_block, eval_block = split(learnblock, self.settings.reliability_sample)
trained_model = model.train( try:
train_block.as_numpy_array(), trained_model = model.train(
[i for i in train_block.get_column_values("Z")]) train_block.as_numpy_array(),
[i for i in train_block.get_column_values("Z")])
except TypeError as error:
print(train_block.purpose)
print(error.with_traceback())
input()
################################################################### ###################################################################
self.logger.protocol("\n{:<20}".format(model.subject)) self.logger.protocol("\n{:<20}".format(model.subject))
self.logger.protocol("{:<20}: {}".format("Accuracy", model.accuracy)) self.logger.protocol("{:<20}: {}".format("Accuracy", model.accuracy))
################################################################### ###################################################################
# check constraints # check constraints
if self._valid_reconstructed(trained_model, "conceptual"): if valid_reconstructed(trained_model,
reliability = self.calc_reliability(trained_model, learnblock, "conceptual",
krippen) self.settings.min_test_accuracy,
self.settings.max_test_error_avg,
self.settings.max_test_error_max):
reliability = calc_reliability(trained_model, learnblock, krippen)
################################################################### ###################################################################
self.logger.protocol("{:<20}: {}".format("Reliability", str(reliability))) self.logger.protocol("{:<20}: {}".format("Reliability", str(reliability)))
################################################################### ###################################################################
...@@ -255,7 +308,7 @@ class Reconstructor: ...@@ -255,7 +308,7 @@ class Reconstructor:
learnblock.purpose]) learnblock.purpose])
meta = Metadata(self.CONCEPTUAL_KNOWLEDGE_ABBREVIATION, meta = Metadata(self.CONCEPTUAL_KNOWLEDGE_ABBREVIATION,
tier, tier,
self.NEXT_MODEL_COUNTER(tier), self.next_model_counter(tier),
learnblock.indexes, learnblock.indexes,
learnblock.columns(), learnblock.columns(),
learnblock.get_column_values("Z"), learnblock.get_column_values("Z"),
...@@ -263,12 +316,16 @@ class Reconstructor: ...@@ -263,12 +316,16 @@ class Reconstructor:
learnblock.max_timestamp, learnblock.max_timestamp,
(model.subject, ), (model.subject, ),
(uid, )) (uid, ))
else:
meta.pre_image_features = learnblock.columns()
meta.pre_image_labels = learnblock.get_column_values("Z")
meta.pre_image = learnblock.indexes
reliability_to_model[reliability].append( reliability_to_model[reliability].append(
PragmaticMachineLearningModel( PragmaticMachineLearningModel(
meta, trained_model, learnblock meta, trained_model, learnblock
) )
) )
return reliability_to_model return reliability_to_model
def _reconstruct_procedural(self, def _reconstruct_procedural(self,
...@@ -276,7 +333,7 @@ class Reconstructor: ...@@ -276,7 +333,7 @@ class Reconstructor:
learnblock, learnblock,
which_models: List, which_models: List,
krippen: str = None, krippen: str = None,
meta: Metadata= None) -> Dict[float, List]: meta: Metadata = None) -> Dict[float, List]:
reliability_to_model = defaultdict(list) reliability_to_model = defaultdict(list)
for model in self.ml_models: for model in self.ml_models:
if model.subject not in which_models: continue if model.subject not in which_models: continue
...@@ -288,7 +345,7 @@ class Reconstructor: ...@@ -288,7 +345,7 @@ class Reconstructor:
[i for i in train_block.get_column_values("Z")]) [i for i in train_block.get_column_values("Z")])
# check contraints # check contraints
if self._valid_reconstructed(trained_model, "procedural"): if valid_reconstructed(trained_model, "procedural"):
reliability = self.calc_reliability( reliability = self.calc_reliability(
trained_model, learnblock, krippen) trained_model, learnblock, krippen)
if reliability >= self.settings.min_reliability: if reliability >= self.settings.min_reliability:
...@@ -296,7 +353,7 @@ class Reconstructor: ...@@ -296,7 +353,7 @@ class Reconstructor:
prag_meta_data = Metadata( prag_meta_data = Metadata(
"P", "P",
tier, tier,
next(self.free_id), next(self.next_model_counter),
learnblock.indexes, learnblock.indexes,
learnblock.columns(), learnblock.columns(),
learnblock.get_column_values("Z"), learnblock.get_column_values("Z"),
...@@ -306,39 +363,74 @@ class Reconstructor: ...@@ -306,39 +363,74 @@ class Reconstructor:
(".".join(["C", '1', learnblock.purpose]), ) (".".join(["C", '1', learnblock.purpose]), )
) )
else: else:
prag_meta_data = meta meta.pre_image_features = learnblock.columns()
meta.pre_image_labels = learnblock.get_column_values("Z")
meta.pre_image = learnblock.indexes
reliability_to_model[reliability].append( reliability_to_model[reliability].append(
PragmaticMachineLearningModel(prag_meta_data, PragmaticMachineLearningModel(prag_meta_data,
trained_model, trained_model,
learnblock)) learnblock))
return reliability_to_model return reliability_to_model
def split(self, learnblock) -> Tuple:
indices = learnblock.indexes def split(learnblock, reliability_sample) -> Tuple:
eval_size = int(learnblock.length * self.settings.reliability_sample) indices = learnblock.indexes
eval_idx = sample(indices, eval_size) eval_size = int(learnblock.length * reliability_sample)
train_idx = list(set(indices).difference(set(eval_idx))) eval_idx = sample(indices, eval_size)
return learnblock.new_block_from_rows_index(train_idx), \ train_idx = list(set(indices).difference(set(eval_idx)))
learnblock.new_block_from_rows_index(eval_idx) return learnblock.new_block_from_rows_index(train_idx), \
learnblock.new_block_from_rows_index(eval_idx)
def calc_reliability(self,
trained_model,
eval_block, def calc_reliability(trained_model, eval_block, metric: str) -> float:
metric: str) -> float: y_pre = trained_model.predict(eval_block.as_numpy_array())
y_pre = trained_model.predict(eval_block.as_numpy_array()) y_true = [i for i in eval_block.get_column_values("Z")]
y_true = [i for i in eval_block.get_column_values("Z")] reliability_data = [y_pre, y_true]
reliability_data = [y_pre, y_true] return alpha(reliability_data, level_of_measurement=metric)
return alpha(reliability_data, level_of_measurement=metric)
def _valid_reconstructed(self, def valid_reconstructed(model,
model, knowledge_domain: str,
knowledge_domain: str) -> bool: min_test_accuracy,
if knowledge_domain == "conceptual": max_test_error_avg,
return model.accuracy >= self.settings.min_test_accuracy max_test_error_max) -> bool:
else: if knowledge_domain == "conceptual":
return model.mean_error <= self.settings.max_test_error_avg and \ return model.accuracy >= min_test_accuracy
model.max_error <= self.settings.max_test_error_max else:
return model.mean_error <= max_test_error_avg and \
model.max_error <= max_test_error_max
def reconstruct_in_parallel(*args):
dto = args[0]
train_block, eval_block = split(dto.learnblock, dto.settings.reliability_sample)
trained_model = dto.model.train(train_block.as_numpy_array(),
[i for i in train_block.get_column_values("Z")])
if valid_reconstructed(trained_model,
"conceptual",
dto.settings.min_test_accuracy,
dto.settings.max_test_error_avg,
dto.settings.max_test_error_max):
reliability = calc_reliability(trained_model, dto.learnblock, "nominal")
if reliability >= dto.settings.min_reliability:
uid = ".".join([dto.abv,
str(dto.tier),
dto.learnblock.purpose])
meta = Metadata(dto.abv,
dto.tier,
0,
dto.learnblock.indexes,
dto.learnblock.columns(),
dto.learnblock.get_column_values("Z"),
dto.learnblock.min_timestamp,
dto.learnblock.max_timestamp,
(dto.model.subject,),
(uid,))
return reliability, PragmaticMachineLearningModel(
meta, trained_model, dto.learnblock)
return None, None
def determine_winner( def determine_winner(
...@@ -353,3 +445,4 @@ def determine_winner( ...@@ -353,3 +445,4 @@ def determine_winner(
winner = model winner = model
return winner return winner
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment