Source code for grid_searcher

import os

import numpy as np
import pandas as pd

from multiprocessing import Queue, Process

queue_results = Queue()


[docs]def gs_cell_process(mt, q, features, region, target_column, df_x, df_y, weights): lcl_kpis, lcl_prediction = mt.training_cross_validated_fs(features, region, target_column, df_x, df_y, weights) # Write on the queue q.put( { 'lcl_kpis': lcl_kpis, 'lcl_prediction': lcl_prediction } )
[docs]class GridSearcher: """ Given a dataset with a target column this class performs grid search over the weights w1, w2 and w3 as specified in the config files """ def __init__(self, features_analyzer, input_gatherer, model_trainer, forecast_type, cfg, logger): """ Constructor :param inputs_gatherer: Inputs Gatherer :type inputs_gatherer: InputsGatherer :param forecast_type: Forecast type (MOR | EVE) :type forecast_type: str :param cfg: FTP parameters for the files exchange :type cfg: dict :param logger: Logger :type logger: Logger object """ # set the variables self.features_analyzer = features_analyzer self.input_gatherer = input_gatherer self.model_trainer = model_trainer self.forecast_type = forecast_type self.cfg = cfg self.logger = logger self.dataFrames = None
[docs] def get_datasets(self): self.dataFrames = self.features_analyzer.dataFrames
[docs] def search_weights(self, region, target_column, cfg_file_name): """ Iterate over the weights as specified in the config file, and for each iteration save the KPIs and all the prediction of the algorithm performed on the test set """ self.get_datasets() for key, df in self.dataFrames.items(): fp = self.input_gatherer.output_folder_creator(key) fn = fp + 'GS_KPIs_' + target_column + '_' + cfg_file_name.split(os.sep)[-1].replace('.json', '') + '.csv' # Initialize empty files in folder. Data will be inserted at each iteration step if self.cfg['regions'][region]['gridSearcher']['hyperParsOptimizationNGB'] is None: pd.DataFrame([], columns=['w1', 'w2', 'w3', 'Accuracy_1', 'Accuracy_2', 'Accuracy_3', 'Accuracy', 'RMSE1', 'RMSE2', 'RMSE3', 'RMSE', 'MAE1', 'MAE2', 'MAE3', 'MAE','ConfMat']).to_csv(fn, mode='a', header=True, index=False) else: pd.DataFrame([], columns=['w1', 'w2', 'w3', 'ne', 'lr', 'Accuracy_1', 'Accuracy_2', 'Accuracy_3', 'Accuracy', 'RMSE1', 'RMSE2', 'RMSE3', 'RMSE', 'MAE1', 'MAE2', 'MAE3', 'MAE','ConfMat']).to_csv(fn, mode='a', header=True, index=False) l1 = np.arange(self.cfg['regions'][region]['gridSearcher']['w1_start'], self.cfg['regions'][region]['gridSearcher']['w1_end']+1, self.cfg['regions'][region]['gridSearcher']['w1_step']) l2 = np.arange(self.cfg['regions'][region]['gridSearcher']['w2_start'], self.cfg['regions'][region]['gridSearcher']['w2_end']+1, self.cfg['regions'][region]['gridSearcher']['w2_step']) l3 = np.arange(self.cfg['regions'][region]['gridSearcher']['w3_start'], self.cfg['regions'][region]['gridSearcher']['w3_end']+1, self.cfg['regions'][region]['gridSearcher']['w3_step']) _, _, features, df_x, df_y = self.features_analyzer.dataset_splitter(key, df, target_column) # Run the grid search procs = [] for w1 in l1: for w2 in l2: for w3 in l3: self.logger.info('Region: %s, target: %s -> weights = [%i, %i, %i]' % (region, target_column, w1, w2, w3)) weights = {'w1': w1, 'w2': w2, 'w3': w3} tmp_proc = Process(target=gs_cell_process, args=[self.model_trainer, queue_results, features, region, target_column, df_x, df_y, weights]) procs.append(tmp_proc) self.logger.info('Start the processes (n=%i)' % len(procs)) for proc in procs: proc.start() self.logger.info('Join the processes together') for proc in procs: proc.join() # Read from the queue i = 0 results = [] while True: item = queue_results.get() results.append(item) i += 1 self.logger.warning('Read data from the queue, added item n. %i' % i) if i == len(procs): break self.logger.warning('Save the results on file %s' % fn) for result in results: result['lcl_kpis'].to_csv(fn, mode='a', header=False, index=False, quoting=2)