Vector: On the left, a MySQL database icon begins to pixelate and break apart. The disintegrated pixels move towards the right and reassemble to form a directory folder icon. From this folder, multiple file icons are emerging. Floating between the database and the directory are snippets of Python code, acting as a bridge. Above the entire scene, a softly glowing Python logo illuminates everything.

Exporting Database Tables to Parquet Files Using Python and Pandas

Managing MySQL databases can often be costly and time-consuming. If you’re working with databases containing static data, an effective alternative is to convert your database tables into individual Parquet files. By storing these files and leveraging Python for direct querying, you’ll maintain your existing querying capabilities and benefit from improved query performance, cost reduction, and serverless infrastructure. In this blog post, we’ll guide you through the process of crafting a Python function that allows you to effortlessly export specific tables from your database into Parquet files.

Step 1: Import necessary libraries

To get started, we’ll import the necessary libraries for our function.

# Standard library imports
import json
import os
import subprocess
import sys
import time
import warnings

# Threading and concurrency
import concurrent.futures

# Date and time
import datetime as dt
import pytz

# Data processing and database
import numpy as np
import pandas as pd
import pyarrow
import pyarrow.parquet as pq
import sqlalchemy
from sqlalchemy import create_engine

# Timezone setup
central = pytz.timezone('US/Central')

# Warnings configuration
warnings.filterwarnings("ignore", category=FutureWarning, module="pyarrow")

Step 2: Create a Credentials File and a MySQL Connection

The first code snippet has the format for a credentials file. I recommend saving this as a JSON file separately from your code so that your credentials are not exposed in your code.

The code snippet sets the credentials as an environmental variable and then creates a function that establishes a connection engine. This engine will be used to read the data from the MySQL database into a Pandas DataFrame.

The base_folder variable is where the parquet files will be saved.

#Example format to store credetials in seperate file
{
    "host":"127.0.0.1",
    "username":"user",
    "password":"password",
    "port":3306,
    "database":"database",
    # "ssl_ca":"path_to_file" #Optional if ssl
}
#Import the creds as an environmental variable 
os.environ["CREDS_PATH"] = "/path_to_file/creds.json"
base_folder = "path_to_folder" #Path to save parquet files

def create_sqlalchemy_engine():

    creds_path = os.environ.get("CREDS_PATH")
    with open(creds_path, 'r') as file:
        creds = json.load(file)

    # Construct the connection string (URI) for SQLAlchemy
    connection_string = (
        f"mysql+pymysql://{creds['username']}:{creds['password']}@"
        f"{creds['host']}:{creds['port']}/{creds['database']}"
    )

    if 'ssl_ca' in creds:
        ssl_args = {'ssl_ca': creds['ssl_ca']}
        engine = create_engine(connection_string, connect_args=ssl_args)
    else:
        engine = create_engine(connection_string)

    return engine

Step 3: Define Functions to Read the Database

The first function allows you to enter a SQL query as a string and receive the results in a Pandas DataFrame. The advantage of using the read_sql function from Pandas is that it ensures the integrity of the database schema is maintained.

def run_sql_query(query, mysql_connection, params=None):
    """
    Execute an SQL query and return the results as a pandas DataFrame.

    :param query: SQL query string
    :param mysql_connection: Active pymysql connection
    :param params: Optional parameters for the query
    :return: Resulting data as a pandas DataFrame
    """
    try:
        # Use pandas.read_sql() to fetch data directly into a DataFrame
        df = pd.read_sql(query, mysql_connection, params=params)
    except Exception as e:
        print(f"Error executing query: {e}")
        return None

    return df

This next function is a little helper function to help create a list of every table in the database to iterate through later.

def retrieve_all_tables(mysql_connection):
    # Use pandas.read_sql() to directly fetch data into a DataFrame
    query = "SHOW TABLES"
    df = pd.read_sql(query, mysql_connection)
    
    # Assuming the DataFrame has a single column with table names
    tables_lst = df.iloc[:, 0].tolist()
    
    return tables_lst

