Work with AWS S3 Storage Backend#

Data engineers and analysts are constantly seeking efficient ways to handle large datasets in cloud environments. This tutorial introduces a powerful combination: Delta Lake, Polars, and AWS S3. We’ll demonstrate how to leverage these tools to create a robust, scalable data pipeline. Delta Lake, an open-source storage layer, brings ACID transactions and increased reliability to your data lakes. Polars, a lightning-fast DataFrame library written in Rust, offers exceptional performance for data manipulation. By integrating these with AWS S3, we create a flexible and cost-effective storage solution.

In this Jupyter notebook, we’ll walk through:

  1. Setting up the environment

  2. Connecting to AWS S3

  3. Using Polars to prepare and transform data

  4. Writing data to Delta Lake format on S3

  5. Reading and querying Delta Lake tables

Reference:

Import Python Libraries#

[1]:
%pip install -r requirements.txt
Requirement already satisfied: deltalake<1.0.0,>=0.18.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 1)) (0.18.2)
Requirement already satisfied: polars<2.0.0,>=1.2.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 2)) (1.4.1)
Requirement already satisfied: boto_session_manager<2.0.0,>=1.7.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 3)) (1.7.2)
Requirement already satisfied: s3pathlib<3.0.0,>=2.1.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 4)) (2.1.2)
Requirement already satisfied: pyarrow>=8 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (17.0.0)
Requirement already satisfied: pyarrow-hotfix in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (0.6)
Requirement already satisfied: boto3 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.34.153)
Requirement already satisfied: iterproxy in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (0.3.1)
Requirement already satisfied: func-args in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (0.1.1)
Requirement already satisfied: pathlib-mate<2.0.0,>=1.0.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (1.3.2)
Requirement already satisfied: smart-open<7.0.0,>=5.1.0 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (6.4.0)
Requirement already satisfied: numpy>=1.16.6 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from pyarrow>=8->deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (2.0.1)
Requirement already satisfied: botocore<1.35.0,>=1.34.153 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.34.153)
Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.0.1)
Requirement already satisfied: s3transfer<0.11.0,>=0.10.0 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (0.10.2)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from botocore<1.35.0,>=1.34.153->boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (2.9.0.post0)
Requirement already satisfied: urllib3!=2.2.0,<3,>=1.25.4 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from botocore<1.35.0,>=1.34.153->boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (2.2.2)
Requirement already satisfied: six>=1.5 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.35.0,>=1.34.153->boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.16.0)

[notice] A new release of pip is available: 24.1.2 -> 24.2
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[2]:
import typing as T
from datetime import datetime, timezone

from boto_session_manager import BotoSesManager
from s3pathlib import S3Path, context
import polars as pl # dataframe manipulation
from tabulate import tabulate # pretty print dataframe

Define S3 Storage Backend#

