Skip to content
Snippets Groups Projects
source_adapters.py 5.88 KiB
Newer Older
from abc import abstractmethod, ABC

import pandas as pd


class Adapter(ABC):

    @abstractmethod
    def read_csv(self): pass

    @abstractmethod
    def sort(self, *args, **kwargs): pass

    @abstractmethod
    def set_column_value(self, *args, **kwargs): pass

    @abstractmethod
    def set_column_values(self, *args, **kwargs): pass

    @abstractmethod
    def get_column_name_by_index(self, *args, **kwargs): pass

    @abstractmethod
    def get_column_index_by_index(self, *args, **kwargs): pass

    @abstractmethod
    def get_columns(self, *args, **kwargs): pass

    @abstractmethod
    def drop_colunn_by_name(self, *args, **kwargs): pass

    @abstractmethod
    def get_column_values_as_list(self, *args, **kwargs): pass

    @abstractmethod
    def get_block(self, *args, **kwargs): pass


# TODO (dmt): Provide common base class or pandas operations.
class PandasBlock:
    _LAST_THREE_COLUMNS = 3

    def __init__(self, data_block, relatives=None):
        self.__data_block = data_block
        self.relatives = relatives

    def __str__(self):
        return str(self.__data_block)

    def __repr__(self):
        return str(
            self.__data_block[
                self.__data_block.columns[:self.rows-self._LAST_THREE_COLUMNS]])

    def __getitem__(self, item):
        return self.__data_block.iloc[item][:self.rows-self._LAST_THREE_COLUMNS]

    def __len__(self):
        return self.__data_block.shape[0]

    def as_numpy_array(self):
        return self.__data_block[
            self.__data_block.columns[
                :self.rows - self._LAST_THREE_COLUMNS]].values

    def set_labels(self, labels):
        data_frame = self.__data_block.copy()
        data_frame["Z"] = labels
        return PandasBlock(data_frame, self.relatives)

    @property
    def min_timestamp(self):
        return min(self.__data_block["T"])

    @property
    def max_timestamp(self):
        return max(self.__data_block["T"])

    @property
    def learn_rows(self):
        return self.__data_block.shape[1] - 3

    @property
    def rows(self):
        return self.__data_block.shape[1]

    def new_block_from(self, column_values):
        data_from = self.__data_block.loc[self.__data_block["T"].isin(
            column_values)]

        return PandasBlock(data_from)

    def get_duplicated_pairs(self, *args):
        bool_series = self.__data_block.duplicated(subset=[args[0], args[1]])
        duplicates = self.__data_block[bool_series]
        for i, j in zip(duplicates[args[0]], duplicates[args[1]]):
            yield i, j

    @property
    def indexes(self):
        return tuple(self.__data_block.index)

    def get_values(self, **kwargs):
        t, z, sigma = kwargs.get("T"), kwargs.get("Z"), kwargs.get("Sigma")

        if t is not None and z is not None:
            data_frame = self.__data_block.loc[
                (self.__data_block["T"] == t) & (self.__data_block["Z"] == z)]

        elif t is not None and sigma is not None:
            data_frame = self.__data_block.loc[
                (self.__data_block["T"] == t) & (
                        self.__data_block["Sigma"] == sigma)]

        elif z is not None and sigma is not None:
            data_frame = self.__data_block.loc[
                (self.__data_block["Z"] == z) & (
                        self.__data_block["Sigma"] == sigma)]

        else:
            # TODO (dmt): Write proper error handling.
            raise Exception()

        return PandasBlock(data_frame)

    @property
    def length(self):
        return self.__data_block.shape[0]

    def drop_row(self, index):
        self.__data_block.drop(index, inplace=True)

    def get_column_values(self, column_name):
        return self.__data_block[column_name]
    def get_column_name_by_index(self, index):
        column_names = self.__data_block.column
        return column_names[index]

    @property
    def feature_count(self):
        return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS

    def drop_columns_by_index(self, index):
        if isinstance(index, set):
            remove_columns = [self.__data_block.columns[i] for i in index]
        else:
            remove_columns = index

        self.__data_block.drop(remove_columns, axis=1, inplace=True)


class PandasAdapter:
    def __init__(self, data_frame):
        self.__data_frame = data_frame

    @classmethod
    def read_csv_data(cls, path):
        data_frame = pd.read_csv(path)
        return PandasAdapter(data_frame)

    @property
    def length(self):
        return self.__data_frame.shape[0]
    
    def get_block_via_index(self, indexes):
        return PandasBlock(self.__data_frame.iloc[indexes])

    def get_block(self, start, end=None, step=None):
        return PandasBlock(self.__data_frame[start:end:step])

    def get_column_values(self, column_name):
        return self.__data_frame[column_name]

    def get_column_values_as_list(self, column_name):
        return self.__data_frame[column_name].tolist()

    def get_columns(self):
        return list(self.__data_frame.columns)

    def drop_column_by_index(self, index):
        column = self.get_column_name_by_index(index)
        self.__data_frame.drop(columns=[column], inplace=True)

    def drop_column_by_name(self, name):
        self.__data_frame.drop(columns=[name], inplace=True)

    def get_column_index_by_name(self, name):
        return self.__data_frame.columns.get_loc(name)

    def get_column_name_by_index(self, index):
        column_names = self.__data_frame.columns
        return column_names[index]

    def set_column_value(self, column_name, value):
        self.__data_frame[column_name] = value

    def sort(self, column_name, ascending=True):
        self.__data_frame.sort_values(by=[column_name],
                                      ascending=ascending,
                                      inplace=True)

    def set_column_values(self, column, values):
        self.__data_frame[column] = values