Datasource

class dagshub.data_engine.model.datasource.Datasource(datasource: DatasourceState, query: DatasourceQuery | None = None, from_dataset: DatasetState | None = None)
clear_query(reset_to_dataset=True)

Clear the attached query.

Parameters:

reset_to_dataset – If True and this Datasource was saved as a dataset, reset to the query in the dataset, instead of clearing the query completely.

property annotation_fields: List[str]

Return all fields that have the annotation meta tag set

head(size=100, load_documents=True, load_annotations=True) QueryResult

Executes the query and returns a QueryResult object containing first size datapoints

Parameters:
  • size – how many datapoints to get. Default is 100

  • load_documents – Automatically download all document blob fields

  • load_annotations – Automatically download all annotation blob fields

all(load_documents=True, load_annotations=True) QueryResult

Executes the query and returns a QueryResult object containing all datapoints

If there’s an active MLflow run, logs an artifact with information about the query to the run.

Parameters:
  • load_documents – Automatically download all document blob fields

  • load_annotations – Automatically download all annotation blob fields

select(*selected: str | Field) Datasource

Select which fields should appear in the query result.

If you want to query older versions of metadata, use Field objects with as_of set to your desired time.

By default, only the defined fields are returned. If you want to return all existing fields plus whatever additional fields you define, add "*" into the arguments.

Parameters:

selected

Fields you want to select. Can be either of:

  • Name of the field to select: "field".

  • "*" to select all the fields in the datasource.

  • Field object.

Example:

t = datetime.now() - timedelta(hours=24)
q1 = ds.select("*", Field("size", as_of=t, alias="size_asof_24h_ago"))
q1.all()
as_of(time: float | datetime) Datasource

Get a snapshot of the datasource’s state as of time.

Parameters:

time – At which point in time do you want to get data from. Either a UTC timestamp or a datetime object.

In the following example, you will get back datapoints that were created no later than yesterday AND had their size at this point bigger than 5 bytes:

t = datetime.now() - timedelta(hours=24)
q1 = (ds["size"] > 5).as_of(t)
q1.all()

Note

If used with select(), the as_of set on the fields takes precedence over the global query as_of set here.

with_time_zone(tz_val: str) Datasource

A time zone offset string in the form of “+HH:mm” or “-HH:mm”.

A metadata of type datetime is always stored in DB as a UTC time, when a query is done on this field there are 3 options:

  • Metadata was saved with a timezone, in which case it will be used.

  • Metadata was saved without a timezone, in which case UTC will be used.

  • with_time_zone specified a time zone and it will override whatever is in the database.

order_by(*args: str | Tuple[str, bool | str]) Datasource

Sort the query result by the specified fields. Any previously set order will be overwritten.

Parameters:

of (Fields to sort by. Can be either) –

  • Name of the field to sort by: "field".

  • A tuple of (field_name, ascending): ("field", True).

  • A tuple of (field_name, "asc"|"desc"): ("field", "asc").

Examples:

