Major update to codebase.

* Added LICENSE file
* Bumped version to 0.1.1
* Automated reformatting of codebase.
* Build in chainer joiners in utils
* Rejigger __main__ to make a bit more sense code wise.
* Add type annotations to most things.
* rejigger distribution files and add pyproject.toml.
* Add some typing stubs for external deps.
main
Cassowary 2 years ago
parent e473f20e29
commit 239d09d628
  1. 11
      LICENSE
  2. 2
      carkov/__init__.py
  3. 181
      carkov/__main__.py
  4. 26
      carkov/abstracts.py
  5. 10
      carkov/analyze/abstract.py
  6. 6
      carkov/analyze/english.py
  7. 15
      carkov/chain.py
  8. 23
      carkov/filters.py
  9. 37
      carkov/pydumper.py
  10. 31
      carkov/serializer.py
  11. 34
      carkov/utils.py
  12. 3
      pyproject.toml
  13. 40
      setup.cfg
  14. 2
      setup.py
  15. 17
      tox.ini
  16. 4
      typehints/msgpack.pyi
  17. 4
      typehints/nltk.pyi

@ -0,0 +1,11 @@
Copyright 2021 Aldercone Studio
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

@ -4,4 +4,4 @@
# This is free software, see the included LICENSE for terms and conditions.
#
version = '0.0.1'
version = '0.1.1'

