Skip to content
Snippets Groups Projects
source_adapters.py 8.47 KiB
Newer Older
from abc import abstractmethod, ABC

import pandas as pd


class Adapter(ABC):

    @abstractmethod
    def read_csv(self): pass

    @abstractmethod
    def sort(self, *args, **kwargs): pass

    @abstractmethod
    def set_column_value(self, *args, **kwargs): pass

    @abstractmethod
    def set_column_values(self, *args, **kwargs): pass

    @abstractmethod
    def get_column_name_by_index(self, *args, **kwargs): pass

    @abstractmethod
    def get_column_index_by_index(self, *args, **kwargs): pass

    @abstractmethod
    def get_columns(self, *args, **kwargs): pass

    @abstractmethod
    def drop_colunn_by_name(self, *args, **kwargs): pass

    @abstractmethod
    def get_column_values_as_list(self, *args, **kwargs): pass

    @abstractmethod
    def get_block(self, *args, **kwargs): pass


# TODO (dmt): Provide common base class or pandas operations.
class PandasBlock:
    _LAST_THREE_COLUMNS = 3

    def __init__(self, data_block, relatives=None, purpose=None, origin=None):
        self.__data_block = data_block
        self.relatives = relatives
        self.purpose = purpose
        self.origin = origin

    def __str__(self):
        return str(self.__data_block)

    def __repr__(self):
        return str(
            self.__data_block[
                self.__data_block.columns[:self.rows-self._LAST_THREE_COLUMNS]])

    def __getitem__(self, item):
        return self.__data_block.iloc[
dmt's avatar
dmt committed
                   item][:len(self.columns())-self._LAST_THREE_COLUMNS]

    def __len__(self):
        return self.__data_block.shape[0]

    @property
    def data_block(self):
        return self.__data_block

    def has_nan(self):
        return self.__data_block.isna().any()[0]

    @property
    def labeled(self):
        return not self.__data_block.Z.nunique() == 1

    def columns(self):
        return list(self.__data_block.columns)

    @property
    def n_features(self):
        return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS

    def as_numpy_array(self):
        return self.__data_block[
            self.__data_block.columns[
dmt's avatar
dmt committed
                :self.cols-self._LAST_THREE_COLUMNS]].values
    def set_labels(self, labels):
        data_frame = self.__data_block.copy()
        data_frame["Z"] = labels
        return PandasBlock(data_frame,
                           self.relatives,
                           self.purpose,
                           self.origin)

    def overlapping_rows(self, block, subset=None):
        big_df = pd.concat([self.__data_block, block.__data_block], sort=False)
        overlapping_data_frame = big_df[big_df.duplicated(
            subset=subset, keep=False)].drop_duplicates(keep="first")
        return PandasBlock(overlapping_data_frame,
                           purpose=self.purpose,
                           origin=self.origin)
    def same_features_fusion(self, block):
        df = pd.concat([self.__data_block, block.__data_block], sort=False,
                       join="inner")
        return PandasBlock(df, purpose=self.purpose, origin=self.origin)
    @property
    def min_timestamp(self):
        return min(self.__data_block["T"])

    @property
    def max_timestamp(self):
        return max(self.__data_block["T"])

    @property
dmt's avatar
dmt committed
    def learn_cols(self):
        return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS
    @property
    def rows(self):
        return self.__data_block.shape[0]
dmt's avatar
dmt committed
    @property
    def cols(self):
        return self.__data_block.shape[1]

