Source code for inputs_gatherer
# import section
import glob
import math
import json
import os
from datetime import datetime, timedelta
from time import sleep
import numpy as np
import pandas as pd
import pytz
from influxdb import InfluxDBClient
import constants
[docs]class InputsGatherer:
"""
Class handling the gathering of the inputs needed by a collection of predictors.
There are 3 ways to create a dataframe:
- Read an existing CSV (see method dataframe_reader)
- Define a region composed of measurements and forecast stations, define the signals to be used by each station, then create all possible signals in JSON format and finally create the dataframe by querying InfluxDB (see method dataframe_builder_regions)
- read an existing JSON containing a set of signals and create the dataframe by querying InfluxDB (see method dataframe_builder_custom)
"""
def __init__(self, influxdb_client, forecast_type, cfg, logger, artificial_features):
"""
Constructor
:param influxdb_client: InfluxDB client
:type influxdb_client: InfluxDBClient
:param forecast_type: Forecast type (MOR | EVE)
:type forecast_type: str
:param cfg: FTP parameters for the files exchange
:type cfg: dict
:param logger: Logger
:type logger: Logger object
:param artificial_features: Artificial features
:type artificial_features: ArtificialFeatures
"""
# set the variables
self.influxdb_client = influxdb_client
self.forecast_type = forecast_type
self.cfg = cfg
self.logger = logger
self.io_data = None
self.io_data_sub = None
self.day_to_predict = None
self.cfg_signals = None
self.artificial_features = artificial_features
[docs] def build_global_input_dataset(self):
"""
Build the dataset
"""
self.io_data = dict()
self.io_data_sub = dict()
self.day_to_predict = None
# Create the signals list considering all the couples location-model
self.cfg_signals = dict(signals=[])
# Cycle over the folders
for tmp_folder in glob.glob('%s%s*%s' % (self.cfg['folders']['models'], os.sep, self.forecast_type)):
# Check if the current folder refers to a location configured for the prediction
region_code = tmp_folder.split(os.sep)[-1].split('_')[0]
if region_code in self.cfg['regions'].keys():
# Cycle over the input files in the folder (each files correspond to a model)
for input_cfg_file in glob.glob('%s%s/inputs_*.json' % (tmp_folder, os.sep)):
tmp_cfg_signals = json.loads(open(input_cfg_file).read())
self.cfg_signals['signals'] = self.cfg_signals['signals'] + tmp_cfg_signals['signals']
self.cfg_signals['signals'] = list(set(self.cfg_signals['signals']))
# get the values in the DB
i = 1
for signal in self.cfg_signals['signals']:
self.logger.info('Try to add input n. %04d/%04d, %s' % (i, len(self.cfg_signals['signals']), signal))
self.add_input_value(signal=signal, forecast_substitution=False)
self.logger.info('Added input n. %04d/%04d' % (i, len(self.cfg_signals['signals'])))
i += 1
# Check the data availability
self.check_inputs_availability()
[docs] def build_dataset(self, name, input_signals):
"""
Build the training dataset given a signal json file in folder "conf/dataset" either from a region or from a
custom list
"""
self.io_data = dict()
fp = self.output_folder_creator(name)
file_name_df = fp + fp.split(os.sep)[1] + '_dataset.csv'
# initialize the Pandas dataframe that will contain the final dataset
output_signals = self.cfg['regions'][name]['targetColumns']
dataset = pd.DataFrame(columns=['date'] + input_signals + output_signals)
# Boolean flag to determine whether we should override an existing output dataset csv or not
flag_starting_dataset = True
# Iterate over the years
for year in self.cfg['datasetSettings']['years']:
start_day = str(year) + '-' + self.cfg['datasetSettings']['startDay']
end_day = str(year) + '-' + self.cfg['datasetSettings']['endDay']
start_dt = datetime.strptime(start_day, '%Y-%m-%d')
end_dt = datetime.strptime(end_day, '%Y-%m-%d')
# Check if you have to jump to the next year
if start_dt > end_dt:
end_day = str(year+1) + '-' + self.cfg['datasetSettings']['endDay']
end_dt = datetime.strptime(end_day, '%Y-%m-%d')
# todo this code should be checked carefully
# In 2020 we lost part of the forecasted signal, so we're forced to discard most days until the 17th August
if len(output_signals) > 0:
if 'O3' in output_signals[0] and year == 2020 and start_dt < datetime.strptime('2020-08-17', '%Y-%m-%d'):
start_day = '2020-08-17'
if 'O3' in output_signals[0] and year == 2020 and end_dt < datetime.strptime('2020-08-17', '%Y-%m-%d'):
continue
curr_day = start_day
while True:
self.cfg['dayToForecast'] = curr_day
# Iterate over the input signals
for i in range(0, len(input_signals)):
self.add_input_value(signal=input_signals[i], forecast_substitution=True)
self.logger.info('Added input n. %04d/%04d' % (i+1, len(input_signals)))
sleep(self.cfg['datasetSettings']['sleepTimeBetweenQueries'])
# Iterate over the output signals
for i in range(0, len(output_signals)):
self.add_output_value(signal=output_signals[i])
self.logger.info('Added output n. %04d/%04d' % (i+1, len(output_signals)))
sleep(self.cfg['datasetSettings']['sleepTimeBetweenQueries'])
lcl_data = dict({'date': curr_day}, **self.io_data)
lcl_df = pd.DataFrame([lcl_data], columns=dataset.columns)
if flag_starting_dataset:
lcl_df.to_csv(file_name_df, mode='w', header=True, index=False)
flag_starting_dataset = False
else:
lcl_df.to_csv(file_name_df, mode='a', header=False, index=False)
dataset = dataset.append(lcl_df)
# add a day
curr_dt = datetime.strptime(curr_day, '%Y-%m-%d')
curr_day = datetime.strftime(curr_dt + timedelta(days=1), '%Y-%m-%d')
# Last day-1d checking
if curr_dt.timestamp() >= end_dt.timestamp():
break
return dataset
[docs] def check_inputs_availability(self):
dps = []
self.io_data_availability = dict()
for k in self.io_data.keys():
# Check if the value is None or nan
if self.io_data[k] is None or np.isnan(self.io_data[k]):
self.io_data[k] = self.retrieve_past_mean(code=k)
self.io_data_availability[k] = False
else:
self.io_data_availability[k] = True
point = {
'time': int(self.day_to_predict),
'measurement': self.cfg['influxDB']['measurementInputsHistory'],
'fields': dict(value=float(self.io_data[k])),
'tags': dict(code=k, case=self.forecast_type)
}
dps.append(point)
if len(dps) > 0:
self.logger.info('Inserted in the history %i inputs values related predictions of day %s, '
'case %s' % (len(dps), datetime.fromtimestamp(self.day_to_predict).strftime('%Y-%m-%d'),
self.forecast_type))
self.influxdb_client.write_points(dps, time_precision=self.cfg['influxDB']['timePrecision'])
[docs] def retrieve_past_mean(self, code):
query = 'SELECT mean(value) FROM %s WHERE code=\'%s\' AND case=\'%s\' AND ' \
'time>=\'%s\' AND time<\'%s\'' % (self.cfg['influxDB']['measurementInputsHistory'], code,
self.forecast_type,
self.cfg['predictionGeneralSettings']['startDateForMeanImputation'],
datetime.fromtimestamp(self.day_to_predict).strftime('%Y-%m-%d'))
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
try:
return float(res.raw['series'][0]['values'][0][1])
except Exception as e:
self.logger.error('Impossible to calculate the mean of the past values')
return np.nan
[docs] def add_input_value(self, signal, forecast_substitution=False, force_substitution=False):
"""
Add the value related to a given input signal
:param signal: signal code
:type signal: string
:return: query
:rtype: string
"""
# Signals exception (e.g. isWeekend, etc.)
if signal in constants.SIGNAL_EXCEPTIONS or any([s in signal for s in constants.ARTIFICIAL_FEATURES]):
self.handle_exception_signal(signal)
else:
tmp = signal.split('__')
# Forecasts data
if tmp[0] in constants.METEO_FORECAST_STATIONS:
# measurement = self.cfg['influxDB']['measurementMeteoSuisse']
measurement = self.cfg['influxDB']['measurementInputsForecasts']
if '__step' in signal:
self.do_forecast_step_query(signal, measurement, forecast_substitution, force_substitution)
else:
self.do_forecast_period_query(signal, measurement, forecast_substitution, force_substitution) # Forecasts data
# Copernicus forecasts data
elif tmp[0] in constants.COPERNICUS_STATIONS and tmp[1] in constants.COPERNICUS_SIGNALS:
self.do_copernicus_step_query(signal, self.cfg['influxDB']['measurementInputsForecastsCopernicus'])
# Measurement data
else:
measurement = self.cfg['influxDB']['measurementInputsMeasurements']
if '__d0' in signal or '__d1' in signal or '__d2' in signal or \
'__d3' in signal or '__d4' in signal or '__d5' in signal:
# check if there are chunks
if '__chunk' in signal:
self.do_chunk_query(signal, measurement)
else:
self.do_daily_query(signal, measurement)
elif '__db' in signal:
self.do_daily_query(signal, measurement)
elif '__moving_average' in signal:
self.do_moving_average_query(signal, measurement)
else:
# specific period query
if 'h__' in signal:
self.do_period_query(signal, measurement, 'measure')
else:
self.do_hourly_query(signal, measurement)
[docs] def add_output_value(self, signal):
"""
Add the value related to a given output signal
:param signal: signal code
:type signal: string
:return: query
:rtype: string
"""
measurement = self.cfg['influxDB']['measurementInputsMeasurements']
self.do_daily_query(signal, measurement, flag_output_signal=True)
[docs] def do_forecast_period_query(self, signal_data, measurement, forecast_substitution, force_substitution=False):
(location, signal_code, chunk, func) = signal_data.split('__')
dt = self.set_forecast_day()
# the last forecast has been performed at 03:00 or at 12:00
if self.forecast_type == 'MOR':
str_dt = '%sT03:00:00Z' % dt.strftime('%Y-%m-%d')
str_steps = self.create_forecast_chunk_steps_string(chunk, '03')
else:
str_dt = '%sT12:00:00Z' % dt.strftime('%Y-%m-%d')
str_steps = self.create_forecast_chunk_steps_string(chunk, '12')
query = 'SELECT value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND %s AND ' \
'time=\'%s\'' % (measurement, location, signal_code, str_steps, str_dt)
self.calc_data(query=query, signal_data=signal_data, func=func, forecast_substitution=forecast_substitution,
str_steps=str_steps, str_dt=str_dt, force_substitution=force_substitution)
[docs] @staticmethod
def create_forecast_chunk_steps_string(chunk, case):
start = constants.CHUNKS_FORECASTS[case][chunk]['start']
end = constants.CHUNKS_FORECASTS[case][chunk]['end']
str_steps = '('
for i in range(start, end + 1):
str_steps += 'step=\'step%02d\' OR ' % i
str_steps = '%s)' % str_steps[:-4]
return str_steps
[docs] def do_copernicus_step_query(self, signal_data, measurement):
(location, signal_code, step) = signal_data.split('__')
dt = self.set_forecast_day()
dt = dt.replace(minute=0, hour=0, second=0, microsecond=0)
step = 'step%02d' % int(step[4:])
# MOR case: for the prediction of a generic day D (performed at D_07:30) the latest COPERNICUS forecast
# is the one available at ~00:30 AM of D, related to D-1,D,D+1,D+2,...
if self.forecast_type == 'MOR':
dt = dt - timedelta(days=1)
# EVE case: for the prediction of a generic day D (performed at D-1_19:30) the latest COPERNICUS forecast
# is the one available at ~00:30 AM of D-1, related to D-2,D-1,D,D+1,...
elif self.forecast_type == 'EVE':
dt = dt - timedelta(days=2)
str_dt = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
query = 'SELECT value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND step=\'%s\' AND ' \
'time=\'%s\'' % (measurement, location, signal_code, step, str_dt)
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
if 'series' in res.raw.keys() and len(res.raw['series']) > 0:
try:
val = float(res.raw['series'][0]['values'][0][1])
self.io_data[signal_data] = val
except Exception as e:
self.logger.error('Data not available')
self.io_data[signal_data] = np.nan
[docs] def do_forecast_step_query(self, signal_data, measurement, forecast_substitution, force_substitution=False):
if len(signal_data.split('__')) == 4:
(location, signal_code, case, step) = signal_data.split('__')
else:
(location, signal_code, step) = signal_data.split('__')
dt = self.set_forecast_day()
# the last forecast has been performed at 03:00 or at 12:00
if self.forecast_type == 'MOR':
# COSMO2 forecast start at 00:00, COSMO1 at 03:00
if '_c2' in signal_code:
dt = dt.replace(minute=0, hour=0, second=0, microsecond=0)
step = 'step%03d' % int(step[4:])
time_interval = 3
else:
dt = dt.replace(minute=0, hour=3, second=0, microsecond=0)
step = 'step%02d' % int(step[4:])
time_interval = 1
else:
dt = dt.replace(minute=0, hour=12, second=0, microsecond=0)
if '_c2' in signal_code:
step = 'step%03d' % int(step[4:])
time_interval = 3
else:
step = 'step%02d' % int(step[4:])
time_interval = 1
str_dt = dt.strftime('%Y-%m-%dT%H:%M:%SZ')
if force_substitution is False:
query = 'SELECT value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND step=\'%s\' AND ' \
'time=\'%s\'' % (measurement, location, signal_code, step, str_dt)
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
if 'series' in res.raw.keys() and len(res.raw['series']) > 0:
try:
# The training of Meteosuisse temperatures were done in Kelvin degrees
# todo this thing just below is a shame, the data in the DB to use for the training are in Celsius!!
# if signal_code in ['TD_2M', 'T_2M']:
# val = float(res.raw['series'][0]['values'][0][1]) + 273.1
# else:
# val = float(res.raw['series'][0]['values'][0][1])
val = float(res.raw['series'][0]['values'][0][1])
self.io_data[signal_data] = val
except Exception as e:
self.logger.error('Data not available')
self.io_data[signal_data] = np.nan
else:
self.logger.warning('Forecast not available')
if forecast_substitution is True:
# Manage the substitution
self.logger.warning('The forecast will be substituted by a correspondent measured value')
dt_start_meas = dt + timedelta(hours=int(step.replace('step', '')))
dt_end_meas = dt_start_meas + timedelta(hours=time_interval)
res_meas = self.do_forecast_substitution(location, signal_code, dt_start_meas, dt_end_meas, None)
try:
val = float(res_meas.raw['series'][0]['values'][0][1])
self.io_data[signal_data] = val
except Exception as e:
self.logger.error('Data not available')
self.io_data[signal_data] = np.nan
else:
self.logger.warning('No data from query %s' % query)
self.io_data[signal_data] = np.nan
else:
self.logger.warning('FORCED SUBSTITUTION: The forecast will be substituted by a correspondent measured value')
dt_start_meas = dt + timedelta(hours=int(step.replace('step', '')))
dt_end_meas = dt_start_meas + timedelta(hours=time_interval)
res_meas = self.do_forecast_substitution(location, signal_code, dt_start_meas, dt_end_meas, None)
if res_meas is not None and 'series' in res_meas.raw.keys():
try:
val = float(res_meas.raw['series'][0]['values'][0][1])
self.io_data_sub[signal_data] = val
except Exception as e:
self.logger.error('EXCEPTION: %s' % str(e))
self.logger.error('Unable to find a substitute for signal %s' % signal_data)
self.io_data_sub[signal_data] = np.nan
else:
self.io_data_sub[signal_data] = np.nan
[docs] def do_chunk_query(self, signal_data, measurement):
(location, signal_code, day, chunk, func) = signal_data.split('__')
# Chunk management
dt = self.set_forecast_day()
dt = dt.date()
if chunk == 'chunk1':
start_dt = dt - timedelta(int(day[-1]))
end_dt = dt - timedelta(int(day[-1]) - 1)
start_tm = '23:00:00'
end_tm = '04:59:00'
elif chunk == 'chunk2':
start_dt = dt - timedelta(int(day[-1]))
end_dt = dt - timedelta(int(day[-1]))
start_tm = '17:00:00'
end_tm = '22:59:00'
elif chunk == 'chunk3':
start_dt = dt - timedelta(int(day[-1]))
end_dt = dt - timedelta(int(day[-1]))
start_tm = '11:00:00'
end_tm = '16:59:00'
elif chunk == 'chunk4':
start_dt = dt - timedelta(int(day[-1]))
end_dt = dt - timedelta(int(day[-1]))
start_tm = '05:00:00'
end_tm = '10:59:00'
query = 'SELECT mean(value) FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND ' \
'time>=\'%s\' AND time<=\'%s\' ' \
'GROUP BY time(1h)' % (measurement, location, signal_code,
'%sT%sZ' % (start_dt.strftime('%Y-%m-%d'), start_tm),
'%sT%sZ' % (end_dt.strftime('%Y-%m-%d'), end_tm))
self.calc_data(query=query, signal_data=signal_data, func=func)
[docs] def do_daily_query(self, signal_data, measurement, flag_output_signal=False):
if len(signal_data.split('__')) == 4:
(location, signal_code, day, func) = signal_data.split('__')
else:
# cases of past YO3 and YO3_index
(location, signal_code, day) = signal_data.split('__')
func = 'mean'
dt = self.set_forecast_day()
# If I have an output signal I have to get data related to future days
if flag_output_signal is True:
# OUTPUT: D --> D+x
if self.forecast_type == 'EVE':
day_date = dt + timedelta(int(day[-1])+1)
else:
day_date = dt + timedelta(int(day[-1]))
else:
# INPUT: D-x <-- D
if self.forecast_type == 'EVE':
day_date = dt - timedelta(int(day[-1])-1)
else:
day_date = dt - timedelta(int(day[-1]))
query = 'SELECT mean(value) FROM %s WHERE location=\'%s\' AND ' \
'signal=\'%s\' AND time>=\'%s\' AND time<=\'%s\' ' \
'GROUP BY time(1h)' % (measurement, location, signal_code,
'%sT00:00:00Z' % day_date.strftime('%Y-%m-%d'),
'%sT23:59:59Z' % day_date.strftime('%Y-%m-%d'))
self.calc_data(query=query, signal_data=signal_data, func=func, forecast_substitution=False, str_steps=None,
str_dt=None)
[docs] @staticmethod
def set_MOR_EVE_daytime(forecast_type, dt):
if forecast_type == 'MOR':
# Data starting is assigned to 04 UTC
return dt.replace(hour=4)
elif forecast_type == 'EVE':
# Data starting is assigned to 16 UTC
return dt.replace(hour=16)
[docs] def do_moving_average_query(self, signal_data, measurement):
(location, signal_code, moving_average_func, hours) = signal_data.split('__')
end_dt = self.set_forecast_day()
end_dt = self.set_MOR_EVE_daytime(self.forecast_type, end_dt)
end_dt = end_dt - timedelta(hours=int(hours[1:]))
start_dt = end_dt - timedelta(hours=int(int(moving_average_func.replace('moving_average', ''))))
query = 'SELECT value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND time>=\'%s\' AND ' \
'time<=\'%s\'' % (measurement, location, signal_code,
'%s:30:00Z' % start_dt.strftime('%Y-%m-%dT%H'),
'%s:00:00Z' % end_dt.strftime('%Y-%m-%dT%H'))
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
try:
# Consider all the available data
vals = []
for elem in res.raw['series'][0]['values']:
vals.append(elem[1])
self.io_data[signal_data] = np.mean(vals)
except Exception as e:
self.logger.error('No data from query %s' % query)
self.io_data[signal_data] = np.nan
[docs] def do_hourly_query(self, signal_data, measurement):
(location, signal_code, hours) = signal_data.split('__')
dt = self.set_forecast_day()
dt = self.set_MOR_EVE_daytime(self.forecast_type, dt)
if hours[0] == 'm':
dt = dt - timedelta(hours=int(hours[1:]))
else:
dt = dt + timedelta(hours=int(hours[1:]))
query = 'SELECT mean(value) FROM %s WHERE location=\'%s\' AND ' \
'signal=\'%s\' AND time>=\'%s\' AND time<=\'%s\'' % (measurement, location, signal_code,
'%s:00:00Z' % dt.strftime('%Y-%m-%dT%H'),
'%s:59:59Z' % dt.strftime('%Y-%m-%dT%H'))
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
try:
self.io_data[signal_data] = res.raw['series'][0]['values'][0][1]
except Exception as e:
self.logger.error('Forecast not available')
self.logger.error('No data from query %s' % query)
# Nocturnal irradiance in BIO is missing rather than 0, so fixing it here
if location == 'BIO' and signal_code == 'Gl':
self.io_data[signal_data] = 0.0
else:
self.io_data[signal_data] = np.nan
[docs] def do_period_query(self, signal_data, measurement, case):
(location, signal_code, period, func) = signal_data.split('__')
dt = self.set_forecast_day()
dt = self.set_MOR_EVE_daytime(self.forecast_type, dt)
hours_num = int(period[0:-1])
if case == 'measure':
dt_start = dt - timedelta(hours=hours_num)
dt_end = dt
else:
dt_start = dt
dt_end = dt + timedelta(hours=hours_num)
query = 'SELECT %s(value) FROM %s WHERE location=\'%s\' AND ' \
'signal=\'%s\' AND time>=\'%s\' AND time<=\'%s\'' % (func, measurement, location, signal_code,
'%s:00:00Z' % dt_start.strftime('%Y-%m-%dT%H'),
'%s:59:59Z' % dt_end.strftime('%Y-%m-%dT%H'))
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
try:
self.io_data[signal_data] = res.raw['series'][0]['values'][0][1]
except Exception as e:
self.logger.error('Forecast not available')
self.logger.error('No data from query %s' % query)
self.io_data[signal_data] = np.nan
[docs] def calc_data(self, query, signal_data, func, forecast_substitution, str_steps, str_dt, force_substitution=False):
vals = []
tmp = signal_data.split('__')
if force_substitution is False:
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
try:
for i in range(0, len(res.raw['series'][0]['values'])):
if res.raw['series'][0]['values'][i][1] is not None:
# The training of Meteosuisse temperatures were done in Kelvin degrees
# if tmp[1] in ['TD_2M', 'T_2M']:
# val = float(res.raw['series'][0]['values'][i][1]) + 273.1
# else:
# val = float(res.raw['series'][0]['values'][i][1])
val = float(res.raw['series'][0]['values'][i][1])
vals.append(val)
if func == 'min':
self.io_data[signal_data] = np.min(vals)
elif func == 'max':
self.io_data[signal_data] = np.max(vals)
elif func == 'mean':
self.io_data[signal_data] = np.mean(vals)
elif func == 'std':
self.io_data[signal_data] = np.std(vals)
except Exception as e:
if forecast_substitution is True:
# Manage the substitution
self.logger.warning('The forecast will be substituted by a correspondent measured value')
steps_sub = str_steps.replace(' OR ', '').replace('(', '').replace(')', '').replace('\'', '').split('step=step')
dt_sub = datetime.strptime(str_dt, '%Y-%m-%dT%H:%M:%SZ')
step_sub_start = int(steps_sub[1])
step_sub_end = int(steps_sub[-1])
dt_start_meas = dt_sub + timedelta(hours=step_sub_start)
dt_end_meas = dt_sub + timedelta(hours=step_sub_end)
res_meas = self.do_forecast_substitution(tmp[0], tmp[1], dt_start_meas, dt_end_meas, tmp[-1])
try:
val = float(res_meas.raw['series'][0]['values'][0][1])
self.io_data[signal_data] = val
except Exception as e:
self.logger.error('Data not available')
self.io_data[signal_data] = np.nan
else:
self.logger.error('Forecast not available')
self.logger.error('No data from query %s' % query)
self.io_data[signal_data] = np.nan
else:
self.logger.warning('FORCE SUBSTITUTION: The forecast will be substituted by a correspondent measured value')
steps_sub = str_steps.replace(' OR ', '').replace('(', '').replace(')', '').replace('\'', '').split('step=step')
dt_sub = datetime.strptime(str_dt, '%Y-%m-%dT%H:%M:%SZ')
step_sub_start = int(steps_sub[1])
step_sub_end = int(steps_sub[-1])
dt_start_meas = dt_sub + timedelta(hours=step_sub_start)
dt_end_meas = dt_sub + timedelta(hours=step_sub_end)
res_meas = self.do_forecast_substitution(tmp[0], tmp[1], dt_start_meas, dt_end_meas, tmp[-1])
try:
val = float(res_meas.raw['series'][0]['values'][0][1])
self.io_data_sub[signal_data] = val
except Exception as e:
self.logger.error('Data not available')
self.io_data_sub[signal_data] = np.nan
[docs] def do_forecast_substitution(self, location, signal, dt_start_meas, dt_end_meas, func_override):
# Manage the substitution
if location in self.cfg['forecastedSubstitutes']['stations'].keys():
loc_sub = self.cfg['forecastedSubstitutes']['stations'][location]
if self.cfg['forecastedSubstitutes']['signals'][signal]['func'] != 'none':
if func_override is None:
str_func_sub = '%s(value)' % self.cfg['forecastedSubstitutes']['signals'][signal]['func']
else:
str_func_sub = '%s(value)' % func_override
else:
str_func_sub = 'value'
sig_sub = self.cfg['forecastedSubstitutes']['signals'][signal]['id']
query_meas = 'SELECT %s FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND ' \
'time>=\'%s\' AND time<\'%s\'' % (str_func_sub,
self.cfg['influxDB']['measurementInputsMeasurements'],
loc_sub, sig_sub,
dt_start_meas.strftime('%Y-%m-%dT%H:%M:%SZ'),
dt_end_meas.strftime('%Y-%m-%dT%H:%M:%SZ'))
self.logger.info('Performing query to substitute the forecast: %s' % query_meas)
res_meas = self.influxdb_client.query(query_meas, epoch='s')
return res_meas
else:
return None
[docs] def handle_exception_signal(self, signal_data):
dt = self.set_forecast_day()
# RHW
if 'RHW' in signal_data:
(signal_code, day) = signal_data.split('__')
day_date = dt - timedelta(int(day[-1]))
str_date = day_date.strftime('%Y-%m-%d')
measurement = self.cfg['influxDB']['measurementGlobal']
query = 'SELECT value FROM %s WHERE signal=\'%s\' AND time=\'%sT00:00:00Z\'' % (measurement, signal_code,
str_date)
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
try:
self.io_data[signal_data] = res.raw['series'][0]['values'][0][1]
except Exception as e:
self.logger.error('Forecast not available')
self.logger.error('No data from query %s' % query)
self.io_data[signal_data] = np.nan
elif any([s in signal_data for s in constants.ARTIFICIAL_FEATURES]):
res = self.artificial_features.analyze_signal(signal_data)
self.io_data[signal_data] = res
else:
# if EVE case the day to predict is the next one
if self.forecast_type == 'EVE':
dt = dt + timedelta(1)
# day of the week
if signal_data == 'DayWeek':
self.io_data[signal_data] = float(dt.weekday())
# weekend/not weekend
elif signal_data == 'IsWeekend':
if dt.weekday() >= 5:
# weekend day
self.io_data[signal_data] = 1.0
else:
# not weekend day
self.io_data[signal_data] = 0.0
# holydays
elif signal_data == 'IsHolyday':
if dt.strftime('%m-%d') in constants.HOLYDAYS:
self.io_data[signal_data] = 1.0
else:
self.io_data[signal_data] = 0.0
[docs] def set_forecast_day(self):
"""
Set the day related to the forecast
"""
dt = datetime.now(pytz.utc)
if self.cfg['forecastPeriod']['case'] != 'current':
# Customize the prediction date
(y, m, d) = self.cfg['dayToForecast'].split('-')
dt = dt.replace(year=int(y), month=int(m), day=int(d))
# Format the global variable as a string
if self.day_to_predict is None:
# morning case
if self.forecast_type == 'MOR':
self.day_to_predict = dt
# evening case
else:
self.day_to_predict = dt + timedelta(days=1)
self.day_to_predict = self.day_to_predict.replace(hour=0, minute=0, second=0)
self.day_to_predict = int(self.day_to_predict.timestamp())
return dt
[docs] def get_previous_day(self):
if self.cfg['forecastPeriod']['case'] == 'current':
dt = datetime.now(pytz.utc) - timedelta(days=1)
else:
dt = datetime.strptime(self.cfg['dayToForecast'], '%Y-%m-%d')
dt = pytz.utc.localize(dt)
dt = dt - timedelta(days=1)
return dt
[docs] def get_location_with_output_signal(self, region, os):
stations = []
for ms in self.cfg['regions'][region]['measureStations']:
if os in self.cfg['measuredSignalsStations'][ms]:
stations.append(ms)
return stations
[docs] def calc_yesterday_output_daily_values(self, region, os):
"""
Calc daily data of O3 values related to yesterday
"""
# get the date to analyze
dt = self.get_previous_day()
o3_locations = self.get_location_with_output_signal(region, os)
str_o3_locs = '('
for o3loc in o3_locations:
str_o3_locs += 'location=\'%s\' OR ' % o3loc
str_o3_locs = '%s)' % str_o3_locs[0:-4]
query = 'SELECT mean(value) FROM %s WHERE signal=\'%s\' AND %s AND ' \
'time>=\'%sT00:00:00Z\' AND ' \
'time<=\'%sT23:59:59Z\' GROUP BY time(1h), location, signal' % (self.cfg['influxDB']['measurementInputsMeasurements'],
os, str_o3_locs,
dt.strftime('%Y-%m-%d'),
dt.strftime('%Y-%m-%d'))
self.logger.info('Performing query: %s' % query)
res = self.influxdb_client.query(query, epoch='s')
utc_ts = int(dt.timestamp())
dps = []
daily_max = -1
for series in res.raw['series']:
vals = []
for i in range(0, len(series['values'])):
if series['values'][i][1] is not None:
vals.append(series['values'][i][1])
vals.append(daily_max)
daily_max = np.max(vals)
point = {
'time': utc_ts,
'measurement': self.cfg['influxDB']['measurementInputsMeasurements'],
'fields': dict(value=float(daily_max)),
'tags': dict(signal='Y%s' % os, location=region)
}
dps.append(point)
self.logger.info('Sent %i points to InfluxDB server' % len(dps))
self.influxdb_client.write_points(dps, time_precision=self.cfg['influxDB']['timePrecision'])
[docs] def hourly_measured_signals(self, measurementStation, measuredSignal):
signals = []
for i in range(24):
signals.append(measurementStation + '__' + measuredSignal + '__m' + str(i))
return signals
[docs] def hourly_mean_avgs_measured_signals(self, measurementStation, measuredSignal, hours_back):
signals = []
for i in range(24):
signals.append(measurementStation + '__' + measuredSignal + '__moving_average' + hours_back + '__m' + str(i))
return signals
[docs] def hourly_forecasted_signals(self, forecastStation, forecastedSignal, start, end, step):
signals = []
for i in range(start, end, step):
signals.append(forecastStation + '__' + forecastedSignal + '__step' + str(i))
return signals
[docs] def chunks_forecasted_signals(self, forecastStation, forecastedSignal):
signals = []
for i in range(1, 5):
for modifier in ['min', 'max', 'mean']:
signals.append(forecastStation + '__' + forecastedSignal + '__chunk' + str(i) + '__' + modifier)
return signals
[docs] def past_days_measured_signals_aggregations(self, measurementStation, measuredSignal, cases=False, agg_func=None):
signals = []
if cases is not False:
choices = cases
else:
choices = ['24h', '48h', '72h']
for i in choices:
if agg_func == 'all' or agg_func is None:
signals.append(measurementStation + '__' + measuredSignal + '__' + str(i) + '__mean')
signals.append(measurementStation + '__' + measuredSignal + '__' + str(i) + '__max')
signals.append(measurementStation + '__' + measuredSignal + '__' + str(i) + '__min')
signals.append(measurementStation + '__' + measuredSignal + '__' + str(i) + '__std')
else:
signals.append(measurementStation + '__' + measuredSignal + '__' + str(i) + '__' + agg_func)
return signals
[docs] def artificial_features_forecasted_signals(self, forecastStation):
signals = []
# for cosmo_case in ['', '_c2']:
for cosmo_case in ['']:
signals.append(forecastStation + '__CLCT%s__mean_mor' % cosmo_case)
signals.append(forecastStation + '__CLCT%s__mean_eve' % cosmo_case)
signals.append(forecastStation + '__GLOB%s__mean_mor' % cosmo_case)
signals.append(forecastStation + '__GLOB%s__mean_eve' % cosmo_case)
signals.append(forecastStation + '__TOT_PREC%s__sum' % cosmo_case)
signals.append(forecastStation + '__T_2M%s__12h_mean' % cosmo_case)
signals.append(forecastStation + '__T_2M%s__12h_mean_squared' % cosmo_case)
signals.append(forecastStation + '__T_2M%s__MAX' % cosmo_case)
signals.append(forecastStation + '__TD_2M%s__MAX' % cosmo_case)
signals.append(forecastStation + '__TD_2M%s__transf' % cosmo_case)
return signals
[docs] def generate_input_signals_codes(self, region):
"""
Method to generate and save all known signals of a specific region (e.g. Ticino) with defined measuring and
forecasting stations
"""
signal_list = []
# Add measures signals codes
for measurementStation in self.cfg["regions"][region]["measureStations"]:
# signal_list.extend(self.artificial_features_measured_signals(measurementStation))
for measuredSignal in self.cfg["measuredSignalsStations"][measurementStation].keys():
if self.cfg["measuredSignalsStations"][measurementStation][measuredSignal] == 'all':
signal_list.extend(self.past_days_measured_signals_aggregations(measurementStation,
measuredSignal, False, None))
signal_list.extend(self.hourly_measured_signals(measurementStation, measuredSignal))
else:
if self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['daily'] == 'all':
daily_agg_func = self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['daily_function']
signal_list.extend(self.past_days_measured_signals_aggregations(measurementStation,
measuredSignal,
False,
daily_agg_func))
elif type(self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['daily']) is list:
if 'daily_function' in self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations'].keys():
daily_agg_func = self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['daily_function']
else:
daily_agg_func = 'all'
signal_list.extend(self.past_days_measured_signals_aggregations(measurementStation,
measuredSignal,
self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['daily'],
daily_agg_func))
if self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['hourly'] == 'all':
signal_list.extend(self.hourly_measured_signals(measurementStation, measuredSignal))
if 'moving_average' in self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['hourly']:
mov_avg_str = self.cfg["measuredSignalsStations"][measurementStation][measuredSignal]['aggregations']['hourly']
mov_avg_str = mov_avg_str.replace('moving_average_', '')
for hours_back in mov_avg_str.split('-'):
signal_list.extend(self.hourly_mean_avgs_measured_signals(measurementStation, measuredSignal, hours_back))
# Add forecast signals codes
for forecastStation in self.cfg["regions"][region]["forecastStations"]:
# signal_list.extend(self.artificial_features_forecasted_signals(forecastStation))
for forecastedSignal in self.cfg["forecastedSignalsStations"][forecastStation]:
# Check if there is a COSMO1 signal (COSMO2s have the _c2 suffix)
if forecastedSignal[-2:] == 'c2':
# COSMO2 goes until 120 hours ahead with a resolution of 3 hours
# The first 33 hours are not considered because they are already covered by COSMO1
signal_list.extend(self.hourly_forecasted_signals(forecastStation, forecastedSignal, 33, 120+1, 3))
else:
# COSMO1 goes until 33 hours ahead with a resolution of 1 hour
signal_list.extend(self.hourly_forecasted_signals(forecastStation, forecastedSignal, 0, 33+1, 1))
# Chunk aggregation
signal_list.extend(self.chunks_forecasted_signals(forecastStation, forecastedSignal))
# Add Copernicus forecast signals codes
for copernicusStation in self.cfg["regions"][region]["copernicusStations"]:
for copernicusSignal in self.cfg["copernicusSignalsStations"][copernicusStation]:
# Copernicus forecast provides 96 steps
signal_list.extend(self.hourly_forecasted_signals(copernicusStation, copernicusSignal, 0, 95+1, 1))
signal_list.extend(self.cfg['globalSignals'])
return signal_list
[docs] def output_folder_creator(self, dataset_name):
"""
Get the address of the output folder for the current case
"""
# Check if start day > end date (it means they are related to different years)
start_dt = datetime.strptime('1970-%s' % self.cfg['datasetSettings']['startDay'], '%Y-%m-%d')
end_dt = datetime.strptime('1970-%s' % self.cfg['datasetSettings']['endDay'], '%Y-%m-%d')
if start_dt > end_dt:
inc_year = 1
else:
inc_year = 0
folder_path = '%s%s_%s_%s-%s_%s-%s%s' % (self.cfg['outputFolder'],
dataset_name,
self.forecast_type,
self.cfg['datasetSettings']['years'][0],
self.cfg['datasetSettings']['startDay'],
self.cfg['datasetSettings']['years'][-1]+inc_year,
self.cfg['datasetSettings']['endDay'], os.sep)
folder_path = folder_path.replace('-', '')
if not os.path.exists(folder_path):
os.makedirs(folder_path)
return folder_path
[docs] def dataframe_builder_regions(self):
for region in self.cfg['regions']:
input_signals = self.generate_input_signals_codes(region)
self.build_dataset(name=region, input_signals=input_signals)
[docs] def dataframe_builder_custom(self):
for dataset in self.cfg['datasetSettings']['customJSONSignals']:
name = dataset['filename'].split('.')[0]
fn = self.cfg['datasetSettings']["loadSignalsFolder"] + dataset['filename']
self.build_dataset(name=name, signals_file=fn)
[docs] def get_ngb_prediction(self, region, predictor, case, signal, start_date, end_date):
query = "select mean(PredictedValue) as prediction from %s " \
"where signal='%s' and location='%s' and " \
"predictor='%s' and case='%s' and time>='%sT00:00:00Z' and " \
"time<='%sT23:59:59Z' " \
"group by time(1d), location, predictor" % (self.cfg['influxDB']['measurementOutputSingleForecast'],
signal, region, predictor,
case, start_date, end_date)
return self.influxdb_client.query(query)
[docs] def get_target_measure(self, region, signal, start_date, end_date):
query = "select mean(value) as measure from %s " \
"where signal='%s' and location='%s' and " \
"time>='%sT00:00:00Z' and time<='%sT23:59:59Z' " \
"group by time(1d), location" % (self.cfg['influxDB']['measurementInputsMeasurements'],
signal, region, start_date, end_date)
# logger.info(query)
return self.influxdb_client.query(query)
[docs] def build_meteo_forecast_dataset_for_checking(self, fw_data):
"""
Build the dataset
"""
self.io_data = dict()
self.day_to_predict = None
# Create the signals list considering all the couples location-model
self.cfg_signals = dict(signals=[])
# Cycle over the folders
for tmp_folder in glob.glob('%s%s*%s' % (self.cfg['folders']['models'], os.sep, self.forecast_type)):
# Check if the current folder refers to a location configured for the prediction
region_code = tmp_folder.split(os.sep)[-1].split('_')[0]
if region_code in self.cfg['regions'].keys():
# Cycle over the input files in the folder (each files correspond to a model)
for input_cfg_file in glob.glob('%s%s/inputs_*.json' % (tmp_folder, os.sep)):
diff_all = dict()
tmp_cfg_signals, signals_rank = self.filter_forecast(input_cfg_file)
self.cfg_signals['signals'] = tmp_cfg_signals['signals']
# get the values in the DB
i = 1
for signal in self.cfg_signals['signals']:
self.logger.info('Try to add input n. %04d/%04d, %s' % (i, len(self.cfg_signals['signals']), signal))
self.add_input_value(signal=signal, forecast_substitution=False, force_substitution=False)
self.logger.info('Added input n. %04d/%04d' % (i, len(self.cfg_signals['signals'])))
i += 1
self.io_data_sub = {}
for k_fs in self.io_data.keys():
self.add_input_value(signal=k_fs, forecast_substitution=False, force_substitution=True)
diff_signals = {}
diff_ranks = {}
str_date = datetime.fromtimestamp(self.day_to_predict).date()
for i in range(0, len(self.cfg_signals['signals'])):
k_sig = self.cfg_signals['signals'][i]
tmp = k_sig.split('__')
sig_code = tmp[1]
if sig_code not in diff_signals.keys():
diff_signals[sig_code] = []
diff_ranks[sig_code] = []
if k_sig in list(self.io_data.keys()) and k_sig in list(self.io_data_sub.keys()):
if math.isnan(self.io_data_sub[k_sig]) is False:
# ERROR = WEATHER FORECAST - MEASURE (substitute)
diff = self.io_data[k_sig] - self.io_data_sub[k_sig]
diff_all[k_sig] = diff
# print(signals_rank[i], k_sig, self.io_data[k_sig], self.io_data_sub[k_sig], diff)
if i <= self.cfg['firstCasesToConsider']:
diff_signals[sig_code].append(diff)
diff_ranks[sig_code].append(signals_rank[i])
# print(diff_signals)
# print(diff_ranks)
# Get the O3 predictions
_ , target_signal, predictor = input_cfg_file.split(os.sep)[-1].split('.')[0].split('_')
# todo to configure
res_pred = self.get_ngb_prediction(region_code, predictor, self.forecast_type, target_signal, str_date, str_date)
res_meas = self.get_target_measure(region_code, self.cfg['measuredSignal'], str_date, str_date)
pred = res_pred.raw['series'][0]['values']
meas = res_meas.raw['series'][0]['values']
# ERROR = OZONE FORECAST - MEASURE
err_output = pred[0][1] - meas[0][1]
for ks in diff_signals.keys():
if len(diff_signals[ks]) > 0:
fw_data.write('%s,%s,%.1f,%.1f,%.1f,%.1f\n' % (str_date, ks, err_output,
np.mean(np.abs(diff_signals[ks])),
np.mean(diff_signals[ks]),
np.std(diff_signals[ks])))
tmp = input_cfg_file.replace('.json', '').split(os.sep)
fw_all = open('%s%sresume_all.csv' % (self.cfg['outputFolder'], os.sep), 'a')
region, case = tmp[-2].split('_')
predictor = tmp[-1].split('_')[-1]
cnt = 0
for k_sig in diff_all.keys():
fw_all.write('%s,%s,%s,%s,%s,%s,%.1f,%.1f\n' % (str_date, region, case, predictor, k_sig,
self.cfg_signals['signals'].index(k_sig)+1,
err_output, diff_all[k_sig]))
if cnt >= self.cfg['firstCasesToConsider'] - 1:
break
cnt += 1
fw_all.close()
[docs] def filter_forecast(self, sigs_file):
all_sig_data = json.loads(open(sigs_file).read())
forecasts = []
rank = []
i = 1
for sig in all_sig_data['signals']:
tmp = sig.split('__')
location = tmp[0]
k_sig = tmp[1]
if location in constants.METEO_FORECAST_STATIONS and '_c2' not in k_sig:
forecasts.append(sig)
rank.append(i)
i += 1
return {'signals': forecasts }, rank