[3]:
aws_profile = "bmt_app_dev_us_east_1"
bsm = BotoSesManager(profile_name=aws_profile)
context.attach_boto_session(boto_ses=bsm.boto_ses)
credential = bsm.boto_ses.get_credentials()
storage_options = {
    "AWS_REGION": bsm.aws_region,
    "AWS_ACCESS_KEY_ID": credential.access_key,
    "AWS_SECRET_ACCESS_KEY": credential.secret_key,
    "AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
if credential.token:
    storage_options["AWS_SESSION_TOKEN"] = credential.token

bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
s3dir_db = S3Path(f"s3://{bucket}/projects/learn_delta_py/mydb/").to_dir()
s3dir_t_account = (s3dir_db / "accounts").to_dir()

def reset_db():
    """
    Reset the database by deleting the entire database S3 folder.
    """
    s3dir_db.delete()
[4]:
reset_db()

Define Data Schema#

[5]:
account_schema = {
    "account_id": pl.Utf8(),
    "create_at": pl.Datetime(),
    "update_at": pl.Datetime(),
    "account_number": pl.Utf8(),
    "account_type": pl.Utf8(),
    "description": pl.Utf8(),
    # partition keys
    "year": pl.Utf8(),
    "month": pl.Utf8(),
    "day": pl.Utf8(),
}

def add_partition_keys_for_accounts(rows: T.List[T.Dict]):
    """
    Add partition keys based on create_at time.
    """
    for row in rows:
        row["year"] = str(row["create_at"].year)
        row["month"] = str(row["create_at"].month).zfill(2)
        row["day"] = str(row["create_at"].day).zfill(2)
    return rows

Insert Two Rows#

[6]:
def w1_create_accounts():
    data = [
        {
            "account_id": "acc-1",
            "create_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
            "account_number": "1111-1111-1111",
            "account_type": "checking",
            "description": "Alice's Main checking account",
        },
        {
            "account_id": "acc-2",
            "create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "account_number": "2222-2222-2222",
            "account_type": "checking",
            "description": "Bob's Main checking account",
        },
    ]
    df = pl.DataFrame(
        add_partition_keys_for_accounts(data),
        schema=account_schema,
    )
    df.write_delta(
        s3dir_t_account.uri,
        mode="append",
        delta_write_options=dict(
            partition_by=["year", "month", "day"],
        ),
        storage_options=storage_options,
    )

w1_create_accounts()

Exam Results

[7]:
def pprint_df(df: pl.DataFrame):
    print(tabulate(df.to_dict(), headers=list(df.schema), tablefmt="grid"))


def query_accounts():
    df = pl.scan_delta(
        s3dir_t_account.uri,
        storage_options=storage_options,
    ).sort(by="create_at").collect()
    pprint_df(df)

query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| account_id   | create_at           | update_at           | account_number   | account_type   | description                   |   year |   month |   day |
+==============+=====================+=====================+==================+================+===============================+========+=========+=======+
| acc-1        | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111   | checking       | Alice's Main checking account |   2021 |      01 |    01 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| acc-2        | 2021-01-02 00:00:00 | 2021-01-02 00:00:00 | 2222-2222-2222   | checking       | Bob's Main checking account   |   2021 |      01 |    02 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+

Do Upsert, Update 1 row, Insert 1 row#

[8]:
def w2_update_accounts():
    data = [
        {
            "account_id": "acc-2",
            "account_number": "2222-2222-2222",
            "create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "account_type": "checking",
            "description": "Bob's Main checking account, updated",
        },
        {
            "account_id": "acc-3",
            "account_number": "3333-3333-3333",
            "create_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "account_type": "saving",
            "description": "Cathy's Main saving account",
        },
    ]
    df = pl.DataFrame(
        add_partition_keys_for_accounts(data),
        schema=account_schema,
    )
    table_merger = df.write_delta(
        s3dir_t_account.uri,
        mode="merge",
        delta_write_options=dict(
            partition_by=["year", "month", "day"],
        ),
        delta_merge_options=dict(
            predicate="s.account_id = t.account_id",
            source_alias="s",
            target_alias="t",
        ),
        storage_options=storage_options,
    )
    (
        table_merger
        .when_matched_update_all() # will do update
        .when_not_matched_insert_all() # will do insert
        .execute()
    )

w2_update_accounts()

Exam Result

[9]:
query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| account_id   | create_at           | update_at           | account_number   | account_type   | description                          |   year |   month |   day |
+==============+=====================+=====================+==================+================+======================================+========+=========+=======+
| acc-1        | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111   | checking       | Alice's Main checking account        |   2021 |      01 |    01 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-2        | 2021-01-02 00:00:00 | 2021-01-03 00:00:00 | 2222-2222-2222   | checking       | Bob's Main checking account, updated |   2021 |      01 |    02 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-3        | 2021-01-03 00:00:00 | 2021-01-03 00:00:00 | 3333-3333-3333   | saving         | Cathy's Main saving account          |   2021 |      01 |    03 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
[ ]: