Source code for demand_acep.timescale_parallel_copy

"""
Functions in this file insert the data into the timescaledb database using the 
go utility provided by timescaledb called timescaledb-parallel-copy
"""

import os
import pandas as pd
import pdb
import datetime
import io
import glob
import multiprocessing
import subprocess

from demand_acep import extract_data
from demand_acep import extract_ppty

[docs]def parallel_copy_data_for_date(config, data_date): """ This function parallel copies the data for the data_date specified to the appropriate table of TSDB. Parameters ---------- config : `config` contains the configuration for paths etc. needed by files. As an argument we can change the config files for different production vs test configs data_date : string `data_date` string will be used to extract the year and the path to the data. It should be in the 'mm/dd/yyyy' format. Returns ------- None - prints the number of rows copied into the database for the particular day. """ print(data_date) # Look for the pickle and file and use pandas_to_sql to insert the data # into the database. data_year = data_date[-4:] # print(data_year) # print(config.DATA_ROOT) # find this path, extract data from those files, create a dataframe, and # store it as pickle in the folder. data_month = data_date[0:2] data_day = data_date[3:5] data_path = os.path.join(config.DATA_ROOT, data_year, data_month, data_day) # print(data_path) # print(config.METER_CHANNEL_DICT) # print(config.DATA_YEARS) meter_count = len(config.METER_CHANNEL_DICT) # Create a list of lists of size of number of meters meter_collection = {} meter_pickle_names = {} # Path of the tsdb parallel copy go executable timescaledb_parallel_copy_path = os.path.join(config.tsdb_pc_path, "timescaledb-parallel-copy") db_address = config.DB_ADDRESS db_name = config.DB_NAME db_username = config.DB_USER db_pwd = config.DB_PWD # Get the CPU count for parallelizing the process num_workers = multiprocessing.cpu_count() # Generate the connection string for the database connection_string = 'host={0} user={1} password={2} sslmode=disable'.format(db_address, db_username, db_pwd) for fname in glob.glob(os.path.join(data_path, '*.csv')): # print(os.path.basename(fname)) file_name = os.path.basename(fname) table_name = file_name.split('@')[0] + '_' + data_year parallel_copy_cmd = [timescaledb_parallel_copy_path, '--db-name', db_name, '--table', table_name, '--file', fname, '--workers', str(num_workers), '--connection', connection_string, '--skip-header', 'true'] # print(parallel_copy_cmd) pcopy = subprocess.run(parallel_copy_cmd, encoding='utf-8', stdout=subprocess.PIPE) for line in pcopy.stdout.split('\n'): print(line) return
[docs]def parallel_copy_data_for_dates(config, start, end): """ This function extracts the data to disk for the data_date_range specified. Parameters ---------- config : `config` contains the configuration for paths etc. needed by files. As an argument we can change the config files for different production vs test configs start : str or datetime-like Left bound for generating dates. end : str or datetime-like Right bound for generating dates. Returns ------- None """ # Get the list of dates within the range and convert to the format we want dates_in_range = pd.date_range(start, end).strftime("%m/%d/%Y") print(dates_in_range) # Perform the insert for the said days for date in dates_in_range: parallel_copy_data_for_date(config, date) return