Domain-agnostic sample factory for building repeatable data pipelines in Elixir.
Forge helps you generate samples, apply staged transformations, compute measurements, and persist results for dataset creation, evaluation harnesses, enrichment jobs, and analytics workflows.
- Pipeline DSL that wires together sources, stages, measurements, and storage backends
- Two execution modes: a GenServer runner for simple batch runs and a streaming runner with backpressure and async stages
- Resilient stage execution with retry policies, error classification, and DLQ marking
- Measurement orchestration with caching, versioning, dependency resolution, and batch/async compute
- Pluggable storage (ETS, Postgres) plus content-addressed artifact storage (local filesystem, S3 stub)
- Built-in telemetry events, metric helpers, and optional OpenTelemetry tracing
- Human-in-the-loop bridge for publishing samples to Anvil with mock/direct/http adapters
- Deterministic manifests for reproducibility and drift detection
- Samples:
Forge.Samplecarriesid,pipeline,data,measurements, status (:pending,:measured,:ready,:labeled,:skipped,:dlq), and timestamps. - Pipelines:
use Forge.Pipelineto declarepipeline/2blocks withsource,stage,measurement, andstorageentries. Introspection helpers:__pipeline__/1,__pipelines__/0. - Sources: Behaviour-driven inputs; built-ins include
Forge.Source.Static(fixed list) andForge.Source.Generator(function-based). Implementinit/1,fetch/1,cleanup/1for custom data feeds. - Stages:
Forge.Stagebehaviour for per-sample transforms. Optionalasync?/0,concurrency/0, andtimeout/0guide execution.Forge.Stage.Executorapplies stages withForge.RetryPolicyandForge.ErrorClassifierto decide retries vs. DLQ. - Measurements:
Forge.Measurementbehaviour withkey/0,version/0, andcompute/1, plus optional async, batch, dependencies, and timeouts.Forge.Measurement.Orchestratorprovides cached, versioned measurement storage with dependency ordering and batch execution. - Manifests:
Forge.ManifestandForge.Manifest.Hashcapture deterministic hashes of pipeline configuration, git SHA, and secret usage for reproducibility. - Human-in-the-loop:
Forge.AnvilBridgeadapters (Mock, Direct stub, HTTP stub) publish samples to Anvil and sync labels; convert samples withsample_to_dto/2.
- GenServer runner (
Forge.Runner): Pulls all samples from the source, applies stages with retry policies, computes measurements (sync and async), optionally persists via storage, and emits telemetry. - Streaming runner (
Forge.Runner.Streaming): Lazily processes samples with optional async stages and bounded concurrency, suitable for large datasets and backpressure-aware consumers.
Define a pipeline:
defmodule MyApp.Pipelines do
use Forge.Pipeline
pipeline :narratives do
source Forge.Source.Generator,
count: 3,
generator: fn idx -> %{id: idx, text: "narrative-#{idx}"} end
stage MyApp.NormalizeStage
measurement MyApp.Measurements.Length
storage Forge.Storage.ETS, table: :narrative_samples
end
endRun it with the GenServer runner:
{:ok, runner} =
Forge.Runner.start_link(pipeline_module: MyApp.Pipelines, pipeline_name: :narratives)
samples = Forge.Runner.run(runner)
Forge.Runner.stop(runner)Stream it with backpressure instead:
stream =
MyApp.Pipelines.__pipeline__(:narratives)
|> Forge.Runner.Streaming.run(concurrency: 8)
stream |> Enum.take(10)- Implement measurement modules with unique
key/0andversion/0. Opt into batching viabatch_capable?/0andcompute_batch/1or markasync?/0for fire-and-forget. Forge.Measurement.Orchestratorcaches results inforge_measurements(Ecto) and supports dependency graphs:
{:ok, :computed, value} =
Forge.Measurement.Orchestrator.measure_sample(sample_id, MyApp.Measurements.Length, [])- Sample storage:
Forge.Storage.ETS(fast, in-memory) andForge.Storage.Postgres(durable, with lineage viaforge_samples, stage executions, measurements). Database migrations live inpriv/repo/migrations; runmix forge.setupto create and migrate. - Artifacts:
Forge.ArtifactStorage.Localstores blobs content-addressed on disk and emits telemetry;Forge.ArtifactStorage.S3provides the interface for a future ExAws-backed adapter.
- Telemetry events cover pipelines, stages, measurements, storage, and DLQ moves (
Forge.Telemetry). - Prebuilt metric specs are available in
Forge.Telemetry.Metrics. - Optional OpenTelemetry tracing via
Forge.Telemetry.OTelwhen configured. - See
docs/telemetry.mdfor event and metric details.
- Configure the Anvil bridge adapter via
config :forge, :anvil_bridge_adapter, Forge.AnvilBridge.Mock(default),Direct, orHTTP. - Publish samples or batches with
Forge.AnvilBridge.publish_sample/2andpublish_batch/2; fetch or sync labels withget_labels/1andsync_labels/2.
Add the dependency to mix.exs:
def deps do
[
{:forge_ex, "~> 0.1.1"}
]
endFor Postgres-backed features (storage, measurement orchestrator), configure Forge.Repo and run the provided migrations. Defaults use postgres/postgres on localhost; override via environment or config.
mix deps.get # Install dependencies
mix forge.setup # Create & migrate Postgres schemas (if using Repo-backed features)
mix test # Run the test suite (sets up DB via alias)
mix docs # Generate ExDoc documentationMIT License © 2024-2025 North-Shore-AI