ds.order_by("size").all()                   # Order by ascending size
ds.order_by(("date", "desc"), "size).all()  # Order by descending date, then ascending size
metadata_field(field_name: str) MetadataFieldBuilder

Returns a builder for a metadata field. The builder can be used to change properties of a field or create a new field altogether. Note that fields get automatically created when you upload new metadata to the Data Engine, so it’s not necessary to create fields with this function.

Example of creating a new annotation field:

ds.metadata_field("annotation").set_type(dtypes.LabelStudioAnnotation).apply()

Note

New fields have to have their type defined using .set_type() before doing anything else

Example of marking an existing field as an annotation field:

ds.metadata_field("existing-field").set_annotation().apply()
Parameters:

field_name – Name of the field that you want to create/change

apply_field_changes(field_builders: List[MetadataFieldBuilder])

Applies one or multiple metadata field builders that can be constructed using the metadata_field() function.

upload_metadata_of_implicit_context()

commit meta data changes done in dictionary assignment context :meta private:

metadata_context() ContextManager[MetadataContextManager]

Returns a metadata context, that you can upload metadata through using its update_metadata() function. Once the context is exited, all metadata is uploaded in one batch:

with ds.metadata_context() as ctx:
    ctx.update_metadata("file1", {"key1": True, "key2": "value"})
upload_metadata_from_file(file_path, path_column: int | str | None = None, ingest_on_server: bool = False)

Upload metadata from a file.

Parameters:
  • file_path – Path to the file with metadata. Allowed formats are CSV, Parquet, ZIP, GZ.

  • path_column – Column with the datapoints’ paths. Can either be the name of the column, or its index. If not specified, the first column is used.

  • ingest_on_server – Set to True to process the metadata asynchronously. The file will be sent to our server and ingested into the datasource there. Default is False.

upload_metadata_from_dataframe(df: DataFrame, path_column: int | str | None = None, ingest_on_server: bool = False)

Upload metadata from a pandas dataframe.

All columns are uploaded as metadata, and the path of every datapoint is taken from path_column.

Parameters:
  • df (pandas.DataFrame) – DataFrame with metadata

  • path_column – Column with the datapoints’ paths. Can either be the name of the column, or its index. If not specified, the first column is used.

  • ingest_on_server – Set to True to process the metadata asynchronously. The file will be sent to our server and ingested into the datasource there. Default is False.

delete_source(force: bool = False)

Delete the record of this datasource along with all datapoints.

Warning

This is a destructive operation! If you delete the datasource, all the datapoints and metadata will be removed.

Parameters:

force – Skip the confirmation prompt

scan_source(options: List[ScanOption] | None = None)

This function fires a call to the backend to rescan the datapoints. Call this function whenever you uploaded new files and want them to appear when querying the datasource, or if you changed existing file contents and want their metadata to be updated.

DagsHub periodically rescans all datasources, this function is a way to make a scan happen as soon as possible.

Notes about automatically scanned metadata:
  1. Only new datapoints (files) will be added. If files were removed from the source, their metadata will still remain, and they will still be returned from queries on the datasource. An API to actively remove metadata will be available soon.

  2. Some metadata fields will be automatically scanned and updated by DagsHub based on this scan - the list of automatic metadata fields is growing frequently!

Parameters:

options – List of scanning options. If not sure, leave empty.

delete_metadata_from_datapoints(datapoints: List[Datapoint], fields: List[str])

Delete metadata from datapoints. The deleted values can be accessed using versioned query with time set before the deletion

Parameters:
  • datapoints – datapoints to delete metadata from

  • fields – fields to delete

delete_datapoints(datapoints: List[Datapoint], force: bool = False)

Delete datapoints.

  • These datapoints will no longer show up in queries.

  • Does not delete the datapoint’s file, only removing the data from the datasource.

  • You can still query these datapoints and associated metadata with versioned queries whose time is before deletion time.

  • You can re-add these datapoints to the datasource by uploading new metadata to it with, for example, Datasource.metadata_context. This will create a new datapoint with new id and new metadata records.

  • Datasource scanning will not add these datapoints back.

Parameters:
  • datapoints – list of datapoints objects to delete

  • force – Skip the confirmation prompt

save_dataset(name: str) Datasource

Save the dataset, which is a combination of datasource + query, on the backend. That way you can persist and share your queries. You can get the dataset back later by calling datasets.get_dataset()

Parameters:

name – Name of the dataset

Returns:

A datasource object with the dataset assigned to it

log_to_mlflow(artifact_name='datasource.dagshub.json', run: mlflow.entities.Run | None = None, as_of: datetime | None = None) mlflow.Entities.Run

Logs the current datasource state to MLflow as an artifact.

Parameters:
  • artifact_name – Name of the artifact that will be stored in the MLflow run.

  • run – MLflow run to save to. If None, uses the active MLflow run or creates a new run.

  • as_of – The querying time for which to save the artifact. Any time the datasource is recreated from the artifact, it will be queried as of this timestamp. If None, the current machine time will be used. If the artifact is autologged to MLflow (will happen if you have an active MLflow run), then the timestamp of the query will be used.

Returns:

Run to which the artifact was logged.

save_to_file(path: str | PathLike = '.') Path

Saves a JSON file representing the current state of datasource or dataset. Useful for connecting code versions to the datasource used for training.

Note

Does not save the actual contents of the datasource/dataset, only the query.

Parameters:

path – Where to save the file. If path is an existing folder, saves to <path>/<ds_name>.json.

Returns:

The path to the saved file

property is_query_different_from_dataset: bool | None

Is the current query of the object different from the one in the assigned dataset.

If no dataset is assigned, returns None.

static load_from_serialized_state(state_dict: Dict) Datasource

Load a Datasource that was saved with save_to_file()

Parameters:

state_dict – Serialized JSON object

to_voxel51_dataset(**kwargs) fo.Dataset

Refer to QueryResult.to_voxel51_dataset() for documentation.

property default_dataset_location: Path

Default location where datapoint files are stored.

On UNIX-likes the path is ~/dagshub/datasets/<repo_name>/<datasource_id>

On Windows the path is C:\Users\<user>\dagshub\datasets\<repo_name>\<datasource_id>

visualize(visualizer: Literal['dagshub', 'fiftyone'] = 'dagshub', **kwargs) str | fo.Session

Visualize the whole datasource using QueryResult.visualize().

Read the function docs for kwarg documentation.

async add_annotation_model_from_config(config, project_name, ngrok_authtoken, port=9090)

Initialize a LS backend for ML annotation using a preset configuration.

Parameters:
  • config – dictionary containing information about the mlflow model, hooks and LS label config

  • repo (recommended to use with get_config() from preconfigured_models in the orchestrator)

  • project_name – automatically adds backend to project

  • ngrok_authtoken – uses ngrok to forward local connection

  • port – (optional, default: 9090) port on which orchestrator is hosted

async add_annotation_model(repo: str, name: str, version: str = 'latest', post_hook: ~typing.Callable[[~typing.Any], ~typing.Any] = <function Datasource.<lambda>>, pre_hook: ~typing.Callable[[~typing.Any], ~typing.Any] = <function Datasource.<lambda>>, port: int = 9090, project_name: str | None = None, ngrok_authtoken: str | None = None) None

Initialize a LS backend for ML annotation.

Parameters:
  • repo – repository to extract the model from

  • name – name of the model in the mlflow registry

  • version – (optional, default: ‘latest’) version of the model in the mlflow registry

  • pre_hook – (optional, default: identity function) function that runs before datapoint is sent to the model

  • post_hook – (optional, default: identity function) function that converts mlflow model output

  • format (to the desired)

  • port – (optional, default: 9090) port on which orchestrator is hosted

  • project_name – (optional, default: None) automatically adds backend to project

  • ngrok_authtoken – (optional, default: None) uses ngrok to forward local connection

annotate(fields_to_embed=None, fields_to_exclude=None) str | None

Sends all datapoints in the datasource for annotation in Label Studio.

Parameters:
  • fields_to_embed – list of meta-data columns that will show up in Label Studio UI. if not specified all will be displayed.

  • fields_to_exclude – list of meta-data columns that will not show up in Label Studio UI

Note

This will send ALL datapoints in the datasource for annotation. It’s recommended to not send a huge amount of datapoints to be annotated at once, to avoid overloading the Label Studio workspace. Use QueryResult.annotate() to annotate a result of a query with less datapoints. Alternatively, use a lower level send_datapoints_to_annotation() function

Returns:

Link to open Label Studio in the browser

send_datapoints_to_annotation(datapoints: List[Datapoint] | QueryResult | List[Dict], open_project=True, ignore_warning=False, fields_to_exclude=None, fields_to_embed=None) str | None

Sends datapoints for annotation in Label Studio.

Parameters:
  • datapoints

    Either of:

    • A QueryResult

    • List of Datapoint objects

    • List of dictionaries. Each dictionary should have fields id and download_url.

      id is the ID of the datapoint in the datasource.

  • open_project – Automatically open the created Label Studio project in the browser.

  • ignore_warning – Suppress the prompt-warning if you try to annotate too many datapoints at once.

  • fields_to_embed – list of meta-data columns that will show up in Label Studio UI. if not specified all will be displayed.

  • fields_to_exclude – list of meta-data columns that will not show up in Label Studio UI

Returns:

Link to open Label Studio in the browser

wait_until_ready(max_wait_time=300, fail_on_timeout=True)

Blocks until the datasource preprocessing is complete.

Useful when you have just created a datasource and the initial scanning hasn’t finished yet.

Parameters:
  • max_wait_time – Maximum time to wait in seconds

  • fail_on_timeout – Whether to raise a RuntimeError or log a warning if the scan does not complete on time

has_field(field_name: str) bool

Checks if a metadata field field_name exists in the datasource.

date_field_in_years(*item: int)

Checks if a metadata field (which is of datetime type) is in one of given years list.

Parameters:

years. (List of)

Examples:

datasource[(datasource["y"].date_field_in_years(1979, 2003)
date_field_in_months(*item: int)

Checks if a metadata field (which is of datetime type) is in one of given months list.

Parameters:

months. (List of)

Examples:

datasource[(datasource["y"].date_field_in_months(12, 2)
date_field_in_days(*item: int)

Checks if a metadata field (which is of datetime type) is in one of given days list.

Parameters:

days. (List of)

Examples:

datasource[(datasource["y"].date_field_in_days(25, 2)
date_field_in_timeofday(item: str)

Checks if a metadata field (which is of datetime type) is in given minute range inside the day (any day). range is in the format of: “HH:mm-HH:mm” (or “HH:mm:ss-HH:mm:ss”) where start hour is on the left. a range that starts at one day and ends at next day, should be expressed as OR of 2 range filter.

Parameters:

string. (Time range)

Examples:

datasource[(datasource["y"].date_field_in_timeofday("11:30-12:30")
import_annotations_from_files(annotation_type: Literal['yolo', 'cvat'], path: str | Path, field: str = 'imported_annotation', load_from: Literal['repo', 'disk'] | None = None, remapping_function: Callable[[str], str] | None = None, **kwargs)

Imports annotations into the datasource from files

The annotations will be downloaded and converted into Label Studio tasks, that are then uploaded into the specified fields.

If the annotations are stored in a repo and not locally, they are downloaded to a temporary directory.

Caveats:
  • YOLO:
    • Images need to also be downloaded to get their dimensions.

    • The .YAML file needs to have the path argument set to the relative path to the data. We’re using that to download the files

    • You have to specify the yolo_type kwarg with the type of annotation to import

Parameters:
  • annotation_type – Type of annotations to import. Possible values are yolo and cvat

  • path – If YOLO - path to the .yaml file, if CVAT - path to the .zip file. Can be either on disk or in repository

  • field – Which field to upload the annotations into. If it’s an existing field, it has to be a blob field, and it will have the annotations flag set afterwards.

  • load_from – Force specify where to get the files from. By default, we’re trying to load files from the disk first, and then repository. If this is specified, then that check is being skipped and we’ll try to download from the specified location.

  • remapping_function – Function that maps from a path of the annotation to the path of the datapoint. If None, we try to make a best guess based on the first imported annotation. This might fail, if there is no matching datapoint in the datasource for some annotations or if the paths are wildly different.

Keyword Arguments:

yolo_type – Type of YOLO annotations to import. Either bbox, segmentation or pose.

Example to import segmentation annotations into an imported_annotations field, using YOLO information from an annotations.yaml file (can be local, or in the repo):

ds.import_annotations_from_files(
    annotation_type="yolo",
    path="annotations.yaml",
    field="imported_annotations",
    yolo_type="segmentation"
)
class dagshub.data_engine.model.datasource.Field(field_name: str, as_of: float | datetime | None = None, alias: str | None = None)

Class used to define custom fields for use in Datasource.select() or in filtering.

Example of filtering on old data from a field:

t = datetime.now() - timedelta(days=2)
q = ds[Field("size", as_of=t)] > 500
q.all()
field_name: str

The database field where the values are stored. In other words, where to get the values from.

as_of: float | datetime | None = None

If defined, the data in this field would be shown as of this moment in time.

Accepts either a datetime object, or a UTC timestamp.

alias: str | None = None

How the returned custom data field should be named.

Useful when you’re comparing the same field at multiple points in time:

yesterday = datetime.now() - timedelta(days=1)

ds.select(
    Field("value", alias="value_today"),
    Field("value", as_of=yesterday, alias="value_yesterday")
).all()
class dagshub.data_engine.model.datasource.MetadataContextManager(datasource: Datasource)

Context manager for updating the metadata on a datasource. Batches the metadata changes, so they are being sent all at once.

update_metadata(datapoints: List[str] | str, metadata: Dict[str, Any])

Update metadata for the specified datapoints.

Note

If datapoints is a list, the same metadata is assigned to all the datapoints in the list. Call update_metadata() separately for each datapoint if you need to assign different metadata.

Parameters:
  • datapoints (Union[List[str], str]) – A list of datapoints or a single datapoint path to update metadata for.

  • metadata (Dict[str, Any]) – A dictionary containing metadata key-value pairs to update.

Example:

with ds.metadata_context() as ctx:
    metadata = {
        "episode": 5,
        "has_baby_yoda": True,
    }

    # Attach metadata to a single specific file in the datasource.
    # The first argument is the filepath to attach metadata to, **relative to the root of the datasource**.
    ctx.update_metadata("images/005.jpg", metadata)

    # Attach metadata to several files at once:
    ctx.update_metadata(["images/006.jpg","images/007.jpg"], metadata)
class dagshub.data_engine.model.metadata_field_builder.MetadataFieldBuilder(datasource: Datasource, field_name: str)

Builder class for changing properties of a metadata field in a datasource. It is also possible to create a new empty field with predefined schema with this builder. All functions return back the builder object to facilitate a builder pattern, for example:

builder.set_type(bytes).set_annotation().apply()
set_type(t: Type | DagshubDataType) MetadataFieldBuilder

Set the type of the field. The type can be either a Python primitive supported by the Data Engine (str, bool, int, float, bytes) or it can be a DagshubDataType inheritor. The DataType inheritors can define additional tags on top of just the basic backing type

set_annotation(is_annotation: bool = True) MetadataFieldBuilder

Mark or unmark the field as annotation field

set_thumbnail(thumbnail_type: Literal['video', 'audio', 'image', 'pdf', 'text'] | None = None, is_thumbnail: bool = True) MetadataFieldBuilder

Mark or unmark the field as thumbnail field, with the specified thumbnail type

apply()

Apply the outgoing changes to this metadata field.

If you need to apply changes to multiple fields at once, use Datasource.apply_field_changes instead.

class dagshub.data_engine.client.models.ScanOption(value, names=<not given>, *values, module=None, qualname=None, type=None, start=1, boundary=None)

Enum of options that can be applied during scanning process with scan_source()

FORCE_REGENERATE_AUTO_SCAN_VALUES = 'FORCE_REGENERATE_AUTO_SCAN_VALUES'

Regenerate all the autogenerated metadata values for the whole datasource