Skip to content
Snippets Groups Projects
Commit 56fda637 authored by dmt's avatar dmt
Browse files

Implement parts of deconstruction process.

parent 68391a29
No related branches found
No related tags found
No related merge requests found
from abc import ABC
from functools import partial
from collections import defaultdict
from itertools import count
from cml.shared.parameter import HIGHEST_TIER
from cml.shared.errors import DeconstructionFailed, NoModelReconstructedError
__all__ = (
__all__ = (
"Deconstructor",
"KnowledgeDatabase",
"RelativeFinder",
"ConceptualKnowledgeDatabase",
"ProceduralKnowledgeDatabase",
"RelativeFinder",
"KnowledgeDomain"
) )
def notify_inverted_index(func):
def wrapper(self, *args, **kwargs):
for obs in self.observer:
getattr(obs, func.__name__)(*args, **kwargs)
return func(*args, **kwargs)
return wrapper
class KnowledgeDatabase(ABC):
def __init__(self):
self.database = [KnowledgeDomain(i) for i in range(8)]
self.observer = []
super().__init__()
def generate_free_ids(self):
for i in count(1):
yield i
def deserialize(self): pass
def serialize(self): pass
@notify_inverted_index
def extend(self): pass
@notify_inverted_index
def insert(self, model):
self.database[model.knowledge_domain].insert(model)
@notify_inverted_index
def remove(self): pass
@notify_inverted_index
def replace(self): pass
class ConceptualKnowledgeDatabase(KnowledgeDatabase):
def __init__(self):
super().__init__()
class ProceduralKnowledgeDatabase(KnowledgeDatabase):
def __init__(self):
super().__init__()
class KnowledgeDomain:
def __init__(self, tier):
self.tier = tier
self.knowledge = {}
def insert(self, model):
self.knowledge[model] = model
class RelativeFinder:
def __init__(self):
self.index_t = defaultdict(list)
self.index_z = defaultdict(list)
self.index_sigma = defaultdict(list)
def find_relatives(self):
# TODO (dmt): If something is found return it, if nohtings is found
# then return None and then raise StopIteration!
pass
def remove(self): pass
def replace(self): pass
def insert(self, model): pass
def extend(self): pass
class Deconstructor:
TIME_COLUMN = "T"
SUBJECT_COLUMN = "Sigma"
PUPORSE_COLUMN = "Z"
def __init__(self, knowledge_database, relative_finder, settings):
self.knowledge_database = knowledge_database
self.relative_finder = relative_finder
self.settings = settings
self.source = None
self.reconstructor = None
def _strategies(self, block):
yield (("T", "Z"), partial(self.deconstruct_time_zeta, block=block))
yield (("T", "Sigma"), partial(self.deconstruct_time_sigma, block=block))
yield (("Sigma", "Z"), self.deconstruct_sigma_zeta)
yield (("complete", ), self.deconstruct_complete)
def deconstruct(self, prag_model, learnblock):
for pair, strategy in self._strategies(learnblock):
for relative in self.relative_finder(pair):
try:
strategy(prag_model, relative)
# successfull deconstruction
if self.settings.deconst_mode == "minimal": return
except DeconstructionFailed:
# unsuccessfull deconstruction
continue
def deconstruct_time_sigma(self, prag_model, relative_model, block):
self.knowledge_database.insert(prag_model)
if relative_model and prag_model.tier < HIGHEST_TIER:
first_block = self.source.get_block(prag_model.pre_image)
second_block = self.source.get_block(relative_model.pre_image)
times_p = set(first_block.get_column_values(self.TIME_COLUMN))
times_r = set(second_block.get_column_values(self.TIME_COLUMN))
if len(times_p.union(times_r)) >= self.settings.min_learnblock:
alpha = self.calc_reliability(relative_model, prag_model, block)
alpha_systematic = alpha < 0
alpha_weak_reliability = 0 <= alpha < self.settings.min_reliability
if (self.settings.allow_weak_reliability and
alpha_weak_reliability) or alpha_systematic:
new_model = prag_model.fusion(relative_model)
self.knowledge_database.insert(new_model)
# Create a new learnblock from times_p.union(times_r)
# Starte eine neure Folge von Construction, Reconstruction
# Deconstruction from this one!
else:
self.knowledge_database.insert(prag_model)
def deconstruct_time_zeta(self, prag_model, relative_model, block):
self.knowledge_database.insert(prag_model)
first_block = self.source.get_block(prag_model.pre_image)
second_block = self.source.get_block(relative_model.pre_image)
times_p = set(first_block.get_column_values(self.TIME_COLUMN))
times_r = set(second_block.get_column_values(self.TIME_COLUMN))
if len(times_p.union(times_r)) >= self.settings.min_learnblock:
new_model = prag_model.fusion(relative_model)
alpha = self.calc_reliability(relative_model, prag_model, block)
if alpha >= self.settings.min_reliability:
self.knowledge_database.replace(relative_model, new_model)
def deconstruct_sigma_zeta(self, prag_model, relative_model):
if relative_model and self.time_constraint(prag_model,
relative_model,
"SigmaZ"):
first_block = self.source.get_block(prag_model.pre_image)
second_block = self.source.get_block(relative_model.pre_image)
overlapping_block = first_block.overlapping_rows(second_block)
if overlapping_block.rows >= 2:
new_model = prag_model.fusion(relative_model)
try:
new_block = first_block.fusion(second_block)
which_ml_models = new_model.subject
recon_m = self.reconstructor.reconstruct(new_block,
which_ml_models,
new_model)
self.knowledge_database.replace(relative_model, recon_m)
except NoModelReconstructedError:
if self.settings.deconst_mode == "conservative":
self.knowledge_database.remove(prag_model)
elif self.settings.deconst_mode == "integrative":
self.knowledge_database.insert(prag_model)
elif self.settings.deconst_mode == "oppurtunistic":
if first_block.rows > second_block.rows:
self.knowledge_database.remove(relative_model)
self.knowledge_database.insert(prag_model)
else:
self.knowledge_database.remove(prag_model)
self.knowledge_database.insert(relative_model)
else:
self.knowledge_database.insert(prag_model)
def time_constraint(self, prag_model, relative_model, _type):
if _type == "SigmaZ":
if self.settings.deconst_max_distance_t == 0:
return self._overlapping_time(prag_model, relative_model)
elif 0 < self.settings.deconst_max_distance_t < 1:
return self._enclosing_time(prag_model, relative_model)
elif self.settings.deconst_max_distance_t == 1:
return True
elif _type == "complete":
return prag_model.min_timestamp >= relative_model.max_timestamp \
>= prag_model.max_timestamp \
or prag_model.min_timestamp <= relative_model.min_timestamp \
and prag_model.max_timestamp >= relative_model.max_timestamp
def _overlapping_time(self, prag_model, relative_model):
return relative_model.min_timestamp >= prag_model.max_timestamp or \
prag_model.min_timestamp >= relative_model.max_timestamp
def _enclosing_time(self, prag_model, relative_model):
m_dash_max_time = max(prag_model.max_timestamp,
relative_model.max_timestamp)
m_dash_min_time = min(prag_model.min_timestamp,
relative_model.min_timestamp)
relative_m_dash_time = self.settings.deconst_max_distance_t*(
m_dash_max_time-m_dash_min_time)
return (relative_model.min_timestamp - prag_model.max_timestamp) < \
relative_m_dash_time or (prag_model.min_timestamp -
relative_model.max_timestamp) < relative_m_dash_time
def deconstruct_complete(self, prag_model, relative_model):
if relative_model:
if self.time_constraint(prag_model, relative_model, "complete") and \
self._feature_intersection(prag_model, relative_model) >= 2:
new_model = prag_model.fusion(relative_model)
else:
new_block = self.source.get_block(
prag_model.pre_image + relative_model.pre_image)
ts_relatives = self.source.time_simga_relatives(new_block)
which_ml_models = prag_model.subject + relative_model.subject
new_model = self.reconstructor.reconstruct(ts_relatives,
which_ml_models)
new_block = self.source.get_block(new_model.pre_image)
which_ml_models = new_block.subject
try:
recon_model = self.reconstructor.reconstruct(new_block,
which_ml_models,
new_model)
self.knowledge_database.remove(relative_model)
self.knowledge_database.insert(recon_model)
except NoModelReconstructedError:
# TODO (dmt): Implement the model differentiation!
pass
else:
self.knowledge_database.insert(prag_model)
def _feature_intersection(self, prag_model, relative_model):
first_block = self.source.get_block(prag_model.pre_image)
second_block = self.source.get_block(relative_model.pre_image)
return len(
set(first_block.columns).intersection(set(second_block.columns))
)
def calc_reliability(self, model_a, model_b, block):
return 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment