def build(
labels: Labels,
features: Sequence[Feature | FeatureSet],
output: str | Path | None = None,
*,
max_lookback: str | timedelta = DEFAULT_MAX_LOOKBACK,
max_staleness: str | timedelta | None = None,
join: str = "strict",
on_missing: str = DEFAULT_ON_MISSING,
splits: dict[str, tuple[str, str]] | None = None,
store: Store | None = None,
flatten_columns: bool = False,
progress: Callable[[str], None] | None = None,
) -> BuildResult:
"""Build a point-in-time correct training set.
Args:
progress: Optional callback invoked with a status message at each step.
Useful for progress bars. Called with messages like "Loading labels",
"Computing feature_name", "Joining feature_name", "Writing output".
"""
start_time = time.time()
def _emit(msg: str) -> None:
if progress is not None:
progress(msg)
max_lookback_td = parse_duration(max_lookback) or timedelta(
days=DEFAULT_MAX_LOOKBACK_DAYS
)
max_staleness_td = parse_duration(max_staleness)
if join not in ("strict", "inclusive"):
raise TimefenceConfigError(
f"join must be 'strict' or 'inclusive', got '{join}'."
)
if on_missing not in ("null", "skip"):
raise TimefenceConfigError(
f"on_missing must be 'null' or 'skip', got '{on_missing}'."
)
flat_features = flatten_features(features)
# Validate feature names are unique (both exact and after sanitization)
seen_names: dict[str, int] = {}
seen_safe: dict[str, list[str]] = {} # safe_name -> [original names]
for feat in flat_features:
seen_names[feat.name] = seen_names.get(feat.name, 0) + 1
safe = _safe_name(feat.name)
seen_safe.setdefault(safe, []).append(feat.name)
duplicates = {n: c for n, c in seen_names.items() if c > 1}
if duplicates:
dup_str = ", ".join(f"'{n}' (x{c})" for n, c in duplicates.items())
raise TimefenceConfigError(
f"Duplicate feature names: {dup_str}.\n\n"
" Each feature must have a unique name. Duplicate names would cause\n"
" one feature to silently overwrite another.\n\n"
" Fix: Set an explicit name on each feature:\n"
' timefence.Feature(..., name="unique_name")\n'
)
collisions = {s: names for s, names in seen_safe.items() if len(set(names)) > 1}
if collisions:
pairs = [f"{sorted(set(names))}" for names in collisions.values()]
raise TimefenceConfigError(
f"Feature names collide after sanitization: {', '.join(pairs)}.\n\n"
" These names are distinct but map to the same internal identifier,\n"
" which would cause one feature to silently overwrite another.\n\n"
" Fix: Rename features to avoid ambiguity (e.g., use underscores consistently).\n"
)
for feat in flat_features:
if feat.embargo >= max_lookback_td:
from timefence.errors import config_error_embargo_lookback
raise config_error_embargo_lookback(
format_duration(feat.embargo) or "0d",
format_duration(max_lookback_td) or DEFAULT_MAX_LOOKBACK,
)
if max_staleness_td is not None and max_staleness_td <= feat.embargo:
raise TimefenceConfigError(
f"max_staleness ({format_duration(max_staleness_td)}) must be greater than "
f"embargo ({format_duration(feat.embargo)}) for feature '{feat.name}'."
)
# Check build-level cache
if store is not None and output is not None:
label_hash = _content_hash_safe(labels.path, store)
feat_cache_keys = []
for feat in flat_features:
src_hash = _content_hash_safe(feat.source.path, store)
fck = store.feature_cache_key(
_definition_hash(feat), src_hash, format_duration(feat.embargo)
)
feat_cache_keys.append(fck)
bck = store.build_cache_key(
label_hash,
feat_cache_keys,
format_duration(max_lookback_td),
format_duration(max_staleness_td),
join,
on_missing,
)
cached_build = store.find_cached_build(bck)
if cached_build is not None:
elapsed = time.time() - start_time
cached_build["duration_seconds"] = elapsed
return BuildResult(
output_path=cached_build.get("output", {}).get("path"),
manifest=cached_build,
stats=BuildStats(
row_count=cached_build.get("output", {}).get("row_count", 0),
column_count=cached_build.get("output", {}).get("column_count", 0),
feature_stats={
k: {
"matched": v.get("matched_rows", 0),
"missing": v.get("missing_rows", 0),
"cached": True,
}
for k, v in cached_build.get("features", {}).items()
},
duration_seconds=elapsed,
),
sql="-- cached build",
)
conn = duckdb.connect()
all_sql = []
try:
# Step 1: Register labels
_emit("Loading labels")
if labels.path is not None:
_load_data_as_table(conn, labels.path, "__labels_raw")
elif labels.df is not None:
_load_data_as_table(conn, labels.df, "__labels_raw")
else:
raise TimefenceValidationError("Labels must have either path or df.")
# Validate label schema
label_cols = [
col[0] for col in conn.execute("DESCRIBE __labels_raw").fetchall()
]
for key in labels.keys:
if key not in label_cols:
raise TimefenceSchemaError(
f"Labels missing key column '{key}'.\n Available: {label_cols}"
)
if labels.label_time not in label_cols:
raise TimefenceSchemaError(
f"Labels missing label_time column '{labels.label_time}'.\n Available: {label_cols}"
)
# Add rowid for join tracking
conn.execute(
"CREATE TEMP TABLE __labels AS "
"SELECT ROW_NUMBER() OVER () AS __label_rowid, * FROM __labels_raw"
)
label_count = conn.execute("SELECT COUNT(*) FROM __labels").fetchone()[0]
logger.info(
"Labels: %d rows, keys=%s, label_time=%s",
label_count,
labels.keys,
labels.label_time,
)
# Get label time range for manifest
time_range_row = conn.execute(
f"SELECT MIN({_qi(labels.label_time)}), MAX({_qi(labels.label_time)}) FROM __labels"
).fetchone()
label_time_range = (
[str(time_range_row[0]), str(time_range_row[1])]
if time_range_row and time_range_row[0] is not None
else None
)
# Validate splits if provided
if splits:
_validate_splits(splits, conn, labels.label_time)
# Step 2: Register sources and compute features
registered_sources: dict[str, str] = {}
feature_tables: dict[str, tuple[str, list[str]]] = {}
feature_cache_keys: list[str] = []
feature_cache_status: dict[str, bool] = {} # name -> was_cached
for i, feat in enumerate(flat_features, 1):
_emit(f"Computing {feat.name} ({i}/{len(flat_features)})")
src_name = feat.source.name
if src_name not in registered_sources:
table_name = f"__src_{_safe_name(src_name)}"
_register_source(conn, feat.source, table_name)
registered_sources[src_name] = table_name
src_table = registered_sources[src_name]
_validate_source_schema(conn, src_table, feat, labels.keys)
_check_duplicates(conn, src_table, feat)
feat_table = f"__feat_{_safe_name(feat.name)}"
# Check feature-level cache
cached = False
fck = None
if store is not None:
src_hash = _content_hash_safe(feat.source.path, store)
fck = store.feature_cache_key(
_definition_hash(feat), src_hash, format_duration(feat.embargo)
)
feature_cache_keys.append(fck)
if store.has_feature_cache(feat.name, fck):
cache_path = store.feature_cache_path(feat.name, fck)
conn.execute(
f"CREATE OR REPLACE TEMP TABLE {feat_table} AS "
f"SELECT * FROM read_parquet({_ql(cache_path)})"
)
feat_cols = [
c[0] for c in conn.execute(f"DESCRIBE {feat_table}").fetchall()
]
output_cols = [
c
for c in feat_cols
if c != "feature_time" and c not in feat.source_keys
]
cached = True
feature_cache_status[feat.name] = True
if not cached:
feature_cache_status[feat.name] = False
feat_sqls, output_cols = _compute_feature_table(
conn, feat, src_table, feat_table
)
all_sql.extend(feat_sqls)
for s in feat_sqls:
logger.info("Feature SQL [%s]:\n %s", feat.name, s)
# Save freshly computed feature to cache
if store is not None and fck is not None:
cache_path = store.feature_cache_path(feat.name, fck)
try:
conn.execute(
f"COPY (SELECT * FROM {feat_table}) TO {_ql(cache_path)} (FORMAT PARQUET)"
)
except (duckdb.Error, OSError) as exc:
logger.warning(
"Feature cache write failed for %s: %s", feat.name, exc
)
else:
logger.info("Feature [%s]: loaded from cache", feat.name)
feature_tables[feat.name] = (feat_table, output_cols)
# Timezone validation
if output_cols:
_validate_timezones(conn, labels.label_time, feat, feat_table)
# Step 3: Point-in-time joins
for i, feat in enumerate(flat_features, 1):
_emit(f"Joining {feat.name} ({i}/{len(flat_features)})")
feat_table, output_cols = feature_tables[feat.name]
join_sql, strategy = _build_join_sql(
feat,
feat_table,
labels.keys,
labels.label_time,
join,
max_lookback_td,
max_staleness_td,
output_cols,
)
logger.info(
"Join SQL [%s] (strategy=%s):\n %s", feat.name, strategy, join_sql
)
try:
conn.execute(join_sql)
except duckdb.Error as exc:
# ASOF fallback: if ASOF fails, retry with ROW_NUMBER
if strategy == "asof":
logger.debug(
"ASOF JOIN failed for %s, falling back to ROW_NUMBER: %s",
feat.name,
exc,
)
join_sql = _build_row_number_join_sql(
feat,
feat_table,
labels.keys,
labels.label_time,
join,
max_lookback_td,
max_staleness_td,
output_cols,
)
conn.execute(join_sql)
strategy = "row_number"
else:
raise
all_sql.append(join_sql)
# Step 4: Combine all joins
key_cols = ", ".join(f"l.{_qi(k)}" for k in labels.keys)
target_cols = ", ".join(f"l.{_qi(t)}" for t in labels.target)
join_clauses = []
select_cols = [key_cols, f"l.{_qi(labels.label_time)}", target_cols]
for feat in flat_features:
prefix = feat.name
safe_prefix = _safe_name(prefix)
_, output_cols = feature_tables[feat.name]
for col in output_cols:
select_cols.append(f"j_{safe_prefix}.{_qi(f'{prefix}__{col}')}")
join_clauses.append(
f"LEFT JOIN __joined_{safe_prefix} j_{safe_prefix} "
f"ON l.__label_rowid = j_{safe_prefix}.__label_rowid"
)
order_cols = (
", ".join(f"l.{_qi(k)}" for k in labels.keys)
+ f", l.{_qi(labels.label_time)}"
)
final_sql = (
f"SELECT {', '.join(select_cols)} "
f"FROM __labels l "
f"{' '.join(join_clauses)} "
f"ORDER BY {order_cols}"
)
# Handle on_missing="skip"
if on_missing == "skip":
not_null_conditions = []
for feat in flat_features:
safe_pref = _safe_name(feat.name)
_, output_cols = feature_tables[feat.name]
for col in output_cols:
not_null_conditions.append(
f"j_{safe_pref}.{_qi(f'{feat.name}__{col}')} IS NOT NULL"
)
if not_null_conditions:
final_sql = (
f"SELECT {', '.join(select_cols)} "
f"FROM __labels l "
f"{' '.join(join_clauses)} "
f"WHERE {' AND '.join(not_null_conditions)} "
f"ORDER BY {order_cols}"
)
all_sql.append(final_sql)
logger.info("Final SQL:\n %s", final_sql)
# Flatten column names if requested
if flatten_columns:
result_rel = conn.execute(final_sql)
col_descriptions = result_rel.description
seen: set[str] = set()
can_flatten = True
for desc in col_descriptions:
name = desc[0]
short = name.split("__", 1)[1] if "__" in name else name
if short in seen:
can_flatten = False
break
seen.add(short)
if can_flatten:
renames = []
for desc in col_descriptions:
name = desc[0]
if "__" in name:
short = name.split("__", 1)[1]
renames.append(f"{_qi(name)} AS {_qi(short)}")
else:
renames.append(_qi(name))
final_sql = f"SELECT {', '.join(renames)} FROM ({final_sql})"
# Step 5: Materialize result, write output, collect stats
_emit("Writing output")
conn.execute(f"CREATE TEMP TABLE __result AS {final_sql}")
result_cols = [c[0] for c in conn.execute("DESCRIBE __result").fetchall()]
result_count = conn.execute("SELECT COUNT(*) FROM __result").fetchone()[0]
if output is not None:
output = str(output)
Path(output).parent.mkdir(parents=True, exist_ok=True)
conn.execute(
f"COPY (SELECT * FROM __result) TO {_ql(output)} (FORMAT PARQUET)"
)
feature_stats = {}
for feat in flat_features:
prefix = feat.name
_, output_cols = feature_tables[feat.name]
if output_cols:
first_col = f"{prefix}__{output_cols[0]}"
if flatten_columns and output_cols[0] in result_cols:
first_col = output_cols[0]
try:
null_count = conn.execute(
f"SELECT COUNT(*) FROM __result WHERE {_qi(first_col)} IS NULL"
).fetchone()[0]
except duckdb.Error as exc:
logger.debug(
"Could not compute null count for %s: %s", feat.name, exc
)
null_count = 0
feature_stats[feat.name] = {
"matched": result_count - null_count,
"missing": null_count,
"cached": feature_cache_status.get(feat.name, False),
}
# Step 6: Post-build verification
_emit("Verifying temporal correctness")
audit_passed = True
for feat in flat_features:
prefix = feat.name
safe_prefix = _safe_name(prefix)
ft_col = _qi(f"{prefix}__feature_time")
lt = _qi(labels.label_time)
embargo_interval = duration_to_sql_interval(feat.embargo)
if join == "strict":
if feat.embargo.total_seconds() > 0:
check_sql = (
f"SELECT COUNT(*) FROM __joined_{safe_prefix} j "
f"JOIN __labels l ON j.__label_rowid = l.__label_rowid "
f"WHERE j.{ft_col} IS NOT NULL "
f"AND j.{ft_col} >= l.{lt} - {embargo_interval}"
)
else:
check_sql = (
f"SELECT COUNT(*) FROM __joined_{safe_prefix} j "
f"JOIN __labels l ON j.__label_rowid = l.__label_rowid "
f"WHERE j.{ft_col} IS NOT NULL "
f"AND j.{ft_col} >= l.{lt}"
)
else:
if feat.embargo.total_seconds() > 0:
check_sql = (
f"SELECT COUNT(*) FROM __joined_{safe_prefix} j "
f"JOIN __labels l ON j.__label_rowid = l.__label_rowid "
f"WHERE j.{ft_col} IS NOT NULL "
f"AND j.{ft_col} > l.{lt} - {embargo_interval}"
)
else:
check_sql = (
f"SELECT COUNT(*) FROM __joined_{safe_prefix} j "
f"JOIN __labels l ON j.__label_rowid = l.__label_rowid "
f"WHERE j.{ft_col} IS NOT NULL "
f"AND j.{ft_col} > l.{lt}"
)
violations = conn.execute(check_sql).fetchone()[0]
if violations > 0:
audit_passed = False
# Handle splits
split_paths = None
if splits and output:
split_paths = {}
output_path = Path(output)
for split_name, (start, end) in splits.items():
split_file = (
output_path.parent
/ f"{output_path.stem}_{split_name}{output_path.suffix}"
)
split_sql = (
f"COPY (SELECT * FROM ({final_sql}) "
f"WHERE {_qi(labels.label_time)} >= {_ql(start)} "
f"AND {_qi(labels.label_time)} < {_ql(end)}) "
f"TO {_ql(split_file)} (FORMAT PARQUET)"
)
conn.execute(split_sql)
split_paths[split_name] = split_file
elapsed = time.time() - start_time
stats = BuildStats(
row_count=result_count,
column_count=len(result_cols),
feature_stats=feature_stats,
duration_seconds=elapsed,
)
# Compute output file size
output_file_size = None
if output is not None:
import contextlib
with contextlib.suppress(OSError):
output_file_size = Path(output).stat().st_size
# Build manifest
build_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
manifest = {
"timefence_version": __version__,
"build_id": build_id,
"created_at": datetime.now(timezone.utc).isoformat(),
"duration_seconds": elapsed,
"labels": {
"path": str(labels.path) if labels.path else None,
"content_hash": _content_hash_safe(labels.path, store),
"row_count": label_count,
"time_range": label_time_range,
"keys": labels.keys,
"label_time_column": labels.label_time,
"target_columns": labels.target,
},
"features": {},
"parameters": {
"max_lookback": format_duration(max_lookback_td),
"max_staleness": format_duration(max_staleness_td),
"join": join,
"on_missing": on_missing,
},
"output": {
"path": str(output) if output else None,
"content_hash": _content_hash_safe(
Path(output) if output else None, store
),
"row_count": result_count,
"column_count": len(result_cols),
"file_size_bytes": output_file_size,
},
"audit": {
"passed": audit_passed,
"invariant": f"feature_time {'<' if join == 'strict' else '<='} label_time - embargo",
"rows_checked": result_count,
},
"environment": {
"python_version": _python_version(),
"duckdb_version": duckdb.__version__,
"os": _os_identifier(),
},
}
for feat in flat_features:
fstats = feature_stats.get(feat.name, {})
manifest["features"][feat.name] = {
"definition_hash": _definition_hash(feat),
"source_content_hash": _content_hash_safe(feat.source.path, store),
"embargo": format_duration(feat.embargo),
"matched_rows": fstats.get("matched", 0),
"missing_rows": fstats.get("missing", 0),
"output_columns": feature_tables[feat.name][1],
"cached": feature_cache_status.get(feat.name, False),
}
# Store build cache key in manifest for future lookups
if store is not None and feature_cache_keys:
bck = store.build_cache_key(
_content_hash_safe(labels.path, store),
feature_cache_keys,
format_duration(max_lookback_td),
format_duration(max_staleness_td),
join,
on_missing,
)
manifest["build_cache_key"] = bck
manifest_path = store.save_build(manifest)
manifest["manifest_path"] = str(manifest_path)
return BuildResult(
output_path=str(output) if output else None,
manifest=manifest,
stats=stats,
splits=split_paths,
sql="\n\n".join(all_sql),
)
finally:
conn.close()