Skip to content

Feature

A named signal derived from a Source. Exactly one of columns, sql, or transform must be provided.

Feature(source, *, columns=None, sql=None, transform=None, name=None, embargo=None, key_mapping=None, on_duplicate='error')

A named, versioned column derived from a source.

Exactly one of columns, sql, or transform must be provided.

Parameters:

Name Type Description Default
source SourceType

The data source for this feature.

required
columns str | list[str] | dict[str, str] | None

Column name(s) to select (Mode 1).

None
sql str | Path | None

SQL query string or path to .sql file (Mode 2).

None
transform Callable | None

Python callable (Mode 3).

None
name str | None

Feature name (auto-derived if possible).

None
embargo str | timedelta | None

Computation lag buffer (e.g., "1d").

None
key_mapping dict[str, str] | None

Map label key names to source key names.

None
on_duplicate str

"error" (default) or "keep_any".

'error'
Source code in src/timefence/core.py
def __init__(
    self,
    source: SourceType,
    *,
    columns: str | list[str] | dict[str, str] | None = None,
    sql: str | Path | None = None,
    transform: Callable | None = None,
    name: str | None = None,
    embargo: str | timedelta | None = None,
    key_mapping: dict[str, str] | None = None,
    on_duplicate: str = "error",
):
    self.source = source

    # Validate exactly one mode
    modes = sum(x is not None for x in [columns, sql, transform])
    if modes != 1:
        raise TimefenceConfigError(
            "Feature requires exactly one of 'columns', 'sql', or 'transform'. "
            f"Got {modes} of them."
        )

    # Determine mode and normalize
    if columns is not None:
        self.mode = "columns"
        if isinstance(columns, str):
            self._columns = {columns: columns}
        elif isinstance(columns, list):
            self._columns = {c: c for c in columns}
        else:
            self._columns = dict(columns)
        if not self._columns:
            raise TimefenceConfigError(
                "Feature 'columns' cannot be empty. "
                "Provide at least one column name."
            )
        self._sql_text = None
        self._sql_path = None
        self._transform = None
    elif sql is not None:
        self.mode = "sql"
        if isinstance(sql, Path):
            self._sql_path = sql
            self._sql_text = sql.read_text()
        else:
            self._sql_path = None
            self._sql_text = sql
        self._columns = {}
        self._transform = None
    else:
        self.mode = "transform"
        self._transform = transform
        self._columns = {}
        self._sql_text = None
        self._sql_path = None

    # Derive name
    if name is not None:
        self.name = name
    elif self.mode == "columns":
        self.name = "_".join(self._columns.values())
    elif (
        self.mode == "sql"
        and hasattr(self, "_sql_path")
        and self._sql_path is not None
    ):
        self.name = self._sql_path.stem
    elif self.mode == "transform":
        self.name = transform.__name__  # type: ignore[union-attr]
    else:
        raise TimefenceConfigError(
            "Feature 'name' is required when using inline SQL. "
            "Timefence cannot auto-derive a name from a SQL string."
        )

    self.embargo = parse_duration(embargo) or timedelta(0)
    self.key_mapping = key_mapping or {}
    self.on_duplicate = on_duplicate

    if on_duplicate not in ("error", "keep_any"):
        raise TimefenceConfigError(
            f"on_duplicate must be 'error' or 'keep_any', got '{on_duplicate}'."
        )

Parameters

Parameter Type Description
source Source \| SQLSource The data source object.
columns str \| list \| dict \| None Mode 1: Select columns directly. Pass a dict to rename: {"source_col": "feature_col"}.
sql str \| Path \| None Mode 2: SQL query or path to .sql file. Use {source} placeholder.
transform Callable \| None Mode 3: Python function (conn, source_table) -> DuckDBPyRelation. Use conn.sql(...) to return a relation.
name str \| None Feature name. Auto-derived when possible. Required for inline SQL strings.
embargo str \| timedelta \| None Computation lag buffer. See Embargo.
key_mapping dict[str, str] \| None Map label key names to source key names: {"user_id": "customer_id"}.
on_duplicate str "error" (default) or "keep_any" when duplicate (key, feature_time) pairs exist.

key_mapping

When label keys don't match source keys, use key_mapping to bridge them:

# Labels use "user_id", but the source uses "customer_id"
spend = timefence.Feature(
    source=transactions,  # has column "customer_id"
    columns=["amount"],
    key_mapping={"user_id": "customer_id"},
)

# Multi-key mapping
orders_feature = timefence.Feature(
    source=orders,  # has "cust_id" and "prod_id"
    columns=["quantity"],
    key_mapping={"user_id": "cust_id", "product_id": "prod_id"},
)

The mapping format is {label_key: source_key}. During the join, Timefence rewrites ON labels.user_id = source.customer_id automatically.

Feature modes

country = timefence.Feature(source=users, columns=["country"])
spend = timefence.Feature(
    source=transactions,
    sql="""
        SELECT user_id, created_at AS feature_time,
        SUM(amount) OVER (
            PARTITION BY user_id ORDER BY created_at
            RANGE INTERVAL 30 DAYS PRECEDING
        ) AS spend_30d
        FROM {source}
    """,
    name="rolling_spend_30d",
    embargo="1d",
)
spend = timefence.Feature(
    source=transactions,
    sql=Path("features/rolling_spend.sql"),
)
def compute_score(conn, source_table):
    return conn.sql(f"""
        SELECT user_id, created_at AS feature_time,
               raw_score * 2.5 AS adjusted_score
        FROM {source_table}
    """)

score = timefence.Feature(source=transactions, transform=compute_score)