Source code for metacat.webapi.webapi
import requests, json, fnmatch, sys, os, random, time
from metacat.util import to_str, to_bytes, ObjectSpec, chunked
from metacat.common import SignedToken, TokenLib, TokenAuthClientMixin, AuthenticationError
from urllib.parse import quote_plus, unquote_plus
INVALID_METADATA_ERROR_CODE = 488
def parse_name(name, default_namespace=None):
words = name.split(":", 1)
if len(words) < 2:
assert not not default_namespace, "Null default namespace"
ns = default_namespace
name = words[-1]
else:
ns, name = words
return ns, name
undid = parse_name
class MCError(Exception):
pass
class WebAPIError(MCError):
Headline = "MetaCat API error"
def __init__(self, url, response=None, message=None):
self.URL = url
self.StatusCode = response.status_code
self.Data = None
self.Body = None
if message:
self.Message = self.Body = message
else:
self.Body = to_str(response.text)
if response.headers.get("content-type") in ("text/json", "application/json"):
try:
self.Data = json.loads(response.text)
if isinstance(self.Data, dict):
self.Message = self.Data.get("message", "")
except:
self.Data = None
else:
self.Message = to_str(response.text)
def __str__(self):
lines = []
if self.Message:
lines.append(self.Message)
else:
lines.append(self.Body)
return "\n".join(lines)
def json(self):
return self.Data
class ServerReportedError(WebAPIError):
Headline = "Server side application error"
def __init__(self, url, status_code, type, value):
message = "Type:" + type + f" Status code:{status_code}"
if value:
message += ": " + value
WebAPIError.__init__(self, url, message=message)
class InvalidArgument(WebAPIError):
Headline = "Invalid argument"
class NotFoundError(WebAPIError):
Headline = "Object not found"
class BadRequestError(WebAPIError):
Headline = "Invalid request"
class AlreadyExistsError(WebAPIError):
Headline = "Object already exists"
class PermissionDeniedError(WebAPIError):
Headline = "Permission denied"
class InvalidMetadataError(WebAPIError):
Headline = "Invalid metadata"
def __str__(self):
msg = ["Invalid metadata"]
for item in self.json().get("metadata_errors", []):
item_headline = item["message"]
index = item.get("index")
fid = item.get("fid")
item_id = ""
if fid is not None:
item_id = f"file fid={fid}" + item_id
if index is not None:
item_id = f"file #{index} " + item_id
item_id = item_id.strip()
item_id = f"{item_id}: " if item_id else ""
msg.append(" " + item_id + item_headline)
for error in item.get("metadata_errors", []):
msg.append(" %s: %s" % (error["name"], error["reason"]))
return "\n".join(msg)
class HTTPClient(object):
InitialRetry = 1.0
RetryExponent = 1.5
DefaultTimeout = 1800.0
def __init__(self, server_url, token, timeout=None):
self.ServerURL = server_url
self.Token = token
if timeout is not None and timeout <= 0:
self.Timeout = None # no timeout
else:
self.Timeout = timeout or self.DefaultTimeout
self.LastResponse = self.LastURL = self.LastStatusCode = None
def retry_request(self, method, url, timeout=None, **args):
"""
Implements the functionality to retry on 503 response with random exponentially growing delay
Use timemout = 0 to try the request exactly once
Returns the response with status=503 on timeout
"""
if timeout is None:
timeout = self.DefaultTimeout
tend = time.time() + timeout
retry_interval = self.InitialRetry
response = None
done = False
while not done:
if method == "get":
response = requests.get(url, timeout=self.Timeout, **args)
else:
response = requests.post(url, timeout=self.Timeout, **args)
if response.status_code not in [502, 503, 504, 521, 524]:
break
sleep_time = min(random.random() * retry_interval, tend-time.time())
retry_interval *= self.RetryExponent
if sleep_time >= 0:
time.sleep(sleep_time)
else:
break # time out
return response
def send_request(self, method, uri_suffix, headers=None, timeout=None, **args):
self.LastURL = url = "%s/%s" % (self.ServerURL, uri_suffix)
default_headers = {
"Accept": "text/plain, application/json, text/json, application/json-seq"
}
if self.Token is not None:
default_headers["X-Authentication-Token"] = self.Token.encode()
if headers:
default_headers.update(headers)
headers = default_headers
self.LastResponse = response = self.retry_request(method, url, headers=headers, **args)
self.LastStatusCode = response.status_code
#print("webapi.send_request: status:", response.status_code)
if response.status_code == INVALID_METADATA_ERROR_CODE:
raise InvalidMetadataError(url, response)
elif response.status_code == 404:
raise NotFoundError(url, response)
elif response.status_code == 403:
raise PermissionDeniedError(url, response)
elif response.status_code == 409:
raise AlreadyExistsError(url, response)
elif response.status_code == 400:
raise BadRequestError(url, response)
elif response.status_code/100 != 2:
raise WebAPIError(url, response)
return response
def get_text(self, uri_suffix):
return self.send_request("get", uri_suffix).text
def post_text(self, uri_suffix, data):
return self.send_request("post", uri_suffix, data=data).text
def unpack_json(self, json_text):
results = json.loads(json_text)
if isinstance(results, dict):
if "results" in results:
results = results["results"]
elif "error" in results:
raise ServerReportedError(self.LastURL, self.LastStatusCode, results["error"]["type"], results["error"].get("value", ""))
return results
RS = b'\x1E'
def unpack_json_seq(self, response):
for line in response.iter_lines():
if line: line = line.strip()
while line and line.startswith(self.RS):
line = line[1:]
if line:
#print(f"stream line:[{line}]")
obj = self.unpack_json(line)
yield obj
def unpack_json_data(self, response):
response_content_type = response.headers.get("content-type")
if "application/json-seq" in response_content_type:
return self.unpack_json_seq(response)
else:
return self.unpack_json(response.text)
def get_json(self, uri_suffix):
headers = {"Accept": "application/json-seq, application/json, text/json"}
return self.unpack_json_data(self.send_request("get", uri_suffix, headers=headers, stream=True))
def post_json(self, uri_suffix, data):
if not isinstance(data, (str, bytes)):
data = json.dumps(data)
headers = {
"Accept": "application/json-seq, application/json, text/json",
"Content-Type": "text/json"
}
response = self.send_request("post", uri_suffix, data=data, headers=headers, stream=True)
return self.unpack_json_data(response)
def get_json_stream(self, uri_suffix):
url = "%s/%s" % (self.ServerURL, uri_suffix)
headers = {"Accept": "application/json-seq"}
if self.Token is not None:
headers["X-Authentication-Token"] = self.Token.encode()
response = self.retry_request("get", url, headers=headers, stream=True)
if response.status_code == INVALID_METADATA_ERROR_CODE:
raise InvalidMetadataError(url, response)
if response.status_code == 404:
raise NotFoundError(url, response)
elif response.status_code != 200:
raise WebAPIError(url, response)
if response.headers.get("Content-Type") != "application/json-seq":
raise WebAPIError(url, response)
for line in response.iter_lines():
if line: line = line.strip()
while line.startswith(b'\x1E'):
line = line[1:]
if line:
#print(f"stream line:[{line}]")
obj = json.loads(line)
yield obj
[docs]
class MetaCatClient(HTTPClient, TokenAuthClientMixin):
Version = "1.0"
def __init__(self, server_url=None, auth_server_url=None, max_concurrent_queries = 5,
token = None, token_file = None, token_library = None, timeout = None):
"""Initializes the MetaCatClient object
Arguments
---------
server_url : str
The server endpoint URL, defult = from METACAT_SERVER_URL environment variable
auth_server_url : str
The endpoint URL for the Authentication server, default = server_url + "/auth"
max_concurrent_queries : int, optional
Controls the concurrency when asynchronous queries are used
token_file : str
File path to read the authentication token from
token : bytes or str or SignedToken
Use this token for authentication, optional
timeout : int or float
Request timeout in seconds. Default: None - use default timeout, which is 300 seconds
"""
server_url = server_url or os.environ.get("METACAT_SERVER_URL")
if not server_url:
raise RuntimeError("MetaCat server URL unspecified")
auth_server_url = auth_server_url or os.environ.get("METACAT_AUTH_SERVER_URL")
# using a longer timeout on the server side in case of large
# queries, client should too
if timeout is None:
timeout = 600
TokenAuthClientMixin.__init__(self, server_url, auth_server_url, token=token, token_file=token_file, token_library=token_library)
HTTPClient.__init__(self, server_url, token=self.token(), timeout=timeout)
self.MaxConcurrent = max_concurrent_queries
self.AsyncQueue = None
def sanitize(self, *words, **kw):
for w in words:
if "'" in words:
raise InvalidArgument("", "Invalid value: %s" % (w,))
for name, value in kw.items():
if "'" in value:
raise InvalidArgument("", "Invalid value for %s: %s" % (name, value))
if "'" in name:
raise InvalidArgument("", "Invalid name for: %s" % (name,))
@property
def async_queue(self):
if self.AsyncQueue is None:
try:
from pythreader import TaskQueue
except ModuleNotFoundError:
raise ModuleNotFoundError("pythreader module required for asynchronous queries. Use: pip install 'pythreader>=2.7.0'")
self.AsyncQueue = TaskQueue(self.MaxConcurrent)
return self.AsyncQueue
def resfresh_token(self):
if self.TokenFile:
token = open(self.TokenFile, "rb").read()
self.Token = SignedToken.decode(token)
return self.Token
def simulate_503(self):
return self.get_text("data/simulate_503")
[docs]
def get_version(self):
"""Returns server version as text
"""
return self.get_text("data/version")
[docs]
def list_datasets(self, namespace_pattern=None, name_pattern=None, with_counts=False):
"""Gets the list of datasets with namespace/name matching the templates. The templates are
Python ``fnmatch`` module style templates where ``'*'`` matches any substring and ``'?'`` matches a single character.
Arguments
---------
namespace_pattern : str
name_pattern : str
with_file_counts : boolean
controls whether the results should include file counts or dataset names only
Yields
------
generator
yields dictionaries like {"namespace":..., "name":..., "file_count":...}
"""
#url = "data/datasets?with_file_counts=%s" % ("yes" if with_file_counts else "no")
url = "data/datasets?with_counts=no"
lst = self.get_json(url)
promises = [] # [(dataset_dict, promise)]
for item in lst:
namespace, name = item["namespace"], item["name"]
if namespace_pattern is not None and not fnmatch.fnmatch(namespace, namespace_pattern):
continue
if name_pattern is not None and not fnmatch.fnmatch(name, name_pattern):
continue
if not with_counts:
yield item
else:
# fetch counts asynchronously
did = namespace + ":" + name
promises.append(self.async_queue.add(self.get_dataset_counts, did, promise_data = item).promise)
for promise in promises:
counts = promise.wait()
#print("promise: counts:", counts)
item = promise.Data
item.update(counts)
yield item
[docs]
def get_dataset_counts(self, did=None, namespace=None, name=None):
"""Gets single dataset files, subsets, supersets, etc. counts
Arguments
---------
did : str - "namespace:name"
namespace : str
name : str
Returns
-------
dict
dataset counts or None if the dataset was not found
"""
did = ObjectSpec(did, namespace=namespace, name=name).did()
try:
out = self.get_json(f"data/dataset_counts?dataset={did}")
#print("get_dataset_counts", did, " out=", out)
return out
except NotFoundError:
#print("get_dataset_counts: None")
return None
[docs]
def get_dataset(self, did=None, namespace=None, name=None, exact_file_count=False):
"""Gets single dataset
Arguments
---------
did : str - "namespace:name"
namespace : str
name : str
Returns
-------
dict
dataset attributes or None if the dataset was not found
"""
spec = ObjectSpec(did, namespace=namespace, name=name).did()
try:
url = f"data/dataset?dataset={spec}"
if exact_file_count:
url += "&exact_file_count=yes"
return self.get_json(url)
except NotFoundError:
return None
[docs]
def get_dataset_files(self, did, namespace=None, name=None, with_metadata=False, include_retired_files=False):
"""Gets single dataset
Arguments
---------
did : str - "namespace:name"
namespace : str
name : str
Returns
-------
generator
generates sequence of dictionaries, one dictionary per file
"""
if namespace is not None:
did = namespace + ':' + name
try:
with_metadata = "yes" if with_metadata else "no"
include_retired_files = "yes" if include_retired_files else "no"
url = f"data/dataset_files?dataset={did}&with_metadata={with_metadata}&include_retired_files={include_retired_files}"
return self.get_json_stream(url)
except NotFoundError:
return None
def create_dataset(self, did, frozen=False, monotonic=False, metadata=None, metadata_requirements=None,
files_query=None, subsets_query=None,
description="", batchsize=0):
if batchsize and files_query:
# if batching, limit initial query and don't freeze yet...
base_query = files_query
files_query = f"({base_query}) ordered limit {batchsize}"
frozen2 = frozen
frozen = False
out = self._create_dataset(did, frozen=frozen, monotonic=monotonic, metadata=metadata, metadata_requirements=metadata_requirements, files_query=files_query, subsets_query=subsets_query, description=description)
if batchsize and files_query:
keep_going = True
batch = 0
# now add remaining files in batches
while keep_going:
batch = batch + 1
files_query = f"({base_query}) ordered skip {batch*batchsize} limit {batchsize}"
af = self.add_files(did, query=files_query)
out["file_count"] += af
keep_going = (af > 0)
# finally set frozen flag if requested
if frozen2:
self.update_datset(did, frozen=frozen2)
return out
def _create_dataset(self, did, frozen=False, monotonic=False, metadata=None, metadata_requirements=None,
files_query=None, subsets_query=None,
description=""):
"""Creates new dataset. Requires client authentication.
Arguments
---------
did : str
"namespace:name"
frozen : bool
monotonic : bool
metadata : dict
Dataset metadata
metadata_requirements : dict
Metadata requirements for files in the dataset
file_query : str
Run MQL file query and add resulting files to the new dataset
dataset_query : str
Run MQL dataset query and add resulting datasets to the new dataset as subsets
description : str
Returns
-------
dict
created dataset attributes
"""
namespace, name = did.split(":",1)
#self.sanitize(namespace=namespace, name=name)
params = {
"namespace": namespace,
"name": name,
"frozen": frozen,
"monotonic": monotonic,
"metadata": metadata or {},
"metadata_requirements": metadata_requirements or None,
"description": description or "",
"files_query": files_query or None,
"subsets_query": subsets_query or None
}
url = f"data/create_dataset"
return self.post_json(url, params)
[docs]
def add_child_dataset(self, parent_spec, child_spec):
"""Adds a child dataset to a dataset.
Arguments
---------
parent_spec : str
Parent namespace, name ("namespace:name")
child_spec : str
Child namespace, name ("namespace:name")
"""
url = f"data/add_child_dataset?parent={parent_spec}&child={child_spec}"
return self.get_text(url)
[docs]
def add_files(self, dataset, file_list=None, namespace=None, query=None):
"""Add existing files to an existing dataset. Requires client authentication.
Arguments
---------
dataset : str
"namespace:name" or "name", if namespace argument is given
query : str
MQL query to run and add files matching the query
file_list : list
List of dictionaries, one dictionary per file. Each dictionary must contain either a file id
.. code-block:: python
{ "fid": "abcd12345" }
or namespace/name:
.. code-block:: python
{ "name": "filename.data", "namespace": "my_namespace" }
or DID:
.. code-block:: python
{ "did": "my_namespace:filename.data" }
namespace : str, optional
Default namespace. If a ``file_list`` item is specified with a name without a namespace, the ``default namespace``
will be used.
Returns
-------
int
number of files added to the dataset
Notes
-----
Either ``file_list`` or ``query`` must be specified, but not both
"""
default_namespace = namespace
if ':' not in dataset:
if default_namespace is None:
raise ValueError("Namespace not specified for the target dataset")
dataset = f"{default_namespace}:{dataset}"
url = f"data/add_files?dataset={dataset}"
if (file_list is None) == (query is None):
raise ValueError("Either file_list or query must be specified, but not both")
params = {
"namespace": namespace,
}
if file_list is not None:
lst = []
for f in file_list:
spec = ObjectSpec.from_dict(f, default_namespace)
spec.validate()
lst.append(spec.as_dict())
params["file_list"] = lst
elif query:
params["query"] = query
else:
raise ValueError("Either file_list or query must be specified, but not both")
out = self.post_json(url, params)
return out["files_added"]
[docs]
def remove_files(self, dataset, file_list=None, namespace=None, query=None):
"""Remove files from a dataset. Requires client authentication.
Arguments
---------
dataset : str
"namespace:name" or "name", if namespace argument is given
query : str
MQL query to run and add files matching the query
file_list : list
List of dictionaries, one dictionary per file. Each dictionary must contain either a file id
.. code-block:: python
{ "fid": "abcd12345" }
or namespace/name:
.. code-block:: python
{ "name": "filename.data", "namespace": "my_namespace" }
or DID:
.. code-block:: python
{ "did": "my_namespace:filename.data" }
namespace : str, optional
Default namespace. If a ``file_list`` item is specified with a name without a namespace, the ``default namespace``
will be used.
Returns
-------
int
actual number of files removed from the dataset
Notes
-----
Either ``file_list`` or ``query`` must be specified, but not both
"""
default_namespace = namespace
dataset_spec = ObjectSpec(dataset, namespace=default_namespace)
if (file_list is None) == (query is None):
raise ValueError("Either file_list or query must be specified, but not both")
params = {
"dataset_namespace": dataset_spec.Namespace,
"dataset_name": dataset_spec.Name,
"namespace": namespace
}
if file_list is not None:
lst = []
for f in file_list:
spec = ObjectSpec.from_dict(f, default_namespace)
spec.validate()
lst.append(spec.as_dict())
params["file_list"] = lst
elif query:
params["query"] = query
else:
raise ValueError("Either file_list or query must be specified, but not both")
out = self.post_json("data/remove_files", params)
return out["files_removed"]
[docs]
def remove_dataset(self, dataset):
"""Remove a dataset. Requires client authentication.
Arguments
---------
dataset : str
"namespace:name"
"""
return self.get_text(f"data/remove_dataset/{dataset}")
[docs]
def declare_file(self, did=None, namespace=None, name=None, auto_name=None,
dataset_did=None, dataset_namespace=None,
dataset_name=None, size=0, metadata={}, fid=None, parents=[], checksums={},
dry_run=False):
"""Declare new file and add it to the dataset. Requires client authentication.
Arguments
---------
did : str
file "namespace:name"
namespace : str
file namespace
name : str
file name
auto_name : str
pattern to use for file name auto generation, default None - do not auto-generate file name
dataset_did : str
dataset "namespace:name"
dataset_namespace : str
dataset namespace
dataset_name : str
dataset name
size : int
file size in bytes, default 0
metadata : dict
file metadata, default empty dictionary
fid : str
file id, default None - to be auto-generated
checksums : dict
dictionary with checksum values by the checksum type: {"type":"value", ...}
parents : list of dicts
each dict represents one parent file. The dict must contain one of the the following
- "fid" - parent file id
- "namespace" and "name" - parent file namespace and name
- "did" - parent file DID ("<namespace>:<name>")
dry_run : boolean
If true, run all the necessary checks but stop short of actual file declaraion or adding to a dataset.
If not all checks are successful, generate eirher InvalidMetadataError or WebApiError.
Default: False = do declare
Returns
-------
dict
dictionary with file name, namespace and file id. Names and file ids will be auto-generated as necessary.
Notes
-----
At least one of the following must be specified for the file:
- did
- namespace and either name or auto_name
At least one of the following must be specified for the dataset:
- dataset_did
- dataset_namespace and dataset_name
Auto-name pattern can be any string with the following substrings, which will be replaced with appropriate values to generate the file name:
- $clock - current interger timestamp in milliseconds
- $clock3 - last 3 digits of $clock - milliseconds only
- $clock6 - last 6 digits of $clock
- $clock9 - last 9 digits of $clock
- $uuid - random UUID in hexadecimal representation, 32 hex digits
- $uuid16 - 16 hex digits from random UUID hexadecimal representation
- $uuid8 - 8 hex digits from random UUID hexadecimal representation
- $fid - file id
"""
if not did:
if not namespace:
raise ValueError("Unspecified file namespace")
if not name and not auto_name:
raise ValueError("Unspecified file name")
else:
namespace, name = undid(did)
if not (dataset_namespace and dataset_name) and not dataset_did:
raise ValueError("Either dataset_did or dataset_namespace and dataset_name must be provided")
if dataset_did is None:
dataset_did = f"{dataset_namespace}:{dataset_name}"
info = dict(
namespace = namespace,
name = name,
size = size,
checksums = checksums,
fid = fid,
parents = parents,
metadata = metadata
)
if not name and auto_name:
info["auto_name"] = auto_name
return self.declare_files(dataset_did, [info])[0]
[docs]
def declare_files(self, dataset, files, namespace=None, dry_run=False, as_required=None):
"""Declare new files and add them to an existing dataset. Requires client authentication.
Arguments
---------
dataset : str
"namespace:name"
files : list or dict
List of dictionaries, one dictionary per a file to be declared. See Notes below for the expected contents of each
dictionary.
For convenience, if declaring single file, the argument can be the single file dictionary instead of a list.
namespace: str, optional
Default namespace for files to be declared
dry_run : boolean
If true, run all the necessary checks but stop short of actual file declaraion or adding to a dataset.
If not all checks are successful, generate eirher InvalidMetadataError or WebApiError.
Default: False = do declare
Returns
-------
list
list of dictionaries, one dictionary per file with file ids: { "fid": "..." }
Notes
-----
Each file to be declared must be represented with a dictionary. The dictionary must contain one of:
"did" - string in the format "<namespace>:<name>"
"name" - file name and optionaly "namespace". If namespace is not present, the ``namespace`` argument will be used
as the default namespace
"auto_name" - pattern to auto-generate file name
.. code-block:: python
{
"namespace": "namespace", # optional, namespace can be specified for each file explicitly or implicitly using the namespace=... argument
"name": "filename", # optional,
"did": "namespace:filename", # optional, convenience for Rucio users
# either "did" or "name", "namespace" must be present
"size": ..., # required, integer number of bytes
"metadata": {...}, # optional, file metadata, a dictionary with arbitrary JSON'able contents
"fid": "...", # optional, file id. Will be auto-generated if unspecified.
# if specified, must be unique
"parents": [...], # optional, list of dicts, one dict per parent. See below.
"checksums": { # optional, checksums dictionary
"method": "value",...
},
"auto_name": "..." # optional, pattern to auto-generate file name if name is not specified or null
},...
Parents are specified with dictionaries, one dictionary per file. Each dictionary specifies the parent file in one of three ways:
- "did": "<namespace>:<name>"
- "namespace":"...", "name":"..."
- "fid": "<file id>"
DEPRECATED: if the parent is specified with a string instead of a dictionary, it is interpreferd as the parent file id.
"""
default_namespace = namespace
if isinstance(files, dict):
files = [files] # convenience
lst = []
for i, item in enumerate(files):
f = item.copy()
namespace = f.get("namespace", default_namespace)
if "did" in f:
if "name" in f or "namespace" in f:
raise ValueError(f"Both DID and namespace/name specified for {did}")
did = f.pop("did")
namespace, name = parse_name(did, default_namespace)
f["name"] = name
f["namespace"] = namespace
size = f.get("size")
if not isinstance(size, int) or size < 0:
raise ValueError(f"File size is unspecified or invalid for file #{i} in the list")
meta = item.get("metadata", {})
for k in meta.keys():
if '.' not in k:
raise ValueError(f'Invalid metadata key "{k}" for file #{i} in the list: metadata key must contain dot (.)')
f["metadata"] = meta
lst.append(f)
url = f"data/declare_files?dataset={dataset}"
if dry_run: url += "&dry_run=yes"
#print("webapi: declare_files: post...")
try:
out = self.post_json(url, lst)
except AlreadyExistsError as e:
# handle as_required options
if as_required:
existing_files = [s.strip() for s in e.Message.split("\n")][1:]
to_declare = []
out = []
for l in lst:
if f"{l['namespace']}:{l['name']}" in existing_files:
if as_required == "unretire":
self.retire_file(name=l['name'], namespace=l['namespace'], retire=False)
out.append(self.update_file(**l))
else:
to_declare.append(l)
# any that weren't in the error, still need to be declared
if to_declare:
out = out + self.post_json(url, to_declare)
else:
raise
#print("webapi: declare_files: out:", out)
return out
[docs]
def move_files(self, namespace, file_list=None, query=None):
"""
Arguments
---------
namespace : str
namespace to move files to
query : str
MQL query to run and add files matching the query
file_list : list
List of dictionaries, one dictionary per file. Each dictionary must contain either a file id
.. code-block:: python
{ "fid": "abcd12345" }
or namespace/name:
.. code-block:: python
{ "name": "filename.data", "namespace": "my_namespace" }
or DID:
.. code-block:: python
{ "did": "my_namespace:filename.data" }
Returns
-------
tuple
number of files moved, list of errors, if any
"""
params = {
"namespace": namespace,
}
if file_list is not None:
lst = []
for f in file_list:
spec = ObjectSpec.from_dict(f)
spec.validate()
lst.append(spec.as_dict())
params["files"] = lst
elif query:
params["query"] = query
else:
raise ValueError("Either file_list or query must be specified, but not both")
url = "data/move_files"
out = self.post_json(url, params)
errors = out.get("errors", [])
return out["files_moved"], errors, out.get("nerrors", len(errors))
[docs]
def update_file(self, did=None, namespace=None, name=None, fid=None, replace=False,
size=None, checksums=None, parents=None, children=None, metadata=None
):
"""
Arguments
---------
did : str
file "namespace:name"
fid : str
file id
namespace : str
file namespace
name : str
file name
replace : bool
If True, the specified attribute values will be replaced with new values.
Otherwise added (for parents and children) and updated (for checksums and metadata)
size : int >= 0
file size, optional
checksums : dict
checksum values, optional
parents : list
list of parent file ids, optional
children : list
list of child file ids, optional
metadata : dict
dictionary with metadata to update or replace, optional
Returns
-------
dict
Dictionary with updated file information
"""
data = {"mode":"replace" if replace else "add-update"}
if fid:
data["fid"] = fid
else:
if did:
namespace, name = did.split(':', 1)
assert namespace and name
data["namespace"] = namespace
data["name"] = name
if size is not None:
assert isinstance(size, int) and size >= 0
data["size"] = size
if checksums is not None:
assert isinstance(checksums, dict)
data["checksums"] = checksums
if parents is not None:
assert isinstance(parents, list)
data["parents"] = [ObjectSpec(p).as_dict() for p in parents]
if children is not None:
assert isinstance(children, list)
data["children"] = [ObjectSpec(c).as_dict() for c in children]
if metadata is not None:
assert isinstance(metadata, dict)
data["metadata"] = metadata
return self.post_json("data/update_file", data)
[docs]
def update_file_meta(self, metadata, files=None, names=None, fids=None, namespace=None, dids=None, mode="update"):
"""Updates metadata for existing files. Requires client authentication.
**DEPRECATED** *update_file() should be used instead*
Arguments
---------
metadata : dict
see Notes
files : list of dicts
Each dict specifies a file. See Notes
names : list of strings
List of file names. Requires namespace to be specified
dids : list of strings
List of DIDs ("namespace:name") strings
fids : list of strings
List of file ids. The list of files can be specified with ``fids`` or with ``names`` argument, but not
both.
namespace : string
Default namespace
mode : str
Either ``"update"`` (default) or ``"replace"``. If mode is ``"update"``, existing metadata will be updated with
values in ``metadata``. If ``"replace"``, then new values will replace existing metadata. Also, see notes below.
Returns
-------
list
list of dictionaries, one dictionary per file with file ids: { "fid": "..." }
Notes
-----
This method can be be used to apply common metadata changes to a list of files. This method **can not** be used to update
file provenance information.
The``metadata`` argument is used to specify the common changes to the metadata to apply to multiple files.
The ``metadata`` dictionary will be used to either update existing metadata of listed files (if ``mode="update"``) or
replace it (if ``mode="replace"``).
Files to update have to be specified in one of the following ways:
- files = [list of dicts] - each dict must be in one of the following formats:
- {"fid":"<file id>"}
- {"namespace":"<file namespace>", "name":"<file name>"} - namespace is optional. Default: the value of the "namespace" method argument
- {"did":"<file namespace>:<file name>"}
- dids = [list of file DIDs]
- names = [list of file names] - "namespace" argument method must be used to specify the common namespace
- fids = [list of file ids]
"""
if names and not namespace:
raise ValueError("Namespace must be specified with names argument")
def combined():
for name in (names or []):
yield ObjectSpec(namespace, name).as_dict()
for did in (dids or []):
spec = ObjectSpec(did)
spec.validate() # will raise ValueError
yield spec.as_dict()
for fid in (fids or []):
yield ObjectSpec(fid=fid).as_dict()
for item in (files or []):
spec = ObjectSpec.from_dict(item)
spec.validate() # will raise ValueError
yield spec.as_dict()
url = f"data/update_file_meta"
out = []
for chunk in chunked(combined(), 1000):
data = {
"metadata":metadata,
"files":chunk,
"mode":mode
}
out.extend(self.post_json(url, data))
return out
[docs]
def delete_file(self, did=None, namespace=None, name=None, fid=None):
"""Delete an existing file. The file will be removed from all datasets and the database and its name and file id can be reused.
Arguments
---------
did : str
file "namespace:name"
fid : str
file id
namespace : str
file namespace
name : str
file name
retire : bool
whether the file should be retired
"""
data = {}
if fid:
data["fid"] = fid
else:
if did:
namespace, name = did.split(':', 1)
assert namespace and name
data["namespace"] = namespace
data["name"] = name
#print("API.delete: sending:", data)
return self.post_json("data/delete_file", data)
[docs]
def retire_file(self, did=None, namespace=None, name=None, fid=None, retire=True):
"""Modify retired status of the file. Retured file remains in the database, "occupies" the name in the namespace, but
id not visible to normal queries. Retired file can be brought back to normal using this method too.
If you need to completely remove the file, use `delete_file` method.
Arguments
---------
did : str
file "namespace:name"
fid : str
file id
namespace : str
file namespace
name : str
file name
retire : bool
whether the file should be retired
Returns
-------
dict
Dictionary with updated file information
"""
data = {
"retire": retire
}
if fid:
data["fid"] = fid
else:
if did:
namespace, name = did.split(':', 1)
assert namespace and name
data["namespace"] = namespace
data["name"] = name
#print("API.retire: sending:", data)
return self.post_json("data/retire_file", data)
[docs]
def update_dataset(self, dataset, metadata=None, mode="update", frozen=None, monotonic=None, description=None):
"""Update dataset. Requires client authentication.
Arguments
---------
dataset : str
"namespace:name"
metadata : dict or None
New metadata values, or, if None, leave the metadata unchanged
mode: str
Either ``"update"`` or ``"replace"``. If ``"update"``, metadata will be updated with new values. If ``"replace"``,
metadata will be replaced with new values.
If ``metadata`` is None, ``mode`` is ignored
frozen: boolean or None
if boolean, new value for the flag. If None, leave it unchanged
monotonic: boolean or None
if boolean, new value for the flag. If None, leave it unchanged
description: str or None
if str, new dataset description. If None, leave the description unchanged
Returns
-------
dict
dictionary with new dataset information
"""
request_data = {}
if metadata is not None:
request_data["mode"] = mode
request_data["metadata"] = metadata
if frozen is not None: request_data["frozen"] = frozen
if monotonic is not None: request_data["monotonic"] = monotonic
if description is not None: request_data["description"] = description
url = f"data/update_dataset?dataset={dataset}"
out = self.post_json(url, request_data)
return out
[docs]
def get_files(self, lookup_list, with_metadata = True, with_provenance=True):
"""Get many file records
Arguments
---------
lookup_list : list
List of dictionaries, one dictionary per file. Each dictionary must have either
"did":"namespace:name", or
"namespace":"..." and "name":"..." or
"fid":"file id"
with_metadata : boolean
whether to include file metadata
with_provenance:
whether to include parents and children list
Returns
-------
List of file records, each record is the same as returned by get_file()
"""
with_metadata = "yes" if with_metadata else "no"
with_provenance = "yes" if with_provenance else "no"
#print("with_metadata:", with_metadata)
new_list = []
for item in lookup_list:
if "fid" in item or "namespace" in item and "name" in item:
pass
elif "did" in item:
did = item["did"]
try:
namespace, name = did.split(':', 1)
except ValueError:
raise ValueError("Invalid DID format: " + did)
item = {"namespace":namespace, "name":name}
else:
raise ValueError("Invalid file specifification: " + str(item))
new_list.append(item)
url = "data/files?with_metadata=%s&with_provenance=%s" % (with_metadata, with_provenance)
return self.post_json(url, new_list)
[docs]
def get_file(self, name=None, namespace=None, fid=None, did=None, with_metadata = True, with_provenance=True, with_datasets=False):
"""Get one file record
Arguments
---------
fid : str, optional
File id
name : str, optional
namespace : str, optional
name and namespace must be specified together
did : str, optional
"nemaspace:name"
with_metadata : boolean
whether to include file metadata
with_provenance : boolean
whether to include parents and children list
with_datasets : boolean
whether to include the list of datasets the file is in
Returns
-------
dict
dictionary with file information or None if the file was not found
.. code-block:: python
{
"name": "namespace:filename", # file name, namespace
"fid": "...", # files id
"creator": "...", # username of the file creator
"created_timestamp": ..., # numeric UNIX timestamp
"size": ..., # file size in bytes
"checksums": { ... }, # file checksums
# included if with_provenance=True
"parents": ["fid",...], # list of ids for the file parent files
"children": ["fid",...], # list of ids for the file child files
# included if with_metadata=True
"metadata": { ... }, # file metadata
# included if with_datasets=True
"datasets": [
{"namespace":"...", "name":"..."}, ...
]
}
Notes
-----
Retrieving file provenance and metadata takes slightly longer time
"""
assert (fid is not None) or (did is not None) or (name is not None and namespace is not None), \
"Either DID or file id or namespace and name must be specified"
with_meta = "yes" if with_metadata else "no"
with_rels = "yes" if with_provenance else "no"
with_datasets = "yes" if with_datasets else "no"
url = f"data/file?with_metadata={with_meta}&with_provenance={with_rels}&with_datasets={with_datasets}"
if did:
namespace, name = parse_name(did, None)
assert namespace is not None, f"Invalid DID format: {did}"
if name:
url += f"&name={name}&namespace={namespace}"
else:
url += f"&fid={fid}"
try:
return self.get_json(url)
except NotFoundError:
return None
[docs]
def query(self, query, namespace=None, with_metadata=False, with_provenance=False, save_as=None, add_to=None,
include_retired_files=False, summary=None, batch_size=0):
"""Run file query. Requires client authentication if save_as or add_to are used.
Arguments
---------
query : str
Query in MQL
namespace : str
default namespace for the query
include_retired_files:
boolean, whether to include retired files into the query results, default=False
with_metadata : boolean
whether to return file metadata
with_provenance : boolean
whether to return parents and children list
save_as : str
namespace:name for a new dataset to create and add found files to
add_to : str
namespace:name for an existing dataset to add found files to
summary : str or None
"count" - return [{"count": n, "total_size": nbytes }]
"keys" - return list of list of all top level metadata keys for the selected files
``summary`` can not be used together with ``save_as`` or ``add_to``
Returns
-------
list of dicts
dictionary with file information. Each file will be represented with a dictionary in this list.
Notes
-----
Retrieving file provenance and metadata takes slightly longer time
"""
assert not (summary is not None and (add_to or save_as)), "Summary can not be used together with add_to or save_as"
assert summary in ("count", "keys", None)
trimquery = query[:255]
if summary:
url = f"data/query?summary={summary}"
if namespace:
url += f"&namespace={namespace}"
if include_retired_files:
url += "&include_retired_files=yes"
url += f"&trimquery={trimquery}"
results = self.post_json(url, query)
yield results
return
else:
url = "data/query?with_meta=%s&with_provenance=%s" % ("yes" if with_metadata else "no","yes" if with_provenance else "no")
if namespace:
url += f"&namespace={namespace}"
if save_as:
url += f"&save_as={save_as}"
if add_to:
url += f"&add_to={add_to}"
if include_retired_files:
url += "&include_retired_files=yes"
url += f"&trimquery={trimquery}"
if batch_size > 0:
offset = 0
count = batch_size
while count == batch_size:
batch_query = f"({query}) ordered skip {offset} limit {batch_size}"
results = self.post_json(url, batch_query)
count = 0
for item in results:
count = count + 1
yield item
offset = offset + batch_size
else:
results = self.post_json(url, query)
for item in results:
yield item
[docs]
def async_query(self, query, data=None, **args):
"""Run the query asynchronously. Requires client authentication if save_as or add_to are used.
Arguments
---------
query : str
Query in MQL
data : anything
Arbitrary data associated with this query
args :
Same keyword arguments as for the query() method
Returns
-------
Promise
``pythreader`` Promise object associated with this query. The promise object will have Data attribute containig the object passed as the ``data``
argument to the ``async_query`` call.
See notes below for more on how to use this method.
"""
return self.async_queue.add(self.query, query, promise_data=data, **args).promise
[docs]
def wait_queries(self):
"""
Wait for all issued asynchronous queries to complete
"""
self.async_queue.waitUntilEmpty()
[docs]
def search_named_queries(self, query):
"""
Run MQL query for named queries
Arguments
---------
query : str
Query in MQL
Returns
-------
list of dicts
The list contains one dictionary per matching named query with the query information.
"""
url = "data/search_queries"
results = self.post_json(url, query)
return results
[docs]
def create_namespace(self, name, owner_role=None, description=None):
"""Creates new namespace. Requires client authentication.
Arguments
---------
name : str
Namespace name
owner_role : str
Owner role for the new namespace. The user must be a member of the role.
Optional. If unspecified, the new namespace will be owned by the user.
description : str
New namespace description
Returns
-------
dict
New namespace information
"""
url = f"data/create_namespace?name={name}"
if owner_role:
url += f"&owner_role={owner_role}"
if description:
desc = quote_plus(description)
url += f"&description={desc}"
return self.get_json(url)
[docs]
def get_namespace(self, name):
"""Get information about a snamespace
Arguments
---------
name : str
Namespace name
Returns
-------
dict
Namespace information or None if the namespace was not found
"""
try:
return self.get_json(f"data/namespace?name={name}")
except NotFoundError:
return None
[docs]
def get_namespaces(self, names):
"""Get information for multiple namespaces
Arguments
---------
names : list of str
Namespace names
Returns
-------
list
Namespace information
"""
return self.post_json(f"data/namespaces", names)
[docs]
def list_namespaces(self, pattern=None, owner_user=None, owner_role=None, directly=False):
"""List namespaces
Arguments
---------
pattern : str
Optional fnmatch style pattern to filter namespaces by name
owner_user : str
Optional, return only namespaces owned by the specified user
directly : boolean
If False and owner_user is specified, return also namespaces owned by all roles the user is in
Ignored if owner_user is not specified
owner_role : str
Optional, return only namespaces owned by the specified role.
Ignored if owner_user is also specified
Returns
-------
list
List of dictionaries with namespace information sorted by the namespace name
"""
url = "data/namespaces"
args = ""
if owner_user:
args += f"owner_user={owner_user}"
if directly:
args += "&directly=yes"
if owner_role:
if args: args += "&" # low level API on the server side will ignore owner_role if owner_user is present, but pass both anyway
args += f"owner_role={owner_role}"
if args:
args = '?' + args
lst = self.get_json("data/namespaces" + args)
for item in lst:
if pattern is None or fnmatch.fnmatch(item["name"], pattern):
yield item
#
# Categiries
#
[docs]
def list_categories(self, root=None):
"""List namespaces
Arguments
---------
root : str
Optional, if present, list only categories under the root
Returns
-------
list
List of dictionaries with category information sorted by category path
"""
lst = self.get_json("data/categories")
if root:
if not root.endswith('.'):
root += '.'
lst = [cat for cat in lst if cat["path"].startswith(root)]
return sorted(lst, key=lambda c: c["path"])
[docs]
def get_category(self, path):
"""Get category information
Returns
-------
dict
A dictionary with category information or None if not found
"""
out = self.get_json(f"data/category/{path}")
return out
#
# Named queries
#
[docs]
def get_named_query(self, namespace, name):
"""Get named query
Arguments
---------
namespace : str
name : str
Returns
-------
dict or None
A dictionary with information about the named query or None if the named query does not exist.
"""
try: data = self.get_json(f"data/named_query?namespace={namespace}&name={name}")
except NotFoundError:
return None
return data
[docs]
def list_named_queries(self, namespace=None):
"""Get multiple named queries
Arguments
---------
namespace : str
optional, if specified the list will include all named queries in the namespace. Orherwise all named queries will be returned
Returns
-------
list
List of dictionaries with information about the named queries.
"""
url = "data/named_queries"
if namespace is not None:
url += f"?namespace={namespace}"
return self.get_json(url)
[docs]
def create_named_query(self, namespace, name, source, parameters=[], update=False):
"""
Arguments
---------
namepsace: str
name: str
Namespace and name to store query under
source:
Query text
parameters:
Values to substitute into query
update: bool
Update the exising named query
Returns
-------
JSON dump of data used to create query
"""
data = dict(namespace=namespace, name=name, source=source, parameters=parameters)
url = "data/create_named_query"
if update: url += "?update=yes"
return self.post_json(url, data)
[docs]
def report_metadata_keys(self):
"""
Arguments
---------
None
Returns
-------
JSON List of all metadata dictionary keys in database
"""
url = "data/report_metadata_keys"
return self.get_json(url)
[docs]
def report_metadata_counts_ranges(self, keylist):
"""
Arguments
---------
keylist: str
comma separated list of metadata keys
Returns
-------
JSON dictionary 3 values for each category and key in the input.
.. code-block:: python
{ "cat.key.min": value, "cat.key.max": value, "cat.key.count": n, ... }
"""
url = "data/report_metadata_counts_ranges?keylist=%s" % ",".join(keylist)
return self.get_json(url)
[docs]
def report_metadata_values(self, key, *args, **kwargs):
"""
Arguments
---------
key: str
Metadata key
Returns
-------
JSON list of all distinct values for key in the various metadata dictionaries.
"""
url = f"data/report_metadata_values?key={key}"
return self.get_json(url)