dmt's avatar
dmt committed
    def new_block_from_rows_index(self, indices: List[int]):
        data_from = self.__data_block.loc[indices]
        return PandasBlock(data_from, purpose=self.purpose, origin=self.origin)
    def new_block_from(self, column_values):
        data_from = self.__data_block.loc[self.__data_block["T"].isin(
            column_values)]

        return PandasBlock(data_from, purpose=self.purpose, origin=self.origin)
    def get_duplicated_pairs(self, *args):
        bool_series = self.__data_block.duplicated(subset=[args[0], args[1]])
        duplicates = self.__data_block[bool_series]
        for i, j in zip(duplicates[args[0]], duplicates[args[1]]):
            yield i, j

    @property
    def indexes(self):
        return list(self.__data_block.index)
    def get_values(self, **kwargs):
        t, z, sigma = kwargs.get("T"), kwargs.get("Z"), kwargs.get("Sigma")

        if t is not None and z is not None:
            data_frame = self.__data_block.loc[
                (self.__data_block["T"] == t) & (self.__data_block["Z"] == z)]

        elif t is not None and sigma is not None:
            data_frame = self.__data_block.loc[
                (self.__data_block["T"] == t) & (
                        self.__data_block["Sigma"] == sigma)]

        elif z is not None and sigma is not None:
            data_frame = self.__data_block.loc[
                (self.__data_block["Z"] == z) & (
                        self.__data_block["Sigma"] == sigma)]

        else:
            # TODO (dmt): Write proper error handling.
            raise Exception()

        return PandasBlock(data_frame, purpose=self.purpose, origin=self.origin)
        return self.__data_block.shape[0]

    def drop_row(self, index):
        self.__data_block.drop(index, inplace=True)

    def get_column_values(self, column_name):
        return list(self.__data_block[column_name])
    def get_column_name_by_index(self, index):
        column_names = self.__data_block.column
        return column_names[index]

    def drop_columns_by_index(self, index):
        if isinstance(index, set):
            remove_columns = [self.__data_block.columns[i] for i in index]
        else:
            remove_columns = index

        self.__data_block.drop(remove_columns, axis=1, inplace=True)

    def get_overlapping(self, block, on=None):
        if len(block) < len(self):
            biggest = block
            block = self
        else:
            biggest = self
            block = block

        join = biggest.data_block.join(
            block.data_block.set_index("T"),
            on="T",
            lsuffix="_other",
            how="inner")

        for column in join.columns:
            if column not in ("T", "Z", "Z_other"):
                join.drop(column, axis=1, inplace=True)
        join = join[["Z", "Z_other", "T"]]
        # TODO (dmt): Make purpose and origin a tuple.
        print(len(join))
        return PandasBlock(join, self.purpose, self.origin)


class PandasAdapter:
    def __init__(self, data_frame):
        self.__data_frame = data_frame

    def new_block(self, values, columns, index, origin):
        data_frame = pd.DataFrame(data=values, columns=columns, index=index)
        block = PandasBlock(data_frame, origin=origin)
        return block

    def read_csv(cls, path):
dmt's avatar
dmt committed
        data_frame = pd.read_csv(path, index_col=False)
        return PandasAdapter(data_frame)

    @property
    def length(self):
        return self.__data_frame.shape[0]
    
    def get_block_via_index(self, indexes, columns=None):
        if columns:
            return PandasBlock(self.__data_frame.iloc[list(indexes)][columns])
        return PandasBlock(self.__data_frame.iloc[list(indexes)])
    def get_block(self, start, end=None, step=None, columns: List[str] = None):
        return PandasBlock(self.__data_frame[start:end:step][columns])
    def get_column_values(self, column_name):
        return self.__data_frame[column_name]

    def get_column_values_as_list(self, column_name):
        return self.__data_frame[column_name].tolist()

    def get_columns(self):
        return list(self.__data_frame.columns)

    def drop_column_by_index(self, index):
        column = self.get_column_name_by_index(index)
        self.__data_frame.drop(columns=[column], inplace=True)

    def drop_column_by_name(self, name):
        self.__data_frame.drop(columns=[name], inplace=True)

    def get_column_index_by_name(self, name):
        return self.__data_frame.columns.get_loc(name)

    def get_column_name_by_index(self, index):
        column_names = self.__data_frame.columns
        return column_names[index]

    def set_column_value(self, column_name, value):
        self.__data_frame[column_name] = value

    def sort(self, column_name, ascending=True):
        self.__data_frame.sort_values(by=[column_name],
                                      ascending=ascending,
                                      inplace=True)

    def set_column_values(self, column, values):
        self.__data_frame[column] = values