Finally, the main function here will read the table data incrementally and store it in a Parquet file format. Some things to point out:

  1. The data is streamed to parquet by leveraging the LIMIT and OFFSET. This way large tables will not cause an out-of-memory issue with Pandas. Changing the chunk_size parameter will also reduce the size of data held in memory.
  2. The data is stored in a directory as the name of the file followed by parquet. The individual data files are in that directory. You can direct pandas to the file directory to read all the data. For example, pd.read_parquet(table.parquet).
  3. The set_dtype_for_null_columns function will help explicitly set the column types for columns that are all null. Null columns get set as floats which can cause a problem if the data later in the table is a different data type.
  4. The function dynamically ends once it reads all the data in the table.
def set_dtype_for_null_columns(mysql_connection, df, table_name):
    """
    This function explicitly sets the data type when the column is all null.
    """
    type_map = {
        'bigint': 'Int64',
        'datetime': 'datetime64[ns]',
        'tinyint': 'Int8',
        'varchar': 'str',
        'smallint': 'Int16',
        'int': 'Int32',
        'float': 'float32',
        'text': 'str',
        'enum': 'str',
        'char': 'str',
        'longtext': 'str',
        'mediumtext': 'str',
        'timestamp': 'datetime64[ns]',
        'double': 'float64',
        'mediumint': 'Int32',
        'bit':'object'
    }
    df_sample = df.head(1000) #pandas will infer schema based on a sample of the data so if the column is mostly nulls then the schema can still be interpreted incorrectly 
    all_null_columns = df_sample.columns[df_sample.isnull().all()].tolist()
    
    if not all_null_columns:
        return df
    
    # Fetching column types for all null columns in one query
    col_placeholders = ', '.join(['%s'] * len(all_null_columns))
    query = f"""
        SELECT COLUMN_NAME, DATA_TYPE 
        FROM INFORMATION_SCHEMA.COLUMNS 
        WHERE TABLE_SCHEMA = 'brazenconnect' 
        AND TABLE_NAME = %s 
        AND COLUMN_NAME IN ({col_placeholders})
    """
    params = tuple([table_name] + all_null_columns)
    information_schema_df = run_sql_query(query, mysql_connection, params)
    
    # Mapping column names to their corresponding data types
    column_to_dtype_map = information_schema_df.set_index('COLUMN_NAME')['DATA_TYPE'].map(type_map).to_dict()
    
    for col, dtype in column_to_dtype_map.items():
        df[col] = df[col].astype(dtype)

    return df


def mysql_to_parquet(table_name, mysql_connection, base_folder, chunk_size=500_000, offset_start=0, max_iterations=None):
    # Create the directory for the table if it doesn't exist
    table_folder = os.path.join(base_folder, f"{table_name}.parquet")
    if not os.path.exists(table_folder):
        os.makedirs(table_folder)

    # Fetch the table schema once
    schema_query = f'DESCRIBE {table_name}'
    schema_df = pd.read_sql(schema_query, mysql_connection)

    offset = offset_start
    print(f"Starting to process {table_name} at {dt.datetime.now(central).strftime('%Y-%m-%d %H:%M:%S')}.")

    # Row count to monitor progress
    row_count_query = f"SELECT COUNT(*) FROM {table_name}"
    row_count_df = pd.read_sql(row_count_query, mysql_connection)
    row_count = row_count_df.iat[0, 0]

    iteration_count = 0

    while True:
        # If we've reached max_iterations, break out of the loop
        if max_iterations is not None and iteration_count >= max_iterations:
            break

        start_time = time.time()

        # Construct the SQL query with LIMIT and OFFSET
        query = f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {offset}"

        # Read data into a Pandas DataFrame
        df_chunk = pd.read_sql(query, mysql_connection)

        # If no data is returned, we're done
        if df_chunk.empty:
            break

        # Convert the columns (assuming you have this function from the previous version)
        df_chunk = set_dtype_for_null_columns(mysql_connection, df_chunk, table_name)

        # Convert bit(1) columns to boolean
        bit_cols = schema_df[schema_df['Type'] == 'bit(1)']['Field'].tolist()
        for col in bit_cols:
            if col in df_chunk.columns:
                df_chunk[col] = df_chunk[col].apply(lambda x: x == b'\x01')

        # Save the chunk to a separate Parquet file
        parquet_chunk_path = os.path.join(table_folder, f"{table_name}_{offset}_to_{offset + chunk_size}.parquet")
        df_chunk.to_parquet(parquet_chunk_path, index=False)

        end_time = time.time()
        elapsed_time = end_time - start_time
        progress = min((offset + chunk_size) / row_count * 100, 100)
        print(f"Finished processing {table_name} from {offset:,} to {offset + chunk_size:,} at {dt.datetime.now(central).strftime('%Y-%m-%d %H:%M:%S')}. The progress is {progress:.2f}% complete. Took {elapsed_time:.2f} seconds to process.")

        # Increment the offset
        offset += chunk_size

        # Increment the iteration count
        iteration_count += 1

