Python API

Python API#

Python API is the recommended way for client side Python applications to communicate with the Data Dispatcher server. To use the API, you need to install Data Dispatcher client module:

$ pip install --user datadispatcher

Then import the API module and create a DataDispatcherClient object:

from data_dispatcher.api import DataDispatcherClient
client = DataDispatcherClient("http://server.host.domain:8080/dd/data")
class data_dispatcher.api.DataDispatcherClient(server_url=None, auth_server_url=None, worker_id=None, worker_id_file=None, token=None, token_file=None, token_library=None, cpu_site='DEFAULT', timeout=300)[source]

Initializes the DataDispatcherClient object

Keyword Arguments:
  • server_url (str) – The server endpoint URL. If unspecified, the value of the DATA_DISPATCHER_URL environment will be used

  • auth_server_url (str) – The endpoint URL for the Authentication server. If unspecified, the value of the DATA_DISPATCHER_AUTH_URL environment will be used

  • worker_id_file (str) – File path to read/store the worker ID. Default: <cwd>/.data_dispatcher_worker_id

  • worker_id (str) – Worker ID to use when reserving next file. If unspecified, will be read from the worker ID file.

  • cpu_site (str) – Name of the CPU site where the client is running, optional. Will be used when reserving project files.

  • timeout (float or int) – Number of seconds to wait for a response.

activate_project(project_id)[source]

Resets the state of an abandoned project back to “active”

auth_info()

Returns information about current authentication token.

Returns:

  • str – username of the authenticated user

  • numeric – token expiration timestamp

cancel_project(project_id)[source]

Cancels a project by id

Parameters:

project_id (str) – project id

Returns:

(dict) project information

copy_project(project_id, common_attributes={}, project_attributes={}, worker_timeout=None, idle_timeout=None)[source]

Creates new project

Parameters:

project_id (int) – id of the project to copy

Keyword Arguments:
  • common_attributes (dict) – file attributes to override

  • project_attributes (dict) – project attributes to override

  • worker_timeout (int or float) – worker timeout to override

Returns:

(dict) new project information

create_project(files, common_attributes={}, project_attributes={}, query=None, worker_timeout=None, idle_timeout=259200, users=[], roles=[])[source]

Creates new project

Parameters:
  • files (list) – Each item in the list is either a dictionary with keys: “namespace”, “name”, “attributes” (optional) or a string “namespace:name”

  • common_attributes (dict) – attributes to attach to each file, will be overridden by the individual file attribute values with the same key

  • project_attributes (dict) – attriutes to attach to the new project

  • query (str) – MQL query to be associated with the project. Thit attribute optiona and is not used by Data Dispatcher in any way. It is used for informational purposes only.

  • worker_timeout (int or float) – If not None, all file handles will be automatically released if allocated by same worker for longer than the worker_timeout seconds

  • idle_timeout (int or float) – If there is no file reserve/release activity for the specified time interval, the project goes into “abandoned” state. Default is 72 hours (3 days). If set to None, the project remains active until complete.

  • users (list of strings) – List of users who can use the worker interface (next_file, done, failed…), in addition to the project creator.

  • roles (list of strings) – List of roles, members of which are authorized to use the worker interface.

Returns:

new project information

Return type:

dict

create_rse(name, description, is_enabled=True, is_available=True, is_tape=False, pin_url=None, poll_url=None, remove_prefix=None, add_prefix=None, pin_prefix=None, preference=None, interface=None)[source]

Creates new RSE

Parameters:
  • (string) (interface)

  • (string)

  • (boolean) (is_tape)

  • (boolean)

  • (boolean)

  • (string)

  • (string)

  • (string)

  • (string)

  • (string)

  • (integer) (preference)

  • (string)

Returns:

dict

Return type:

new RSE information

file_done(project_id, did, worker_id=None)[source]

Notifies Data Dispatcher that the file was successfully processed and should be marked as “done”.

Parameters:
  • project_id (int) – project id

  • did (str) – file DID (“<namespace>:<name>”)

file_failed(project_id, did, retry=True, worker_id=None)[source]

Notifies Data Dispatcher that the file was successfully processed and should be marked as “done”.

Parameters:
  • project_id (int) – project id

  • did (str) – file DID (“<namespace>:<name>”)

get_file(namespace, name)[source]

Deprecated

get_handle(project_id, namespace, name)[source]

Gets information about a file handle

Parameters:
  • project_id (str) – project id

  • namespace (str) – file namespace

  • name (str) – file name

Returns:

(dict) file handle information or None if not found

get_project(project_id, with_files=True, with_replicas=False)[source]

Gets information about the project

Parameters:

project_id (str) – project id

Keyword Arguments:
  • with_files (boolean) – whether to include iformation about project files. Default: True

  • with_replicas (boolean) – whether to include iformation about project file replicas. Default: False

Returns:

(dict) project information or None if project not found.

The dictionary will include the following values:

  • project_id: numeric, project id

  • owner: str, project owner username,

  • state: str, current project state,

  • attributes: dict, project metadata attributes as set by the create_project(),

  • created_timestamp: numeric, timestamp for the project creation time,

  • ended_timestamp: numeric or None, project end timestamp,

  • active: boolean, whether the project is active - at least one handle is not done or failed,

  • query: str, MQL query string associated with the project,

  • worker_timeout: numeric or None, worker idle timeout, in seconds

  • idle_timeout: numeric or None, project inactivity timeout in seconds

get_rse(name)[source]

Returns information about RSE

Parameters:

name (str) – RSE name

Returns:

dictionary with RSE information or None if not found

list_handles(project_id, state=None, not_state=None, with_replicas=False)[source]

