Skip to content
Snippets Groups Projects
Commit 3d80516c authored by dmt's avatar dmt
Browse files

Refactor reconstruction.

parent 4cd2e12d
No related branches found
No related tags found
No related merge requests found
...@@ -2,10 +2,16 @@ from random import sample ...@@ -2,10 +2,16 @@ from random import sample
from collections import defaultdict from collections import defaultdict
from dataclasses import dataclass from dataclasses import dataclass
from functools import partial from functools import partial
from typing import Union, List, Tuple, Generator, Dict
import krippendorff from krippendorff import alpha
from cml.shared.errors import NoModelReconstructedError from cml.shared.settings import ReconstructionSettings
from cml.domain.data_source import DataSource
from cml.shared.errors import (
NoModelReconstructedError,
NotEnoughFeaturesWarning
)
__all__ = ( __all__ = (
...@@ -19,127 +25,198 @@ class Metadata: ...@@ -19,127 +25,198 @@ class Metadata:
knowledge_tier: int knowledge_tier: int
identifier: int identifier: int
pre_image: list pre_image: list
pre_image_features: list
pre_image_labels: list
t_min: int t_min: int
t_max: int t_max: int
sigma: list sigma: tuple
zeta: list zeta: tuple
def __str__(self): def __str__(self):
return f"Knowledge domain: <{self.knowledge_domain}> " \ return f"Knowledge domain: <{self.knowledge_domain}> \n" \
f"Knowledge tier: <{self.knowledge_tier}> " \ f"Knowledge tier: <{self.knowledge_tier}> \n" \
f"Identifier: <{self.identifier}> " \ f"Identifier: <{self.identifier}> \n" \
f"Pre image: <{self.pre_image}> " \ f"Pre image: <{self.pre_image}> \n" \
f"T min: <{self.t_min}> " \ f"Pre image labels: <{self.pre_image_labels}> \n" \
f"T max: <{self.t_max}> " \ f"Pre image features: <{self.pre_image_features}> \n" \
f"Subjects: <{self.sigma}> " \ f"T min: <{self.t_min}> \n" \
f"Puposes: <{self.zeta}>" f"T max: <{self.t_max}> \n" \
f"Subjects: <{self.sigma}> \n" \
f"Puposes: <{self.zeta}> \n"
def __hash__(self):
return hash("".join([self.knowledge_domain,
str(self.knowledge_tier),
str(self.identifier)]))
class PragmaticMachineLearningModel: class PragmaticMachineLearningModel:
def __init__(self, meta, model, learnblock): def __init__(self,
meta: Metadata,
model,
learnblock):
self.meta = meta self.meta = meta
self.model = model self.model = model
self.domain_size = learnblock.n_features self.domain_size = learnblock.n_features
self.learnblock = learnblock if self.tier > 1 else None
self.domain = learnblock.indexes self.domain = learnblock.indexes
self.origin = learnblock.origin
def __hash__(self): def __str__(self) -> str:
return hash(self.uid) return self.uid
def __repr__(self) -> str:
return self.uid
def __eq__(self, other): def __hash__(self) -> int:
return hash(self.meta)
def __eq__(self,
other: Union['PragmaticMachineLearningModel', str]) -> bool:
if isinstance(other, PragmaticMachineLearningModel): if isinstance(other, PragmaticMachineLearningModel):
return hash(self) == hash(other) return hash(self) == hash(other)
if isinstance(other, str):
return hash(self) == hash(other)
raise NotImplementedError() raise NotImplementedError()
@property @property
def tier(self): def pre_image_features(self) -> List[str]:
return self.meta.pre_image_features
@property
def tier(self) -> int:
return self.meta.knowledge_tier return self.meta.knowledge_tier
@property @property
def min_timestamp(self): def min_timestamp(self) -> int:
return self.meta.t_min return self.meta.t_min
@property @property
def max_timestamp(self): def max_timestamp(self) -> int:
return self.meta.t_max return self.meta.t_max
@property @property
def pre_image(self): def pre_image(self) -> List[int]:
return self.meta.pre_image return self.meta.pre_image
@property @property
def subject(self): def pre_image_labels(self) -> List[Union[int, str]]:
return self.meta.pre_image_labels
@property
def subject(self) -> Tuple[str]:
return self.meta.sigma return self.meta.sigma
@property @property
def purpose(self): def purpose(self) -> Tuple[str]:
return self.meta.zeta return self.meta.zeta
@property @property
def uid(self): def uid(self) -> str:
return ".".join([self.meta.knowledge_domain, return ".".join([self.meta.knowledge_domain,
str(self.meta.knowledge_tier), str(self.meta.knowledge_tier),
str(self.meta.identifier)]) str(self.meta.identifier)])
@property def fusion(self,
def sample_times(self): model: 'PragmaticMachineLearningModel',
pass new_identifier: int) -> Metadata:
return Metadata(self.meta.knowledge_domain,
self.meta.knowledge_tier,
new_identifier,
self.pre_image + model.pre_image,
list(set(self.pre_image_features).intersection(
set(model.pre_image_features))),
self.pre_image_labels + model.pre_image_labels,
min(self.meta.t_min, model.min_timestamp),
max(self.meta.t_max, model.max_timestamp),
self.subject + model.subject,
self.meta.zeta + model.subject)
def trained_with(self, source: DataSource):
if self.origin == "source":
block = source.get_block(self.pre_image,
columns=self.pre_image_features)
block = block.set_labels(self.pre_image_labels)
return block
def fusion(self, prag_model): else:
pass return self.learnblock
class Reconstructor: class Reconstructor:
def __init__(self, settings, ml_models, knowlege_domain): def __init__(self,
self.logger = None settings: ReconstructionSettings,
ml_models: List,
knowlege_domain: str):
self.settings = settings self.settings = settings
self.ml_models = ml_models self.ml_models = ml_models
self.knowledge_domain = knowlege_domain self.knowledge_domain = knowlege_domain
self.logger = None
self._category = None self._category = None
self._free_id = None self._free_id = None
self.__reconstruction = None self.__reconstruction = None
def reconstruct(self, learnblock, which_models=None, meta=None):
if not which_models:
which_models = [m.abbreviation for m in self.ml_models]
reliabilities_to_model = self.__reconstruction(learnblock,
which_models,
meta)
if reliabilities_to_model.keys():
return determine_winner(reliabilities_to_model)
raise NoModelReconstructedError()
@property @property
def category(self): def category(self) -> str:
return self._category return self._category
@category.setter @category.setter
def category(self, value): def category(self, value: str) -> None:
if value == "conceptual": if value == "conceptual":
self.__reconstruction = partial(self._reconstruct_conceptual, self.__reconstruction = partial(self._reconstruct_conceptual,
krippen="nominal") krippen="nominal")
elif value == "procedural": elif value == "procedural":
self.__reconstruction = partial(self._reconstruct_procedural, self.__reconstruction = partial(self._reconstruct_procedural,
krippen="ratio") krippen="ratio")
else: else: raise ValueError()
raise ValueError() self._category = value
@property @property
def free_id(self): def free_id(self) -> Generator[int, None, None]:
return self._free_id return self._free_id
@free_id.setter @free_id.setter
def free_id(self, value): def free_id(self, value: Generator[int, None, None]) -> None:
self._free_id = iter(value) self._free_id = iter(value)
def reconstruct(self,
tier: int,
learnblock,
which_models: List = None,
meta: Metadata = None) -> PragmaticMachineLearningModel:
# Check if learnblock has enough features
if not (learnblock.learn_rows > 0):
raise NotEnoughFeaturesWarning()
# Specify the models which should be trained
if not which_models:
which_models = [m.subject for m in self.ml_models]
# Start the reconstruction
reliabilities_to_model = self.__reconstruction(tier,
learnblock,
which_models,
meta=meta)
# Determine the best pragmatic machine learning model
if reliabilities_to_model.keys():
return determine_winner(reliabilities_to_model)
raise NoModelReconstructedError()
def _reconstruct_conceptual(self, def _reconstruct_conceptual(self,
tier: int,
learnblock, learnblock,
which_models, which_models: List,
krippen=None, krippen: str = None,
meta=None): meta: Metadata = None):
reliability_to_model = defaultdict(list) reliability_to_model = defaultdict(list)
for model in self.ml_models: for model in self.ml_models:
if model.abbreviation not in which_models: continue if model.subject not in which_models: continue
# train model # train model
train_block, eval_block = self.split(learnblock) train_block, eval_block = self.split(learnblock)
...@@ -149,21 +226,24 @@ class Reconstructor: ...@@ -149,21 +226,24 @@ class Reconstructor:
# check constraints # check constraints
if self._valid_reconstructed(trained_model, "conceptual"): if self._valid_reconstructed(trained_model, "conceptual"):
reliability = self.calc_reliability(trained_model, reliability = self.calc_reliability(
learnblock, trained_model, learnblock, krippen)
krippen)
if reliability >= self.settings.min_reliability: if reliability >= self.settings.min_reliability:
# TODO (dmt): Fix the knowledge tier after first iteration! if not meta:
prag_meta_data = Metadata( prag_meta_data = Metadata(
"C", "C",
1, tier,
next(self.free_id), next(self.free_id),
learnblock.indexes, learnblock.indexes,
learnblock.min_timestamp, learnblock.columns,
learnblock.max_timestamp, learnblock.get_column_values("Z"),
[model.subject], learnblock.min_timestamp,
[".".join(["C", '1', learnblock.purpose])] learnblock.max_timestamp,
) (model.subject, ),
(".".join(["C", '1', learnblock.purpose]), )
)
else:
prag_meta_data = meta
reliability_to_model[reliability].append( reliability_to_model[reliability].append(
PragmaticMachineLearningModel(prag_meta_data, PragmaticMachineLearningModel(prag_meta_data,
...@@ -171,9 +251,15 @@ class Reconstructor: ...@@ -171,9 +251,15 @@ class Reconstructor:
learnblock)) learnblock))
return reliability_to_model return reliability_to_model
def _reconstruct_procedural(self, learnblock, krippen=None, meta=None): def _reconstruct_procedural(self,
tier: int,
learnblock,
which_models: List,
krippen: str = None,
meta: Metadata= None) -> Dict[float, List]:
reliability_to_model = defaultdict(list) reliability_to_model = defaultdict(list)
for model in self.ml_models: for model in self.ml_models:
if model.subject not in which_models: continue
# train model # train model
train_block, eval_block = self.split(learnblock) train_block, eval_block = self.split(learnblock)
...@@ -183,17 +269,32 @@ class Reconstructor: ...@@ -183,17 +269,32 @@ class Reconstructor:
# check contraints # check contraints
if self._valid_reconstructed(trained_model, "procedural"): if self._valid_reconstructed(trained_model, "procedural"):
reliability = self.calc_reliability(trained_model, reliability = self.calc_reliability(
learnblock, trained_model, learnblock, krippen)
krippen)
if reliability >= self.settings.min_reliability: if reliability >= self.settings.min_reliability:
if not meta:
prag_meta_data = Metadata(
"P",
tier,
next(self.free_id),
learnblock.indexes,
learnblock.columns,
learnblock.get_column_values("Z"),
learnblock.min_timestamp,
learnblock.max_timestamp,
(model.subject, ),
(".".join(["C", '1', learnblock.purpose]), )
)
else:
prag_meta_data = meta
reliability_to_model[reliability].append( reliability_to_model[reliability].append(
PragmaticMachineLearningModel(trained_model, PragmaticMachineLearningModel(prag_meta_data,
trained_model,
learnblock)) learnblock))
return reliability_to_model return reliability_to_model
def split(self, learnblock): def split(self, learnblock) -> Tuple:
indices = learnblock.indexes indices = learnblock.indexes
eval_size = int(learnblock.length * self.settings.reliability_sample) eval_size = int(learnblock.length * self.settings.reliability_sample)
eval_idx = sample(indices, eval_size) eval_idx = sample(indices, eval_size)
...@@ -201,14 +302,18 @@ class Reconstructor: ...@@ -201,14 +302,18 @@ class Reconstructor:
return learnblock.new_block_from_rows_index(train_idx), \ return learnblock.new_block_from_rows_index(train_idx), \
learnblock.new_block_from_rows_index(eval_idx) learnblock.new_block_from_rows_index(eval_idx)
def calc_reliability(self, trained_model, eval_block, metric): def calc_reliability(self,
trained_model,
eval_block,
metric: str) -> float:
y_pre = trained_model.predict(eval_block.as_numpy_array()) y_pre = trained_model.predict(eval_block.as_numpy_array())
y_true = [i for i in eval_block.get_column_values("Z")] y_true = [i for i in eval_block.get_column_values("Z")]
reliability_data = [y_pre, y_true] reliability_data = [y_pre, y_true]
return krippendorff.alpha(reliability_data, return alpha(reliability_data, level_of_measurement=metric)
level_of_measurement=metric)
def _valid_reconstructed(self, model, knowledge_domain): def _valid_reconstructed(self,
model,
knowledge_domain: str) -> bool:
if knowledge_domain == "conceptual": if knowledge_domain == "conceptual":
return model.accuracy >= self.settings.min_test_accuracy return model.accuracy >= self.settings.min_test_accuracy
else: else:
...@@ -216,10 +321,10 @@ class Reconstructor: ...@@ -216,10 +321,10 @@ class Reconstructor:
model.max_error <= self.settings.max_test_error_max model.max_error <= self.settings.max_test_error_max
def determine_winner(reliability_to_model): def determine_winner(
reliability_to_model: dict) -> PragmaticMachineLearningModel:
sorted_reliabilities = sorted(reliability_to_model.keys(), reverse=True) sorted_reliabilities = sorted(reliability_to_model.keys(), reverse=True)
biggest_reliabilities = reliability_to_model[sorted_reliabilities.pop()] biggest_reliabilities = reliability_to_model[sorted_reliabilities.pop()]
winner = None winner = None
min_domain = float("inf") min_domain = float("inf")
for model in biggest_reliabilities: for model in biggest_reliabilities:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment