Source code for demand_acep.create_db_schema

""" This file contains functions that create the schema in the database """

from sqlalchemy import *
import pandas as pd
import os

[docs]def create_schema_from_source_files(sql_engine, config): """ This function reads the source file and extracts the table names and corresponding column names. Parameters ---------- sql_engine : SQLAlchemy engine 'sql_engine` should support database operation. channel_metadata_file : string `channel_metadata_file` represents the absolute path of the channel metadata file **including** the filename. years_file : string `years_file` represents the absolute path of the file containing the years for which the tables should be created for. This file should be a text file with one year per row and the column header "years". Returns ------- List of table names in the database. """ # meter_channel_dict, data_years = read_source_files(channel_metadata_file, years_file) # Metadata for the SQLAlchemy tables database - will add tables eventually # This should be connected to the correct database. metadata = MetaData(sql_engine) metadata.reflect(sql_engine) delete_ok = input("This will drop all the tables and delete all the data, OK? Enter y or n") # Clean the database, # ***** this will delete all tables and data in it ***** if delete_ok.lower() == 'y': metadata.drop_all(sql_engine) # Clear the metadata metadata.clear() for meter in config.METER_CHANNEL_DICT: # Get the channels for this meter meter_channels = config.METER_CHANNEL_DICT[meter] # Create the following lists for the database insertion # Types of columns - defaulting to float, except for time columns_types = [TIMESTAMP(timezone=True)] + [Float] * (len(meter_channels) - 1) # Primary key flags - defaulting to false, except for time # **** removing the pk_constraint, for speedy inserts **** primary_key_flags = [True] + [False] * (len(meter_channels) - 1) # Nullable flags - defaulting to false, except for time # **** removing the non_null constraint, for speedy inserts **** nullable_flags = [True] + [False] * (len(meter_channels) - 1) # Iterate over the years list for year in config.DATA_YEARS: # Create table name from meter name and year table_name = meter + "_" + str(year) # Dynamically create table names and corresponding column names table_name_i = Table(table_name, metadata, *(Column(column_name, column_type, primary_key=primary_key_flag, nullable=nullable_flag) for column_name, column_type, primary_key_flag, nullable_flag in zip(meter_channels, columns_types, primary_key_flags, nullable_flags))) print(table_name_i) # Create the tables in the metadata - this defaults to checking that # the table names first, so will only create tables if they do not exist # making this function idempotent metadata.create_all() # Create timescaledb hypertables from the tables created ########## # There is currently no SQLAlchemy way to create hypertables, so we execute # an sql statement. This would have been simlar in psycopg2. # Also, this creates hypertables for all tables in the metadata. If this is # desired, then specify the tables names using some other mechanism. ######### # First get the names of the tables table_names = metadata.tables.keys() # Connect to the database engine to execute the query with sql_engine.connect() as con: # Iterate over the table names to create the query statement for table_name in table_names: # Generate the query statement for the current table name sql_statement = ("""SELECT create_hypertable('public.""" + '"' + table_name + '"' + """', 'time' , if_not_exists => TRUE);""") # Execute the query con.execute(sql_statement) return metadata.tables.keys()