Newer
Older
from typing import List
from abc import abstractmethod, ABC
import pandas as pd
class Adapter(ABC):
@abstractmethod
def read_csv(self): pass
@abstractmethod
def sort(self, *args, **kwargs): pass
@abstractmethod
def set_column_value(self, *args, **kwargs): pass
@abstractmethod
def set_column_values(self, *args, **kwargs): pass
@abstractmethod
def get_column_name_by_index(self, *args, **kwargs): pass
@abstractmethod
def get_column_index_by_index(self, *args, **kwargs): pass
@abstractmethod
def get_columns(self, *args, **kwargs): pass
@abstractmethod
def drop_colunn_by_name(self, *args, **kwargs): pass
@abstractmethod
def get_column_values_as_list(self, *args, **kwargs): pass
@abstractmethod
def get_block(self, *args, **kwargs): pass
# TODO (dmt): Provide common base class or pandas operations.
class PandasBlock:
def __init__(self, data_block, relatives=None, purpose=None, origin=None):
self.__data_block = data_block
self.n_cluster = None
def __str__(self):
return str(self.__data_block)
def __repr__(self):
return str(
self.__data_block[
self.__data_block.columns[:self.rows-self._LAST_THREE_COLUMNS]])
return self.__data_block.iloc[
def __len__(self):
return self.__data_block.shape[0]
@property
def data_block(self):
return self.__data_block
def has_nan(self):
return self.__data_block.isna().any()[0]
@property
def labeled(self):
return not self.__data_block.Z.nunique() == 1
def columns(self):
return list(self.__data_block.columns)
@property
def n_features(self):
return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS
def as_numpy_array(self):
return self.__data_block[
self.__data_block.columns[
data_frame = self.__data_block.copy()
data_frame["Z"] = labels
return PandasBlock(data_frame,
self.relatives,
self.purpose,
self.origin)
def overlapping_rows(self, block, subset=None):
big_df = pd.concat([self.__data_block, block.__data_block], sort=False)
overlapping_data_frame = big_df[big_df.duplicated(
subset=subset, keep=False)].drop_duplicates(keep="first")
return PandasBlock(overlapping_data_frame,
purpose=self.purpose,
origin=self.origin)
def same_features_fusion(self, block):
df = pd.concat([self.__data_block, block.__data_block], sort=False,
join="inner")
return PandasBlock(df, purpose=self.purpose, origin=self.origin)
@property
def min_timestamp(self):
return min(self.__data_block["T"])
@property
def max_timestamp(self):
return max(self.__data_block["T"])
@property
def learn_cols(self):
return self.__data_block.shape[1] - self._LAST_THREE_COLUMNS
return self.__data_block.shape[0]
@property
def cols(self):
return self.__data_block.shape[1]
def new_block_from_rows_index(self, indices: List[int]):
data_from = self.__data_block.loc[indices]
return PandasBlock(data_from, purpose=self.purpose, origin=self.origin)
def new_block_from(self, column_values):
data_from = self.__data_block.loc[self.__data_block["T"].isin(
column_values)]
return PandasBlock(data_from, purpose=self.purpose, origin=self.origin)
def get_duplicated_pairs(self, *args):
bool_series = self.__data_block.duplicated(subset=[args[0], args[1]])
duplicates = self.__data_block[bool_series]
for i, j in zip(duplicates[args[0]], duplicates[args[1]]):
yield i, j
return list(self.__data_block.index)
def get_values(self, **kwargs):
t, z, sigma = kwargs.get("T"), kwargs.get("Z"), kwargs.get("Sigma")
if t is not None and z is not None:
data_frame = self.__data_block.loc[
(self.__data_block["T"] == t) & (self.__data_block["Z"] == z)]
elif t is not None and sigma is not None:
data_frame = self.__data_block.loc[
(self.__data_block["T"] == t) & (
self.__data_block["Sigma"] == sigma)]
elif z is not None and sigma is not None:
data_frame = self.__data_block.loc[
(self.__data_block["Z"] == z) & (
self.__data_block["Sigma"] == sigma)]
else:
# TODO (dmt): Write proper error handling.
raise Exception()
return PandasBlock(data_frame, purpose=self.purpose, origin=self.origin)
@property
def length(self):
return self.__data_block.shape[0]
def drop_row(self, index):
self.__data_block.drop(index, inplace=True)
def get_column_values(self, column_name):
return list(self.__data_block[column_name])
def get_column_name_by_index(self, index):
column_names = self.__data_block.column
return column_names[index]
def drop_columns_by_index(self, index):
if isinstance(index, set):
remove_columns = [self.__data_block.columns[i] for i in index]
else:
remove_columns = index
self.__data_block.drop(remove_columns, axis=1, inplace=True)
def get_overlapping(self, block, on=None):
if len(block) < len(self):
biggest = block
block = self
else:
biggest = self
block = block
join = biggest.data_block.join(
block.data_block.set_index("T"),
on="T",
lsuffix="_other",
how="inner")
for column in join.columns:
if column not in ("T", "Z", "Z_other"):
join.drop(column, axis=1, inplace=True)
join = join[["Z", "Z_other", "T"]]
# TODO (dmt): Make purpose and origin a tuple.
print(len(join))
return PandasBlock(join, self.purpose, self.origin)
class PandasAdapter:
def __init__(self, data_frame):
self.__data_frame = data_frame
def new_block(self, values, columns, index, origin):
data_frame = pd.DataFrame(data=values, columns=columns, index=index)
block = PandasBlock(data_frame, origin=origin)
return block
return PandasAdapter(data_frame)
return self.__data_frame.shape[0]
def get_block_via_index(self, indexes, columns=None):
if columns:
return PandasBlock(self.__data_frame.iloc[list(indexes)][columns])
return PandasBlock(self.__data_frame.iloc[list(indexes)])
def get_block(self, start, end=None, step=None, columns: List[str] = None):
return PandasBlock(self.__data_frame[start:end:step][columns])
def get_column_values(self, column_name):
return self.__data_frame[column_name]
def get_column_values_as_list(self, column_name):
return self.__data_frame[column_name].tolist()
def get_columns(self):
return list(self.__data_frame.columns)
def drop_column_by_index(self, index):
column = self.get_column_name_by_index(index)
self.__data_frame.drop(columns=[column], inplace=True)
def drop_column_by_name(self, name):
self.__data_frame.drop(columns=[name], inplace=True)
def get_column_index_by_name(self, name):
return self.__data_frame.columns.get_loc(name)
def get_column_name_by_index(self, index):
column_names = self.__data_frame.columns
return column_names[index]
def set_column_value(self, column_name, value):
self.__data_frame[column_name] = value
def sort(self, column_name, ascending=True):
self.__data_frame.sort_values(by=[column_name],
ascending=ascending,
inplace=True)
def set_column_values(self, column, values):
self.__data_frame[column] = values