Step 4: Run the Function

Now you can start exporting data! The first function saves a list of every table in the database. The second function starts iterating through each table and saving the data. The progress will be printed as you go along.

#The code above can be saved in a seperate python folder in the same directory and imported
from mysql_to_parquet import *

mysql_connection = create_sqlalchemy_engine() #Open the connection
# mysql_connection.close() #Close the connection

#List of mysql tables
tables_lst = retrieve_all_tables()
print(len(tables_lst))

for table in tables_lst:
    mysql_to_parquet(table)

The above function iterates through each table one by one. To speed things up, here is how you can run each loop item concurrently:

import concurrent.futures

# Using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor() as executor:
    results = list(executor.map(mysql_to_parquet, tables_lst))

Step 5: Check the Files

Lastly, a simple validation step involves comparing the row count of the database table with the corresponding row count in the respective Parquet file to ensure data consistency.

If the database is static, then the data will match exactly. If your database is active, you will see differences in the tables, but the idea is that it is a way to programmatically check the results to make sure there weren’t any issues iterating through the tables.

def get_table_count(table):
    temp_df = run_sql_query(f'SELECT "{table}" as table_name, COUNT(*) as row_count FROM {table}', mysql_connection)
    return temp_df

# Using ThreadPoolExecutor to run queries in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:  # You can adjust max_workers based on your needs
    results = list(executor.map(get_table_count, tables_lst))

# Concatenate the results into a single DataFrame
mysql_row_counts_df = pd.concat(results, ignore_index=True)

#Get the row counts for the parquet files
parquet_row_counts = {}

for table_name in tables_lst:
    directory = os.path.join(base_folder, f"{table_name}.parquet")
    files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.parquet')]
    parquet_row_count = sum(pq.read_metadata(f).num_rows for f in files)
    parquet_row_counts[table_name] = parquet_row_count

parquet_row_counts_df = pd.DataFrame.from_dict(parquet_row_counts, orient='index').reset_index().rename(columns={'index':'table_name',0:'row_count'})

#Merge the DataFrames
merged_df = pd.merge(mysql_row_counts_df, parquet_row_counts_df, on='table_name', how='outer', suffixes=('_mysql', '_parquet'))
# merged_df['row_count_bq'] = pd.to_numeric(merged_df['row_count_bq'].fillna(0))
merged_df

#Find the difference in row counts
merged_df['row_count_diff'] = merged_df['row_count_mysql'] - merged_df['row_count_parquet']

#View the results
pd.set_option('display.max_rows', None)
merged_df.sort_values('row_count_diff')

Google Colab Notebook

A Google Colab Notebook with the code can be found here.

Final Thoughts

Check out more Python tricks in this Colab Notebook or in my recent Python Posts.

Thanks for reading!


Posted

in

by

Tags: