Download Resources
Resources represent steps in a dataset preparation pipeline. They form a directed acyclic graph (DAG) where each resource can depend on other resources.
Key concepts:
Two-path system: resources write to
transient_pathduring download, then the framework moves data topathand marks the resource as COMPLETE.Three states: NONE, PARTIAL, COMPLETE (persisted in
.state.json)Transient resources: intermediate resources that can be deleted after all dependents are COMPLETE (eager cleanup)
Resource Hierarchy
Resource (ABC)
├── FileResource — produces a single file
├── FolderResource — produces a directory
│ └── FilesCopy — copies files from another resource
├── ValueResource — produces an in-memory value (no files)
├── reference — references another dataset
└── Download — (deprecated alias for Resource)
Resource Base Class
- class datamaestro.download.Resource(varname: str | None = None, *, transient: bool = False)
Base class for all dataset resources.
A resource represents a single step in a dataset preparation pipeline. Resources form a DAG: each resource declares its dependencies, and the orchestrator ensures they are processed in topological order.
Usage modes:
Class attribute (preferred):
@dataset(url="...") class MyDataset(Base): DATA = filedownloader("data.csv", "http://...", transient=True) PROCESSED = SomeProcessor.from_file(DATA)
Decorator on function (deprecated, backward compat):
@filedownloader("data.csv", "http://...") @dataset(Base) def my_dataset(data): ...
Two-path system:
transient_path: where download/processing writes datapath: final location after successful completion
The framework moves data from
transient_path→pathand then marks the resource as COMPLETE. Subclassdownload()implementations should always write totransient_path.State is persisted in a metadata file at:
<dataset.datapath>/.downloads/.state.json
- classmethod apply(*args, **kwargs) Resource
Factory classmethod for creating resource instances.
Allows defining shorthand factory functions:
filedownloader = FileDownloader.apply
Default implementation:
return cls(*args, **kwargs)Subclasses may override for custom argument handling.
- bind(name: str, dataset: AbstractDataset) None
Bind this resource to a dataset.
Called by the dataset class machinery during initialization. Sets self.name (if not explicitly set via varname) and self.dataset. Registers the resource in dataset.resources and dataset.ordered_resources.
For class-based datasets: called by
@datasetwhen it processes class attributes. For decorator-based: called byannotate()(existing protocol).
- property can_recover: bool
Whether partial downloads can be resumed.
When True and state is PARTIAL, existing data at transient_path is preserved on error, allowing the next download() call to resume from where it left off.
When False and state is PARTIAL, data at transient_path is deleted and state is reset to NONE.
Default: False. Subclasses override to enable recovery.
- cleanup() None
Remove this resource’s data from disk.
Called automatically for transient resources after all dependents reach COMPLETE (eager cleanup).
Default implementation:
Deletes self.path (file or directory)
Deletes self.transient_path if it exists
Sets self.state = NONE
Subclasses may override for custom cleanup.
- property dependencies: list[Resource]
Resources that must be COMPLETE before this one can process.
Populated from constructor arguments. Subclasses with factory methods should pass dependency resources to
__init__and store them in_dependencies.
- property dependents: list[Resource]
Resources that depend on this one (inverse of dependencies).
Computed by the dataset after all resources are bound. Used for eager transient cleanup decisions.
- abstractmethod download(force: bool = False) None
Execute this resource’s download/processing step.
Contract:
Called only when all dependencies are COMPLETE.
Must write output to
self.transient_path.The framework handles moving transient_path → path and setting state to COMPLETE after this returns.
If force=True, re-execute even if already COMPLETE.
Note: State management (COMPLETE/PARTIAL/NONE transitions, moving transient_path → path) is handled by the framework, NOT by the download() implementation.
- Raises:
Exception – On download/processing failure. The framework will handle PARTIAL state based on can_recover.
- has_files() bool
Whether this resource produces files on disk.
Returns False for reference-only resources (e.g., links to other datasets, in-memory values). Default: True.
- property path: Path
Final storage path for this resource’s data.
This is where data lives after successful completion. Default:
dataset.datapath / self.nameSubclasses may override to customize (e.g., add file extension).
- abstractmethod prepare()
Return the value for dataset construction.
Called after download() has completed (state is COMPLETE). Return type depends on the resource subclass:
FileResource → Path
FolderResource → Path
ValueResource → resource-specific
For backward compat with function-based datasets, this value is passed as a keyword argument to the dataset function.
- property state: ResourceState
Current state, read from the metadata file.
If no metadata entry exists, returns NONE.
- property transient_path: Path
Temporary path where download/processing writes data.
During download(), subclasses write to this path. After successful download, the framework moves the data from transient_path to path, then marks state as COMPLETE.
Default:
dataset.datapath / ".downloads" / self.name
ResourceState
FileResource
- class datamaestro.download.FileResource(filename: str, *, varname: str | None = None, transient: bool = False)
A resource that produces a single file on disk.
Subclasses implement
_download()to produce the file at the given destination (which isself.transient_path).- download(force: bool = False) None
Downloads the file.
Delegates to
_download(self.transient_path).
- stream() IO[bytes] | None
Return a readable byte stream of the file content.
Returns None if streaming is not supported for this resource. Default: returns None. Subclasses may override.
This allows downstream resources to consume data without needing the file to be fully materialized on disk first.
FolderResource
ValueResource
- class datamaestro.download.ValueResource(varname: str | None = None, *, transient: bool = False)
A resource that produces an in-memory value (no files on disk).
Used for resources like HuggingFace dataset handles that don’t produce local files. The transient_path/path two-path system is not used; state tracking is still via metadata file.
- has_files() bool
Whether this resource produces files on disk.
Returns False for reference-only resources (e.g., links to other datasets, in-memory values). Default: True.
- abstractmethod prepare()
Return the in-memory value.
Defining Resources (Modern API)
Resources are defined as class attributes on dataset classes. The framework automatically detects them and builds the dependency graph.
Single Files
Package: datamaestro.download.single
Use FileDownloader for single file downloads:
from datamaestro.download.single import FileDownloader
from datamaestro.definitions import AbstractDataset, dataset
@dataset(url="http://example.com")
class MyDataset(CSVData):
DATA = FileDownloader("data.csv", "http://example.com/data.csv")
@classmethod
def __create_dataset__(cls, dataset: AbstractDataset):
return cls.C(path=cls.DATA.path)
- class datamaestro.download.single.FileDownloader(filename: str, url: str, size: int | None = None, transforms: Transform | None = None, checker=None, *, varname: str | None = None, transient: bool = False)
Downloads a single file from a URL.
Supports optional transforms (e.g., gzip decompression) and integrity checking.
Usage as class attribute (preferred):
@dataset(url="...") class MyDataset(Base): DATA = FileDownloader.apply( "data.csv", "http://example.com/data.csv.gz" )
Usage as decorator (deprecated):
@filedownloader("data.csv", "http://example.com/data.csv.gz") @dataset(Base) def my_dataset(data): ...
Automatic Decompression:
Files with .gz or .bz2 extensions are automatically decompressed:
# Downloads and decompresses to data.txt
DATA = FileDownloader(
"data.txt", "http://example.com/data.txt.gz"
)
ConcatDownloader
Downloads multiple files and concatenates them:
- class datamaestro.download.single.ConcatDownloader(filename: str, url: str, transforms=None, *, varname: str | None = None, transient: bool = False)
Concatenate all files from an archive into a single file.
Usage as class attribute (preferred):
@dataset(url="...") class MyDataset(Base): DATA = ConcatDownloader.apply( "data.txt", "http://example.com/data.tar.gz" )
COMBINED = ConcatDownloader(
"combined.txt",
"http://example.com/part1.txt",
"http://example.com/part2.txt",
"http://example.com/part3.txt",
)
Archives
Package: datamaestro.download.archive
Archive resources extract archives and produce a directory.
ZipDownloader
Downloads and extracts ZIP archives:
- class datamaestro.download.archive.ZipDownloader(varname: str, url: str, subpath: str | None = None, checker: FileChecker | None = None, files: Set[str] | None = None, *, transient: bool = False)
ZIP Archive handler.
from datamaestro.download.archive import ZipDownloader
@dataset(url="http://example.com")
class MyDataset(MyData):
DATA = ZipDownloader("data", "http://example.com/archive.zip")
@classmethod
def __create_dataset__(cls, dataset: AbstractDataset):
return cls.C(path=cls.DATA.path / "file.csv")
Parameters:
varname: Resource nameurl: URL to the ZIP filefiles: Optional list of files to extract (default: all)subpath: Extract only a subdirectory
TarDownloader
Downloads and extracts TAR archives (including .tar.gz, .tar.bz2):
- class datamaestro.download.archive.TarDownloader(varname: str, url: str, subpath: str | None = None, checker: FileChecker | None = None, files: Set[str] | None = None, *, transient: bool = False)
TAR archive handler.
from datamaestro.download.archive import TarDownloader
@dataset(url="http://example.com")
class MyDataset(MyData):
DATA = TarDownloader(
"data", "http://example.com/archive.tar.gz"
)
HuggingFace Integration
Package: datamaestro.download.huggingface
For datasets hosted on HuggingFace Hub:
- class datamaestro.download.huggingface.HFDownloader(varname: str, repo_id: str, *, name: str | None = None, data_files: str | None = None, split: str | None = None, streaming: bool = False, local_path: Path | str | None = None, transient: bool = False)
Load a dataset from the HuggingFace Hub.
Usage as class attribute (preferred):
@dataset(url="...") class MyDataset(Base): DATA = HFDownloader.apply( "hf_data", repo_id="user/dataset" )
Usage as decorator (deprecated):
@hf_download("hf_data", repo_id="user/dataset") @dataset(Base) def my_dataset(hf_data): ...
from datamaestro.download.huggingface import HFDownloader
@dataset(url="https://huggingface.co/datasets/squad")
class Squad(QADataset):
HF_DATA = HFDownloader("squad_data", "squad")
Links
Package: datamaestro.download.links
Links reference other datasets or external data folders.
from datamaestro.download.links import links
@dataset(url="http://example.com")
class ExtendedDataset(ExtendedData):
BASE = links("base", "com.example.base_dataset")
@classmethod
def __create_dataset__(cls, dataset: AbstractDataset):
return cls.C(base=cls.BASE.prepare())
linkfolder
Link to a configured data folder:
from datamaestro.download.links import linkfolder
@dataset(url="http://example.com")
class ExternalDataset(MyData):
DATA = linkfolder("data", "mydata")
linkfile
Link to a specific file in a data folder:
from datamaestro.download.links import linkfile
@dataset(url="http://example.com")
class SpecificFile(MyData):
CSV = linkfile("csvfile", "mydata", "subdir/data.csv")
Internet Archive (Wayback Machine)
Package: datamaestro.download.wayback
For datasets that are no longer available at their original URLs:
from datamaestro.download.wayback import wayback_documents
@dataset(url="http://example.com")
class ArchivedDataset(MyData):
DATA = wayback_documents(
"data", "http://defunct-website.com/data.csv",
timestamp="20200101"
)
Custom Downloads
Package: datamaestro.download.custom
For complex download logic that doesn’t fit standard patterns:
- class datamaestro.download.custom.Downloader(*args, **kwargs)
- datamaestro.download.custom.custom_download(varname: str, downloader: Downloader, *, transient: bool = False)
A resource that delegates to a user-defined download function.
Usage as class attribute (preferred):
@dataset(url="...") class MyDataset(Base): DATA = custom_download( "data", downloader=my_download_fn )
Usage as decorator (deprecated):
@custom_download("data", downloader=my_download_fn) @dataset(Base) def my_dataset(data): ...
reference
References another dataset instead of downloading:
- class datamaestro.download.reference(reference=None, *, varname=None)
References another dataset instead of downloading.
Usage:
# Positional form (preferred): DOCS = reference(Documents) # Keyword form: DOCS = reference(reference=Documents) # With explicit varname (rarely needed — auto-set from attribute name): DOCS = reference(Documents, varname="docs")
In the
config()method, call.config()(or.prepare()) to obtain the referenced dataset’s prepared configuration:def config(self) -> Adhoc: return Adhoc.C(documents=self.DOCS.config())
FilesCopy
Package: datamaestro.download
Copies specific files from a source resource (typically a transient archive) into a persistent folder. This is useful when you only need a few files from a large archive and want the archive to be cleaned up after extraction.
- class datamaestro.download.FilesCopy(source: Resource, files: dict[str, str])
Copies files from a source resource into a persistent folder.
Used to preserve specific files from a transient resource (e.g. an archive) before it gets cleaned up.
Usage:
@dataset() class MyDataset(Dataset): ARCHIVE = ZipDownloader("data", url, transient=True) files = FilesCopy(ARCHIVE, { "queries.jsonl": "queries.jsonl", "train.tsv": "qrels/train.tsv", "test.tsv": "qrels/test.tsv", }) def config(self): return MyType.C( queries=self.files.path / "queries.jsonl", qrels=self.files.path / "train.tsv", )
from datamaestro.download.archive import ZipDownloader
from datamaestro.download import FilesCopy
from datamaestro.definitions import Dataset, dataset
@dataset(url="http://example.com")
class MyDataset(Dataset):
# Archive is transient — deleted after FilesCopy completes
ARCHIVE = ZipDownloader("data", "http://example.com/data.zip", transient=True)
# Copy only the files we need
FILES = FilesCopy(ARCHIVE, {
"queries.jsonl": "queries.jsonl",
"train.tsv": "qrels/train.tsv",
"test.tsv": "qrels/test.tsv",
})
def config(self):
return MyType.C(
queries=self.FILES.path / "queries.jsonl",
train_qrels=self.FILES.path / "train.tsv",
)
Parameters:
source: The source resource whosepathcontains the files to copy.files: A mapping of{dest_filename: relative_src_path}. Each entry copiessource.path / relative_src_pathtoself.path / dest_filename.
FilesCopy automatically declares a dependency on source, so the source
is always downloaded and completed before the copy runs.
Transient Resources & Pipelines
Resources can be marked as transient, meaning their data can be deleted
after all downstream dependents reach COMPLETE state. This is useful for
intermediate files in processing pipelines.
from datamaestro.download.single import FileDownloader
@dataset(url="http://example.com")
class ProcessedDataset(MyData):
# Raw download — deleted after processing completes
RAW = FileDownloader(
"raw.gz", "http://example.com/data.gz",
transient=True,
)
# Processed output — kept permanently
PROCESSED = MyProcessor.from_file(RAW)
@classmethod
def __create_dataset__(cls, dataset: AbstractDataset):
return cls.C(path=cls.PROCESSED.path)
Creating Custom Resource Handlers
Extend the download system by subclassing FileResource, FolderResource,
or ValueResource:
from datamaestro.download import FileResource
class MyProcessor(FileResource):
"""Process a source file into a numpy array."""
@property
def can_recover(self) -> bool:
return False
def __init__(self, filename, source, **kw):
super().__init__(filename, **kw)
self._dependencies = [source]
def _download(self, destination):
source_path = self.dependencies[0].path
data = load(source_path)
save(process(data), destination)
@classmethod
def from_source(cls, source):
return cls("processed.npy", source)
# Factory alias
my_processor = MyProcessor.from_source
The _download(destination) method receives self.transient_path as
destination. After it returns, the framework moves data from
transient_path to path and marks the resource as COMPLETE.
File Validation
Package: datamaestro.utils
Validate downloaded files with checksums:
- class datamaestro.utils.FileChecker
Checks a file
from datamaestro.download.single import FileDownloader
from datamaestro.utils import HashCheck
DATA = FileDownloader(
"data.csv",
"http://example.com/data.csv",
checker=HashCheck("sha256", "abc123def456...")
)
Supported hash algorithms: md5, sha1, sha256, sha512
Two-Path Download Flow
The framework orchestrates the download process for each resource:
COMPLETE and not force — skip (no-op)
PARTIAL and not can_recover — delete
transient_path, set NONEPARTIAL and can_recover — leave
transient_pathin place for resumptionCall
resource.download(force)— resource writes totransient_pathOn success — move
transient_path→path, set COMPLETEOn failure — if
can_recover, set PARTIAL; otherwise delete and set NONEEager cleanup — for each transient dependency where all dependents are COMPLETE, call
cleanup()
State Metadata File
Resource states are persisted in <dataset.datapath>/.downloads/.state.json:
{
"version": 1,
"resources": {
"TRAIN_IMAGES": {"state": "complete"},
"TRAIN_LABELS": {"state": "partial"}
}
}
Deprecated: Download Decorators
Deprecated since version The: decorator-based API still works but emits deprecation warnings. Migrate to the class-attribute approach described above.
Download decorators are applied above the @dataset decorator and pass
downloaded file paths as arguments to the dataset function.
from datamaestro.download.single import filedownloader
from datamaestro.definitions import dataset
# DEPRECATED — use class-attribute approach instead
@filedownloader("data", "http://example.com/data.csv")
@dataset(MyData)
def my_dataset(data): # 'data' receives the downloaded Path
return MyData(path=data)
filedownloader (decorator)
# DEPRECATED
@filedownloader("data.csv", "http://example.com/data.csv")
@dataset(MyData)
def compressed_dataset(data):
return MyData(path=data)
concatdownload (decorator)
# DEPRECATED
@concatdownload(
"combined",
"http://example.com/part1.txt",
"http://example.com/part2.txt",
)
@dataset(MyData)
def concatenated_dataset(combined):
return MyData(path=combined)
zipdownloader / tardownloader (decorator)
from datamaestro.download.archive import zipdownloader, tardownloader
# DEPRECATED
@zipdownloader("data", "http://example.com/archive.zip")
@dataset(MyData)
def zipped_dataset(data):
return MyData(path=data / "file.csv")
# DEPRECATED
@tardownloader("data", "http://example.com/archive.tar.gz")
@dataset(MyData)
def tar_dataset(data):
return MyData(path=data / "file.csv")
Multiple decorators (deprecated)
# DEPRECATED
@filedownloader("train", "http://example.com/train.csv")
@filedownloader("test", "http://example.com/test.csv")
@dataset(MyData)
def multi_resource_dataset(train, test):
return MyData(train_path=train, test_path=test)
Custom handler (deprecated)
from datamaestro.download import Download
# DEPRECATED — use FileResource / FolderResource instead
class MyDownload(Download):
def __init__(self, varname, custom_param):
super().__init__(varname)
self.custom_param = custom_param
def prepare(self):
return self._download_and_process()
def download(self, force=False):
if force or not self._is_cached():
self._do_download()
def hasfiles(self) -> bool:
return True
Deprecated Names
Deprecated |
Replacement |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|