Major code import.

main
Cas Rusnov 6 years ago
parent 13fb5dac1c
commit 43d40f7fce
  1. 7
      demo/.meta
  2. 33
      demo/atom.xml
  3. 5
      demo/atom.xml.meta
  4. 0
      demo/bar/baz/quux/quuux
  5. 5
      demo/blog_posts/anotherpost.cont
  6. 4
      demo/blog_posts/anotherpost.cont.meta
  7. 1
      demo/blog_posts/test.cont
  8. 4
      demo/blog_posts/test.cont.meta
  9. 1
      demo/foo.cont
  10. 5
      demo/foo.cont.meta
  11. 19
      demo/index.cont
  12. 0
      demo/mapping.json
  13. 9
      demo/passthrough.md
  14. 3
      demo/passthrough.md.meta
  15. 9
      demo/readme.md
  16. 3
      demo/readme.md.meta
  17. 32
      demo/templates/debug.jinja2
  18. 6
      demo/templates/default-fs.jinja2
  19. 13
      demo/templates/default.jinja2
  20. 0
      pixywerk2/__init__.py
  21. 119
      pixywerk2/__main__.py
  22. 104
      pixywerk2/defaults/chains.yaml
  23. 145
      pixywerk2/metadata.py
  24. 180
      pixywerk2/processchain.py
  25. 1
      pixywerk2/processors/__init__.py
  26. 32
      pixywerk2/processors/jinja2.py
  27. 76
      pixywerk2/processors/jinja2_page_embed.py
  28. 68
      pixywerk2/processors/passthrough.py
  29. 1
      pixywerk2/processors/process_less.py
  30. 69
      pixywerk2/processors/process_md.py
  31. 1
      pixywerk2/processors/process_pp.py
  32. 1
      pixywerk2/processors/process_sass.py
  33. 1
      pixywerk2/processors/process_styl.py
  34. 63
      pixywerk2/processors/processors.py
  35. 80
      pixywerk2/template_tools.py
  36. 0
      pixywerk2/tests/unit/__init__.py
  37. 6
      pixywerk2/tests/unit/test_processchain.py
  38. 42
      pixywerk2/utils.py
  39. 58
      setup.py
  40. 22
      tox.ini

@ -0,0 +1,7 @@
{
"site_root":"https://example.com",
"title":"Test Metadata",
"author": "Test User",
"author_email": "test_user@example.com",
"uuid_oid_root": "pixywerk-demo"
}

@ -0,0 +1,33 @@
<?xml version="1.0" encoding="utf-8"?>
<feed xmlns="http://www.w3.org/2005/Atom">
<title>{{ metadata.title }}</title>
<subtitle>{{ metadata.subtitle }}</subtitle>
<link href="{{ metadata.site_root }}/{{ metadata.file_name }}" rel="self" />
<link href="{{ metadata.site_root }}" />
<id>urn:uuid:{{ metadata.uuid }}</id>
<updated>{{ get_time_iso8601(metadata['build-time']) }}</updated>
{% set posts = get_file_list('blog_posts/*.cont') %}
{% for post in posts %}
{% set post_meta = get_file_metadata(post['file_path']) %}
<entry>
<title>{{ post_meta.title }}</title>
<link href="{{ metadata.site_root }}/{{post_meta.file_path}}" />
<id>urn:uuid:{{ post_meta.uuid }}</id>
<updated>{{ get_time_iso8601(post_meta.stat.mtime) }}</updated>
<summary>{{post_meta.summary }}</summary>
<!-- this would be the snippet, more than summary chunk -->
<!-- <content type="xhtml"> -->
<!-- <div xmlns="http://www.w3.org/1999/xhtml"> -->
<!-- <p>{{ post_meta.summary }}</p> -->
<!-- </div> -->
<!-- </content> -->
<author>
<name>{{ post_meta.author }}</name>
<email>{{ post_meta.author_email }}</email>
</author>
</entry>
{% endfor %}
</feed>

@ -0,0 +1,5 @@
{
"type": "templatable",
"title": "Test RSS Feed",
"subtitle": "Some Subtitle"
}

@ -0,0 +1,5 @@
Some more post
la la la

@ -0,0 +1,4 @@
{
"title":"Another Post(tm)",
"summary":"Yet another post"
}

@ -0,0 +1 @@
Some content.

@ -0,0 +1,4 @@
{
"title":"Test.cont",
"summary":"Some empty test content"
}

@ -0,0 +1 @@
yo fresh

@ -0,0 +1,5 @@
{
"foo":"bar",
"title":"A title",
"summary":"Just a post."
}

@ -0,0 +1,19 @@
<h1>Index of all content</h1>
{% for f in get_file_list('*', sort_order='file_name') %}
<a href="{{ get_file_name(f['file_name']) }}">{{get_file_name(f['file_name'])}}</a>
{% endfor %}
<p>Including foo.cont.meta:
<pre>
{{ get_file_content('foo.cont.meta') }}
</pre>
</p>
<h1>Metadata</h1>
<table class="metadata">
<tr><th>key</th><th>value</th></tr>
{% set metadata = get_file_metadata('foo.cont') %}
{% for k in metadata.keys() %}
<tr><td>{{k}}</td><td>{{metadata[k]}}</td></tr>
{% endfor %}
</table>

@ -0,0 +1,9 @@
# README #
This is a test of the emergency compiled HTML system. This is only a *test*.
[Foo!](foo.html)
{% for i in range(100) %}
* {{ i }}
{% endfor %}

@ -0,0 +1,3 @@
{
"pragma":["no-proc"]
}

@ -0,0 +1,9 @@
# README #
This is a test of the emergency compiled HTML system. This is only a *test*.
[Foo!](foo.html)
{% for i in range(100) %}
* {{ i }}
{% endfor %}

@ -0,0 +1,3 @@
{
"title":"Yo, markdown"
}

@ -0,0 +1,32 @@
<!DOCTYPE html>
<head>
<title>Debug for {{path}}</title>
<style type="text/css">
table { border: 1px solid black; }
div { border: 1px solid black; }
td { border: 1px solid black; }
</style>
</head>
<body>
<p>{{path}}</p>
<h1>Content</h1>
<div class="content">
{{content}}
</div>
<h1>Environment</h1>
<table class="environment">
<tr><th>key</th><th>value</th></tr>
{% for k in environ.keys() %}
<tr><td>{{k}}</td><td>{{environ[k]}}</td></tr>
{% endfor %}
</table>
<h1>Metadata</h1>
<table class="metadata">
<tr><th>key</th><th>value</th></tr>
{% for k in metadata.keys() %}
<tr><td>{{k}}</td><td>{{metadata[k]}}</td></tr>
{% endfor %}
</table>
</body>

@ -0,0 +1,6 @@
<table class="werk-file-list">
<tr class="werk-file-list-head"><th>file</th><th>type</th><th>size</th><th>last change</th></tr>
{% for f in files.keys() %}
<tr class="werk-file-list-item"><td><a href="/{{files[f].relpath}}">{{f}}</a></td><td>{{files[f].type}}</td><td>{{files[f].size}}</td><td>{{files[f].ctime | date}}</td></tr>
{% endfor %}
</table>

@ -0,0 +1,13 @@
<!DOCTYPE html>
<head>
<title>{{metadata.title}}</title>
<style type="text/css">
table { border: 1px solid black; }
div { border: 1px solid black; }
td { border: 1px solid black; }
</style>
</head>
<body>
{{content}}
</body>
</html>

@ -0,0 +1,119 @@
# iterate source tree
# create directors in target tree
# for each item:
# run processor(s) on item, each processor could be in a chain or a branch
# Processors also provide filename munging
# output target based on processor output
import argparse
import logging
import os
import sys
import time
from typing import Dict, List, cast
from .processchain import ProcessorChains
from .metadata import MetaTree
from .template_tools import file_list, file_name, file_content, file_metadata, time_iso8601
logger = logging.getLogger()
def setup_logging(verbose: bool = False) -> None:
pass
def get_args(args: List[str]) -> argparse.Namespace:
parser = argparse.ArgumentParser("Compile a Pixywerk directory into an output directory.")
parser.add_argument("root", help="The root of the pixywerk directory to process.")
parser.add_argument("output", help="The output directory to export post-compiled files to.")
parser.add_argument(
"-c", "--clean", help="Remove the target tree before proceeding (by renaming to .bak).", action="store_true"
)
parser.add_argument("-s", "--safe", help="Abort if the target directory already exists.", action="store_true")
parser.add_argument("-t", "--template", help="The template directory (default: root/templates)", default=None)
parser.add_argument("-d", "--dry-run", help="Perform a dry-run.", action="store_true")
parser.add_argument("-v", "--verbose", help="Output verbosely.", action="store_true")
parser.add_argument("--processors", help="Specify a path to a processor configuration file.", default=None)
result = parser.parse_args(args)
# validate arguments
if not os.path.isdir(result.root):
raise FileNotFoundError("can't find root folder {}".format(result.root))
if not result.template:
result.template = os.path.join(result.root, "templates")
result.excludes = [result.template]
return result
def main() -> int:
try:
args = get_args(sys.argv[1:])
except FileNotFoundError as ex:
print("error finding arguments: {}".format(ex))
return 1
setup_logging(args.verbose)
if os.path.exists(args.output) and args.clean:
bak = "{}.bak-{}".format(args.output, int(time.time()))
print("cleaning target {} -> {}".format(args.output, bak))
os.rename(args.output, bak)
process_chains = ProcessorChains(args.processors)
default_metadata = {
"templates": args.template,
"template": "default.jinja2",
"dir-template": "default-dir.jinja2",
"filters": {},
"build-time": time.time(),
"build-datetime": time.ctime(),
"uuid-oid-root": "pixywerk",
}
meta_tree = MetaTree(args.root, default_metadata)
file_list_cache = cast(Dict, {})
file_cont_cache = cast(Dict, {})
file_name_cache = cast(Dict, {})
default_metadata["globals"] = {
"get_file_list": file_list(args.root, file_list_cache),
"get_file_name": file_name(args.root, meta_tree, process_chains, file_name_cache),
"get_file_content": file_content(args.root, meta_tree, process_chains, file_cont_cache),
"get_file_metadata": file_metadata(meta_tree),
"get_time_iso8601": time_iso8601("UTC"),
}
for root, _, files in os.walk(args.root):
workroot = os.path.relpath(root, args.root)
if workroot == ".":
workroot = ""
target_dir = os.path.join(args.output, workroot)
print("mkdir -> {}".format(target_dir))
if not args.dry_run:
try:
os.mkdir(target_dir)
except FileExistsError:
if args.safe:
print("error, target directory exists, aborting")
return 1
for f in files:
# fixme global generic filters
if f.endswith(".meta") or f.endswith("~"):
continue
metadata = meta_tree.get_metadata(os.path.join(workroot, f))
chain = process_chains.get_chain_for_filename(os.path.join(root, f), ctx=metadata)
print("process {} -> {}".format(os.path.join(root, f), os.path.join(target_dir, chain.output_filename)))
if not args.dry_run:
with open(os.path.join(target_dir, chain.output_filename), "w") as outfile:
for line in chain.output:
outfile.write(line)
return 0
if __name__ == "__main__":
sys.exit(main())

@ -0,0 +1,104 @@
# Default: output == input
default:
extension: default
chain:
- passthrough
# Any object that needs jinja scripts but no other explicit processing
templatable:
extension: null
chain:
- jinja2
# Markdown, BBCode and RST are first run through the templater, and then
# they are processed into HTML, and finally embedded in a page template.
markdown:
extension:
- md
chain:
- jinja2
- process_md
- jinja2_page_embed
bbcode:
extension:
- bb
- pp
chain:
- jinja2
- process_pp
- jinja2_page_embed
# FIXME implement RST processor
# restructured:
# extension:
# - rst
# chain:
# - jinja2
# - process_rst
# - jinja2_page_embed
# # JSON and YAML are split, passed through a pretty printer, and then output
# FIXME implement split chain processor, implement processor arguments
# json:
# extension:
# - json
# chain:
# - split (passthrough)
# - pp_json
# yaml:
# extension:
# - yml
# - yaml
# chain:
# - split (passthrough)
# - pp_yaml
# Template-html is first passed through the templater, and then embedded
# in a page template
template-html:
extension:
- thtml
- cont
chain:
- jinja2
- jinja2_page_embed
# Smart CSS are simply converted to CSS.
sass:
extension:
- sass
- scss
chain:
- process_sass
less:
extension:
- less
chain:
- process_less
stylus:
extension:
- styl
chain:
- process_styl
# # Images are processed into thumbnails and sized in addition to being retained as their original
# FIXME implement split chain processor, implement processor arguments,
# image:
# extension:
# - jpg
# - jpeg
# - png
# chain:
# - split (image_bigthumb)
# - split (image_smallthumb)
# - passthrough
# image_bigthumb:
# extension:
# chain:
# - smart_resize (big)
# image_smallthumb:
# extension:
# chain:
# - smart_resize (small)

@ -0,0 +1,145 @@
"""Constructs a tree-like object containing the metadata for a given path, and caches said metadata."""
import logging
import mimetypes
import os
import uuid
from typing import Dict, Optional, Union, List, Tuple, Any, cast
import jstyleson
from .utils import guess_mime
# setup mimetypes with some extra ones
mimetypes.init()
mimetypes.add_type("text/html", "thtml")
mimetypes.add_type("text/html", "cont")
logger = logging.getLogger(__name__)
class MetaCacheMiss(Exception):
"""Raised on cache miss."""
class MetaCache:
"""This class provides an in-memory cache for metadata tree."""
def __init__(self, max_age: float = 200.0):
"""Initialize the cache.
Arguments:
max_age (int): the number of seconds to age-out cache items
"""
self._max_age = max_age
self._cache: Dict[str, Tuple[float, Any]] = {}
def get(self, key: str, new_time_stamp: float) -> Any:
"""Get an item from the cache.
Arguments:
key (str): the cache key to retieve
new_time_stamp (int): The time to use to compare the stored time with
Returns:
:obj:misc: The previously stored value.
Raises:
MetaCacheMiss: on missing key, or on aged out
"""
if key not in self._cache:
raise MetaCacheMiss("no item for key {}".format(key))
if self._cache[key][0] + self._max_age <= new_time_stamp:
return self._cache[key][1]
raise MetaCacheMiss("cache expired for key {}".format(key))
def put(self, key: str, value: Union[Dict, List, int, str, object], time_stamp: float) -> None:
"""Put an item into the cache.
Arguments:
key (str): the key to store the cache item under
value (:obj:misc): the value to store
time_stamp (float): the time stamp to store the item under
"""
self._cache[key] = (time_stamp, value)
class MetaTree:
"""This provides an interface to loading and caching tree metadata for a given directory tree."""
def __init__(self, root: str, default_metadata: Optional[Dict] = None):
"""Initialize the metadata tree object.
Arguments:
root (str): The path to the root of the file tree to operate on.
default_metadata (dict, optional): The default metadata to apply to the tree
"""
self._cache = MetaCache()
if default_metadata is None:
default_metadata = {}
self._default_metadata = default_metadata
if root[-1] != "/":
root += "/"
self._root = root
def get_metadata(self, rel_path: str) -> Dict:
"""Retrieve the metadata for a given path
The general procedure is to iterate the tree, at each level
m load .meta (JSON formatted dictionary) for that level, and
then finally load the path.meta, and merge these dictionaries
in descendant order.
Arguments:
rel_path (str): The path to retrieve the metadata for (relative to root)
Returns:
dict: A dictionary of metadata for that path tree.
"""
metablob = dict(self._default_metadata)
# iterate path components from root to target path
comps = [self._root] + rel_path.split("/")
fullpath = ""
for pth in comps:
fullpath = os.path.join(fullpath, pth)
st = os.stat(fullpath)
cachekey = fullpath + ".meta"
meta = cast(Dict, {})
try:
st_meta = os.stat(cachekey)
meta = self._cache.get(cachekey, st_meta.st_mtime)
except FileNotFoundError:
st_meta = None # type: ignore
except MetaCacheMiss:
meta = {}
if not meta and st_meta:
meta = jstyleson.load(open(cachekey, "r"))
self._cache.put(cachekey, meta, st_meta.st_mtime)
metablob.update(meta)
# return final dict
metablob["dir"], metablob["file_name"] = os.path.split(rel_path)
metablob["file_path"] = rel_path
metablob["uuid"] = uuid.uuid3(
uuid.NAMESPACE_OID, metablob["uuid-oid-root"] + os.path.join(self._root, rel_path)
)
metablob["os-path"], _ = os.path.split(fullpath)
metablob["guessed-type"] = guess_mime(os.path.join(self._root, rel_path))
if "mime-type" not in metablob:
metablob["mime-type"] = metablob["guessed-type"]
metablob["stat"] = {}
for stk in ("st_mtime", "st_ctime", "st_atime", "st_mode", "st_size", "st_ino"):
metablob["stat"][stk.replace("st_", "")] = getattr(st, stk)
return metablob

@ -0,0 +1,180 @@
"""Interface for chains of processors"""
import os
import os.path
import random
from typing import List, Iterable, Optional, Any, Dict, Type, cast
import yaml
from .processors.processors import Processor
class ProcessorChain:
"""This implements a wrapper for an arbitrary set of processors and an associated file stream."""
def __init__(
self,
processors: List[Processor],
file_name: str,
file_data: Iterable[str],
file_type: str,
ctx: Optional[Dict] = None,
):
"""Initialize the processing stream.
Arguments:
processors (list): A list of processor objects.
file_data (Iterable): An iterable from which to retrieve the input
file_type (str): the specified file type for consumer information.
"""
self._processors = processors
self._file_data = file_data
self._file_type = file_type
self._file_name = file_name
self._ctx: Dict = {}
if ctx is not None:
self._ctx = cast(Dict, ctx)
@property
def output(self) -> Iterable:
"""Return an iterable for the output of the process chain
Returns:
:obj:'iterable': the iterable
"""
prev = self._file_data
for processor in self._processors:
if processor:
prev = processor.process(prev, self._ctx)
return prev
@property
def output_mime(self) -> str:
"""Return the post-processed MIME value from the processing chain
Returns:
str: the mime type
"""
fname = self._file_name
for processor in self._processors:
fname = processor.mime_type(fname, self._ctx)
return fname
@property
def output_ext(self) -> str:
"""Return the post-processed extension from the processing chain
Returns:
str: the extension
"""
fname = self._file_name
for processor in self._processors:
fname = processor.extension(fname, self._ctx)
return fname
@property
def output_filename(self) -> str:
"""Return the post-processed filename from the processing chain
Returns:
str: the new filename
"""
fname = os.path.basename(self._file_name)
for processor in self._processors:
fname = processor.filename(fname, self._ctx)
return fname
class ProcessorChains:
"""Load a configuration for processor chains, and provide ability to process the chains given a particular input
file.
"""
def __init__(self, config: Optional[str] = None):
"""Initialize, with a specified configuration file
Arguments:
config (str, optional): The path to a yaml formatted configuration file.
"""
if config is None: # pragma: no coverage
config = os.path.join(os.path.dirname(__file__), "defaults", "chains.yaml")
self.chainconfig = yaml.load(open(config, "r"))
self.extensionmap: Dict[str, Any] = {}
self.processors: Dict[str, Type[Processor]] = {}
for ch, conf in self.chainconfig.items():
if conf["extension"] == "default":
self.default = ch
else:
if conf["extension"]:
for ex in conf["extension"]:
if ex in self.extensionmap or ex is None:
# log an error or except or something we'll just override for now.
pass
self.extensionmap[ex] = ch
for pr in conf["chain"]:
if pr in self.processors:
continue
processor_module = __import__("processors", globals(), locals(), [pr], 1)
self.processors[pr] = processor_module.__dict__[pr].processor
def get_chain_for_filename(self, filename: str, ctx: Optional[Dict] = None) -> ProcessorChain:
"""Get the ProcessorChain, as configured for a given file by extension.
Arguments:
filename (str): The name of the file to get a chain for.
Returns:
ProcessorChain: the constructed processor chain.
"""
r = filename.rsplit(".", 1)
ftype = "default"
if r:
ftype = r[-1]
if ctx and "pragma" in ctx:
if "no-proc" in ctx["pragma"]:
ftype = "default"
if ctx and "type" in ctx:
ftype = ctx["type"]
return self.get_chain_for_file(open(filename, "r"), ftype, filename, ctx)
def get_chain_for_file(
self, file_obj: Iterable, file_ext: str, file_name: Optional[str] = None, ctx: Optional[Dict] = None
) -> ProcessorChain:
"""Get the ProcessorChain for a given iterable object based on the specified file type
Arguments:
file_obj (:obj:`iterable`): The input file stream
file_ext (str): The type (extension) of the input stream
Returns:
ProcessorChain: the constructed processor chain.
"""
if file_ext not in self.extensionmap or not self.extensionmap[file_ext]:
if file_ext in self.chainconfig:
file_type = file_ext
else:
file_type = "default"
else:
file_type = self.extensionmap[file_ext]
if not (bool(file_name)):
file_name = hex(random.randint(0, 65536))
return ProcessorChain(
[self.processors[x]() for x in self.chainconfig[file_type]["chain"]],
cast(str, file_name),
file_obj,
file_type,
ctx,
)

@ -0,0 +1 @@
# processors metadata here

@ -0,0 +1,32 @@
"""Define a Jinja2 Processor which applies programmable templating to the input stream."""
from typing import Iterable, Optional, Dict, cast
from jinja2 import Environment, FileSystemLoader
from .passthrough import PassThrough
class Jinja2(PassThrough):
"""Pass the input stream through Jinja2 for scritable templating."""
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
"""Return an iterable object of the post-processed file.
Arguments:
input_file (iterable): An input stream
ctx (dict, optional): A context object generated from the processor configuration
Returns:
iterable: The post-processed output stream
"""
ctx = cast(Dict, ctx)
template_env = Environment(loader=FileSystemLoader(ctx["templates"]))
template_env.globals.update(ctx["globals"])
template_env.filters.update(ctx["filters"])
tmpl = template_env.from_string("".join([x for x in input_file]))
return tmpl.render(metadata=ctx)
processor = Jinja2

@ -0,0 +1,76 @@
"""Define a Jinja2 processor which embeds the (presumably HTML) input stream into a Page Template
as defined in the ctx metadata (the ``content`` variable is assigned to the input stream and
the target template is rendered)."""
import os
from typing import Iterable, Optional, Dict, cast
from jinja2 import Environment, FileSystemLoader
from .processors import Processor
class Jinja2PageEmbed(Processor):
"""Embed input stream as ``content`` variable in page template defined in context key ``template``."""
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the filename of the post-processed file.
Arguments:
oldname (str): the previous name for the file.
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new name for the file
"""
return os.path.splitext(oldname)[0] + ".html"
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new mimetype of the file after processing
"""
return "text/html"
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
"""Return an iterable object of the post-processed file.
Arguments:
input_file (iterable): An input stream
ctx (dict, optional): A context object generated from the processor configuration
Returns:
iterable: The post-processed output stream
"""
ctx = cast(Dict, ctx)
template_env = Environment(loader=FileSystemLoader(ctx["templates"]))
template_env.globals.update(ctx["globals"])
template_env.filters.update(ctx["filters"])
tmpl = template_env.get_template(ctx["template"])
content = "".join([x for x in input_file])
return tmpl.render(content=content, metadata=ctx)
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new extension of the file after processing
"""
return "html"
processor = Jinja2PageEmbed

@ -0,0 +1,68 @@
"""Passthrough progcessor which takes input and returns it."""
import os
from .processors import Processor
from ..utils import guess_mime
from typing import Iterable, Optional, Dict, cast
class PassThrough(Processor):
"""A simple passthrough processor that takes input and sends it to output."""
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the filename of the post-processed file.
Arguments:
oldname (str): the previous name for the file.
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new name for the file
"""
return oldname
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new mimetype of the file after processing
"""
result = cast(str, guess_mime(oldname))
if result == "directory":
result = "DIR"
return result
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
"""Return an iterable object of the post-processed file.
Arguments:
input_file (iterable): An input stream
ctx (dict, optional): A context object generated from the processor configuration
Returns:
iterable: The post-processed output stream
"""
return input_file
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new extension of the file after processing
"""
return os.path.splitext(oldname)[-1]
processor = PassThrough

