Download Resources

Resources represent steps in a dataset preparation pipeline. They form a directed acyclic graph (DAG) where each resource can depend on other resources.

Key concepts:

  • Two-path system: resources write to transient_path during download, then the framework moves data to path and marks the resource as COMPLETE.

  • Three states: NONE, PARTIAL, COMPLETE (persisted in .state.json)

  • Transient resources: intermediate resources that can be deleted after all dependents are COMPLETE (eager cleanup)

Resource Hierarchy

Resource (ABC)
├── FileResource      — produces a single file
├── FolderResource    — produces a directory
│   └── FilesCopy     — copies files from another resource
├── ValueResource     — produces an in-memory value (no files)
├── reference         — references another dataset
└── Download          — (deprecated alias for Resource)

Resource Base Class

class datamaestro.download.Resource(varname: str | None = None, *, transient: bool = False)

Base class for all dataset resources.

A resource represents a single step in a dataset preparation pipeline. Resources form a DAG: each resource declares its dependencies, and the orchestrator ensures they are processed in topological order.

Usage modes:

  1. Class attribute (preferred):

    @dataset(url="...")
    class MyDataset(Base):
        DATA = filedownloader("data.csv", "http://...", transient=True)
        PROCESSED = SomeProcessor.from_file(DATA)
    
  2. Decorator on function (deprecated, backward compat):

    @filedownloader("data.csv", "http://...")
    @dataset(Base)
    def my_dataset(data): ...
    

Two-path system:

  • transient_path: where download/processing writes data

  • path: final location after successful completion

The framework moves data from transient_pathpath and then marks the resource as COMPLETE. Subclass download() implementations should always write to transient_path.

State is persisted in a metadata file at:

<dataset.datapath>/.downloads/.state.json
classmethod apply(*args, **kwargs) Resource

Factory classmethod for creating resource instances.

Allows defining shorthand factory functions:

filedownloader = FileDownloader.apply

Default implementation: return cls(*args, **kwargs) Subclasses may override for custom argument handling.

bind(name: str, dataset: AbstractDataset) None

Bind this resource to a dataset.

Called by the dataset class machinery during initialization. Sets self.name (if not explicitly set via varname) and self.dataset. Registers the resource in dataset.resources and dataset.ordered_resources.

For class-based datasets: called by @dataset when it processes class attributes. For decorator-based: called by annotate() (existing protocol).

property can_recover: bool

Whether partial downloads can be resumed.

When True and state is PARTIAL, existing data at transient_path is preserved on error, allowing the next download() call to resume from where it left off.

When False and state is PARTIAL, data at transient_path is deleted and state is reset to NONE.

Default: False. Subclasses override to enable recovery.

cleanup() None

Remove this resource’s data from disk.

Called automatically for transient resources after all dependents reach COMPLETE (eager cleanup).

Default implementation:

  • Deletes self.path (file or directory)

  • Deletes self.transient_path if it exists

  • Sets self.state = NONE

Subclasses may override for custom cleanup.

property dependencies: list[Resource]

Resources that must be COMPLETE before this one can process.

Populated from constructor arguments. Subclasses with factory methods should pass dependency resources to __init__ and store them in _dependencies.

property dependents: list[Resource]

Resources that depend on this one (inverse of dependencies).

Computed by the dataset after all resources are bound. Used for eager transient cleanup decisions.

abstractmethod download(force: bool = False) None

Execute this resource’s download/processing step.

Contract:

  • Called only when all dependencies are COMPLETE.

  • Must write output to self.transient_path.

  • The framework handles moving transient_path → path and setting state to COMPLETE after this returns.

  • If force=True, re-execute even if already COMPLETE.

Note: State management (COMPLETE/PARTIAL/NONE transitions, moving transient_path → path) is handled by the framework, NOT by the download() implementation.

Raises:

Exception – On download/processing failure. The framework will handle PARTIAL state based on can_recover.

has_files() bool

Whether this resource produces files on disk.

Returns False for reference-only resources (e.g., links to other datasets, in-memory values). Default: True.

property path: Path

Final storage path for this resource’s data.

