Skip to content

Source

Defines a historical data source (Parquet, CSV, or DataFrame).

Source(path=None, *, keys, timestamp, name=None, format=None, delimiter=',', timestamp_format=None, df=None)

A table of historical data with timestamps.

Parameters:

Name Type Description Default
path str | Path | None

Path to the data file (Parquet or CSV).

None
keys str | list[str]

Column name(s) used as entity keys.

required
timestamp str

Column name containing the temporal key.

required
name str | None

Human-readable name (defaults to filename stem).

None
format str | None

File format ("parquet" or "csv"). Auto-detected from extension.

None
delimiter str

CSV delimiter (only for CSV files).

','
timestamp_format str | None

strftime format for parsing timestamp strings.

None
Source code in src/timefence/core.py
def __init__(
    self,
    path: str | Path | None = None,
    *,
    keys: str | list[str],
    timestamp: str,
    name: str | None = None,
    format: str | None = None,
    delimiter: str = ",",
    timestamp_format: str | None = None,
    df: Any = None,
):
    if path is None and df is None:
        raise TimefenceValidationError(
            "Source requires either 'path' or 'df' parameter."
        )
    if path is not None and df is not None:
        raise TimefenceValidationError(
            "Source accepts either 'path' or 'df', not both."
        )

    self.path = Path(path) if path is not None else None
    self.df = df
    self.keys = _as_list(keys)
    if not self.keys:
        raise TimefenceValidationError(
            "Source 'keys' cannot be empty. Provide at least one entity key column."
        )
    self.timestamp = timestamp
    self.name = name or (self.path.stem if self.path else "dataframe")
    self.delimiter = delimiter
    self.timestamp_format = timestamp_format

    if format is not None:
        self.format = format
    elif self.path is not None:
        ext = self.path.suffix.lower()
        if ext in (".parquet", ".pq"):
            self.format = "parquet"
        elif ext == ".csv":
            self.format = "csv"
        else:
            raise TimefenceValidationError(
                f"Cannot auto-detect format for '{self.path}'. "
                "Specify format='parquet' or format='csv'."
            )
    else:
        self.format = "arrow"

Parameters

Parameter Type Description
path str \| Path \| None Path to the data file. Mutually exclusive with df.
keys str \| list[str] Column name(s) representing the entity (e.g., "user_id").
timestamp str Column name containing the valid-at timestamp.
name str \| None Human-readable name. Defaults to filename stem.
format str \| None "parquet" or "csv". Auto-detected from file extension.
delimiter str CSV delimiter. Default: ",".
timestamp_format str \| None Optional strftime format for parsing timestamps (CSV only).
df Any \| None Pass a DataFrame, DuckDB relation, or any object with a compatible interface instead of a file path.

Mutual exclusivity

Provide exactly one of path or df. Passing both raises TimefenceValidationError. Passing neither also raises an error.

Examples

# Parquet source
transactions = timefence.Source(
    path="data/transactions.parquet",
    keys=["user_id"],
    timestamp="created_at",
)

# CSV source
events = timefence.Source(
    path="data/events.csv",
    keys=["user_id"],
    timestamp="event_time",
    format="csv",
    delimiter=",",
)

# DataFrame source
df_source = timefence.Source(
    df=my_dataframe,
    keys=["user_id"],
    timestamp="created_at",
)

# Multi-key source
orders = timefence.Source(
    path="data/orders.parquet",
    keys=["user_id", "product_id"],
    timestamp="order_time",
)

Convenience aliases

transactions = timefence.ParquetSource("data/tx.parquet", keys="user_id", timestamp="ts")
events = timefence.CSVSource("data/events.csv", keys="user_id", timestamp="ts")

These are thin wrappers that set format automatically.

SQLSource

Define a source via a SQL query against DuckDB. Use this when your data requires pre-processing (joins, filters, aggregations) before it can serve as a feature source.

SQLSource(query, *, keys, timestamp, name, connection=None)

A source defined by a SQL query against DuckDB.

Parameters:

Name Type Description Default
query str

SQL query string. Can use read_parquet/read_csv directly.

required
keys str | list[str]

Column name(s) used as entity keys.

required
timestamp str

Column name containing the temporal key.

required
name str

Human-readable name.

required
connection str | None

Path to a DuckDB database file (optional, uses in-memory by default).

None
Source code in src/timefence/core.py
def __init__(
    self,
    query: str,
    *,
    keys: str | list[str],
    timestamp: str,
    name: str,
    connection: str | None = None,
):
    self.query = query
    self.keys = _as_list(keys)
    self.timestamp = timestamp
    self.name = name
    self.connection = connection
    self.path = None
    self.df = None
    self.format = "sql"

Parameters

Parameter Type Description
query str SQL query string. Can use read_parquet() / read_csv() directly.
keys str \| list[str] Column name(s) used as entity keys.
timestamp str Column name containing the temporal key.
name str Human-readable name (required — cannot be auto-derived from a query).
connection str \| None Path to a DuckDB database file. Default: None (in-memory).

Examples

import timefence

# Query across multiple parquet files
combined = timefence.SQLSource(
    query="""
        SELECT user_id, event_time, amount
        FROM read_parquet('data/transactions_*.parquet')
        WHERE amount > 0
    """,
    keys=["user_id"],
    timestamp="event_time",
    name="positive_transactions",
)

# Pre-aggregate before using as a feature source
daily_spend = timefence.SQLSource(
    query="""
        SELECT user_id, DATE_TRUNC('day', created_at) AS day,
               SUM(amount) AS daily_total
        FROM read_parquet('data/transactions.parquet')
        GROUP BY user_id, DATE_TRUNC('day', created_at)
    """,
    keys=["user_id"],
    timestamp="day",
    name="daily_spend",
)

# Join two files before use
enriched = timefence.SQLSource(
    query="""
        SELECT t.user_id, t.created_at, t.amount, u.country
        FROM read_parquet('data/transactions.parquet') t
        JOIN read_parquet('data/users.parquet') u
          ON t.user_id = u.user_id
    """,
    keys=["user_id"],
    timestamp="created_at",
    name="enriched_transactions",
)

# Use with an existing DuckDB database
warehouse = timefence.SQLSource(
    query="SELECT user_id, updated_at, score FROM user_scores",
    keys=["user_id"],
    timestamp="updated_at",
    name="user_scores",
    connection="analytics.duckdb",
)

When to use SQLSource

Scenario Use
Single Parquet or CSV file Source (simpler)
Glob patterns (*.parquet) SQLSource
Pre-filtering rows SQLSource
Joining multiple tables SQLSource
Existing DuckDB database SQLSource with connection
DataFrame in memory Source with df