@ -9,38 +9,36 @@
import argparse
import enum
import pathlib
import random
import sys
import traceback
from random import Random
from typing import cast
from .analyze.abstract import AbstractAnalyzer
from .analyze.english import English
from .analyze.words import Words
from .chain import Chain, from_analyzer
from .serializer import dump_chainer, load_chainer
from .utils import make_sent, make_word
ERROR_NO_FILE = 1
ERROR_EXISTING_OUTPUT = 2
ERROR_WRONG_ORDER = 3
ERROR_WRONG_CLASS = 4
ERROR_NO_FILE=1
ERROR_EXISTING_OUTPUT=2
ERROR_WRONG_ORDER=3
ERROR_WRONG_CLASS=4
class AnalyzeMode(enum.Enum):
english = "english"
word = "word"
def __str__(self):
def __str__(self) -> str:
return self.value
def word_joiner(ar):
return "".join(ar)
def english_joiner(ar):
return " ".join(ar)
JOINERS = {"Words": make_word, "English": make_sent}
JOINERS={"Words":word_joiner, "English":english_joiner}
def parse_arguments():
def parse_arguments() -> argparse.Namespace:
parser = argparse.ArgumentParser(prog="python -mcarkov",
description=("Process a text corpus in a markov chain fashion and/or output from"
"an analysis."))
@ -49,9 +47,9 @@ def parse_arguments():
analyze_sub.add_argument('output', help="Output chain to specified destination", type=pathlib.Path)
analyze_sub.add_argument('input', help="The corpus to analyze", type=pathlib.Path, nargs='+')
overappend = analyze_sub.add_mutually_exclusive_group()
overappend.add_argument('-o','--overwrite', help='Overwrite output file.', action='store_true')
overappend.add_argument('-o', '--overwrite', help='Overwrite output file.', action='store_true')
overappend.add_argument('-a', '--append', help='Append output file.', action='store_true')
analyze_sub.add_argument('-w','--window', help='Select length of analysis window', type=int, default=2)
analyze_sub.add_argument('-w', '--window', help='Select length of analysis window', type=int, default=2)
analyze_sub.add_argument('-m',
'--mode',
help='Select analyzis mode',
@ -68,73 +66,98 @@ def parse_arguments():
return parser.parse_args()
def main():
args = parse_arguments()
r = random.Random()
if args.command == 'analyze':
if not any(x.exists() for x in args.input):
print("Must specify an existing file as input for the analyzer.")
return ERROR_NO_FILE
if args.output.exists() and not (args.overwrite or args.append):
print("Output file exists, pass --overwrite to overwrite or --append to add to exsiting analysis.")
return ERROR_EXISTING_OUTPUT
if args.mode == AnalyzeMode.english:
analyzer = English(args.window)
# we just dump a whole file into the english analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
analyzer.analyze(inp.read_text('utf-8'))
else:
analyzer = Words(args.window)
# we do line-by-line single word dumps into the word analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
with inp.open('r') as words:
for line in words:
analyzer.analyze(line.strip().lower())
if args.append:
# in append mode we load an existing chain file, and then run the analyzer and merge the contents.
with open(args.output, 'rb') as old:
chainer = load_chainer(old)
if chainer.order != analyzer.order:
print("Append chainer order isn't the same as the analyzer order.")
return ERROR_WRONG_ORDER
if chainer.analyzer_class != analyzer.__class__.__name__:
print("Append chainer class isn't the same as analyzer class.")
return ERROR_WRONG_CLASS
chainer.integrate(analyzer)
else:
chainer = from_analyzer(analyzer)
with open(args.output, 'wb') as output:
dump_chainer(chainer, output)
print(f"Wrote chainer to {args.output}")
if args.test:
for _ in range(0,5):
print(JOINERS[chainer.analyzer_class](chainer.walk(r, True)))
def print_chainer_output(chainer: Chain, random_state: Random):
if chainer.analyzer_class in JOINERS:
print(JOINERS[cast(str, chainer.analyzer_class)](chainer.walk(random_state, True)))
else:
print(chainer.walk(random_state, True))
def command_analyze(args: argparse.Namespace) -> int:
if not any(x.exists() for x in args.input):
print("Must specify an existing file as input for the analyzer.")
return ERROR_NO_FILE
if args.output.exists() and not (args.overwrite or args.append):
print("Output file exists, pass --overwrite to overwrite or --append to add to exsiting analysis.")
return ERROR_EXISTING_OUTPUT
analyzer: AbstractAnalyzer
if args.mode == AnalyzeMode.english:
analyzer = English(args.window)
# we just dump a whole file into the english analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
analyzer.analyze(inp.read_text('utf-8'))
else:
if not args.input.exists():
print("Must specify a chain file to load.")
return ERROR_NO_FILE
analyzer = Words(args.window)
# we do line-by-line single word dumps into the word analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
with inp.open('r') as words:
for line in words:
analyzer.analyze(line.strip().lower())
if args.append:
# in append mode we load an existing chain file, and then run the analyzer and merge the contents.
with open(args.output, 'rb') as old:
chainer = load_chainer(old)
if chainer.order != analyzer.order:
print("Append chainer order isn't the same as the analyzer order.")
return ERROR_WRONG_ORDER
if chainer.analyzer_class != analyzer.__class__.__name__:
print("Append chainer class isn't the same as analyzer class.")
return ERROR_WRONG_CLASS
chainer.integrate(analyzer.chain_counts)
else:
chainer = from_analyzer(analyzer)
with open(args.output, 'wb') as output:
dump_chainer(chainer, output)
print(f"Wrote chainer to {args.output}")
if args.test:
r = Random()
for _ in range(0, 5):
print_chainer_output(chainer, r)
return 0
with args.input.open('rb') as inp:
chainer = load_chainer(inp)
if args.count < 1:
args.count = 1
def command_chain(args: argparse.Namespace) -> int:
r = Random()
for _ in range(0, args.count):
print(JOINERS[chainer.analyzer_class](chainer.walk(r, True)))
if not args.input.exists():
print("Must specify a chain file to load.")
return ERROR_NO_FILE
with args.input.open('rb') as inp:
chainer = load_chainer(inp)
if args.count < 1:
args.count = 1
for _ in range(0, args.count):
print_chainer_output(chainer, r)
return 0
def main() -> int:
args = parse_arguments()
if args.command == 'analyze':
return command_analyze(args)
elif args.command == 'chain':
return command_chain(args)
else:
print("Expect a command `analyze` or `chain`. See --help for details.")
return 1
return 0

@ -9,36 +9,46 @@ This module provides a few utility objects, especially the Abstract object which
and other abstract tokens.
"""
class CarkovFilterException(BaseException):
pass
"""
Base exception for filter stages.
"""
class Abort(CarkovFilterException):
"""
This exception is intended for a protocol by which filters can abort a particular token from being added to the
stream.
"""
...
class AbortSegment(CarkovFilterException):
"""
This exception is intended for a protocol by which filters can abort an entire segment if a particular token would
be rejected.
"""
...
class Abstract:
"""
This is used as a way to indicate abstract tokens in a stream of tokens.
"""
def __init__(self, name):
def __init__(self, name: str):
self.name = name
def __repl__(self):
return f"<Abstract: {self.name}>"
def __repr__(self) -> str:
if self == NUMBER:
return 'NUMBER'
elif self == TERMINAL:
return 'TERMINAL'
return f"carkov.abstracts.Abstract({self.name})"
"""A universal Number abstract."""
NUMBER = Abstract("NUMBER")
"""A universal Number abstract."""
"""A Universal Terminal abostract."""
TERMINAL = Abstract("TERMINAL")
"""A Universal Terminal abostract."""

@ -1,3 +1,9 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
from abc import ABC, abstractmethod
from collections import deque, defaultdict
@ -9,7 +15,7 @@ This module defines the base class for analyzers which do basic statistical anal
"""
class AbstractAnalyzer:
class AbstractAnalyzer(ABC):
def __init__(self, order, filters=None):
"""
Initialize the analyzer.
@ -25,7 +31,6 @@ class AbstractAnalyzer:
self.tokens = {}
self.chain_counts = {}
def analyze(self, corpus):
"""
Analyze a corpus and integrate the data into the internal state.
@ -70,7 +75,6 @@ class AbstractAnalyzer:
Returns:
(array of abstract): An array of segments that this class is prepared to process.
"""
...
def analyze_segment(self, segment):
"""

@ -1,3 +1,9 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
import nltk
from .abstract import AbstractAnalyzer

@ -11,12 +11,17 @@ and provides convenience functions for walking the chain.
from collections import UserDict, deque
from typing import Any, Dict, Tuple, Optional
from .abstracts import TERMINAL
from .analyze.abstract import AbstractAnalyzer
from .utils import merge_stats, convert_counts, weighted_stat_choice
def from_analyzer(analyzer):
ChainType = Dict[Tuple[Any], Any]
def from_analyzer(analyzer: AbstractAnalyzer):
"""
Static initializer: Return a chainer with parameters and contents based on an analyzer instance.
"""
@ -26,7 +31,7 @@ def from_analyzer(analyzer):
class Chain(UserDict):
def __init__(self, order, analyzer_class=None):
def __init__(self, order: int, analyzer_class: Optional[str] = None):
"""
Initialize Chain class
@ -34,11 +39,11 @@ class Chain(UserDict):
order: The window size of this chainer.
"""
self.order = order
self.data = {}
self.data: ChainType = {}
self.start_token = (None, ) * self.order
self.analyzer_class = analyzer_class
def integrate(self, counts):
def integrate(self, counts: ChainType):
"""
Accept a counts dictionary and merge it with local data and recalculate statistical relationships between
outcomes. The counts must be from an analyzer of the same order.
@ -116,7 +121,7 @@ class Chain(UserDict):
Returns:
A token
"""
if not parent_token in self.data:
if parent_token not in self.data:
return None
if weighted:
return weighted_stat_choice(random_generator, self.data[parent_token])

@ -8,23 +8,28 @@
Various filter functions that may be useful for processing certain kinds of corpora.
"""
from unidecode import unidecode
from typing import Optional
# from unidecode import unidecode # fixme asciifying filter
# All of these filters operate on string tokens
def str_abstractize_numbers(token):
def str_abstractize_numbers(token: str) -> Optional[str]:
"""Replace all numbers with a Number abstract."""
pass
return None
def str_abstractize_roman(token):
def str_abstractize_roman(token: str) -> Optional[str]:
"""Replace roman numerals with a Number abstract."""
pass
return None
def str_strip_punct(token):
def str_strip_punct(token: str) -> Optional[str]:
"""Remove any punctuation characters."""
pass
return None
def str_asciify(token):
def str_asciify(token: str) -> Optional[str]:
"""Convert all characters to an ascii approximation."""
pass
return None

@ -0,0 +1,37 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Serialize chain as a python structure.
"""
from io import TextIOBase
from . import version
from .chain import Chain
template = """
# serialized from version {version}
def get_chainer():
from carkov.chain import Chain
from carkov.abstracts import NUMBER, TERMINAL, Abstract
chain = Chain({order}, "{analyzer}")
chain.data = {data}
return chain
"""
def dump_chainer(chain: Chain, outfile: TextIOBase):
"""
Serialize a chainer to an open IO stream
Arguments:
chain: A Chain object
outfile: An open IO stream in text mode that will be writen to
"""
outfile.write(template.format(version=version,
order=chain.order,
analyzer=chain.analyzer_class,
data=repr(chain.data).replace(']},', ']},\n')))

@ -7,39 +7,42 @@
"""
Use msgpack to serialize a chainer to disk and then reread it from a serialized file.
"""
from . import version
from .abstracts import Abstract, TERMINAL, NUMBER
from .chain import Chain
from typing import Any, BinaryIO, Dict, Tuple, cast
import msgpack
from . import version
from .abstracts import NUMBER, TERMINAL, Abstract
from .chain import Chain
def _unserialize_encode_helper(obj):
def _unserialize_encode_helper(obj: Dict) -> Any:
"""
This is a helper function which handles Abstract objects for serialization.
"""
if '$$####$$' in obj:
val: Abstract
if obj['n'] == 'TERMINAL':
obj = TERMINAL
val = TERMINAL
elif obj['n'] == 'NUMBER':
obj = NUMBER
val = NUMBER
else:
obj = Abstract(obj['n'])
val = Abstract(obj['n'])
return val
return obj
def _serialize_encode_helper(obj):
def _serialize_encode_helper(obj: Any) -> Any:
"""
This is a helper function which handles Abstract objects for serialization.
"""
if isinstance(obj, Abstract):
obj = {'$$####$$': True, 'n': obj.name}
obj = {'$$####$$': True, 'n': cast(Abstract, obj).name}
return obj
def load_chainer(infile):
def load_chainer(infile: BinaryIO) -> Chain:
"""
Unserialize a chainer from an open IO stream
@ -54,11 +57,11 @@ def load_chainer(infile):
import warnings
warnings.warn(f"Version mismatch while loading chain expect: [{version}] got: [{serialdict['version']}]")
chain = Chain(serialdict['order'], serialdict['analyzer_class'])
chain.data = dict([(tuple(x), y) for x, y in serialdict['data']])
chain.data = {cast(Tuple[Any], tuple(x)): y for x, y in serialdict['data']}
return chain
def dump_chainer(chain: Chain, outfile):
def dump_chainer(chain: Chain, outfile: BinaryIO):
"""
Serialize a chainer to an open IO stream
@ -66,7 +69,7 @@ def dump_chainer(chain: Chain, outfile):
chain: A Chain object
outfile: An open IO stream in binary mode that will be writen to
"""
serialdict = {}
serialdict: Dict[str, Any] = {}
serialdict['version'] = version
serialdict['order'] = chain.order
serialdict['analyzer_class'] = chain.analyzer_class

@ -8,10 +8,14 @@
Various utilities the chainers and analyzers use.
"""
import string
from bisect import bisect
from typing import Dict, Callable, List, Sequence
from random import Random
def merge_dict(into, outof, mergefunction = lambda x, y: x + y):
def merge_dict(into: Dict, outof: Dict, mergefunction: Callable = lambda x, y: x + y) -> Dict:
"""
Given two dictionries of dictionaries, merge them together by applying the mergefunction to the
values of the second level dictionary.
@ -37,7 +41,7 @@ def merge_dict(into, outof, mergefunction = lambda x, y: x + y):
return into
def convert_counts(ind):
def convert_counts(ind: Dict) -> Dict:
"""
Convert counts produced by analyzers into the statistics counts used by chainers.
@ -54,7 +58,7 @@ def convert_counts(ind):
return out
def merge_stats(into, outof):
def merge_stats(into: Dict, outof: Dict) -> Dict:
"""
Perform a merge_dict in a way safe for the statistics dictionaries used by chainers.
@ -74,7 +78,7 @@ def merge_stats(into, outof):
return merge_dict(into, outof, stats_merge_function)
def weighted_choice(random_state, values, weights):
def weighted_choice(random_state: Random, values: Sequence, weights: Sequence):
"""
Choose a random value in a weighted manner.
@ -86,8 +90,8 @@ def weighted_choice(random_state, values, weights):
Returns:
The selected value
"""
total = 0
cum_weights = []
total: float = 0
cum_weights: List[float] = []
for w in weights:
total += w
cum_weights.append(total)
@ -96,7 +100,7 @@ def weighted_choice(random_state, values, weights):
return values[i]
def weighted_stat_choice(random_state, stats):
def weighted_stat_choice(random_state: Random, stats: Dict):
"""
Perform a weighted choice on a stat dictionary as used in chainers.
@ -107,3 +111,19 @@ def weighted_stat_choice(random_state, stats):
values = tuple(stats.keys())
weights = tuple(stats[x][1] for x in values)
return weighted_choice(random_state, values, weights)
def make_word(seq: Sequence[str]):
return "".join(seq)
def make_sent(seq: Sequence[str]) -> str:
output = ""
for item in seq:
if item in string.punctuation:
output += item
else:
output += (" " + item) if output else (item)
return output

@ -0,0 +1,3 @@
[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

@ -0,0 +1,40 @@
[metadata]
name = carkov
version = 0.1.1
description = A markov chainer library
author = Aldercone Studio
author_email = alderconestudio@gmail.com
keywords = text, markov, ebooks, chainer, generator, generative
long-description = file: README.md
long_description_content_type = text/markdown
license-file = LICENSE
license = BSD
platform = any
classifiers =
Development Status :: 3 - Alpha
Intended Audience :: Developers
License :: OSI Approved :: BSD License
Operating System :: OS Independent
Programming Language :: Python
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Topic :: Artistic Software
Topic :: Text Processing
Topic :: Text Processing :: Linguistic
[options]
packages =
carkov
carkov.analyze
zip_safe = false
install_requires =
python_version >= "3.6"
unidecode
nltk
[options.entry_points]
console_scripts =
carkov = carkov.__main__:main

@ -0,0 +1,2 @@
import setuptools
setuptools.setup()

@ -0,0 +1,17 @@
[tox]
envlist = py36, py37, py38, py39
[testenv]
deps =
flake8
mypy
commands =
flake8
mypy carkov typehints
[flake8]
max-line-length = 120
max-complexity = 15

@ -0,0 +1,4 @@
from typing import Any, Callable, Dict
def packb(o: Any, use_bin_type: bool, default: Callable) -> bytes: ...
def unpackb(data: bytes, raw: bool, object_hook: Callable) -> Dict: ...

@ -0,0 +1,4 @@
from typing import Iterator
def sent_tokenize(sent: str) -> Iterator[str]: ...
def word_tokenize(word: str) -> Iterator[str]: ...
Loading…
Cancel
Save