This is where data lives after successful completion. Default: dataset.datapath / self.name

Subclasses may override to customize (e.g., add file extension).

abstractmethod prepare()

Return the value for dataset construction.

Called after download() has completed (state is COMPLETE). Return type depends on the resource subclass:

  • FileResource → Path

  • FolderResource → Path

  • ValueResource → resource-specific

For backward compat with function-based datasets, this value is passed as a keyword argument to the dataset function.

property state: ResourceState

Current state, read from the metadata file.

If no metadata entry exists, returns NONE.

property transient_path: Path

Temporary path where download/processing writes data.

During download(), subclasses write to this path. After successful download, the framework moves the data from transient_path to path, then marks state as COMPLETE.

Default: dataset.datapath / ".downloads" / self.name

ResourceState

class datamaestro.download.ResourceState(value)

State of a resource in the preparation pipeline.

COMPLETE = 'complete'

Fully available.

NONE = 'none'

Not started / no data on disk.

PARTIAL = 'partial'

Started but incomplete (error during download).

FileResource

class datamaestro.download.FileResource(filename: str, *, varname: str | None = None, transient: bool = False)

A resource that produces a single file on disk.

Subclasses implement _download() to produce the file at the given destination (which is self.transient_path).

download(force: bool = False) None

Downloads the file.

Delegates to _download(self.transient_path).

property path: Path

Final path to the produced file.

dataset.datapath / self.filename

prepare() Path

Returns self.path.

stream() IO[bytes] | None

Return a readable byte stream of the file content.

Returns None if streaming is not supported for this resource. Default: returns None. Subclasses may override.

This allows downstream resources to consume data without needing the file to be fully materialized on disk first.

property transient_path: Path

Temporary path for writing during download.

dataset.datapath / ".downloads" / self.filename

FolderResource

class datamaestro.download.FolderResource(varname: str | None = None, *, transient: bool = False)

A resource that produces a directory on disk.

Subclasses implement _download() to populate the directory at the given destination (which is self.transient_path).

download(force: bool = False) None

Downloads/extracts the directory content to transient_path.

property path: Path

Final path to the produced directory.

dataset.datapath / self.name

prepare() Path

Returns self.path.

property transient_path: Path

Temporary path for writing during download.

dataset.datapath / ".downloads" / self.name

ValueResource

class datamaestro.download.ValueResource(varname: str | None = None, *, transient: bool = False)

A resource that produces an in-memory value (no files on disk).

Used for resources like HuggingFace dataset handles that don’t produce local files. The transient_path/path two-path system is not used; state tracking is still via metadata file.

has_files() bool

Whether this resource produces files on disk.

Returns False for reference-only resources (e.g., links to other datasets, in-memory values). Default: True.

abstractmethod prepare()

Return the in-memory value.

Defining Resources (Modern API)

Resources are defined as class attributes on dataset classes. The framework automatically detects them and builds the dependency graph.

Single Files

Package: datamaestro.download.single

Use FileDownloader for single file downloads:

from datamaestro.download.single import FileDownloader
from datamaestro.definitions import AbstractDataset, dataset

@dataset(url="http://example.com")
class MyDataset(CSVData):
    DATA = FileDownloader("data.csv", "http://example.com/data.csv")

    @classmethod
    def __create_dataset__(cls, dataset: AbstractDataset):
        return cls.C(path=cls.DATA.path)
class datamaestro.download.single.FileDownloader(filename: str, url: str, size: int | None = None, transforms: Transform | None = None, checker=None, *, varname: str | None = None, transient: bool = False)

Downloads a single file from a URL.

Supports optional transforms (e.g., gzip decompression) and integrity checking.

Usage as class attribute (preferred):

@dataset(url="...")
class MyDataset(Base):
    DATA = FileDownloader.apply(
        "data.csv", "http://example.com/data.csv.gz"
    )

Usage as decorator (deprecated):

@filedownloader("data.csv", "http://example.com/data.csv.gz")
@dataset(Base)
def my_dataset(data): ...

Automatic Decompression:

Files with .gz or .bz2 extensions are automatically decompressed:

# Downloads and decompresses to data.txt
DATA = FileDownloader(
    "data.txt", "http://example.com/data.txt.gz"
)

ConcatDownloader

Downloads multiple files and concatenates them:

class datamaestro.download.single.ConcatDownloader(filename: str, url: str, transforms=None, *, varname: str | None = None, transient: bool = False)

Concatenate all files from an archive into a single file.

Usage as class attribute (preferred):

@dataset(url="...")
class MyDataset(Base):
    DATA = ConcatDownloader.apply(
        "data.txt", "http://example.com/data.tar.gz"
    )
COMBINED = ConcatDownloader(
    "combined.txt",
    "http://example.com/part1.txt",
    "http://example.com/part2.txt",
    "http://example.com/part3.txt",
)

Archives

Package: datamaestro.download.archive

Archive resources extract archives and produce a directory.

ZipDownloader

Downloads and extracts ZIP archives:

class datamaestro.download.archive.ZipDownloader(varname: str, url: str, subpath: str | None = None, checker: FileChecker | None = None, files: Set[str] | None = None, *, transient: bool = False)

ZIP Archive handler.

from datamaestro.download.archive import ZipDownloader

@dataset(url="http://example.com")
class MyDataset(MyData):
    DATA = ZipDownloader("data", "http://example.com/archive.zip")

    @classmethod
    def __create_dataset__(cls, dataset: AbstractDataset):
        return cls.C(path=cls.DATA.path / "file.csv")

Parameters:

  • varname: Resource name

  • url: URL to the ZIP file

  • files: Optional list of files to extract (default: all)

  • subpath: Extract only a subdirectory

TarDownloader

Downloads and extracts TAR archives (including .tar.gz, .tar.bz2):

class datamaestro.download.archive.TarDownloader(varname: str, url: str, subpath: str | None = None, checker: FileChecker | None = None, files: Set[str] | None = None, *, transient: bool = False)

TAR archive handler.

from datamaestro.download.archive import TarDownloader

@dataset(url="http://example.com")
class MyDataset(MyData):
    DATA = TarDownloader(
        "data", "http://example.com/archive.tar.gz"
    )

HuggingFace Integration

Package: datamaestro.download.huggingface

For datasets hosted on HuggingFace Hub:

class datamaestro.download.huggingface.HFDownloader(varname: str, repo_id: str, *, name: str | None = None, data_files: str | None = None, split: str | None = None, streaming: bool = False, local_path: Path | str | None = None, transient: bool = False)

Load a dataset from the HuggingFace Hub.

Usage as class attribute (preferred):

@dataset(url="...")
class MyDataset(Base):
    DATA = HFDownloader.apply(
        "hf_data", repo_id="user/dataset"
    )

Usage as decorator (deprecated):

@hf_download("hf_data", repo_id="user/dataset")
@dataset(Base)
def my_dataset(hf_data): ...
from datamaestro.download.huggingface import HFDownloader

@dataset(url="https://huggingface.co/datasets/squad")
class Squad(QADataset):
    HF_DATA = HFDownloader("squad_data", "squad")

Internet Archive (Wayback Machine)

Package: datamaestro.download.wayback

For datasets that are no longer available at their original URLs:

from datamaestro.download.wayback import wayback_documents

@dataset(url="http://example.com")
class ArchivedDataset(MyData):
    DATA = wayback_documents(
        "data", "http://defunct-website.com/data.csv",
        timestamp="20200101"
    )

Custom Downloads

Package: datamaestro.download.custom

For complex download logic that doesn’t fit standard patterns:

class datamaestro.download.custom.Downloader(*args, **kwargs)
datamaestro.download.custom.custom_download(varname: str, downloader: Downloader, *, transient: bool = False)

A resource that delegates to a user-defined download function.

Usage as class attribute (preferred):

@dataset(url="...")
class MyDataset(Base):
    DATA = custom_download(
        "data", downloader=my_download_fn
    )

Usage as decorator (deprecated):

@custom_download("data", downloader=my_download_fn)
@dataset(Base)
def my_dataset(data): ...

reference

References another dataset instead of downloading:

class datamaestro.download.reference(reference=None, *, varname=None)

References another dataset instead of downloading.

Usage:

# Positional form (preferred):
DOCS = reference(Documents)

# Keyword form:
DOCS = reference(reference=Documents)

# With explicit varname (rarely needed — auto-set from attribute name):
DOCS = reference(Documents, varname="docs")

In the config() method, call .config() (or .prepare()) to obtain the referenced dataset’s prepared configuration:

def config(self) -> Adhoc:
    return Adhoc.C(documents=self.DOCS.config())
from datamaestro.download import reference

@dataset(url="http://example.com")
class DerivedDataset(MyData):
    BASE = reference("base", reference=other_dataset)

FilesCopy

Package: datamaestro.download

Copies specific files from a source resource (typically a transient archive) into a persistent folder. This is useful when you only need a few files from a large archive and want the archive to be cleaned up after extraction.

class datamaestro.download.FilesCopy(source: Resource, files: dict[str, str])

Copies files from a source resource into a persistent folder.

Used to preserve specific files from a transient resource (e.g. an archive) before it gets cleaned up.

Usage:

@dataset()
class MyDataset(Dataset):
    ARCHIVE = ZipDownloader("data", url, transient=True)
    files = FilesCopy(ARCHIVE, {
        "queries.jsonl": "queries.jsonl",
        "train.tsv": "qrels/train.tsv",
        "test.tsv": "qrels/test.tsv",
    })

    def config(self):
        return MyType.C(
            queries=self.files.path / "queries.jsonl",
            qrels=self.files.path / "train.tsv",
        )
from datamaestro.download.archive import ZipDownloader
from datamaestro.download import FilesCopy
from datamaestro.definitions import Dataset, dataset

@dataset(url="http://example.com")
class MyDataset(Dataset):
    # Archive is transient — deleted after FilesCopy completes
    ARCHIVE = ZipDownloader("data", "http://example.com/data.zip", transient=True)

    # Copy only the files we need
    FILES = FilesCopy(ARCHIVE, {
        "queries.jsonl": "queries.jsonl",
        "train.tsv": "qrels/train.tsv",
        "test.tsv": "qrels/test.tsv",
    })

    def config(self):
        return MyType.C(
            queries=self.FILES.path / "queries.jsonl",
            train_qrels=self.FILES.path / "train.tsv",
        )

Parameters:

  • source: The source resource whose path contains the files to copy.

  • files: A mapping of {dest_filename: relative_src_path}. Each entry copies source.path / relative_src_path to self.path / dest_filename.

FilesCopy automatically declares a dependency on source, so the source is always downloaded and completed before the copy runs.

Transient Resources & Pipelines

Resources can be marked as transient, meaning their data can be deleted after all downstream dependents reach COMPLETE state. This is useful for intermediate files in processing pipelines.

from datamaestro.download.single import FileDownloader

@dataset(url="http://example.com")
class ProcessedDataset(MyData):
    # Raw download — deleted after processing completes
    RAW = FileDownloader(
        "raw.gz", "http://example.com/data.gz",
        transient=True,
    )
    # Processed output — kept permanently
    PROCESSED = MyProcessor.from_file(RAW)

    @classmethod
    def __create_dataset__(cls, dataset: AbstractDataset):
        return cls.C(path=cls.PROCESSED.path)

Creating Custom Resource Handlers

Extend the download system by subclassing FileResource, FolderResource, or ValueResource:

from datamaestro.download import FileResource

class MyProcessor(FileResource):
    """Process a source file into a numpy array."""

    @property
    def can_recover(self) -> bool:
        return False

    def __init__(self, filename, source, **kw):
        super().__init__(filename, **kw)
        self._dependencies = [source]

    def _download(self, destination):
        source_path = self.dependencies[0].path
        data = load(source_path)
        save(process(data), destination)

    @classmethod
    def from_source(cls, source):
        return cls("processed.npy", source)

# Factory alias
my_processor = MyProcessor.from_source

The _download(destination) method receives self.transient_path as destination. After it returns, the framework moves data from transient_path to path and marks the resource as COMPLETE.

File Validation

Package: datamaestro.utils

Validate downloaded files with checksums:

class datamaestro.utils.FileChecker

Checks a file

class datamaestro.utils.HashCheck(hashstr: str, hasherfn=<built-in function openssl_md5>)
__init__(hashstr: str, hasherfn=<built-in function openssl_md5>)

Check a file against a hash

Parameters:
  • hashstr – The HASH value

  • hasherfn – The hash computer, defaults to hashlib.md5

from datamaestro.download.single import FileDownloader
from datamaestro.utils import HashCheck

DATA = FileDownloader(
    "data.csv",
    "http://example.com/data.csv",
    checker=HashCheck("sha256", "abc123def456...")
)

Supported hash algorithms: md5, sha1, sha256, sha512

Two-Path Download Flow

The framework orchestrates the download process for each resource:

  1. COMPLETE and not force — skip (no-op)

  2. PARTIAL and not can_recover — delete transient_path, set NONE

  3. PARTIAL and can_recover — leave transient_path in place for resumption

  4. Call resource.download(force) — resource writes to transient_path

  5. On success — move transient_pathpath, set COMPLETE

  6. On failure — if can_recover, set PARTIAL; otherwise delete and set NONE

  7. Eager cleanup — for each transient dependency where all dependents are COMPLETE, call cleanup()

State Metadata File

Resource states are persisted in <dataset.datapath>/.downloads/.state.json:

{
  "version": 1,
  "resources": {
    "TRAIN_IMAGES": {"state": "complete"},
    "TRAIN_LABELS": {"state": "partial"}
  }
}

Deprecated: Download Decorators

Deprecated since version The: decorator-based API still works but emits deprecation warnings. Migrate to the class-attribute approach described above.

Download decorators are applied above the @dataset decorator and pass downloaded file paths as arguments to the dataset function.

from datamaestro.download.single import filedownloader
from datamaestro.definitions import dataset

# DEPRECATED — use class-attribute approach instead
@filedownloader("data", "http://example.com/data.csv")
@dataset(MyData)
def my_dataset(data):  # 'data' receives the downloaded Path
    return MyData(path=data)

filedownloader (decorator)

# DEPRECATED
@filedownloader("data.csv", "http://example.com/data.csv")
@dataset(MyData)
def compressed_dataset(data):
    return MyData(path=data)

concatdownload (decorator)

# DEPRECATED
@concatdownload(
    "combined",
    "http://example.com/part1.txt",
    "http://example.com/part2.txt",
)
@dataset(MyData)
def concatenated_dataset(combined):
    return MyData(path=combined)

zipdownloader / tardownloader (decorator)

from datamaestro.download.archive import zipdownloader, tardownloader

# DEPRECATED
@zipdownloader("data", "http://example.com/archive.zip")
@dataset(MyData)
def zipped_dataset(data):
    return MyData(path=data / "file.csv")

# DEPRECATED
@tardownloader("data", "http://example.com/archive.tar.gz")
@dataset(MyData)
def tar_dataset(data):
    return MyData(path=data / "file.csv")

Multiple decorators (deprecated)

# DEPRECATED
@filedownloader("train", "http://example.com/train.csv")
@filedownloader("test", "http://example.com/test.csv")
@dataset(MyData)
def multi_resource_dataset(train, test):
    return MyData(train_path=train, test_path=test)

Custom handler (deprecated)

from datamaestro.download import Download

# DEPRECATED — use FileResource / FolderResource instead
class MyDownload(Download):
    def __init__(self, varname, custom_param):
        super().__init__(varname)
        self.custom_param = custom_param

    def prepare(self):
        return self._download_and_process()

    def download(self, force=False):
        if force or not self._is_cached():
            self._do_download()

    def hasfiles(self) -> bool:
        return True

Deprecated Names

Deprecated

Replacement

Download (base class)

Resource

hasfiles()

has_files()

Resource.definition

Resource.dataset

Resource.varname

Resource.name

@filedownloader(...) (decorator)

FileDownloader(...) (class attribute)

SingleDownload

FileDownloader