4. About the data pipeline

The data pipeline comprises of three steps:

4.1. Extract

This steps takes the data from NetCDF files and creates a dataframe with time as index and values in the NetCDF file as the only column.

4.2. Transform

This step takes the extracted data in the dataframe for each channel and down-samples the data to a lower resolution (1 minute default) and concatenates other channels to the dataframe keeping the same time index for each meter. The transformation step also does data imputation, i.e. fills in the missing values. The section Data Imputation describes the data imputation process in more details. The implementation of extraction and transformation is coupled and happens in the function extract_csv_for_date(), which saves the transformed data into a csv for each year. This function also handles edge-cases like days when data download to NetCDF files happens more than once. An example extraction and transformation for a day is shown in the jupyter notebook extract_to_csv.ipynb. An extraction and transformation for multiple days can be done in parallel and is shown in jupyter notebook test_multiprocessing_csv.ipynb. The extraction, transformation and saving of down-sampled data to csv currently takes around 2h 10min on a 28 core, 2.4 GHz system.

Note

Since the “Load” step is dependent on the csv files existing in the path, the “Transform” step should finish before the “Load” step is initiated.

4.3. Load

This step takes the transformed data in csv files and copies it into the timeseries database - TimescaleDB. TimescaleDB installation instructions for Ubuntu are pretty straightforward and it is incredibly straightforward to use with docker, like so:

sudo docker run -d -e POSTGRES_USER=<username> -e POSTGRES_PASSWORD=<password> --name <database_name> -p 5432:5432  --restart=always timescale/timescaledb

Then create a database named demand_acep and enable the TimescaleDB extension as described in the getting started section of the TimescaleDB docs. The database schema for insertion (copy) can be created using the function create_db_schema_from_source_files() which deletes all the existing data and tables and creates tables for each meter for each year, with channels as columns and time as primary key. Further, “copy” operation is preferred over the “insert” since it is much faster and can be done for full resolution data too efficiently (Read here about the risks and care in using copy over insert in postgresql database). Further, a Go utility timescaledb-parallel-copy is used to copy the data to the database in parallel. The function parallel_copy_data_for_date() prepares the command for timescaledb-parallel-copy and copies the data. This command is run with the “skip-header” option to ignore the first line of each day csv file, as that date-time is repeated with the previous day. Function parallel_copy_data_for_dates() is a wrapper around the parallel_copy_data_for_date() function and does the copying for a date range. An example application of the copy operation can be seen in the jupyter notebook timescale_parallel_copy.ipynb. The parallel copy takes 6min 18s on a 28 core, 2.4 GHz system.