Quick Start#
In this notebook, we’ll explore how to combine the power of Delta Lake, an open-source storage layer that brings ACID transactions to big data workloads, with Polars, a lightning-fast DataFrames library written in Rust.
We’ll guide you through:
Setting up your environment
Ingesting data into Delta Lake using Polars
Reading and querying data from Delta Lake with Polars
Reference:
Import Python Libraries#
[1]:
%pip install -r requirements.txt
Requirement already satisfied: deltalake<1.0.0,>=0.18.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 1)) (0.18.2)
Requirement already satisfied: polars<2.0.0,>=1.2.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 2)) (1.4.1)
Requirement already satisfied: pyarrow>=8 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (17.0.0)
Requirement already satisfied: pyarrow-hotfix in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (0.6)
Requirement already satisfied: numpy>=1.16.6 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from pyarrow>=8->deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (2.0.1)
[notice] A new release of pip is available: 24.1.2 -> 24.2
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[2]:
import typing as T
import shutil
from pathlib import Path
from datetime import datetime, timezone
import polars as pl # dataframe manipulation
from tabulate import tabulate # pretty print dataframe
Define Local Path Storage Backend#
[3]:
dir_here = Path.cwd()
dir_db = dir_here / "mydb"
dir_t_account = dir_db / "accounts"
def reset_db():
"""
Reset the database by deleting the entire database folder.
"""
shutil.rmtree(dir_db, ignore_errors=True)
dir_db.mkdir(exist_ok=True)
dir_t_account.mkdir(exist_ok=True)
[4]:
reset_db()
Define Data Schema#
[5]:
account_schema = {
"account_id": pl.Utf8(),
"create_at": pl.Datetime(),
"update_at": pl.Datetime(),
"account_number": pl.Utf8(),
"account_type": pl.Utf8(),
"description": pl.Utf8(),
# partition keys
"year": pl.Utf8(),
"month": pl.Utf8(),
"day": pl.Utf8(),
}
def add_partition_keys_for_accounts(rows: T.List[T.Dict]):
"""
Add partition keys based on create_at time.
"""
for row in rows:
row["year"] = str(row["create_at"].year)
row["month"] = str(row["create_at"].month).zfill(2)
row["day"] = str(row["create_at"].day).zfill(2)
return rows
Insert Two Rows#
[6]:
def w1_create_accounts():
data = [
{
"account_id": "acc-1",
"create_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
"account_number": "1111-1111-1111",
"account_type": "checking",
"description": "Alice's Main checking account",
},
{
"account_id": "acc-2",
"create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
"account_number": "2222-2222-2222",
"account_type": "checking",
"description": "Bob's Main checking account",
},
]
df = pl.DataFrame(
add_partition_keys_for_accounts(data),
schema=account_schema,
)
df.write_delta(
f"{dir_t_account}",
mode="append",
delta_write_options=dict(
partition_by=["year", "month", "day"],
),
)
w1_create_accounts()
Exam Results
[7]:
def pprint_df(df: pl.DataFrame):
print(tabulate(df.to_dict(), headers=list(df.schema), tablefmt="grid"))
def query_accounts():
df = pl.scan_delta(
f"{dir_t_account}",
).sort(by="create_at").collect()
pprint_df(df)
query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| account_id | create_at | update_at | account_number | account_type | description | year | month | day |
+==============+=====================+=====================+==================+================+===============================+========+=========+=======+
| acc-1 | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111 | checking | Alice's Main checking account | 2021 | 01 | 01 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| acc-2 | 2021-01-02 00:00:00 | 2021-01-02 00:00:00 | 2222-2222-2222 | checking | Bob's Main checking account | 2021 | 01 | 02 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
Do Upsert, Update 1 row, Insert 1 row#
[8]:
def w2_update_accounts():
data = [
{
"account_id": "acc-2",
"account_number": "2222-2222-2222",
"create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
"account_type": "checking",
"description": "Bob's Main checking account, updated",
},
{
"account_id": "acc-3",
"account_number": "3333-3333-3333",
"create_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
"update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
"account_type": "saving",
"description": "Cathy's Main saving account",
},
]
df = pl.DataFrame(
add_partition_keys_for_accounts(data),
schema=account_schema,
)
table_merger = df.write_delta(
f"{dir_t_account}",
mode="merge",
delta_write_options=dict(
partition_by=["year", "month", "day"],
),
delta_merge_options=dict(
predicate="s.account_id = t.account_id",
source_alias="s",
target_alias="t",
),
)
(
table_merger
.when_matched_update_all() # will do update
.when_not_matched_insert_all() # will do insert
.execute()
)
w2_update_accounts()
Exam Result
[9]:
query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| account_id | create_at | update_at | account_number | account_type | description | year | month | day |
+==============+=====================+=====================+==================+================+======================================+========+=========+=======+
| acc-1 | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111 | checking | Alice's Main checking account | 2021 | 01 | 01 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-2 | 2021-01-02 00:00:00 | 2021-01-03 00:00:00 | 2222-2222-2222 | checking | Bob's Main checking account, updated | 2021 | 01 | 02 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-3 | 2021-01-03 00:00:00 | 2021-01-03 00:00:00 | 3333-3333-3333 | saving | Cathy's Main saving account | 2021 | 01 | 03 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
[ ]: