Commit 46e4215c authored by Tilmann Sager's avatar Tilmann Sager
Browse files

Implemented pipeline + multiprocessing + a nice config

parent cf248917
__pycache__/
.idea/
*.png
*.csv
snippets
[general]
multiprocessing = 1
[product]
name = MODTBGA
bands = 31, 32
11um = 31
12um = 32
calculate_radiance = 0
band_keyword = BAND
band_suffix = _1
[data]
[path]
hdf = ../data/granules
flights = ../data/flights/raw
flights = ../data/flights
output = ../data/output
world_map = ../data/maps/ne_10m_admin_0_countries/ne_10m_admin_0_countries.shp
[extraction]
band_keyword = BAND
band_suffix = _1
[transformation]
norm_i = 100
[preprocessing]
norm_interval = 100
gauss_kernel_size = 5
k = 0.1
order = 0
threshold = 0.75
perc_low = 40
perc_high = 95
sigma = 0.35
[detection]
line_len = 100
line_gap = 2
max_size_px = 150
method = hough
filter = scharr
sigma = 1
method = straight
min_px_size = 150
connectivity = 2
split_by = 3
threshold = 90
postprocess = 1
[straight]
threshold = 0.9
[probabilistic]
line_length = 150
line_gap = 3
filter = canny
sigma = 2
[plot]
world_map = ../data/maps/ne_10m_admin_0_countries/ne_10m_admin_0_countries.shp
......@@ -2,160 +2,178 @@
config
"""
import datetime
import os
from configparser import ConfigParser
from datetime import date, datetime
cp = ConfigParser()
cp.read(os.path.join(os.getcwd(), 'config.ini'))
from util import fileio
def _today():
return datetime.date.today().__str__()
"""
PRODUCT
"""
prod_key = 'product'
def product_name():
return cp.get(prod_key, 'name')
def bands():
return [int(band.strip()) for band in cp.get(prod_key, 'bands').split(',')]
def band_11um():
return int(cp.get(prod_key, '11um'))
def band_12um():
return int(cp.get(prod_key, '12um'))
"""
DIRECTORIES
"""
data_key = 'data'
def product_dir():
return os.path.join(cp.get(data_key, 'hdf'), product_name())
def flight_input():
return cp.get(data_key, 'flights')
def processed_flights():
proc_flight_dir = os.path.join(flight_input(), 'processed')
if not os.path.exists(proc_flight_dir):
os.makedirs(proc_flight_dir)
return proc_flight_dir
def output_img():
proc_img_dir = os.path.join(output_dir(), 'img')
if not os.path.exists(proc_img_dir):
os.makedirs(proc_img_dir)
return proc_img_dir
def output_dir():
out_dir = os.path.join(cp.get(data_key, 'output'), _today())
if not os.path.exists(out_dir):
os.makedirs(out_dir)
return out_dir
"""
EXTRACTION
"""
extrac_key = 'extraction'
def band_keyword():
return cp.get(extrac_key, 'band_keyword')
def band_suffix():
return cp.get(extrac_key, 'band_suffix')
"""
TRANSFORMATION
"""
trans_key = 'transformation'
def norm_i():
return int(cp.get(trans_key, 'norm_i'))
def gauss_kernel_size():
s = int(cp.get(trans_key, 'gauss_kernel_size'))
return s, s
def k():
return float(cp.get(trans_key, 'k'))
def order():
return int(cp.get(trans_key, 'order'))
def threshold():
return float(cp.get(trans_key, 'threshold'))
def perc_low():
return int(cp.get(trans_key, 'perc_low'))
def perc_high():
return int(cp.get(trans_key, 'perc_high'))
"""
DETECTION
"""
detect_key = 'detection'
def line_len():
return int(cp.get(detect_key, 'line_len'))
def line_gap():
return int(cp.get(detect_key, 'line_gap'))
def max_size_px():
return int(cp.get(detect_key, 'max_size_px'))
def detection_method():
return cp.get(detect_key, 'method')
def filter_method():
return cp.get(detect_key, 'filter')
def sigma():
return int(cp.get(detect_key, 'sigma'))
"""
PLOTS
"""
def world_map():
return cp.get(data_key, 'world_map')
return date.today().__str__()
def _now():
return datetime.now().strftime("%Y%m%d-%H%M")
_general_key = 'general'
_product_key = 'product'
_path_key = 'path'
_pp_key = 'preprocessing'
_detect_key = 'detection'
_probabilistic_key = 'probabilistic'
_straight_key = 'straight'
_plot_key = 'plot'
def init():
cp = ConfigParser()
cp.read(fileio.check_path(['./config.ini'], create=False))
return {
# General
'multiprocessing': bool(cp.get(_general_key, 'multiprocessing')),
# Path
'hdf_dir': fileio.check_path([cp.get(_path_key, 'hdf'), cp.get(_product_key, 'name')], create=False),
'flight_raw_dir': fileio.check_path([cp.get(_path_key, 'flights'), 'raw'], create=False),
'flight_proc_dir': fileio.check_path([cp.get(_path_key, 'flights'), 'processed'], create=True),
'output_dir': fileio.check_path([cp.get(_path_key, 'output'), _now()], create=True),
# Bands
'bands': [int(band.strip()) for band in cp.get(_product_key, 'bands').split(',')],
'11um': int(cp.get(_product_key, '11um')),
'12um': int(cp.get(_product_key, '12um')),
'band_key': cp.get(_product_key, 'band_keyword'),
'band_suffix': cp.get(_product_key, 'band_suffix'),
# Preprocessing
'norm_interval': int(cp.get(_pp_key, 'norm_interval')),
'kernel_size': int(cp.get(_pp_key, 'gauss_kernel_size')),
'k': float(cp.get(_pp_key, 'k')),
'order': int(cp.get(_pp_key, 'order')),
'pp_sigma': float(cp.get(_pp_key, 'sigma')),
# Detection
'method': cp.get(_detect_key, 'method'), # "straight" or "probabilistic"
'max_px_size': int(cp.get(_detect_key, 'min_px_size')),
'connectivity': int(cp.get(_detect_key, 'connectivity')),
'split_by': int(cp.get(_detect_key, 'split_by')),
'threshold': int(cp.get(_detect_key, 'threshold')),
'postprocess': bool(cp.get(_detect_key, 'postprocess')),
# Detection - Probabilistic
'prob_line_length': int(cp.get(_probabilistic_key, 'line_length')),
'prob_line_gap': int(cp.get(_probabilistic_key, 'line_gap')),
'prob_sigma': int(cp.get(_probabilistic_key, 'sigma')),
'prob_filter': cp.get(_probabilistic_key, 'filter'),
# Detection - Straight
'straight_threshold': float(cp.get(_straight_key, 'threshold')),
# Plots
'world_map': cp.get(_plot_key, 'world_map')
}
# """
# DIRECTORIES
# """
#
# path_key = 'data'
# def product_dir():
# return os.path.join(cp.get(data_key, 'hdf'), product_name())
#
#
# def flight_input():
# return cp.get(data_key, 'flights')
# def processed_flights():
# proc_flight_dir = os.path.join(flight_input(), 'processed')
# if not os.path.exists(proc_flight_dir):
# os.makedirs(proc_flight_dir)
# return proc_flight_dir
#
#
# def output_img():
# proc_img_dir = os.path.join(output_dir(), 'img')
# if not os.path.exists(proc_img_dir):
# os.makedirs(proc_img_dir)
# return proc_img_dir
#
#
# def output_dir():
# out_dir = os.path.join(cp.get(path_key, 'output'), _today())
# if not os.path.exists(out_dir):
# os.makedirs(out_dir)
# return out_dir
# """
# EXTRACTION
# """
#
# extrac_key = 'extraction'
#
#
# def band_keyword():
# return cp.get(extrac_key, 'band_keyword')
#
#
# def band_suffix():
# return cp.get(extrac_key, 'band_suffix')
# """
# TRANSFORMATION
# """
# def threshold():
# return float(cp.get(trans_key, 'threshold'))
#
#
# def perc_low():
# return int(cp.get(trans_key, 'perc_low'))
#
#
# def perc_high():
# return int(cp.get(trans_key, 'perc_high'))
# """
# DETECTION
# """
# detect_key = 'detection'
#
#
# def line_len():
# return int(cp.get(detect_key, 'line_len'))
#
#
# def line_gap():
# return int(cp.get(detect_key, 'line_gap'))
#
#
# def max_size_px():
# return int(cp.get(detect_key, 'max_size_px'))
#
#
# def detection_method():
# return cp.get(detect_key, 'method')
#
#
# def filter_method():
# return cp.get(detect_key, 'filter')
#
#
# def sigma():
# return int(cp.get(detect_key, 'sigma'))
# """
# PLOTS
# """
#
#
# def world_map():
# return cp.get(path_key, 'world_map')
import numpy as np
class Columns:
hdf = 'hdf'
name = 'name'
......@@ -29,6 +26,8 @@ class Columns:
rel_flight_px = 'flight_to_pixel_relation'
contrail_mask = 'contrail_mask'
contrail_mask_post = 'contrail_masked_processed'
h = 'horizontal_bounding'
v = 'vertical_bounding'
columns_to_save = [Columns.hdf,
......@@ -43,48 +42,16 @@ columns_to_save = [Columns.hdf,
Columns.west,
Columns.east,
Columns.north,
Columns.south]
def columns_as_list():
columns = []
for column in list(Columns.__dict__.values()):
if type(column) == str:
columns.append(column)
return columns
DTYPES = {
'FLIGHT_ID': np.int,
'SEGMENT_NO': np.int,
'LATITUDE': np.double,
'LONGITUDE': np.double,
'ALTITUDE': np.double,
'SEGMENT_MONTH': np.int,
'SEGMENT_DAY': np.int,
'SEGMENT_YEAR': np.int,
'SEGMENT_HOUR': np.int,
'SEGMENT_MIN': np.int,
'SEGMENT_SEC': np.double,
'EMISSIONS_MODE': np.double,
'TEMPERATURE': np.double,
'PRESSURE': np.double,
'HUMIDITY': np.double,
'SPEED': np.double,
'SEGMENT_TIME': np.object,
'TRACK_DISTANCE': np.double,
'THRUST': np.double,
'WEIGHT': np.double,
'FUELBURN': np.double,
'CO': np.double,
'HC': np.double,
'NOX': np.double,
'PMNV': np.double,
'PMSO': np.double,
'PMFO': np.double,
'CO2': np.double,
'H2O': np.double,
'SOX': np.double
}
Columns.south,
Columns.h,
Columns.v]
# def columns_as_list():
# columns = []
#
# for column in columns_to_save:
# if type(column) == str:
# columns.append(column)
#
# return columns
import os
from tqdm import tqdm
from glob import glob
from multiprocessing import Pool
import analysis
import config
import plots
import pipeline
from constants import columns_to_save
from stages import prepare, detection, flight_filtering, stats, preprocessing, postprocessing
from util import check_and_repair_csv
from glob import glob
from util.fileio import write_results, write_config, create_path
tqdm.pandas()
"""
INPUT
"""
input_files = glob(config.product_dir() + '/*.hdf')
# input_files = [os.path.join(config.product_dir(), 'MODTBGA.A2012094.h15v03.006.2015240000438.hdf')]
# TODO: collecting system information (cpu cores, current time)
# TODO: multiprocessing
# TODO: implement opt-out/in stages
# TODO: improve logging
# TODO: filter time per day
## URGENT TODO
# split picture into chunks
# streamline model
# filter
print('Creating job')
job_df = prepare.create_job(input_files)
# TODO: print status
"""
CHECK FLIGHT CSV
"""
# print('Checking flight files')
# check_and_repair_csv(job_df)
# TODO: print status
"""
FILTER FLIGHT CSV
"""
# print('Filter flights')
# job_df = job_df.progress_apply(flight_filtering.filter_flights, axis=1)
"""
PREPROCESSING
"""
# TODO: split picture into chunks
# TODO: get geoinformation/projection
"""
TRANSFORMATION
"""
print('Running CDA')
job_df = job_df.progress_apply(preprocessing.cda_preprocess, axis=1)
job_df = job_df.progress_apply(preprocessing.rescale_intensity, axis=1)
# print('Running thresholding')
# job_df = job_df.progress_apply(transformation.threshold, axis=1)
"""
DETECTION
"""
print('Running detection')
# job_df = job_df.progress_apply(detection.probabilistic_hough, axis=1)
job_df = job_df.progress_apply(detection.straight_hough, axis=1)
"""
SEGMENTATION
"""
print('Running postprocessing')
job_df = job_df.progress_apply(postprocessing.postprocess, axis=1)
"""
ASSEMBLING INFORMATION
"""
# print('Assembling information')
# job_df = job_df.progress_apply(stats.collect, axis=1)
"""
ANALYSIS
"""
# print('Running analysis')
# job_df = analysis.flight_pixel_relation(job_df)
"""
RESULTS
"""
print('Saving results')
job_df.to_csv(os.path.join(config.output_dir(), 'output.csv'), columns=columns_to_save, index=False)
params = config.init()
"""
PLOTS
"""
# TODO: one granule per day
if params.get('multiprocessing'):
num_processes = 4
else:
num_processes = 1
print('Creating plots')
# all_flights = pd.DataFrame()
# for flight_csv in glob(config.processed_flights() + '/*.csv'):
# all_flights = all_flights.append(pd.read_csv(flight_csv))
results_file = create_path([params.get('output_dir'), 'results.csv'])
config_file = create_path([params.get('output_dir'), 'config.json'])
input_files = glob(params.get('hdf_dir') + '/*.hdf')
# plots.boxplot(job_df)
# plots.flights(all_flights)
# plots.granules(job_df)
# plots.contrails(job_df)
# plots.density_map(job_df, include=['granules', 'contrails'])
# job_df.apply(plots.plot_coordinates, axis=1)
# job_df.progress_apply(plots.export_shapefile, axis=1)
pool = Pool(processes=num_processes)
jobs = [(input_file, params) for input_file in input_files]
job_df.progress_apply(plots.plot_contrail_mask, axis=1)
job_df.progress_apply(plots.save_contrail_masks, axis=1)
results = pool.starmap(pipeline.run, jobs)
write_results(results_file, results, columns_to_save)
write_config(config_file, params)
from stages import transformation, detection, segmentation
from plots import plots
from stages import extraction, preprocessing, detection, flight_filtering
def run(granule):
def run(filepath: str, params: {}):
print(filepath)
"""
TRANSFORMATION
EXTRACTION
"""
granule = transformation.cda_preprocess(granule)
granule = transformation.rescale_intensity(granule)
granule = extraction.run(filepath)
"""
DETECTION
FLIGHT FILTERING
"""
granule = flight_filtering.run(granule, params)
"""
PREPROCESSING
"""
granule = detection.probabilistic_hough(granule)
granule = preprocessing.run(granule, params)
"""