You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
153 lines
5.1 KiB
153 lines
5.1 KiB
"""Constructs a tree-like object containing the metadata for a given path, and caches said metadata."""
|
|
|
|
import fnmatch
|
|
import logging
|
|
import mimetypes
|
|
import os
|
|
import uuid
|
|
from typing import Any, Dict, List, Optional, Tuple, Union, cast
|
|
|
|
import jstyleson
|
|
|
|
from .utils import guess_mime
|
|
|
|
# setup mimetypes with some extra ones
|
|
mimetypes.init()
|
|
mimetypes.add_type("text/html", "thtml")
|
|
mimetypes.add_type("text/html", "cont")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MetaCacheMiss(Exception):
|
|
"""Raised on cache miss."""
|
|
|
|
|
|
class MetaCache:
|
|
"""This class provides an in-memory cache for metadata tree."""
|
|
|
|
def __init__(self, max_age: float = 200.0):
|
|
"""Initialize the cache.
|
|
|
|
Arguments:
|
|
max_age (int): the number of seconds to age-out cache items
|
|
|
|
"""
|
|
self._max_age = max_age
|
|
self._cache: Dict[str, Tuple[float, Any]] = {}
|
|
|
|
def get(self, key: str, new_time_stamp: float) -> Any:
|
|
"""Get an item from the cache.
|
|
|
|
Arguments:
|
|
key (str): the cache key to retieve
|
|
new_time_stamp (int): The time to use to compare the stored time with
|
|
|
|
Returns:
|
|
:obj:misc: The previously stored value.
|
|
|
|
Raises:
|
|
MetaCacheMiss: on missing key, or on aged out
|
|
|
|
"""
|
|
if key not in self._cache:
|
|
raise MetaCacheMiss("no item for key {}".format(key))
|
|
|
|
if self._cache[key][0] + self._max_age <= new_time_stamp:
|
|
return self._cache[key][1]
|
|
|
|
raise MetaCacheMiss("cache expired for key {}".format(key))
|
|
|
|
def put(self, key: str, value: Union[Dict, List, int, str, object], time_stamp: float) -> None:
|
|
"""Put an item into the cache.
|
|
|
|
Arguments:
|
|
key (str): the key to store the cache item under
|
|
value (:obj:misc): the value to store
|
|
time_stamp (float): the time stamp to store the item under
|
|
|
|
"""
|
|
self._cache[key] = (time_stamp, value)
|
|
|
|
|
|
class MetaTree:
|
|
"""This provides an interface to loading and caching tree metadata for a given directory tree."""
|
|
|
|
def __init__(self, root: str, default_metadata: Optional[Dict] = None):
|
|
"""Initialize the metadata tree object.
|
|
|
|
Arguments:
|
|
root (str): The path to the root of the file tree to operate on.
|
|
default_metadata (dict, optional): The default metadata to apply to the tree
|
|
|
|
"""
|
|
self._cache = MetaCache()
|
|
if default_metadata is None:
|
|
default_metadata = {}
|
|
self._default_metadata = default_metadata
|
|
if root[-1] != "/":
|
|
root += "/"
|
|
self._root = root
|
|
|
|
def get_metadata(self, rel_path: str) -> Dict:
|
|
"""Retrieve the metadata for a given path
|
|
|
|
The general procedure is to iterate the tree, at each level
|
|
load .meta (JSON formatted dictionary) for that level, and
|
|
then finally load the path.meta, and merge these dictionaries
|
|
in descendant order.
|
|
|
|
Arguments:
|
|
rel_path (str): The path to retrieve the metadata for (relative to root)
|
|
|
|
Returns:
|
|
dict: A dictionary of metadata for that path tree.
|
|
|
|
"""
|
|
metablob = dict(self._default_metadata)
|
|
# iterate path components from root to target path
|
|
comps = [self._root] + rel_path.split("/")
|
|
fullpath = ""
|
|
ospath = os.path.join(self._root, rel_path)
|
|
for pth in comps:
|
|
fullpath = os.path.join(fullpath, pth)
|
|
st = os.stat(fullpath)
|
|
|
|
if os.path.isdir(fullpath):
|
|
cachekey = os.path.join(fullpath, ".meta")
|
|
else:
|
|
cachekey = fullpath + ".meta"
|
|
meta = cast(Dict, {})
|
|
try:
|
|
st_meta = os.stat(cachekey)
|
|
meta = self._cache.get(cachekey, st_meta.st_mtime)
|
|
except FileNotFoundError:
|
|
st_meta = None # type: ignore
|
|
except MetaCacheMiss:
|
|
meta = {}
|
|
|
|
if not meta and st_meta:
|
|
meta = jstyleson.load(open(cachekey, "r"))
|
|
self._cache.put(cachekey, meta, st_meta.st_mtime)
|
|
|
|
if fullpath == ospath and "wildcard_metadata" in metablob:
|
|
for wild in metablob["wildcard_metadata"]:
|
|
if fnmatch.fnmatch(pth, wild[0]):
|
|
metablob.update(wild[1])
|
|
|
|
metablob.update(meta)
|
|
|
|
# return final dict
|
|
metablob["dir"], metablob["file_name"] = os.path.split(rel_path)
|
|
metablob["file_path"] = rel_path
|
|
metablob["relpath"] = os.path.relpath("/", "/" + metablob["dir"])
|
|
metablob["uuid"] = uuid.uuid3(uuid.NAMESPACE_OID, metablob["uuid-oid-root"] + ospath)
|
|
metablob["os-path"], _ = os.path.split(fullpath)
|
|
metablob["guessed-type"] = guess_mime(ospath)
|
|
if "mime-type" not in metablob:
|
|
metablob["mime-type"] = metablob["guessed-type"]
|
|
metablob["stat"] = {}
|
|
for stk in ("st_mtime", "st_ctime", "st_atime", "st_mode", "st_size", "st_ino"):
|
|
metablob["stat"][stk.replace("st_", "")] = getattr(st, stk)
|
|
|
|
return metablob
|
|
|