Work with AWS S3 Storage Backend#
Data engineers and analysts are constantly seeking efficient ways to handle large datasets in cloud environments. This tutorial introduces a powerful combination: Delta Lake, Polars, and AWS S3. We’ll demonstrate how to leverage these tools to create a robust, scalable data pipeline. Delta Lake, an open-source storage layer, brings ACID transactions and increased reliability to your data lakes. Polars, a lightning-fast DataFrame library written in Rust, offers exceptional performance for data manipulation. By integrating these with AWS S3, we create a flexible and cost-effective storage solution.
In this Jupyter notebook, we’ll walk through:
Setting up the environment
Connecting to AWS S3
Using Polars to prepare and transform data
Writing data to Delta Lake format on S3
Reading and querying Delta Lake tables
Reference:
Import Python Libraries#
[1]:
%pip install -r requirements.txt
Requirement already satisfied: deltalake<1.0.0,>=0.18.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 1)) (0.18.2)
Requirement already satisfied: polars<2.0.0,>=1.2.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 2)) (1.4.1)
Requirement already satisfied: boto_session_manager<2.0.0,>=1.7.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 3)) (1.7.2)
Requirement already satisfied: s3pathlib<3.0.0,>=2.1.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 4)) (2.1.2)
Requirement already satisfied: pyarrow>=8 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (17.0.0)
Requirement already satisfied: pyarrow-hotfix in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (0.6)
Requirement already satisfied: boto3 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.34.153)
Requirement already satisfied: iterproxy in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (0.3.1)
Requirement already satisfied: func-args in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (0.1.1)
Requirement already satisfied: pathlib-mate<2.0.0,>=1.0.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (1.3.2)
Requirement already satisfied: smart-open<7.0.0,>=5.1.0 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from s3pathlib<3.0.0,>=2.1.2->-r requirements.txt (line 4)) (6.4.0)
Requirement already satisfied: numpy>=1.16.6 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from pyarrow>=8->deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (2.0.1)
Requirement already satisfied: botocore<1.35.0,>=1.34.153 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.34.153)
Requirement already satisfied: jmespath<2.0.0,>=0.7.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.0.1)
Requirement already satisfied: s3transfer<0.11.0,>=0.10.0 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (0.10.2)
Requirement already satisfied: python-dateutil<3.0.0,>=2.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from botocore<1.35.0,>=1.34.153->boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (2.9.0.post0)
Requirement already satisfied: urllib3!=2.2.0,<3,>=1.25.4 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from botocore<1.35.0,>=1.34.153->boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (2.2.2)
Requirement already satisfied: six>=1.5 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from python-dateutil<3.0.0,>=2.1->botocore<1.35.0,>=1.34.153->boto3->boto_session_manager<2.0.0,>=1.7.2->-r requirements.txt (line 3)) (1.16.0)
[notice] A new release of pip is available: 24.1.2 -> 24.2
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[2]:
import typing as T
from datetime import datetime, timezone
from boto_session_manager import BotoSesManager
from s3pathlib import S3Path, context
import polars as pl # dataframe manipulation
from tabulate import tabulate # pretty print dataframe
Define S3 Storage Backend#
[3]:
aws_profile = "bmt_app_dev_us_east_1"
bsm = BotoSesManager(profile_name=aws_profile)
context.attach_boto_session(boto_ses=bsm.boto_ses)
credential = bsm.boto_ses.get_credentials()
storage_options = {
"AWS_REGION": bsm.aws_region,
"AWS_ACCESS_KEY_ID": credential.access_key,
"AWS_SECRET_ACCESS_KEY": credential.secret_key,
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}
if credential.token:
storage_options["AWS_SESSION_TOKEN"] = credential.token
bucket = f"{bsm.aws_account_alias}-{bsm.aws_region}-data"
s3dir_db = S3Path(f"s3://{bucket}/projects/learn_delta_py/mydb/").to_dir()
s3dir_t_account = (s3dir_db / "accounts").to_dir()
def reset_db():
"""
Reset the database by deleting the entire database S3 folder.
"""
s3dir_db.delete()
[4]:
reset_db()
Define Data Schema#
[5]:
account_schema = {
"account_id": pl.Utf8(),
"create_at": pl.Datetime(),
"update_at": pl.Datetime(),
"account_number": pl.Utf8(),
"account_type": pl.Utf8(),
"description": pl.Utf8(),
# partition keys
"year": pl.Utf8(),
"month": pl.Utf8(),
"day": pl.Utf8(),
}
def add_partition_keys_for_accounts(rows: T.List[T.Dict]):
"""
Add partition keys based on create_at time.
"""
for row in rows:
row["year"] = str(row["create_at"].year)
row["month"] = str(row["create_at"].month).zfill(2)
row["day"] = str(row["create_at"].day).zfill(2)
return rows
Insert Two Rows#
[6]:
def w1_create_accounts():
data = [
{
"account_id": "acc-1",
"create_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
"account_number": "1111-1111-1111",
"account_type": "checking",
"description": "Alice's Main checking account",
},
{
"account_id": "acc-2",
"create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
"account_number": "2222-2222-2222",
"account_type": "checking",
"description": "Bob's Main checking account",
},
]
df = pl.DataFrame(
add_partition_keys_for_accounts(data),
schema=account_schema,
)
df.write_delta(
s3dir_t_account.uri,
mode="append",
delta_write_options=dict(
partition_by=["year", "month", "day"],
),
storage_options=storage_options,
)
w1_create_accounts()
Exam Results
[7]:
def pprint_df(df: pl.DataFrame):
print(tabulate(df.to_dict(), headers=list(df.schema), tablefmt="grid"))
def query_accounts():
df = pl.scan_delta(
s3dir_t_account.uri,
storage_options=storage_options,
).sort(by="create_at").collect()
pprint_df(df)
query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| account_id | create_at | update_at | account_number | account_type | description | year | month | day |
+==============+=====================+=====================+==================+================+===============================+========+=========+=======+
| acc-1 | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111 | checking | Alice's Main checking account | 2021 | 01 | 01 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| acc-2 | 2021-01-02 00:00:00 | 2021-01-02 00:00:00 | 2222-2222-2222 | checking | Bob's Main checking account | 2021 | 01 | 02 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
Do Upsert, Update 1 row, Insert 1 row#
[8]:
def w2_update_accounts():
data = [
{
"account_id": "acc-2",
"account_number": "2222-2222-2222",
"create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
"account_type": "checking",
"description": "Bob's Main checking account, updated",
},
{
"account_id": "acc-3",
"account_number": "3333-3333-3333",
"create_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
"account_type": "saving",
"description": "Cathy's Main saving account",
},
]
df = pl.DataFrame(
add_partition_keys_for_accounts(data),
schema=account_schema,
)
table_merger = df.write_delta(
s3dir_t_account.uri,
mode="merge",
delta_write_options=dict(
partition_by=["year", "month", "day"],
),
delta_merge_options=dict(
predicate="s.account_id = t.account_id",
source_alias="s",
target_alias="t",
),
storage_options=storage_options,
)
(
table_merger
.when_matched_update_all() # will do update
.when_not_matched_insert_all() # will do insert
.execute()
)
w2_update_accounts()
Exam Result
[9]:
query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| account_id | create_at | update_at | account_number | account_type | description | year | month | day |
+==============+=====================+=====================+==================+================+======================================+========+=========+=======+
| acc-1 | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111 | checking | Alice's Main checking account | 2021 | 01 | 01 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-2 | 2021-01-02 00:00:00 | 2021-01-03 00:00:00 | 2222-2222-2222 | checking | Bob's Main checking account, updated | 2021 | 01 | 02 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-3 | 2021-01-03 00:00:00 | 2021-01-03 00:00:00 | 3333-3333-3333 | saving | Cathy's Main saving account | 2021 | 01 | 03 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
[ ]: