You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
145 lines
4.7 KiB
145 lines
4.7 KiB
import copy
|
|
import datetime
|
|
import glob
|
|
import itertools
|
|
import os
|
|
from typing import Callable, Dict, Iterable, List, Union, cast, Tuple
|
|
|
|
import jstyleson
|
|
|
|
import pytz
|
|
|
|
from .metadata import MetaTree
|
|
from .processchain import ProcessorChains
|
|
from .utils import deep_merge_dicts
|
|
|
|
|
|
def file_list(root: str, listcache: Dict) -> Callable:
|
|
def get_file_list(
|
|
path_glob: Union[str, List[str], Tuple[str]],
|
|
*,
|
|
sort_order: str = "ctime",
|
|
reverse: bool = False,
|
|
limit: int = 0) -> Iterable:
|
|
stattable = cast(List, [])
|
|
if isinstance(path_glob, str):
|
|
path_glob = [path_glob]
|
|
for pglob in path_glob:
|
|
if pglob in listcache:
|
|
stattable.extend(listcache[pglob])
|
|
else:
|
|
for fil in glob.glob(os.path.join(root, pglob)):
|
|
if os.path.isdir(fil):
|
|
continue
|
|
if fil.endswith(".meta") or fil.endswith("~"):
|
|
continue
|
|
st = os.stat(fil)
|
|
stattable.append(
|
|
{
|
|
"file_path": os.path.relpath(fil, root),
|
|
"file_name": os.path.split(fil)[-1],
|
|
"mtime": st.st_mtime,
|
|
"ctime": st.st_ctime,
|
|
"size": st.st_size,
|
|
"ext": os.path.splitext(fil)[1],
|
|
}
|
|
)
|
|
listcache[pglob] = stattable
|
|
ret = sorted(stattable, key=lambda x: x[sort_order], reverse=reverse)
|
|
if limit > 0:
|
|
return itertools.islice(ret, limit)
|
|
return ret
|
|
|
|
return get_file_list
|
|
|
|
|
|
def file_list_hier(root: str, flist: Callable) -> Callable:
|
|
"""Return a callable which, given a directory, will walk the directory and return the files within
|
|
it that match the glob passed."""
|
|
|
|
def get_file_list_hier(path: str, glob: str, *, sort_order: str = "ctime", reverse: bool = False) -> Iterable:
|
|
output = []
|
|
|
|
for pth in os.walk(os.path.join(root, path)):
|
|
output.extend(
|
|
flist(
|
|
os.path.join(os.path.relpath(os.path.realpath(pth[0]), root), glob),
|
|
sort_order=sort_order,
|
|
reverse=reverse,
|
|
)
|
|
)
|
|
|
|
return output
|
|
|
|
return get_file_list_hier
|
|
|
|
|
|
def file_name(root: str, metatree: MetaTree, processor_chains: ProcessorChains, namecache: Dict) -> Callable:
|
|
def get_file_name(file_name: str) -> Dict:
|
|
if file_name in namecache:
|
|
return namecache[file_name]
|
|
metadata = metatree.get_metadata(file_name)
|
|
chain = processor_chains.get_chain_for_filename(os.path.join(root, file_name), ctx=metadata)
|
|
namecache[file_name] = chain.output_filename
|
|
return namecache[file_name]
|
|
|
|
return get_file_name
|
|
|
|
|
|
def file_raw(root: str, contcache: Dict) -> Callable:
|
|
def get_raw(file_name: str) -> str:
|
|
if file_name in contcache:
|
|
return contcache[file_name]
|
|
with open(os.path.join(root, file_name), "r", encoding="utf-8") as f:
|
|
return f.read()
|
|
|
|
return get_raw
|
|
|
|
|
|
def file_json(root: str) -> Callable:
|
|
def get_json(file_name: str, parent: Dict = None) -> Dict:
|
|
outd = {}
|
|
if parent is not None:
|
|
outd = copy.deepcopy(parent)
|
|
|
|
with open(os.path.join(root, file_name), "r", encoding="utf-8") as f:
|
|
return deep_merge_dicts(outd, jstyleson.load(f))
|
|
|
|
return get_json
|
|
|
|
|
|
def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable:
|
|
def get_file_content(file_name: str) -> Iterable:
|
|
if file_name in contcache:
|
|
return contcache[file_name]
|
|
metadata = metatree.get_metadata(file_name)
|
|
chain = processor_chains.get_chain_for_filename(os.path.join(root, file_name), ctx=metadata)
|
|
contcache[file_name] = chain.output
|
|
return str(chain.output)
|
|
|
|
return get_file_content
|
|
|
|
|
|
def file_metadata(metatree: MetaTree) -> Callable:
|
|
def get_file_metadata(file_name: str) -> Dict:
|
|
return metatree.get_metadata(file_name)
|
|
|
|
return get_file_metadata
|
|
|
|
|
|
def time_iso8601(timezone: str) -> Callable:
|
|
tz = pytz.timezone(timezone)
|
|
|
|
def get_time_iso8601(time_t: Union[int, float]) -> str:
|
|
return datetime.datetime.fromtimestamp(time_t, tz).isoformat("T")
|
|
|
|
return get_time_iso8601
|
|
|
|
|
|
def date_iso8601(timezone: str) -> Callable:
|
|
tz = pytz.timezone(timezone)
|
|
|
|
def get_date_iso8601(time_t: Union[int, float]) -> str:
|
|
return datetime.datetime.fromtimestamp(time_t, tz).strftime("%Y-%m-%d")
|
|
|
|
return get_date_iso8601
|
|
|