Quick Start#

In this notebook, we’ll explore how to combine the power of Delta Lake, an open-source storage layer that brings ACID transactions to big data workloads, with Polars, a lightning-fast DataFrames library written in Rust.

We’ll guide you through:

  1. Setting up your environment

  2. Ingesting data into Delta Lake using Polars

  3. Reading and querying data from Delta Lake with Polars

Reference:

Import Python Libraries#

[1]:
%pip install -r requirements.txt
Requirement already satisfied: deltalake<1.0.0,>=0.18.2 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 1)) (0.18.2)
Requirement already satisfied: polars<2.0.0,>=1.2.1 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from -r requirements.txt (line 2)) (1.4.1)
Requirement already satisfied: pyarrow>=8 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (17.0.0)
Requirement already satisfied: pyarrow-hotfix in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (0.6)
Requirement already satisfied: numpy>=1.16.6 in /Users/sanhehu/Documents/GitHub/learn_delta_py-project/.venv/lib/python3.10/site-packages (from pyarrow>=8->deltalake<1.0.0,>=0.18.2->-r requirements.txt (line 1)) (2.0.1)

[notice] A new release of pip is available: 24.1.2 -> 24.2
[notice] To update, run: pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[2]:
import typing as T
import shutil
from pathlib import Path
from datetime import datetime, timezone

import polars as pl # dataframe manipulation
from tabulate import tabulate # pretty print dataframe

Define Local Path Storage Backend#

[3]:
dir_here = Path.cwd()
dir_db = dir_here / "mydb"
dir_t_account = dir_db / "accounts"

def reset_db():
    """
    Reset the database by deleting the entire database folder.
    """
    shutil.rmtree(dir_db, ignore_errors=True)
    dir_db.mkdir(exist_ok=True)
    dir_t_account.mkdir(exist_ok=True)
[4]:
reset_db()

Define Data Schema#

[5]:
account_schema = {
    "account_id": pl.Utf8(),
    "create_at": pl.Datetime(),
    "update_at": pl.Datetime(),
    "account_number": pl.Utf8(),
    "account_type": pl.Utf8(),
    "description": pl.Utf8(),
    # partition keys
    "year": pl.Utf8(),
    "month": pl.Utf8(),
    "day": pl.Utf8(),
}

def add_partition_keys_for_accounts(rows: T.List[T.Dict]):
    """
    Add partition keys based on create_at time.
    """
    for row in rows:
        row["year"] = str(row["create_at"].year)
        row["month"] = str(row["create_at"].month).zfill(2)
        row["day"] = str(row["create_at"].day).zfill(2)
    return rows

Insert Two Rows#

[6]:
def w1_create_accounts():
    data = [
        {
            "account_id": "acc-1",
            "create_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 1, tzinfo=timezone.utc),
            "account_number": "1111-1111-1111",
            "account_type": "checking",
            "description": "Alice's Main checking account",
        },
        {
            "account_id": "acc-2",
            "create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "account_number": "2222-2222-2222",
            "account_type": "checking",
            "description": "Bob's Main checking account",
        },
    ]
    df = pl.DataFrame(
        add_partition_keys_for_accounts(data),
        schema=account_schema,
    )
    df.write_delta(
        f"{dir_t_account}",
        mode="append",
        delta_write_options=dict(
            partition_by=["year", "month", "day"],
        ),
    )

w1_create_accounts()

Exam Results

[7]:
def pprint_df(df: pl.DataFrame):
    print(tabulate(df.to_dict(), headers=list(df.schema), tablefmt="grid"))


def query_accounts():
    df = pl.scan_delta(
        f"{dir_t_account}",
    ).sort(by="create_at").collect()
    pprint_df(df)

query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| account_id   | create_at           | update_at           | account_number   | account_type   | description                   |   year |   month |   day |
+==============+=====================+=====================+==================+================+===============================+========+=========+=======+
| acc-1        | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111   | checking       | Alice's Main checking account |   2021 |      01 |    01 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+
| acc-2        | 2021-01-02 00:00:00 | 2021-01-02 00:00:00 | 2222-2222-2222   | checking       | Bob's Main checking account   |   2021 |      01 |    02 |
+--------------+---------------------+---------------------+------------------+----------------+-------------------------------+--------+---------+-------+

Do Upsert, Update 1 row, Insert 1 row#

[8]:
def w2_update_accounts():
    data = [
        {
            "account_id": "acc-2",
            "account_number": "2222-2222-2222",
            "create_at": datetime(2021, 1, 2, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "account_type": "checking",
            "description": "Bob's Main checking account, updated",
        },
        {
            "account_id": "acc-3",
            "account_number": "3333-3333-3333",
            "create_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "update_at": datetime(2021, 1, 3, tzinfo=timezone.utc),
            "account_type": "saving",
            "description": "Cathy's Main saving account",
        },
    ]
    df = pl.DataFrame(
        add_partition_keys_for_accounts(data),
        schema=account_schema,
    )
    table_merger = df.write_delta(
        f"{dir_t_account}",
        mode="merge",
        delta_write_options=dict(
            partition_by=["year", "month", "day"],
        ),
        delta_merge_options=dict(
            predicate="s.account_id = t.account_id",
            source_alias="s",
            target_alias="t",
        ),
    )
    (
        table_merger
        .when_matched_update_all() # will do update
        .when_not_matched_insert_all() # will do insert
        .execute()
    )

w2_update_accounts()

Exam Result

[9]:
query_accounts()
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| account_id   | create_at           | update_at           | account_number   | account_type   | description                          |   year |   month |   day |
+==============+=====================+=====================+==================+================+======================================+========+=========+=======+
| acc-1        | 2021-01-01 00:00:00 | 2021-01-01 00:00:00 | 1111-1111-1111   | checking       | Alice's Main checking account        |   2021 |      01 |    01 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-2        | 2021-01-02 00:00:00 | 2021-01-03 00:00:00 | 2222-2222-2222   | checking       | Bob's Main checking account, updated |   2021 |      01 |    02 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
| acc-3        | 2021-01-03 00:00:00 | 2021-01-03 00:00:00 | 3333-3333-3333   | saving         | Cathy's Main saving account          |   2021 |      01 |    03 |
+--------------+---------------------+---------------------+------------------+----------------+--------------------------------------+--------+---------+-------+
[ ]: