Source code for data_manager

# import section
import os
import datetime
import zipfile

import tailhead
import copy
import pytz
import time
import ftplib
import numpy as np
import pandas as pd


[docs]def warn(*args, **kwargs): pass
import warnings warnings.warn = warn import sklearn.metrics as metrics from zipfile import ZipFile from datetime import date, datetime, timedelta from influxdb import InfluxDBClient import constants
[docs]class DataManager: """ Generic interface for data (measures and forecasts) management """ def __init__(self, influxdb_client, cfg, logger, influx_df_client=None): """ Constructor :param influxdb_client: InfluxDB client :type influxdb_client: InfluxDBClient :param cfg: FTP parameters for the files exchange :type cfg: dict :param logger: Logger :type logger: Logger object """ # set the variables self.influxdb_client = influxdb_client self.influx_df_client = influx_df_client self.cfg = cfg self.logger = logger self.files_correctly_downloaded = [] self.files_not_correctly_downloaded = dict() self.files_correctly_handled = [] self.files_not_correctly_handled = dict() self.ftp = None # Define the time zone self.tz_local = pytz.timezone(self.cfg['local']['timeZone'])
[docs] def open_ftp_connection(self): # perform FTP connection and login self.ftp = ftplib.FTP(self.cfg['ftp']['host']) self.ftp.login(self.cfg['ftp']['user'], self.cfg['ftp']['password'])
[docs] def close_ftp_connection(self): # close the FTP connection self.ftp.close()
[docs] def upload_file(self, file_to_send): curr_wd = os.getcwd() os.chdir('%s%s%s' % (os.getcwd(), os.sep, self.cfg['ftp']['localFolders']['tmp'])) try: self.ftp.cwd(self.cfg['ftp']['remoteFolders']['results']) with open(file_to_send, 'rb') as f: self.ftp.storbinary('STOR %s' % file_to_send, f) os.chdir(curr_wd) return True except Exception as e: self.logger.error('Exception: %s' % str(e)) os.chdir(curr_wd) return False
[docs] def download_remote_files(self): if os.path.isdir(self.cfg['ftp']['localFolders']['tmp']) is False: os.mkdir(self.cfg['ftp']['localFolders']['tmp']) self.files_correctly_downloaded = [] self.files_not_correctly_downloaded = dict() try: # todo Check if also the forecast folder is effectively used for ftp_dir in [self.cfg['ftp']['remoteFolders']['measures'], self.cfg['ftp']['remoteFolders']['forecasts']]: self.logger.info('Getting files via FTP from %s/%s' % (self.cfg['ftp']['host'], ftp_dir)) self.ftp.cwd('/%s' % ftp_dir) # cycle over the remote files for file_name in self.ftp.nlst('*'): tmp_local_file = os.path.join(self.cfg['ftp']['localFolders']['tmp'] + "/", file_name) try: self.logger.info('%s/%s -> %s/%s/%s' % (ftp_dir, file_name, os.getcwd(), self.cfg['ftp']['localFolders']['tmp'], file_name)) # get the file from the server with open(tmp_local_file, 'wb') as f: def callback(data): f.write(data) self.ftp.retrbinary("RETR " + file_name, callback) self.files_correctly_downloaded.append(file_name) except Exception as e: self.logger.error('Downloading exception: %s' % str(e)) self.files_not_correctly_downloaded[file_name] = str(e) except Exception as e: self.logger.error('Connection exception: %s' % str(e))
[docs] def is_meteosuisse_forecast_file(self, file_name): if 'VNXA51' in file_name or 'VQCA19' in file_name or 'VNYA34' in file_name or 'VNYA32' in file_name or 'VNXA54' in file_name: return True else: return False
[docs] def delete_remote_files(self): if os.path.isdir(self.cfg['ftp']['localFolders']['tmp']) is False: os.mkdir(self.cfg['ftp']['localFolders']['tmp']) try: # cycle over the remote files for file_to_delete in self.files_correctly_handled: # Set the remote folder if self.is_meteosuisse_forecast_file(file_to_delete): self.ftp.cwd('/%s' % self.cfg['ftp']['remoteFolders']['forecasts']) else: self.ftp.cwd('/%s' % self.cfg['ftp']['remoteFolders']['measures']) try: # delete the remote file self.logger.info('Delete remote file %s' % file_to_delete) self.ftp.delete(file_to_delete) except Exception as e: self.logger.error('Unable to delete remote file %s' % file_to_delete) self.logger.error('Exception: %s' % str(e)) except Exception as e: self.logger.error('Connection exception: %s' % str(e))
[docs] def insert_data(self): self.logger.info('Started data inserting into DB') file_names = os.listdir(self.cfg['ftp']['localFolders']['tmp']) dps = [] self.files_correctly_handled = [] self.files_not_correctly_handled = dict() for file_name in file_names: file_path = '%s/%s' % (self.cfg['ftp']['localFolders']['tmp'], file_name) self.logger.info('Getting data from %s' % file_path) try: # Meteo forecasts provided by Meteosuisse if self.is_meteosuisse_forecast_file(file_name): dps = self.handle_meteo_forecasts_file(file_path, dps) # OASI/ARPA/Meteosuisse measurements else: dps = self.handle_measures_file(file_path, file_name, dps) # Archive file self.archive_file(file_name) self.files_correctly_handled.append(file_name) except Exception as e: self.logger.error('EXCEPTION: %s, file %s' % (str(e), file_name)) self.files_not_correctly_handled[file_name] = str(e) # Delete the raw file os.unlink('%s%s%s' % (self.cfg['ftp']['localFolders']['tmp'], os.sep, file_name)) # Send remaining points to InfluxDB self.logger.info('Sent %i points to InfluxDB server' % len(dps)) self.influxdb_client.write_points(dps, time_precision=self.cfg['influxDB']['timePrecision'])
[docs] def archive_file(self, file_name): if self.is_meteosuisse_forecast_file(file_name): archive_folder = '%s%s%s' % ( self.cfg['ftp']['localFolders']['archive'], os.sep, file_name.split('.')[1][0:8]) self.check_folder(archive_folder) else: if 'meteosvizzera' in file_name or 'arpal' in file_name or 'nabel' in file_name: archive_folder = '%s%s%s' % ( self.cfg['ftp']['localFolders']['archive'], os.sep, file_name.split('-')[2]) self.check_folder(archive_folder) else: archive_folder = '%s%s%s' % ( self.cfg['ftp']['localFolders']['archive'], os.sep, file_name.split('-')[1]) self.check_folder(archive_folder) # Zip the raw file zip_obj = ZipFile('%s%s%s.zip' % (archive_folder, os.sep, file_name), 'w') zip_obj.write(('%s%s%s' % (self.cfg['ftp']['localFolders']['tmp'], os.sep, file_name)), compress_type=zipfile.ZIP_DEFLATED, arcname=file_name) zip_obj.close() # Delete the raw file os.unlink('%s%s%s' % (self.cfg['ftp']['localFolders']['tmp'], os.sep, file_name))
[docs] @staticmethod def check_folder(folder): if os.path.exists(folder) is False: os.mkdir(folder)
[docs] def handle_measures_file(self, file_path, file_name, dps): # Open the file f = open(file_path, 'rb') # Read the first line raw = f.readline() # Check the datasets cases, I apologize for the hard-coding if 'arpa' in file_name: # ARPA case str_locations = raw.decode(constants.ENCODING) arpa_locations_keys = str_locations[:-1].split(';')[1:] measurement = self.cfg['influxDB']['measurementInputsMeasurements'] else: if 'meteosvizzera' in file_name: # MeteoSuisse case [key1, key2, _, _] = file_name.split('-') oasi_location_key = '%s-%s' % (key1, key2) measurement = self.cfg['influxDB']['measurementInputsMeasurements'] else: if 'nabel' in file_name: # Nabel OASI case [key1, key2, _, _] = file_name.split('-') oasi_location_key = '%s-%s' % (key1, key2) else: # Simple OASI case [oasi_location_key, _, _] = file_name.split('-') measurement = self.cfg['influxDB']['measurementInputsMeasurements'] # Signals raw = f.readline() str_signals = raw.decode(constants.ENCODING) signals = str_signals[:-1].split(';')[1:] # measure units, not used f.readline() # cycling over the data for raw_row in f: # decode raw bytes row = raw_row.decode(constants.ENCODING) row = row[:-1] # get raw date time dt = row.split(';')[0] # get data array data = row.split(';')[1:] # timestamp management naive_time = datetime.strptime(dt, self.cfg['local']['timeFormatMeasures']) local_dt = self.tz_local.localize(naive_time) # add an hour to delete the DST influence (OASI timestamps are always in UTC+1 format) utc_dt = local_dt.astimezone(pytz.utc) + local_dt.dst() # calculate the UTC timestamp utc_ts = int(utc_dt.timestamp()) for i in range(0, len(data)): # Currently, the status are not taken into account if self.is_float(data[i]) is True and signals[i] != 'status': if 'arpa' in file_name: location_tag = constants.LOCATIONS[arpa_locations_keys[i]] else: location_tag = constants.LOCATIONS[oasi_location_key] point = { 'time': utc_ts, 'measurement': measurement, 'fields': dict(value=float(data[i])), 'tags': dict(signal=signals[i], location=location_tag, case='MEASURED') } dps = self.point_handling(dps, point) return dps
@staticmethod def is_float(s): try: float(s) return True except ValueError: return False
[docs] def handle_meteo_forecasts_file(self, input_file, dps): """ Save measures data (OASI and ARPA) in InfluxDB :param input_file: input file :type input_file: string """ self.logger.info('Getting data from %s' % input_file) (code, dt_code, ext) = input_file.split('.') f = open(input_file, 'rb') # Meteosuisse forecasts for specific locations (VNXA51 case: COSMO1, VNYA34 case: COSMO2) if 'VNYA34' in code or 'VNXA51' in code: dt_run = None signals = None for raw_row in f: row = raw_row.decode(constants.ENCODING) row = row[:-1] # check if the signal dict is defined if signals is not None: data = row.split(';') # set the running time if dt_run is None: dt_run = data[1] for i in range(3, len(data) - 1): utc_time = datetime.strptime(dt_run, self.cfg['local']['timeFormatForecasts']) utc_dt = pytz.utc.localize(utc_time) # The Meteosuisse temperatures are in Kelvin degrees if signals[i] in ['TD_2M', 'T_2M']: val = float(data[i]) - 273.1 else: val = float(data[i]) # Check if the data is available if val != -99999: # Find the : position colon = data[2].find(':') # Step and COSMO2 suffix handling step = int(data[2].split(':')[0]) if 'VNYA34' in code: str_step = 'step%03d' % step signal_suffix = '_c2' else: str_step = 'step%02d' % step signal_suffix = '' point = { 'time': int(utc_dt.timestamp()), 'measurement': self.cfg['influxDB']['measurementInputsForecasts'], 'fields': dict(value=val), 'tags': dict(signal='%s%s' % (signals[i], signal_suffix), location=data[0], step=str_step) } dps = self.point_handling(dps, point) # define signals if row[0:3] == 'stn': signals = row.split(';') # skip the line related to unit measures f.readline() # VQCA19 case: global forecast about meteorological situation in Ticino elif 'VQCA19' in code: for raw_row in f: row = raw_row.decode(constants.ENCODING) row = row[:-1] if 'RHW' in row: # row = row.replace(' ', '') (signal, day, value) = row.replace(' ', ',').replace(' ', '').replace(',,', ',')[0:-2].split(',') utc_day = datetime.strptime(day, self.cfg['local']['timeFormatGlobal']) utc_day = pytz.utc.localize(utc_day) point = { 'time': int(utc_day.timestamp()), 'measurement': self.cfg['influxDB']['measurementGlobal'], 'fields': dict(value=float(value)), 'tags': dict(signal=signal) } dps.append(copy.deepcopy(point)) # VNYA32 and VNXA54 cases: vertical gradients elif 'VNYA32' in code or 'VNXA54' in code: dt_run = None single_sigs = self.get_single_signals(f) for raw_row in f: row = raw_row.decode(constants.ENCODING) row = row.replace('\n', '') row = row[:-1] data = row.split(';') # set the running time if dt_run is None: dt_run = data[1] for i in range(3, len(data)): utc_time = datetime.strptime(dt_run, self.cfg['local']['timeFormatForecasts']) utc_dt = pytz.utc.localize(utc_time) # Step and COSMO2 suffix handling step = int(data[2].split(':')[0]) if 'VNYA32' in code: str_step = 'step%03d' % step signal_suffix = '_c2' else: str_step = 'step%02d' % step signal_suffix = '' point = { 'time': int(utc_dt.timestamp()), 'measurement': self.cfg['influxDB']['measurementInputsForecasts'], 'fields': dict(value=float(data[i])), 'tags': dict(signal='%s%s' % (single_sigs[i-3], signal_suffix), location=data[0], step=str_step) } dps = self.point_handling(dps, point) # close the file f.close() # os.unlink(file_path) return dps
[docs] def get_single_signals(self, fr): str_sigs = False str_quotes = False while str_sigs is False or str_quotes is False: raw_row = fr.readline() row = raw_row.decode(constants.ENCODING) row = row.replace('\n', '') row = row[:-1] if 'stn;' in row: str_sigs = row if ';level;' in row: str_quotes = row sigs = str_sigs.split(';') quotes = str_quotes.split(';') if len(sigs) == len(quotes): new_sigs = [] for i in range(3, len(sigs)): new_sigs.append('%s%s' % (sigs[i], quotes[i])) return new_sigs
[docs] def point_handling(self, dps, point): """ Add a point and (eventually) store the entire dataset in InfluxDB :param point: point to add :type point: dict :param dps: input data points :type dps: list :return: output data points :rtype: list """ dps.append(copy.deepcopy(point)) if len(dps) >= int(self.cfg['influxDB']['maxLinesPerInsert']): self.logger.info('Sent %i points to InfluxDB server' % len(dps)) self.influxdb_client.write_points(dps, time_precision=self.cfg['influxDB']['timePrecision']) dps = [] time.sleep(0.1) return dps
[docs] def update_training_datasets(self, new_value, location): """ Update the output training dataset :param new_value: new output value :type new_value: float :param location: location related to the new value :type location: string """ # define the files paths inputs_tmp_file = '%s/tmp/%s_%s.csv' % (self.cfg['local']['trainingDatasets'], location, self.forecast_type) inputs_file = '%s/inputs/%s_%s.csv' % (self.cfg['local']['trainingDatasets'], location, self.forecast_type) outputs_file = '%s/outputs/%s_%s.csv' % (self.cfg['local']['trainingDatasets'], location, self.forecast_type) # check if the temporary inputs file exists yesterday = datetime.strftime(datetime.now() - timedelta(days=1), '%Y-%m-%d') if os.path.isfile(inputs_tmp_file): # get the last data appended to the inputs dataset data_tail = tailhead.tail(open(inputs_tmp_file, 'rb'), 1) last_data_inputs_tmp = data_tail[0].decode('utf-8') # get the last data appended to the input dataset data_tail = tailhead.tail(open(inputs_file, 'rb'), 1) last_data_inputs = data_tail[0].decode('utf-8') # get the last data appended to the output dataset data_tail = tailhead.tail(open(outputs_file, 'rb'), 1) last_data_outputs = data_tail[0].decode('utf-8') # check if the last inputs string in the temporary file is related to yesterday and # no data about yesterday were already saved in the datasets if yesterday in last_data_inputs_tmp and yesterday not in last_data_inputs and \ yesterday not in last_data_outputs: # append inputs with open(inputs_file, 'a') as fw: fw.write('%s\n' % last_data_inputs_tmp) fw.close() # append outputs with open(outputs_file, 'a') as fw: fw.write('%s,%.3f\n' % (yesterday, new_value)) fw.close() # delete the temporary file os.unlink(inputs_tmp_file) else: self.logger.warning('No inputs available for station %s, case %s, day %s' % (location, self.forecast_type, yesterday))
[docs] @staticmethod def is_float(s): try: float(s) return True except ValueError: return False
[docs] def calc_kpis(self): # get the date to analyze dt = self.get_previous_day() dt = dt.replace(hour=0, minute=0, second=0) # cycle over the starting dates for start_date in self.cfg['kpis']['startingDates']: dt_start_date = datetime.strptime(start_date, '%Y-%m-%d') dt_start_date = pytz.utc.localize(dt_start_date) end_date = dt.strftime('%Y-%m-%d') # end_date = '2019-06-15' # get measurement data query = 'SELECT mean(value) FROM %s WHERE signal=\'YO3\' AND time>=\'%sT00:00:00Z\' AND ' \ 'time<=\'%sT23:59:59Z\' GROUP BY time(1d), location, signal' % ( self.cfg['influxDB']['measurementOASI'], start_date, end_date) self.logger.info('Performing query: %s' % query) res = self.influxdb_client.query(query, epoch='s') dfs_meas = dict() for series in res.raw['series']: ts = [] vals = [] for i in range(0, len(series['values'])): if series['values'][i][1] is not None: ts.append(series['values'][i][0]) vals.append(series['values'][i][1]) df = pd.DataFrame.from_dict({'ts': ts, series['tags']['location']: vals}) df = df.set_index('ts') dfs_meas[series['tags']['location']] = df # get forecasts data query = 'SELECT mean(forecast), mean(forecastRF) FROM %s WHERE time>=\'%sT00:00:00Z\' AND time<=\'%sT23:59:59Z\' ' \ 'GROUP BY time(1d), location, case, predictor' % (self.cfg['influxDB']['measurementForecasts'], start_date, end_date) self.logger.info('Performing query: %s' % query) res = self.influxdb_client.query(query, epoch='s') dfs_forecasts_ens = dict() dfs_forecasts_rf = dict() for series in res.raw['series']: ts = [] vals_ens = [] vals_rf = [] for i in range(0, len(series['values'])): if series['values'][i][1] is not None: ts.append(series['values'][i][0]) vals_ens.append(series['values'][i][1]) vals_rf.append(series['values'][i][2]) id = '%s__%s__%s' % (series['tags']['location'], series['tags']['case'], series['tags']['predictor']) df_ens = pd.DataFrame.from_dict({'ts': ts, id: vals_ens}) df_ens = df_ens.set_index('ts') dfs_forecasts_ens[id] = df_ens df_rf = pd.DataFrame.from_dict({'ts': ts, id: vals_rf}) df_rf = df_rf.set_index('ts') dfs_forecasts_rf[id] = df_rf # calculate the kpis dps = [] for id in dfs_forecasts_ens.keys(): (location, case, predictor) = id.split('__') # merge the dataframes mdf_ens = pd.concat([dfs_forecasts_ens[id], dfs_meas[location]], axis=1, join='inner') mdf_rf = pd.concat([dfs_forecasts_rf[id], dfs_meas[location]], axis=1, join='inner') # additional simple checking on the datasets if len(mdf_ens[location]) == len(mdf_ens[id]) and len(mdf_rf[location]) == len(mdf_rf[id]): rmse_ens = np.sqrt(metrics.mean_squared_error(mdf_ens[location], mdf_ens[id])) mae_ens = metrics.mean_absolute_error(mdf_ens[location], mdf_ens[id]) point = { 'time': int(dt_start_date.timestamp()), 'measurement': self.cfg['influxDB']['measurementForecastsKPIs'], 'fields': dict(rmse=float(rmse_ens), mae=float(mae_ens)), 'tags': dict(location=location, case=case, predictor=predictor, start_date='sd_%s' % start_date, type='ENS') } dps = self.point_handling(dps, point) rmse_rf = np.sqrt(metrics.mean_squared_error(mdf_rf[location], mdf_rf[id])) mae_rf = metrics.mean_absolute_error(mdf_rf[location], mdf_rf[id]) point = { 'time': int(dt_start_date.timestamp()), 'measurement': self.cfg['influxDB']['measurementForecastsKPIs'], 'fields': dict(rmse=float(rmse_rf), mae=float(mae_rf)), 'tags': dict(location=location, case=case, predictor=predictor, start_date='sd_%s' % start_date, type='RF') } dps = self.point_handling(dps, point) # calculate the errors mdf_ens['abs_err'] = abs(mdf_ens[id] - mdf_ens[location]) mdf_ens['err'] = mdf_ens[id] - mdf_ens[location] mdf_rf['abs_err'] = abs(mdf_rf[id] - mdf_rf[location]) mdf_rf['err'] = mdf_rf[id] - mdf_rf[location] # insert the errors data # ensemble case for index, row in mdf_ens.iterrows(): point = { 'time': int(index), 'measurement': self.cfg['influxDB']['measurementForecastsKPIs'], 'fields': dict(err=float(row['err']), abs_err=float(row['abs_err'])), 'tags': dict(location=location, case=case, predictor=predictor, type='ENS')} dps = self.point_handling(dps, point) # random forest for index, row in mdf_rf.iterrows(): point = { 'time': int(index), 'measurement': self.cfg['influxDB']['measurementForecastsKPIs'], 'fields': dict(err=float(row['err']), abs_err=float(row['abs_err'])), 'tags': dict(location=location, case=case, predictor=predictor, type='RF')} dps = self.point_handling(dps, point) else: self.logger.info('Unable to calculate the kpis') # Send data points to InfluxDB self.logger.info('Sent %i points to InfluxDB server' % len(dps)) self.influxdb_client.write_points(dps, time_precision=self.cfg['influxDB']['timePrecision'])
[docs] @staticmethod def mean_absolute_percentage_error(y_true, y_pred): return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
[docs] def get_start_end_dates(self): if self.cfg['calculatedInputsSection']['period'] == 'custom': start_date = self.cfg['calculatedInputsSection']['from'] end_date = self.cfg['calculatedInputsSection']['to'] else: last_hours = int(self.cfg['calculatedInputsSection']['period'].replace('last', '').replace('h', '')) start_date = (datetime.now() - timedelta(hours=last_hours)).strftime('%Y-%m-%dT%H:00:00Z') end_date = datetime.now().strftime('%Y-%m-%dT%H:59:59Z') return start_date, end_date
[docs] def calculate_artificial_data(self): if self.cfg['intervalSettings']['period'] == 'custom': start_day = datetime.strptime(self.cfg['intervalSettings']['from'], '%Y-%m-%d') end_day = datetime.strptime(self.cfg['intervalSettings']['to'], '%Y-%m-%d') # Cycle over the days curr_day = start_day while curr_day <= end_day: curr_str = curr_day.strftime('%Y-%m-%d') self.calc_artificial_data_for_given_period(curr_str, curr_str) curr_day += timedelta(days=1) else: last_hours = int(self.cfg['intervalSettings']['period'].replace('last', '').replace('h', '')) start_str = (datetime.now() - timedelta(hours=last_hours)).strftime('%Y-%m-%d') end_str = datetime.now().strftime('%Y-%m-%d') self.calc_artificial_data_for_given_period(start_str, end_str)
[docs] def create_aggregated_data(self): if self.cfg['intervalSettings']['period'] == 'custom': start_day = datetime.strptime(self.cfg['intervalSettings']['from'], '%Y-%m-%d') end_day = datetime.strptime(self.cfg['intervalSettings']['to'], '%Y-%m-%d') # Cycle over the days curr_day = start_day while curr_day <= end_day: curr_str = curr_day.strftime('%Y-%m-%d') self.calc_aggregated_data_for_given_period(curr_str, curr_str) curr_day += timedelta(days=1) else: last_hours = int(self.cfg['intervalSettings']['period'].replace('last', '').replace('h', '')) start_str = (datetime.now() - timedelta(hours=last_hours)).strftime('%Y-%m-%d') end_str = datetime.now().strftime('%Y-%m-%d') self.calc_aggregated_data_for_given_period(start_str, end_str)
[docs] def calc_artificial_data_for_given_period(self, from_str, to_str): self.logger.info('Calculate artificial data for period [%s:%s]' % (from_str, to_str)) # To be safe go from the beginning of the "from" date to the end of "to" date from_str = '%sT00:00:00Z' % from_str to_str = '%sT23:59:59Z' % to_str for asig_data in self.cfg['calculatedInputsSection']['inputs']: try: if asig_data['forecast'] is False: input1 = self.get_measure_data(asig_data['locations'][0], asig_data['signals'][0], from_str, to_str) input2 = self.get_measure_data(asig_data['locations'][1], asig_data['signals'][1], from_str, to_str) tag_loc, tag_sig = self.get_loc_sig(asig_data['locations'], asig_data['signals'], asig_data['function']) # Check if input data are meaningful if input1 is not None and input2 is not None and input1.index.equals(input2.index) is True: output = self.apply_function_to_measures(asig_data['function'], input1, input2) # Apply gain and offset output['value'] = output['value'] * asig_data['gain'] + asig_data['offset'] if asig_data['booleanOutputThreshold'] is not False: output = self.digitalize_output(output, asig_data['booleanOutputThreshold']) tag_sig = 'B%s' % tag_sig res = self.influx_df_client.write_points(output, self.cfg['influxDB']['measurementInputsMeasurements'], tags={'location': tag_loc, 'signal': tag_sig}, protocol='line') if res is not True: self.logger.warning('Failed measure inserting for couple [%s:%s]' % (tag_loc, tag_sig)) else: self.logger.info('Inserted measure data for couple [%s:%s]' % (tag_loc, tag_sig)) else: self.logger.warning('Data missing or index mismatch: measure not inserted for couple [%s:%s]' % (tag_loc, tag_sig)) else: input1 = self.get_forecast_data(asig_data['locations'][0], asig_data['signals'][0], from_str, to_str) input2 = self.get_forecast_data(asig_data['locations'][1], asig_data['signals'][1], from_str, to_str) tag_loc, tag_sig = self.get_loc_sig(asig_data['locations'], asig_data['signals'], asig_data['function']) # Check if input data are meaningful if input1 is not None and input2 is not None and input1.index.equals(input2.index) is True: output = self.apply_function_to_forecast(asig_data['function'], input1, input2) # Apply gain and offset output['value'] = output['value'] * asig_data['gain'] + asig_data['offset'] res = self.influx_df_client.write_points(output, self.cfg['influxDB']['measurementInputsForecasts'], tags={'location': tag_loc, 'signal': tag_sig}, tag_columns=['step'], protocol='line') if res is not True: self.logger.warning('Failed forecast inserting for couple [%s:%s]' % (tag_loc, tag_sig)) else: self.logger.info('Inserted forecast data for couple [%s:%s]' % (tag_loc, tag_sig)) else: self.logger.warning('Data missing or index mismatch: forecast not inserted for couple [%s:%s]' % (tag_loc, tag_sig)) except Exception as e: self.logger.error('EXCEPTION: %s' % str(e)) self.logger.info('Failed data calculation for couple [%s:%s]' % (tag_loc, tag_sig))
[docs] def calc_aggregated_data_for_given_period(self, from_str, to_str): self.logger.info('Created aggregated data for period [%s:%s]' % (from_str, to_str)) # To be safe go from the beginning of the "from" date to the end of "to" date from_str = '%sT00:00:00Z' % from_str to_str = '%sT23:59:59Z' % to_str for asig_data in self.cfg['aggregatedInputsSection']['inputs']: try: if asig_data['forecast'] is False: inputs = [] for i in range(0, len(asig_data['locations'])): tmp_df = self.get_measure_data(asig_data['locations'][i], asig_data['signals'][i], from_str, to_str) tmp_df = tmp_df.rename(columns={'value': 'value_%i' % i}) inputs.append(tmp_df) input_df = pd.concat(inputs, axis=1, join="inner") agg_df = self.aggregate_dataframes(input_df, asig_data['function']) output = pd.concat([agg_df], axis=1) output = output.rename(columns={0: 'value'}) res = self.influx_df_client.write_points(output, self.cfg['influxDB']['measurementInputsMeasurements'], tags={'location': asig_data['location'], 'signal': asig_data['signal']}, protocol='line') if res is not True: self.logger.warning('Failed measure inserting for couple [%s:%s]' % (asig_data['location'], asig_data['signal'])) else: self.logger.info('Inserted measure data for couple [%s:%s]' % (asig_data['location'], asig_data['signal'])) else: inputs = [] for i in range(0, len(asig_data['locations'])): tmp_df = self.get_forecast_data(asig_data['locations'][i], asig_data['signals'][i], from_str, to_str) if i == 0: idx_step_df = tmp_df idx_step_df = idx_step_df.drop(['value'], axis=1) tmp_df = tmp_df.drop(['index', 'step'], axis=1) tmp_df = tmp_df.rename(columns={'value': 'value_%i' % i}) inputs.append(tmp_df) input_df = pd.concat(inputs, axis=1, join='inner') agg_df = self.aggregate_dataframes(input_df, asig_data['function']) output = pd.concat([agg_df, idx_step_df], axis=1) output = output.rename(columns={0: 'value'}) output = output.set_index('index') output = output.reindex(columns=['value', 'step']) res = self.influx_df_client.write_points(output, self.cfg['influxDB']['measurementInputsForecasts'], tags={'location': asig_data['location'], 'signal': asig_data['signal']}, tag_columns=['step'], protocol='line') if res is not True: self.logger.warning('Failed forecast inserting for couple [%s:%s]' % (asig_data['location'], asig_data['signal'])) else: self.logger.info('Inserted forecast data for couple [%s:%s]' % (asig_data['location'], asig_data['signal'])) except Exception as e: self.logger.error('EXCEPTION: %s' % str(e)) self.logger.info('Failed data calculation for couple [%s:%s]' % (asig_data['location'], asig_data['signal']))
[docs] @staticmethod def aggregate_dataframes(inputs, func): if func == 'max': return inputs.max(axis=1) elif func == 'mean': return inputs.mean(axis=1) elif func == 'min': return inputs.min(axis=1)
[docs] @staticmethod def digitalize_output(output, pars): # Currently it works only with > and < operators op = pars[0] th = float(pars[1:]) if op == '>': output['value'] = [1.0 if x > th else 0.0 for x in output['value']] elif op == '<': output['value'] = [1.0 if x < th else 0.0 for x in output['value']] return output
[docs] def get_measure_data(self, loc, sig, st_date, end_date): query = 'SELECT mean(value) AS value FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND time>=\'%s\' AND ' \ 'time<=\'%s\' GROUP BY time(30m)' % (self.cfg['influxDB']['measurementInputsMeasurements'], loc, sig, st_date, end_date) res = self.influx_df_client.query(query=query) if self.cfg['influxDB']['measurementInputsMeasurements'] in res.keys(): return res[self.cfg['influxDB']['measurementInputsMeasurements']] else: return None
[docs] def get_forecast_data(self, loc, sig, st_date, end_date): query = 'SELECT value, step FROM %s WHERE location=\'%s\' AND signal=\'%s\' AND time>=\'%s\' AND ' \ 'time<=\'%s\'' % (self.cfg['influxDB']['measurementInputsForecasts'], loc, sig, st_date, end_date) res = self.influx_df_client.query(query=query) df = res[self.cfg['influxDB']['measurementInputsForecasts']] df.reset_index(inplace=True) df['idx'] = df['index'].astype(str) + "-" + df['step'].astype(str) df = df.set_index('idx') return df
[docs] @staticmethod def apply_function_to_measures(func, i1, i2): if func == 'sum': return i1+i2 elif func == 'diff': return i1-i2 elif func == 'mul': return i1*i2 elif func == 'ratio': return i1/i2
[docs] @staticmethod def apply_function_to_forecast(func, i1, i2): if func == 'sum': output_series = i1['value']+i2['value'] elif func == 'diff': output_series = i1['value']-i2['value'] elif func == 'mul': output_series = i1['value']*i2['value'] elif func == 'ratio': output_series = i1['value']/i2['value'] output = copy.deepcopy(i1) output['value'] = output_series return output.set_index('index')
[docs] @staticmethod def get_loc_sig(locations, signals, func): return '%s_%s' % (locations[0], locations[1]), '%s_%s_%s' % (func[0].upper(), signals[0], signals[1])