Deprecated

list_projects(owner=None, state='active', not_state='abandoned', attributes=None, with_files=True, with_replicas=False)[source]

Lists existing projects

Keyword Arguments:
  • owner (str) – Include only projects owned by the specified user. Default: all users

  • state (str) – Include only projects in specified state. Default: active only

  • not_state (str) – Exclude projects in the specified state. Default: exclude abandoned

  • attributes (dict) – Include only projects with specified attribute values. Default: do not filter by attributes

  • with_files (boolean) – Include information about files. Default: True

  • with_replicas (boolean) – Include information about file replics. Default: False

Returns:

list of dictionaries with information about projects selected

list_rses()[source]

Return information about all RSEs

Args:

Returns:

list of dictionaries with RSE information

login_digest(username, password, save_token=False)

Performs password-based authentication and stores the authentication token locally.

Parameters:
  • username (str)

  • password (str) – Password is not sent over the network. It is hashed and then used for digest authentication (RFC 2617).

Returns:

  • str – username of the authenticated user (same as usernme argument)

  • numeric – token expiration timestamp

login_ldap(username, password)

Performs password-based authentication and stores the authentication token locally using LDAP.

Parameters:
  • username (str)

  • password (str) – Password

Returns:

  • str – username of the authenticated user (same as usernme argument)

  • numeric – token expiration timestamp

login_password(username, password)

Combines LDAP and RFC 2617 digest authentication by calling login_ldap first and then, if it fails, ldap_digest methods

Parameters:
  • username (str)

  • password (str) – Password

Returns:

  • str – username of the authenticated user (same as usernme argument)

  • numeric – token expiration timestamp

login_token(username, encoded_token)

Authenticate using a JWT or a SciToken.

Parameters:
  • username (str)

  • encoded_token (str or bytes)

Returns:

  • str – username of the authenticated user (same as usernme argument)

  • numeric – authentication expiration timestamp

login_x509(username, cert, key=None)

Performs X.509 authentication and stores the authentication token locally.

Parameters:
  • username (str)

  • cert (str) – Path to the file with the X.509 certificate or the certificate and private key

  • key (str) – Path to the file with the X.509 private key

Returns:

  • str – username of the authenticated user (same as usernme argument)

  • numeric – token expiration timestamp

new_worker_id(new_id=None, worker_id_file=None)[source]

Sets or generates new worker ID to be used for next file allocation.

Keyword Arguments:
  • new_id (str or None) – New worker id to use. If None, a random worker_id will be generated.

  • worker_id_file (str or None) – Path to store the worker id. Default: <cwd>/.data_dispatcher_worker_id

Returns:

(str) assigned worker id

next_file(project_id, cpu_site=None, worker_id=None, timeout=None, stagger=10)[source]

Reserves next available file from the project

Parameters:
  • project_id (int) – project id to reserve a file from

  • cpu_site (str) – optional, if specified, the file will be reserved according to the CPU/RSE proximity map

  • timeout (int or float) – optional, if specified, time to wait for a file to become available. Otherwise, will wait indefinitely

  • stagger (int or float) – optional, introduce a random delay between 0 and <stagger> seconds before sending first request. This will help mitigate the effect of synchronous stard of multiple workers. Default: 10

Returns:

Dictionary or boolean. If dictionary, the dictionary contains the reserved file information. “replicas” field will be a dictionary will contain a subdictionary with replicas information indexed by RSE name. If True: the request timed out, but can be retried. If False: the project has ended.

static random_worker_id(prefix='')[source]

Static method to generate random worker id

reserved_handles(project_id, worker_id=None)[source]

Returns list of file handles reserved in the project by given worker

Parameters:
  • project_id (int) – Project id

  • worker_id (str or None) – Worker id. If None, client’s worker id will be used

Returns:

list of dictionaries with the file handle information

restart_handles(project_id, done=False, failed=False, reserved=False, all=False, handles=[])[source]

Restart processing of project file handles

Parameters:

project_id (int) – id of the project to restart

Keyword Arguments:
  • done (boolean) – default=False, restart done handles

  • reserved (boolean) – default=False, restart reserved handles

  • failed (boolean) – default=False, restart failed handles

  • all (boolean) – default=False, restart all handles

  • handles (list of DIDs) – default=[], restart specific handles

Returns:

(dict) project information

retry_request(method, url, timeout=None, **args)

Implements the functionality to retry on 503 response with random exponentially growing delay Use timemout = 0 to try the request exactly once Returns the response with status=503 on timeout

search_projects(search_query, owner=None, state='active', with_files=True, with_replicas=False)[source]

Lists existing projects

Parameters:

search_query (str) – project search query in subset of MQL

Keyword Arguments:
  • owner (str) – Include only projects owned by the specified user. Default: all users

  • with_files (boolean) – Include information about files. Default: True

  • with_replicas (boolean) – Include information about file replics. Default: False

Returns:

list of dictionaries with information about projects found

set_rse_availability(name, available)[source]

” Deprecated. Please use update_rse instead.

update_rse(name, description, is_enabled=None, is_available=None, is_tape=None, pin_url=None, poll_url=None, remove_prefix=None, add_prefix=None, pin_prefix=None, preference=None, interface=None)[source]

Update RSE settings. User must be an admin.

Parameters:
  • (string) (interface)

  • (string)

  • (boolean) (is_tape)

  • (boolean)

  • (boolean)

  • (string)

  • (string)

  • (string)

  • (string)

  • (string)

  • (integer) (preference)

  • (string)

Returns:

dict

Return type:

updated RSE information

version()[source]

Returns the server version as a string