@ -0,0 +1,69 @@
"""Convert an MD stream into an HTML stream"""
import io
import os
from typing import Iterable, Optional, Dict
import markdown
from .processors import Processor
class MarkdownProcessor(Processor):
"""Convert an MD stream into an HTML stream"""
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the filename of the post-processed file.
Arguments:
oldname (str): the previous name for the file.
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new name for the file
"""
return os.path.splitext(oldname)[0] + ".html"
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new mimetype of the file after processing
"""
return "text/html"
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new extension of the file after processing
"""
return "html"
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
"""Return an iterable object of the post-processed file.
Arguments:
input_file (iterable): An input stream
ctx (dict, optional): A context object generated from the processor configuration
Returns:
iterable: The post-processed output stream
"""
md = u"".join([x for x in input_file])
return io.StringIO(markdown.markdown(md, extensions=["extra", "admonition", "wikilinks"]))
processor = MarkdownProcessor # pylint: disable=invalid-name

@ -0,0 +1,63 @@
import abc
from typing import Iterable, Optional, Dict
class ProcessorException(Exception): # pragma: no cover
"""A base exception class to be used by processor objects."""
class Processor(abc.ABC): # pragma: no cover
def __init__(self, *args, **kwargs):
"""Initialize the class."""
@abc.abstractmethod
def filename(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the filename of the post-processed file.
Arguments:
oldname (str): the previous name for the file.
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new name for the file
"""
@abc.abstractmethod
def mime_type(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new mimetype of the file after processing
"""
@abc.abstractmethod
def extension(self, oldname: str, ctx: Optional[Dict] = None) -> str:
"""Return the mimetype of the post-processed file.
Arguments:
oldname (str): the input filename
ctx (dict, optional): A context object generated from the processor configuration
Returns:
str: the new extension of the file after processing
"""
@abc.abstractmethod
def process(self, input_file: Iterable, ctx: Optional[Dict] = None) -> Iterable:
"""Return an iterable object of the post-processed file.
Arguments:
input_file (iterable): An input stream
ctx (dict, optional): A context object generated from the processor configuration
Returns:
iterable: The post-processed output stream
"""

@ -0,0 +1,80 @@
import datetime
import glob
import itertools
import os
import pytz
from typing import Callable, Dict, List, Iterable, Union, cast
from .metadata import MetaTree
from .processchain import ProcessorChains
def file_list(root: str, listcache: Dict) -> Callable:
def get_file_list(path_glob: str, *, sort_order: str = "ctime", reverse: bool = False, limit: int = 0) -> Iterable:
stattable = cast(List, [])
if path_glob in listcache:
stattable = listcache[path_glob]
else:
for fil in glob.glob(os.path.join(root, path_glob)):
if os.path.isdir(fil):
continue
if fil.endswith(".meta") or fil.endswith("~"):
continue
st = os.stat(fil)
stattable.append(
{
"file_path": os.path.relpath(fil, root),
"file_name": os.path.split(fil)[-1],
"mtime": st.st_mtime,
"ctime": st.st_ctime,
"size": st.st_size,
"ext": os.path.splitext(fil)[1],
}
)
listcache[path_glob] = stattable
ret = sorted(stattable, key=lambda x: x[sort_order], reverse=reverse)
if limit > 0:
return itertools.islice(ret, limit)
return ret
return get_file_list
def file_name(root: str, metatree: MetaTree, processor_chains: ProcessorChains, namecache: Dict) -> Callable:
def get_file_name(file_name: str) -> Dict:
if file_name in namecache:
return namecache[file_name]
metadata = metatree.get_metadata(file_name)
chain = processor_chains.get_chain_for_filename(os.path.join(root, file_name), ctx=metadata)
namecache[file_name] = chain.output_filename
return namecache[file_name]
return get_file_name
def file_content(root: str, metatree: MetaTree, processor_chains: ProcessorChains, contcache: Dict) -> Callable:
def get_file_content(file_name: str) -> Iterable:
if file_name in contcache:
return contcache[file_name]
metadata = metatree.get_metadata(file_name)
chain = processor_chains.get_chain_for_filename(os.path.join(root, file_name), ctx=metadata)
contcache[file_name] = chain.output
return chain.output
return get_file_content
def file_metadata(metatree: MetaTree) -> Callable:
def get_file_metadata(file_name: str) -> Dict:
return metatree.get_metadata(file_name)
return get_file_metadata
def time_iso8601(timezone: str) -> Callable:
tz = pytz.timezone(timezone)
def get_time_iso8601(time_t: Union[int, float]) -> str:
return datetime.datetime.fromtimestamp(time_t, tz).isoformat("T")
return get_time_iso8601

@ -0,0 +1,6 @@
class TestProcessChain:
def test_process_chain(self):
pass
def test_processor_chain(self):
pass

@ -0,0 +1,42 @@
import mimetypes
import os
from typing import Dict, Optional
def merge_dicts(dict_a: Dict, dict_b: Dict) -> Dict:
"""Merge two dictionaries.
Arguments:
dict_a (dict): The dictionary to use as the base.
dict_b (dict): The dictionary to update the values with.
Returns:
dict: A new merged dictionary.
"""
dict_z = dict_a.copy()
dict_z.update(dict_b)
return dict_z
def guess_mime(path: str) -> Optional[str]:
"""Guess the mime type for a given path.
Arguments:
root (str): the root path of the file tree
path (str): the sub-path within the file tree
Returns:
str: the guessed mime-type
"""
mtypes = mimetypes.guess_type(path)
ftype = None
if os.path.isdir(path):
ftype = "directory"
elif os.access(path, os.F_OK) and mtypes[0]:
ftype = mtypes[0]
else:
ftype = "application/octet-stream"
return ftype

@ -0,0 +1,58 @@
"""Package configuration."""
from setuptools import find_packages, setup
LONG_DESCRIPTION = """Pixywerk 2 is a DWIM filesystem based static site generator."""
INSTALL_REQUIRES = ["yaml-1.3", "markdown", "jstyleson", "jinja2"]
# Extra dependencies
EXTRAS_REQUIRE = {
# Test dependencies
"tests": [
"black",
"bandit>=1.1.0",
"flake8>=3.2.1",
"mypy>=0.470",
"prospector[with_everything]>=0.12.4",
"pytest-cov>=1.8.0",
"pytest-xdist>=1.15.0",
"pytest>=3.0.3",
"sphinx_rtd_theme>=0.1.6",
"sphinx-argparse>=0.1.15",
"Sphinx>=1.4.9",
]
}
SETUP_REQUIRES = ["pytest-runner>=2.7.1", "setuptools_scm>=1.15.0"]
setup(
author="Cassowary Rusnov",
author_email="rusnovn@gmail.com",
classifiers=[
"Development Status :: 1 - Pre-alpha",
"Environment :: Console",
"License :: OSI Approved :: MIT",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3 :: Only",
"Topic :: Software Development :: Libraries :: Python Modules",
],
description="A filesystem-based DWIM website generator / CMS",
# entry_points={
# 'console_scripts': [
# 'cookbook = spicerack.cookbook:main',
# ],
# },
extras_require=EXTRAS_REQUIRE,
install_requires=INSTALL_REQUIRES,
keywords=["wmf", "automation", "orchestration"],
license="MIT",
long_description=LONG_DESCRIPTION,
name="pixywerk2", # Must be the same used for __version__ in __init__.py
packages=find_packages(exclude=["*.tests", "*.tests.*"]),
platforms=["GNU/Linux"],
setup_requires=SETUP_REQUIRES,
use_scm_version=True,
url="https://git.antpanethon.com/cas/pixywerk2",
zip_safe=False,
)

@ -0,0 +1,22 @@
[tox]
envlist=py{36,37}-{code-quality, unit} #, py37-sphinx
skipsdist = true
[testenv]
setenv =
LANG = en_US.UTF-8
deps = .[tests]
commands =
unit: py.test --strict --cov-report=term-missing --cov=pixywerk2 pixywerk2/tests/unit {posargs}
code-quality: flake8 pixywerk2
code-quality: black -l 120 --check pixywerk2
code-quality: - prospector -A
code-quality: - mypy --ignore-missing-imports pixywerk2
# sphinx: python setup.py build_sphinx -b html
# sphinx: python setup.py build_sphinx -b man
basepython =
py36: python3.6
py37: python3.7
[flake8]
max-line-length = 120