from typing import List from abc import abstractmethod, ABC import pandas as pd class Adapter(ABC): @abstractmethod def read_csv(self): pass @abstractmethod def sort(self, *args, **kwargs): pass @abstractmethod def set_column_value(self, *args, **kwargs): pass @abstractmethod def set_column_values(self, *args, **kwargs): pass @abstractmethod def get_column_name_by_index(self, *args, **kwargs): pass @abstractmethod def get_column_index_by_index(self, *args, **kwargs): pass @abstractmethod def get_columns(self, *args, **kwargs): pass @abstractmethod def drop_colunn_by_name(self, *args, **kwargs): pass @abstractmethod def get_column_values_as_list(self, *args, **kwargs): pass @abstractmethod def get_block(self, *args, **kwargs): pass # TODO (dmt): Provide common base class or pandas operations. class PandasBlock: _LAST_THREE_COLUMNS = 3 def __init__(self, data_block, relatives=None, purpose=None, origin=None): self.__data_block = data_block self.relatives = relatives self.purpose = purpose self.n_cluster = None self.origin = origin def __str__(self): return str(self.__data_block) def __repr__(self): return str( self.__data_block[ self.__data_block.columns[:self.rows-self._LAST_THREE_COLUMNS]]) def __getitem__(self, item): return self.__data_block.iloc[ item][:len(self.columns())-self._LAST_THREE_COLUMNS] def __len__(self): return self.__data_block.shape[0] @property def data_block(self): return self.__data_block def has_nan(self): return self.__data_block.isna().any()[0] @property def labeled(self): return not self.__data_block.Z.nunique() == 1 def columns(self): return list(self.__data_block.columns) @property def n_features(self): return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS def as_numpy_array(self): return self.__data_block[ self.__data_block.columns[ :self.cols-self._LAST_THREE_COLUMNS]].values def set_labels(self, labels): data_frame = self.__data_block.copy() data_frame["Z"] = labels return PandasBlock(data_frame, self.relatives, self.purpose, self.origin) def overlapping_rows(self, block, subset=None): big_df = pd.concat([self.__data_block, block.__data_block], sort=False) overlapping_data_frame = big_df[big_df.duplicated( subset=subset, keep=False)].drop_duplicates(keep="first") return PandasBlock(overlapping_data_frame, purpose=self.purpose, origin=self.origin) def same_features_fusion(self, block): df = pd.concat([self.__data_block, block.__data_block], sort=False, join="inner") return PandasBlock(df, purpose=self.purpose, origin=self.origin) @property def min_timestamp(self): return min(self.__data_block["T"]) @property def max_timestamp(self): return max(self.__data_block["T"]) @property def learn_cols(self): return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS @property def rows(self): return self.__data_block.shape[0] @property def cols(self): return self.__data_block.shape[1] def new_block_from_rows_index(self, indices: List[int]): data_from = self.__data_block.loc[indices] return PandasBlock(data_from, purpose=self.purpose, origin=self.origin) def new_block_from(self, column_values): data_from = self.__data_block.loc[self.__data_block["T"].isin( column_values)] return PandasBlock(data_from, purpose=self.purpose, origin=self.origin) def get_duplicated_pairs(self, *args): bool_series = self.__data_block.duplicated(subset=[args[0], args[1]]) duplicates = self.__data_block[bool_series] for i, j in zip(duplicates[args[0]], duplicates[args[1]]): yield i, j @property def indexes(self): return list(self.__data_block.index) def get_values(self, **kwargs): t, z, sigma = kwargs.get("T"), kwargs.get("Z"), kwargs.get("Sigma") if t is not None and z is not None: data_frame = self.__data_block.loc[ (self.__data_block["T"] == t) & (self.__data_block["Z"] == z)] elif t is not None and sigma is not None: data_frame = self.__data_block.loc[ (self.__data_block["T"] == t) & ( self.__data_block["Sigma"] == sigma)] elif z is not None and sigma is not None: data_frame = self.__data_block.loc[ (self.__data_block["Z"] == z) & ( self.__data_block["Sigma"] == sigma)] else: # TODO (dmt): Write proper error handling. raise Exception() return PandasBlock(data_frame, purpose=self.purpose, origin=self.origin) @property def length(self): return self.__data_block.shape[0] def drop_row(self, index): self.__data_block.drop(index, inplace=True) def get_column_values(self, column_name): return list(self.__data_block[column_name]) def get_column_name_by_index(self, index): column_names = self.__data_block.column return column_names[index] def drop_columns_by_index(self, index): if isinstance(index, set): remove_columns = [self.__data_block.columns[i] for i in index] else: remove_columns = index self.__data_block.drop(remove_columns, axis=1, inplace=True) def get_overlapping(self, block, on=None): if len(block) < len(self): biggest = block block = self else: biggest = self block = block join = biggest.data_block.join( block.data_block.set_index("T"), on="T", lsuffix="_other", how="inner") for column in join.columns: if column not in ("T", "Z", "Z_other"): join.drop(column, axis=1, inplace=True) join = join[["Z", "Z_other", "T"]] # TODO (dmt): Make purpose and origin a tuple. print(len(join)) return PandasBlock(join, self.purpose, self.origin) class PandasAdapter: def __init__(self, data_frame): self.__data_frame = data_frame def new_block(self, values, columns, index, origin): data_frame = pd.DataFrame(data=values, columns=columns, index=index) block = PandasBlock(data_frame, origin=origin) return block @classmethod def read_csv(cls, path): data_frame = pd.read_csv(path, index_col=False) return PandasAdapter(data_frame) @property def length(self): return self.__data_frame.shape[0] def get_block_via_index(self, indexes, columns=None): if columns: return PandasBlock(self.__data_frame.iloc[list(indexes)][columns]) return PandasBlock(self.__data_frame.iloc[list(indexes)]) def get_block(self, start, end=None, step=None, columns: List[str] = None): return PandasBlock(self.__data_frame[start:end:step][columns]) def get_column_values(self, column_name): return self.__data_frame[column_name] def get_column_values_as_list(self, column_name): return self.__data_frame[column_name].tolist() def get_columns(self): return list(self.__data_frame.columns) def drop_column_by_index(self, index): column = self.get_column_name_by_index(index) self.__data_frame.drop(columns=[column], inplace=True) def drop_column_by_name(self, name): self.__data_frame.drop(columns=[name], inplace=True) def get_column_index_by_name(self, name): return self.__data_frame.columns.get_loc(name) def get_column_name_by_index(self, index): column_names = self.__data_frame.columns return column_names[index] def set_column_value(self, column_name, value): self.__data_frame[column_name] = value def sort(self, column_name, ascending=True): self.__data_frame.sort_values(by=[column_name], ascending=ascending, inplace=True) def set_column_values(self, column, values): self.__data_frame[column] = values