Source code for artificial_features

# import section
from datetime import datetime, timedelta

import numpy as np
import pytz
from influxdb import InfluxDBClient


[docs]class ArtificialFeatures: """Class handling the forecasting of features calculated from the other measurements or forecasts (e.g. VOC, Kloten-Luano pressure gradient, specific time slots means, and so on...) """ def __init__(self, influxdb_client, forecast_type, cfg, logger): """ Constructor :param influxdb_client: InfluxDB client :type influxdb_client: InfluxDBClient :param forecast_type: Forecast type (MOR | EVE) :type forecast_type: str :param cfg: FTP parameters for the files exchange :type cfg: dict :param logger: Logger :type logger: Logger object """ # set the variables self.influxdb_client = influxdb_client self.forecast_type = forecast_type self.cfg = cfg self.logger = logger self.input_data = dict() self.input_data_desc = [] self.input_data_values = [] self.day_to_predict = None self.cfg_signals = None self.input_df = None
[docs] def get_query_value_global(self, signal): """ Perform a query on signals who are location independent, i.e. it doesnt matter where they are measured. This method is used for calculating VOC and NOx daily variables in method do_IFEC_query :param signal: signal code :type signal: str :return: single value of the queried signal :rtype: float """ measurement = self.cfg["influxDB"]["measurementGlobal"] dt = self.set_forecast_day() lcl_dt = dt.strftime('%Y-%m-%d') lcl_dt_plus_one_day = (dt + timedelta(days=1)).strftime('%Y-%m-%d') if self.forecast_type == 'MOR': query = 'SELECT value FROM %s WHERE signal=\'%s\' AND time=\'%s\'' % (measurement, signal, lcl_dt) else: query = 'SELECT value FROM %s WHERE signal=\'%s\' AND time=\'%s\'' % ( measurement, signal, lcl_dt_plus_one_day) self.logger.info('Performing query: %s' % query) value = self.influxdb_client.query(query, epoch='s').raw['series'][0]['values'][0][1] return value
[docs] def get_query_value_forecast(self, measurement, signal_data, steps, func): """ Perform a query on forecasted signals using a certain amount of steps forward in time and a data aggregating function, such as mean, max, min or sum. This method is used in most methods except KLO-LUG and past measurements features :param measurement: InfluxDB measurement code :type measurement: str :param signal_data: signal code :type signal_data: str :param steps: steps (hours) in the future for the data we're interested in :type steps: str :param func: reduction to apply to multiple data ('min', 'max' or 'mean') :type func: str :return: single value of the queried signal :rtype: float """ (location, signal_code, aggregator) = signal_data.split('__') dt = self.set_forecast_day() if self.forecast_type == 'MOR': lcl_dt = '%sT03:00:00Z' % dt.strftime('%Y-%m-%d') else: lcl_dt = '%sT12:00:00Z' % dt.strftime('%Y-%m-%d') query = 'SELECT value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND %s AND time=\'%s\'' \ % (measurement, location, signal_code, steps, lcl_dt) return self.calc_data(query=query, signal_data=signal_data, func=func)
[docs] def get_query_value_measure(self, measurement, signal_data, start_dt, end_dt, func): """ Perform a query on measured signals using a starting and ending time and a data aggregating function :param measurement: InfluxDB measurement code :type measurement: str :param signal_data: signal code :type signal_data: str :param start_dt: timestamp, format '%Y-%m-%dTHH:MM:SSZ' :type start_dt: str :param end_dt: timestamp, format '%Y-%m-%dTHH:MM:SSZ' :type end_dt: str :param func: reduction to apply to multiple data ('min', 'max' or 'mean') :type func: str :return: single value of the queried signal :rtype: float """ (location, signal_code, aggregator) = signal_data.split('__', 2) query = 'SELECT value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND time>=\'%s\' AND ' \ 'time<=\'%s\'' % (measurement, location, signal_code, start_dt, end_dt) return self.calc_data(query=query, signal_data=signal_data, func=func)
[docs] def steps_type_forecast(self, mor_start, mor_end, eve_start, eve_end): """ Create a string for the forward steps in time of a forecast signal query """ if self.forecast_type == 'MOR': steps = self.create_forecast_chunk_steps_string(mor_start, mor_end) else: steps = self.create_forecast_chunk_steps_string(eve_start, eve_end) return steps
[docs] def measurements_start_end(self, days): """ Create start and end times for the measurement signal queries. To be used in the do_multiday_query method :param days: number of days back in time :type days: int :return: list of timestamps :rtype: list """ dt = self.set_forecast_day() if self.forecast_type == 'MOR': start_dt = '%sT05:00:00Z' % (dt - timedelta(days=days)).strftime('%Y-%m-%d') end_dt = '%sT04:00:00Z' % dt.strftime('%Y-%m-%d') else: start_dt = '%sT17:00:00Z' % (dt - timedelta(days=days)).strftime('%Y-%m-%d') end_dt = '%sT16:00:00Z' % dt.strftime('%Y-%m-%d') return [start_dt, end_dt]
[docs] def analyze_signal(self, signal): """ Parse the signal codes and send it to the appropriate method for value calculation :param signal: signal code :type signal: str :return: single value of the queried signal :rtype: float """ val = np.nan if len(signal.split('__')) == 1: if 'KLO' in signal: # KLO-LUG, KLO-LUG_favonio val = self.do_KLO_query(signal) if signal == 'NOx_Totale': val = self.get_query_value_global('Total_NOx') # This signal is not handled as global anymore # if signal == 'VOC_Totale': # VOC_without_woods, VOC_woods, VOC_woods_corrected = self.do_VOC_query() # if self.cfg['VOC']['useCorrection'] and self.cfg['VOC'][ # 'emissionType'] == 'forecasted' and self.VOC_forecasted_status: # self.logger.info('Correcting forecasted VOC woods emissions') # val = VOC_without_woods + VOC_woods_corrected # else: # val = VOC_without_woods + VOC_woods else: tmp = signal.split('__') # if 'T_2M' in tmp[1] and '12h_mean' in tmp[2]: # # E.g. P_BIO__T_2M__12h_mean, P_BIO__T_2M__12h_mean_squared # measurement = self.cfg['influxDB']['measurementInputsForecasts'] # val = self.do_T_2M_query(signal, measurement) if 'MAX' in tmp[2]: # E.g. P_BIO__T_2M__MAX measurement = self.cfg['influxDB']['measurementInputsForecasts'] val = self.do_MAX_query(signal, measurement) if 'transf' in tmp[2]: # E.g. P_BIO__TD_2M__transf measurement = self.cfg['influxDB']['measurementInputsForecasts'] val = self.do_transf_query(signal, measurement) if 'NOx__12h' in signal or '24h' in tmp[2] or '48h' in tmp[2] or '72h' in tmp[2]: # E.g. BIO__CN__24h__mean, BIO__CN__48h__mean, BIO__CN__72h__mean measurement = self.cfg['influxDB']['measurementInputsMeasurements'] val = self.do_multiday_query(signal, measurement) if 'mean_mor' in tmp[2] or 'mean_eve' in tmp[2]: # E.g. P_BIO__GLOB__mean_mor,P_BIO__GLOB__mean_eve, P_BIO__CLCT__mean_mor, P_BIO__CLCT__mean_eve measurement = self.cfg['influxDB']['measurementInputsForecasts'] val = self.do_mor_eve_query(signal, measurement) if 'TOT_PREC' in tmp[1]: # E.g. P_BIO__TOT_PREC__sum measurement = self.cfg['influxDB']['measurementInputsForecasts'] val = self.do_tot_prec_query(signal, measurement) if val == np.nan: self.logger.error('Unrecognized artificial feature in function analyze_signal') return val
[docs] def calculate_wood_emission(self, region): """ Calculate the VOC emissions from woods, using either forecasted or measured data. Forecasted data are not always available, so sometimes we're forced to use measured values and infer the future. :param emissionType: type of emission, either 'measured' or 'forecasted' :type emissionType: str :return: calculated VOC emission value :rtype: float """ # Load necessary constants temp_s = self.cfg['VOC']['T_s'] r = self.cfg['VOC']['R'] alpha = self.cfg['VOC']['alpha'] c_l1 = self.cfg['VOC']['C_L1'] c_temp1 = self.cfg['VOC']['C_T1'] c_temp2 = self.cfg['VOC']['C_T2'] T_m = self.cfg['VOC']['T_m'] # C_T3 = self.cfg['VOC']['C_T3'] q, temp = self.get_Q_T_forecasted(region) # if emissionType == 'forecasted': # # elif emissionType == 'measured': # [Q, T] = self.get_Q_T_measured() # else: # self.logger.error('Unrecognized VOC woods type of data to use') # [Q, T] = [None, None] # Calculate woods emission gamma = (alpha * c_l1 * q / np.sqrt(1 + np.power(alpha, 2) * np.power(q, 2))) * (np.exp(c_temp1 * (temp - temp_s) / (r * temp_s * temp))) / (1 + np.exp(c_temp2 * (temp - T_m) / (r * temp_s * temp))) return self.cfg['VOC']['KG_per_gamma'] * gamma
[docs] def get_Q_T_forecasted(self, region): # Get 24 hours of forecasts if self.forecast_type == 'MOR': steps_G = self.create_forecast_chunk_steps_string(1, 24) steps_T = self.create_forecast_chunk_steps_string(0, 23) else: steps_G = steps_T = self.create_forecast_chunk_steps_string(10, 33) q = self.get_query_value_forecast(self.cfg['influxDB']['measurementInputsForecasts'], '%s__GLOB__' % self.cfg['regions'][region]['VOCStation'], steps_G, 'mean') temp = self.get_query_value_forecast(self.cfg['influxDB']['measurementInputsForecasts'], '%s__T_2M__' % self.cfg['regions'][region]['VOCStation'], steps_T, 'mean') # Transform into the appropriate unit of measurement q = q * self.cfg['VOC']['GLOB_to_PAR'] return q, temp
# def get_Q_T_measured(self): # # self.VOC_forecasted_status = False # # dt = self.set_forecast_day() # func = 'mean' # measurement = self.cfg['influxDB']['measurementInputsMeasurements'] # location = 'MS-LUG' # # start_dt = '%sT23:05:00Z' % (dt - timedelta(days=1)).strftime('%Y-%m-%d') # end_dt = '%sT23:05:00Z' % dt.strftime('%Y-%m-%d') # # Q = self.get_query_value_measure(measurement, location + '__Gl__', start_dt, end_dt, func) # T = self.get_query_value_measure(measurement, location + '__T__', start_dt, end_dt, func) # # # Transform into the appropriate unit of measurement # Q_ = Q * self.cfg['VOC']['GLOB_to_PAR'] # T_ = T + 273.1 # # return [Q_, T_] # This functions should not be used anymore
[docs] def do_VOC_query(self, region): """ Get values with and without woods emission """ voc_other = self.get_query_value_global('Total_VOC') voc_woods_raw, voc_woods_corrected = self.calc_voc_woods(region) return voc_other, voc_woods_raw, voc_woods_corrected
[docs] def calc_voc_woods(self, region): """ Get values with and without woods emission """ voc_woods_raw = self.calculate_wood_emission(region) voc_woods_corrected = self.cfg['VOC']['correction']['slope'] * voc_woods_raw + self.cfg['VOC']['correction']['intercept'] return voc_woods_raw, voc_woods_corrected
[docs] def do_mor_eve_query(self, signal_data, measurement): # mean_mor: Mean of values from 03:00 UTC to 10:00 UTC # mean_eve: Mean of values from 11:00 UTC to 21:00 UTC # Forecasted signals considered: GLOB, CLCT (location, signal_code, aggregator) = signal_data.split('__') func = 'mean' if aggregator == 'mean_mor': if signal_code == 'GLOB': # Start at step01 because step00 is always -99999 steps = self.steps_type_forecast(mor_start=1, mor_end=7, eve_start=15, eve_end=22) else: steps = self.steps_type_forecast(mor_start=0, mor_end=7, eve_start=15, eve_end=22) elif aggregator == 'mean_eve': steps = self.steps_type_forecast(mor_start=8, mor_end=18, eve_start=23, eve_end=33) else: self.logger.error('Something wrong in mean_mor, mean_eve features calculation') return self.get_query_value_forecast(measurement, signal_data, steps, func)
[docs] def do_MAX_query(self, signal_data, measurement): """ Calculate maximum value of all hourly forecasted temperatures :param signal_data: signal code :type signal_data: str :param measurement: InfluxDB measurement code :type measurement: str :return: single value of the queried signal :rtype: float """ func = 'max' steps = self.create_forecast_chunk_steps_string(0, 40) return self.get_query_value_forecast(measurement, signal_data, steps, func)
[docs] def do_T_2M_query(self, signal_data, measurement): # MOR: mean values of hourly forecasted temperatures from 12:00 to 00:00 of the same day # EVE: mean values of hourly forecasted temperatures from 10:00 to 21:00 of the following day # The squared value of the above means is a considered signal too func = 'mean' steps = self.steps_type_forecast(mor_start=7, mor_end=19, eve_start=22, eve_end=33) val = self.get_query_value_forecast(measurement, signal_data, steps, func) - 273.1 if 'squared' in signal_data: return val ** 2 else: return val
[docs] def do_transf_query(self, signal_data, measurement): """ Maximum value of all hourly forecasted Dew Temperatures, to which we add 20 and cube the obtained value """ func = 'max' steps = self.create_forecast_chunk_steps_string(0, 40) res = self.get_query_value_forecast(measurement, signal_data, steps, func) return (res + 20) ** 3
[docs] def do_tot_prec_query(self, signal_data, measurement): """ Sum of the hourly forecasted precipitations for the next 24 hours """ func = 'sum' # start steps at 1 instead of 0 because step00 is always -99999 steps = self.create_forecast_chunk_steps_string(1, 23) return self.get_query_value_forecast(measurement, signal_data, steps, func)
[docs] def do_KLO_query(self, signal_data): """ Calculate the pressure gradient between Kloten airport and Lugano to take into account the air current between South and North of Switzerland :param signal_data: signal code :type signal_data: str :return: single value of the queried signal :rtype: float """ func = 'mean' dt = self.set_forecast_day() if dt < pytz.timezone("UTC").localize(datetime(2021, 4, 1)): measurement = self.cfg['influxDB']['measurementInputsMeasurements'] start_dt = '%sT23:05:00Z' % (dt - timedelta(days=1)).strftime('%Y-%m-%d') end_dt = '%sT23:05:00Z' % dt.strftime('%Y-%m-%d') res_LUG = self.get_query_value_measure(measurement, 'LUG__P_red__', start_dt, end_dt, func) res_KLO = self.get_query_value_measure(measurement, 'KLO__P_red__', start_dt, end_dt, func) diff = res_KLO - res_LUG else: measurement = self.cfg['influxDB']['measurementInputsForecasts'] steps = self.create_forecast_chunk_steps_string(0, 40) res_LUG = self.get_query_value_forecast(measurement, 'LUG__PMSL__0', steps, func) res_KLO = self.get_query_value_forecast(measurement, 'KLO__PMSL__0', steps, func) diff = (res_KLO - res_LUG) / 100.0 if 'favonio' in signal_data: return 1.0 if diff >= 6.0 else 0.0 else: return diff
[docs] def do_multiday_query(self, signal_data, measurement): """ Calculate the mean of the last 24, 48 or 72 hourly measurements for all measured signals Special case NOx_12h is also calculated here: MOR: mean value of NOx hourly measurements from 22:00 to 10:00 of previous day (previous afternoon) EVE: mean value of NOx hourly measurements from 22:00 of previous day to 10:00 of present day (present morning) :param signal_data: signal code :type signal_data: str :param measurement: InfluxDB measurement code :type measurement: str :return: single value of the queried signal :rtype: float """ (location, signal_code, aggregator) = signal_data.split('__', 2) dt = self.set_forecast_day() func = signal_data.split('__')[-1] if '12h' in aggregator: if self.forecast_type == 'MOR': start_dt = '%sT10:00:00Z' % (dt - timedelta(days=1)).strftime('%Y-%m-%d') end_dt = '%sT22:00:00Z' % (dt - timedelta(days=1)).strftime('%Y-%m-%d') else: start_dt = '%sT22:00:00Z' % (dt - timedelta(days=1)).strftime('%Y-%m-%d') end_dt = '%sT10:00:00Z' % dt.strftime('%Y-%m-%d') elif '24h' in aggregator: [start_dt, end_dt] = self.measurements_start_end(days=1) elif '48h' in aggregator: [start_dt, end_dt] = self.measurements_start_end(days=2) elif '72h' in aggregator: [start_dt, end_dt] = self.measurements_start_end(days=3) else: self.logger.error('Unexpected error with 12h, 24h, 48h, 72h artificial signal') val = self.get_query_value_measure(measurement, location + '__' + signal_code + '__', start_dt, end_dt, func) return val
[docs] def set_forecast_day(self): """ Set the day related to the forecast """ dt = datetime.now(pytz.utc) if self.cfg['forecastPeriod']['case'] != 'current': # Customize the prediction date (y, m, d) = self.cfg['dayToForecast'].split('-') dt = dt.replace(year=int(y), month=int(m), day=int(d)) # Format the global variable as a string if self.day_to_predict is None: # morning case if self.forecast_type == 'MOR': self.day_to_predict = dt # evening case else: self.day_to_predict = dt + timedelta(days=1) self.day_to_predict = self.day_to_predict.replace(hour=0, minute=0, second=0) self.day_to_predict = int(self.day_to_predict.timestamp()) return dt
[docs] def calc_data(self, query, signal_data, func): self.logger.info('Performing query: %s' % query) res = self.influxdb_client.query(query, epoch='s') vals = [] # tmp = signal_data.split('__') try: for i in range(0, len(res.raw['series'][0]['values'])): if res.raw['series'][0]['values'][i][1] is not None: # The training of Meteosuisse temperatures were done in Kelvin degrees # if tmp[1] in ['TD_2M', 'T_2M'] and 'MAX' not in signal_data and 'transf' not in signal_data: # val = float(res.raw['series'][0]['values'][i][1]) + 273.1 # else: # val = float(res.raw['series'][0]['values'][i][1]) val = float(res.raw['series'][0]['values'][i][1]) vals.append(val) if func == 'min': return np.min(vals) elif func == 'max': return np.max(vals) elif func == 'mean': return np.mean(vals) elif func == 'sum': return np.sum(vals) elif func == 'std': return np.std(vals) except Exception as e: self.logger.error('Forecast not available') self.logger.error('No data from query %s' % query) self.input_data[signal_data] = np.nan
[docs] @staticmethod def create_forecast_chunk_steps_string(start, end): str_steps = '(' for i in range(start, end + 1): str_steps += 'step=\'step%02d\' OR ' % i str_steps = '%s)' % str_steps[:-4] return str_steps