from typing import List from abc import abstractmethod, ABC import pandas as pd class Adapter(ABC): @abstractmethod def read_csv(self): pass @abstractmethod def sort(self, *args, **kwargs): pass @abstractmethod def set_column_value(self, *args, **kwargs): pass @abstractmethod def set_column_values(self, *args, **kwargs): pass @abstractmethod def get_column_name_by_index(self, *args, **kwargs): pass @abstractmethod def get_column_index_by_index(self, *args, **kwargs): pass @abstractmethod def get_columns(self, *args, **kwargs): pass @abstractmethod def drop_colunn_by_name(self, *args, **kwargs): pass @abstractmethod def get_column_values_as_list(self, *args, **kwargs): pass @abstractmethod def get_block(self, *args, **kwargs): pass # TODO (dmt): Provide common base class or pandas operations. class PandasBlock: _LAST_THREE_COLUMNS = 3 def __init__(self, data_block, relatives=None): self.__data_block = data_block self.relatives = relatives self.n_cluster = None def __str__(self): return str(self.__data_block) def __repr__(self): return str( self.__data_block[ self.__data_block.columns[:self.rows-self._LAST_THREE_COLUMNS]]) def __getitem__(self, item): return self.__data_block.iloc[item][:self.rows-self._LAST_THREE_COLUMNS] def __len__(self): return self.__data_block.shape[0] @property def labeled(self): return not self.__data_block.Z.nunique() == 1 @property def columns(self): return list(self.__data_block.columns) @property def n_features(self): return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS def as_numpy_array(self): return self.__data_block[ self.__data_block.columns[ :self.rows - self._LAST_THREE_COLUMNS]].values def set_labels(self, labels): data_frame = self.__data_block.copy() data_frame["Z"] = labels return PandasBlock(data_frame, self.relatives) @property def min_timestamp(self): return min(self.__data_block["T"]) @property def max_timestamp(self): return max(self.__data_block["T"]) @property def learn_rows(self): return self.__data_block.shape[1] - 3 @property def rows(self): return self.__data_block.shape[1] def new_block_from_rows_index(self, indices: List[int]): data_from = self.__data_block.loc[indices] return PandasBlock(data_from) def new_block_from(self, column_values): data_from = self.__data_block.loc[self.__data_block["T"].isin( column_values)] return PandasBlock(data_from) def get_duplicated_pairs(self, *args): bool_series = self.__data_block.duplicated(subset=[args[0], args[1]]) duplicates = self.__data_block[bool_series] for i, j in zip(duplicates[args[0]], duplicates[args[1]]): yield i, j @property def indexes(self): return tuple(self.__data_block.index) def get_values(self, **kwargs): t, z, sigma = kwargs.get("T"), kwargs.get("Z"), kwargs.get("Sigma") if t is not None and z is not None: data_frame = self.__data_block.loc[ (self.__data_block["T"] == t) & (self.__data_block["Z"] == z)] elif t is not None and sigma is not None: data_frame = self.__data_block.loc[ (self.__data_block["T"] == t) & ( self.__data_block["Sigma"] == sigma)] elif z is not None and sigma is not None: data_frame = self.__data_block.loc[ (self.__data_block["Z"] == z) & ( self.__data_block["Sigma"] == sigma)] else: # TODO (dmt): Write proper error handling. raise Exception() return PandasBlock(data_frame) @property def length(self): return self.__data_block.shape[0] def drop_row(self, index): self.__data_block.drop(index, inplace=True) def get_column_values(self, column_name): return self.__data_block[column_name] def get_column_name_by_index(self, index): column_names = self.__data_block.column return column_names[index] @property def feature_count(self): return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS def drop_columns_by_index(self, index): if isinstance(index, set): remove_columns = [self.__data_block.columns[i] for i in index] else: remove_columns = index self.__data_block.drop(remove_columns, axis=1, inplace=True) class PandasAdapter: def __init__(self, data_frame): self.__data_frame = data_frame @classmethod def read_csv_data(cls, path): data_frame = pd.read_csv(path, index_col=False) return PandasAdapter(data_frame) @property def length(self): return self.__data_frame.shape[0] def get_block_via_index(self, indexes): return PandasBlock(self.__data_frame.iloc[indexes]) def get_block(self, start, end=None, step=None): return PandasBlock(self.__data_frame[start:end:step]) def get_column_values(self, column_name): return self.__data_frame[column_name] def get_column_values_as_list(self, column_name): return self.__data_frame[column_name].tolist() def get_columns(self): return list(self.__data_frame.columns) def drop_column_by_index(self, index): column = self.get_column_name_by_index(index) self.__data_frame.drop(columns=[column], inplace=True) def drop_column_by_name(self, name): self.__data_frame.drop(columns=[name], inplace=True) def get_column_index_by_name(self, name): return self.__data_frame.columns.get_loc(name) def get_column_name_by_index(self, index): column_names = self.__data_frame.columns return column_names[index] def set_column_value(self, column_name, value): self.__data_frame[column_name] = value def sort(self, column_name, ascending=True): self.__data_frame.sort_values(by=[column_name], ascending=ascending, inplace=True) def set_column_values(self, column, values): self.__data_frame[column] = values