Initial checkin.

- Base code works.
- Copyright notices and mostly complete documentation.
main
Cassowary 2 years ago
commit e473f20e29
  1. 61
      .gitignore
  2. 5
      README.md
  3. 7
      carkov/__init__.py
  4. 147
      carkov/__main__.py
  5. 44
      carkov/abstracts.py
  6. 5
      carkov/analyze/__init__.py
  7. 116
      carkov/analyze/abstract.py
  8. 20
      carkov/analyze/english.py
  9. 14
      carkov/analyze/words.py
  10. 161
      carkov/chain.py
  11. 30
      carkov/filters.py
  12. 74
      carkov/serializer.py
  13. 109
      carkov/utils.py

61
.gitignore vendored

@ -0,0 +1,61 @@
*.py[cod]
# C extensions
*.so
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
bin
var
sdist
develop-eggs
.installed.cfg
lib
lib64
__pycache__
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
nosetests.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Emacs git ignore
# -*- mode: gitignore; -*-
*~
\#*\#
/.emacs.desktop
/.emacs.desktop.lock
*.elc
auto-save-list
tramp
.\#*
# Org-mode
.org-id-locations
*_archive
# flymake-mode
*_flymake.*
# eshell files
/eshell/history
/eshell/lastdir
# elpa packages
/elpa/

@ -0,0 +1,5 @@
# carkov #
This is a markov chainer library, for implementing things like word generators, ebook bots and things. It is not a very
statistically oriented thing - it is functionality but not science.

@ -0,0 +1,7 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
version = '0.0.1'

@ -0,0 +1,147 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
# This module provides a command line interface to doing some common operations.
#
import argparse
import enum
import pathlib
import random
import sys
import traceback
from .analyze.english import English
from .analyze.words import Words
from .chain import Chain, from_analyzer
from .serializer import dump_chainer, load_chainer
ERROR_NO_FILE=1
ERROR_EXISTING_OUTPUT=2
ERROR_WRONG_ORDER=3
ERROR_WRONG_CLASS=4
class AnalyzeMode(enum.Enum):
english = "english"
word = "word"
def __str__(self):
return self.value
def word_joiner(ar):
return "".join(ar)
def english_joiner(ar):
return " ".join(ar)
JOINERS={"Words":word_joiner, "English":english_joiner}
def parse_arguments():
parser = argparse.ArgumentParser(prog="python -mcarkov",
description=("Process a text corpus in a markov chain fashion and/or output from"
"an analysis."))
subparsers = parser.add_subparsers(dest='command')
analyze_sub = subparsers.add_parser('analyze', help="Analyze a corpus")
analyze_sub.add_argument('output', help="Output chain to specified destination", type=pathlib.Path)
analyze_sub.add_argument('input', help="The corpus to analyze", type=pathlib.Path, nargs='+')
overappend = analyze_sub.add_mutually_exclusive_group()
overappend.add_argument('-o','--overwrite', help='Overwrite output file.', action='store_true')
overappend.add_argument('-a', '--append', help='Append output file.', action='store_true')
analyze_sub.add_argument('-w','--window', help='Select length of analysis window', type=int, default=2)
analyze_sub.add_argument('-m',
'--mode',
help='Select analyzis mode',
type=AnalyzeMode,
choices=list(AnalyzeMode),
default=AnalyzeMode.english)
analyze_sub.add_argument('-t', '--test', help="Output a sample from the chainer generated", action='store_true')
chain_sub = subparsers.add_parser('chain', help="Output from a chainer")
chain_sub.add_argument('input', help="The chain file to load", type=pathlib.Path)
chain_sub.add_argument('-c', '--count', help="Number of chain outputs to output", type=int)
return parser.parse_args()
def main():
args = parse_arguments()
r = random.Random()
if args.command == 'analyze':
if not any(x.exists() for x in args.input):
print("Must specify an existing file as input for the analyzer.")
return ERROR_NO_FILE
if args.output.exists() and not (args.overwrite or args.append):
print("Output file exists, pass --overwrite to overwrite or --append to add to exsiting analysis.")
return ERROR_EXISTING_OUTPUT
if args.mode == AnalyzeMode.english:
analyzer = English(args.window)
# we just dump a whole file into the english analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
analyzer.analyze(inp.read_text('utf-8'))
else:
analyzer = Words(args.window)
# we do line-by-line single word dumps into the word analyzer
for inp in args.input:
if not inp.exists():
print(f"warning {inp} does not exist")
continue
print(f"analyze: {inp}")
with inp.open('r') as words:
for line in words:
analyzer.analyze(line.strip().lower())
if args.append:
# in append mode we load an existing chain file, and then run the analyzer and merge the contents.
with open(args.output, 'rb') as old:
chainer = load_chainer(old)
if chainer.order != analyzer.order:
print("Append chainer order isn't the same as the analyzer order.")
return ERROR_WRONG_ORDER
if chainer.analyzer_class != analyzer.__class__.__name__:
print("Append chainer class isn't the same as analyzer class.")
return ERROR_WRONG_CLASS
chainer.integrate(analyzer)
else:
chainer = from_analyzer(analyzer)
with open(args.output, 'wb') as output:
dump_chainer(chainer, output)
print(f"Wrote chainer to {args.output}")
if args.test:
for _ in range(0,5):
print(JOINERS[chainer.analyzer_class](chainer.walk(r, True)))
else:
if not args.input.exists():
print("Must specify a chain file to load.")
return ERROR_NO_FILE
with args.input.open('rb') as inp:
chainer = load_chainer(inp)
if args.count < 1:
args.count = 1
for _ in range(0, args.count):
print(JOINERS[chainer.analyzer_class](chainer.walk(r, True)))
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception:
print("Unexpected exception!")
traceback.print_exc()

@ -0,0 +1,44 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
This module provides a few utility objects, especially the Abstract object which is used for terminals
and other abstract tokens.
"""
class CarkovFilterException(BaseException):
pass
class Abort(CarkovFilterException):
"""
This exception is intended for a protocol by which filters can abort a particular token from being added to the
stream.
"""
...
class AbortSegment(CarkovFilterException):
"""
This exception is intended for a protocol by which filters can abort an entire segment if a particular token would
be rejected.
"""
...
class Abstract:
"""
This is used as a way to indicate abstract tokens in a stream of tokens.
"""
def __init__(self, name):
self.name = name
def __repl__(self):
return f"<Abstract: {self.name}>"
"""A universal Number abstract."""
NUMBER = Abstract("NUMBER")
"""A Universal Terminal abostract."""
TERMINAL = Abstract("TERMINAL")

@ -0,0 +1,5 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#

@ -0,0 +1,116 @@
from abc import ABC, abstractmethod
from collections import deque, defaultdict
from ..abstracts import TERMINAL
from ..utils import merge_dict
"""
This module defines the base class for analyzers which do basic statistical analysis on a corpus.
"""
class AbstractAnalyzer:
def __init__(self, order, filters=None):
"""
Initialize the analyzer.
Arguments:
order (int): Defines the window size this analyzer uses.
filters: A list of callables to apply to each token before processing.
"""
if filters is None:
filters = []
self.order = order
self.filters = filters
self.tokens = {}
self.chain_counts = {}
def analyze(self, corpus):
"""
Analyze a corpus and integrate the data into the internal state.
Arguments:
corpus (abstract): This could be any type that the class is prepared to process.
Retruns:
self.chain_counts after processing.
"""
counts = self.analyze_corpus(corpus)
merge_dict(self.chain_counts, counts)
return self.chain_counts
def analyze_corpus(self, corpus):
"""
Do the actual analysis of corpus, and return a count dictionary.
Arguments:
corpus (abstract): This could be any type that the class is prepared to process.
Returns:
(dict) a count dictionary of just this corpus
"""
segments = self.segmentize_corpus(corpus)
counts = {}
for segment in segments:
merge_dict(counts, self.analyze_segment(segment))
return counts
@abstractmethod
def segmentize_corpus(self, corpus):
"""
Convert a corpus into a series of segments.
This must be overloaded by child class.
Arguments:
corpus (abstract): This could be any type that the class is prepared to process.
Returns:
(array of abstract): An array of segments that this class is prepared to process.
"""
...
def analyze_segment(self, segment):
"""
Process a segment into a series of tokens.
Arguments:
segment (abstract): This could be of any type that this class is prepared to process.
Returns:
(counts dictionary): A dictionary keyed by windowed token keys with counts of each following token
"""
tokens = self.tokenize_segment(segment) + [TERMINAL]
token = deque([None] * self.order, self.order)
counts = defaultdict(lambda: defaultdict(int))
for raw_token in tokens:
raw_token = self.process_token(raw_token)
tkey = tuple(token)
counts[tkey][raw_token] += 1
token.append(raw_token)
return counts
@abstractmethod
def tokenize_segment(self, segment):
"""
Convert a segment into a series of tokens.
This must be overloaded by child class.
Arguments:
segment (abstract): This could be of any type that this class is prepared to process.
Returns:
(array of tokens): The format and type of tokens is defined by the child class.
"""
...
def process_token(self, raw_token):
for filter in self.filters:
raw_token = filter(raw_token)
return raw_token

@ -0,0 +1,20 @@
import nltk
from .abstract import AbstractAnalyzer
class English(AbstractAnalyzer):
def __init__(self, order, filters=None):
if filters is None:
filters = []
super().__init__(order, filters)
def segmentize_corpus(self, corpus):
chunks = corpus.split('\n\n')
ret = []
for chunk in chunks:
ret = ret + nltk.sent_tokenize(chunk)
return ret
def tokenize_segment(self, segment):
return list(nltk.word_tokenize(segment))

@ -0,0 +1,14 @@
from .abstract import AbstractAnalyzer
class Words(AbstractAnalyzer):
def __init__(self, order, filters=None):
if filters is None:
filters = []
super().__init__(order, filters)
def segmentize_corpus(self, corpus):
return corpus.split(' ')
def tokenize_segment(self, segment):
return list(segment)

@ -0,0 +1,161 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
This module defines a chainer class which can process a count dictionary from an analyzer
and provides convenience functions for walking the chain.
"""
from collections import UserDict, deque
from .abstracts import TERMINAL
from .utils import merge_stats, convert_counts, weighted_stat_choice
def from_analyzer(analyzer):
"""
Static initializer: Return a chainer with parameters and contents based on an analyzer instance.
"""
chainer = Chain(analyzer.order, analyzer.__class__.__name__)
chainer.integrate(analyzer.chain_counts)
return chainer
class Chain(UserDict):
def __init__(self, order, analyzer_class=None):
"""
Initialize Chain class
Arguments:
order: The window size of this chainer.
"""
self.order = order
self.data = {}
self.start_token = (None, ) * self.order
self.analyzer_class = analyzer_class
def integrate(self, counts):
"""
Accept a counts dictionary and merge it with local data and recalculate statistical relationships between
outcomes. The counts must be from an analyzer of the same order.
Arguments:
counts: A counts dictionary as contained in the analyzer's chain_counts
"""
for key, count in counts.items():
stat = convert_counts(count)
if key in self.data:
merge_stats(self.data[key], stat)
else:
self.data[key] = stat
self.update_stats()
def merge(self, other):
"""
Merge a separate chainer's data into this chainer. They must be of the same order.
Arguments:
other (Chain): Another chain of the same order.
"""
for key, stat in other.items():
if key in self.data:
merge_stats(self.data[key], stat)
else:
self.data[key] = stat
self.update_stats()
def update_stats(self):
"""
Update all of the statistical ratios in the chain.
"""
for token in self.data:
self.update_stat(token)
def update_stat(self, parent_token):
"""
Update one specific set of statistical ratios in the chain.
Arguments:
parent_token: A windowed token tuple which points at the part of the chain to update
"""
stat = self.data[parent_token]
total = sum([s[0] for s in stat.values()])
for it in stat.values():
it[1] = int((it[0] / total) * 100)
def add(self, parent_token, token):
"""
Add a new count to the chain.
Arguments:
parent_token: A windowed token tuple which points to the location to add the new token.
token: The token to add.
"""
if parent_token not in self.data:
self.data[parent_token] = {}
if token in self.data[parent_token]:
self.data[parent_token][token][0] += 1
self.update_stat(parent_token)
else:
self.data[parent_token][token] = [1, 100]
def select(self, parent_token, random_generator, weighted=False):
"""
Select a token from a given parent token.
Arguments:
parent_token: A windowed token tuple
random_generator: A random.Random instance
weighted (bool, default=False): Whether to do a weighted select or a random select.
Returns:
A token
"""
if not parent_token in self.data:
return None
if weighted:
return weighted_stat_choice(random_generator, self.data[parent_token])
else:
return random_generator.choice(list(self.data[parent_token].keys()))
def walk(self, random_generator, weighted=False, maximum=1000):
"""
Return a list of tokens by walking the chain.
Arguments:
random_generator: A random.Random instance
weighted: Whether to do a weighted select at each step.
maximum: THe maximum number of walks to perform.
Returns:
A list of tokens
"""
token = self.start_token
item = None
output = []
while (len(output) < maximum):
item = self.select(token, random_generator, weighted)
if item == TERMINAL:
return output
output.append(item)
token = self.next_token(token, item)
def next_token(self, parent_token, token):
"""
Given a windowed token tuple and a token, return the next windowed token tuple.
Arguments:
parent_token: A windowed token tuple
token: A token
Returns:
A windowed token tuple which would be the next step in the chain after the token.
"""
q = deque(parent_token, self.order)
q.append(token)
return tuple(q)

@ -0,0 +1,30 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Various filter functions that may be useful for processing certain kinds of corpora.
"""
from unidecode import unidecode
# All of these filters operate on string tokens
def str_abstractize_numbers(token):
"""Replace all numbers with a Number abstract."""
pass
def str_abstractize_roman(token):
"""Replace roman numerals with a Number abstract."""
pass
def str_strip_punct(token):
"""Remove any punctuation characters."""
pass
def str_asciify(token):
"""Convert all characters to an ascii approximation."""
pass

@ -0,0 +1,74 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Use msgpack to serialize a chainer to disk and then reread it from a serialized file.
"""
from . import version
from .abstracts import Abstract, TERMINAL, NUMBER
from .chain import Chain
import msgpack
def _unserialize_encode_helper(obj):
"""
This is a helper function which handles Abstract objects for serialization.
"""
if '$$####$$' in obj:
if obj['n'] == 'TERMINAL':
obj = TERMINAL
elif obj['n'] == 'NUMBER':
obj = NUMBER
else:
obj = Abstract(obj['n'])
return obj
def _serialize_encode_helper(obj):
"""
This is a helper function which handles Abstract objects for serialization.
"""
if isinstance(obj, Abstract):
obj = {'$$####$$': True, 'n': obj.name}
return obj
def load_chainer(infile):
"""
Unserialize a chainer from an open IO stream
Arguments:
infile: An open IO stream in binary mode pointing at a messagepack stream
Returns:
a new Chain object initialized with the contents of the stream.
"""
serialdict = msgpack.unpackb(infile.read(), object_hook=_unserialize_encode_helper, raw=False)
if serialdict['version'] != version:
import warnings
warnings.warn(f"Version mismatch while loading chain expect: [{version}] got: [{serialdict['version']}]")
chain = Chain(serialdict['order'], serialdict['analyzer_class'])
chain.data = dict([(tuple(x), y) for x, y in serialdict['data']])
return chain
def dump_chainer(chain: Chain, outfile):
"""
Serialize a chainer to an open IO stream
Arguments:
chain: A Chain object
outfile: An open IO stream in binary mode that will be writen to
"""
serialdict = {}
serialdict['version'] = version
serialdict['order'] = chain.order
serialdict['analyzer_class'] = chain.analyzer_class
serialdict['data'] = [(k, v) for k, v in chain.items()]
outfile.write(msgpack.packb(serialdict, use_bin_type=True, default=_serialize_encode_helper))

@ -0,0 +1,109 @@
#
# carkov markov chain library
# © Copyright 2021 by Aldercone Studio <aldercone@gmail.com>
# This is free software, see the included LICENSE for terms and conditions.
#
"""
Various utilities the chainers and analyzers use.
"""
from bisect import bisect
def merge_dict(into, outof, mergefunction = lambda x, y: x + y):
"""
Given two dictionries of dictionaries, merge them together by applying the mergefunction to the
values of the second level dictionary.
Arguments:
into: The dictionary that is being operated on which gets modified.
outof: The dictionary to merge into into.
mergefunction: A function applied to every value in the second level dictionries, defaults to
adding the two values together.
Returns:
into dictionary after modification.
"""
for key in outof.keys():
if key in into:
for innerkey in outof[key].keys():
if innerkey in into[key]:
into[key][innerkey] = mergefunction(into[key][innerkey], outof[key][innerkey])
else:
into[key][innerkey] = outof[key][innerkey]
else:
into[key] = outof[key]
return into
def convert_counts(ind):
"""
Convert counts produced by analyzers into the statistics counts used by chainers.
Arguments:
ind (dict): The second level dictionary of a counts dictionary
Returns:
dict: A copy of ind with the values updated for chainer use.
"""
out = {}
for k in ind:
out[k] = [ind[k], 0]
return out
def merge_stats(into, outof):
"""
Perform a merge_dict in a way safe for the statistics dictionaries used by chainers.
Arguments:
into: The dictionary to modify
outof: The dictionary to merge into into.
Returns:
into (after modification)
"""
def stats_merge_function(i, o):
out = [0, 0]
out[0] = i[0] + o[0]
out[1] = 0
return out
return merge_dict(into, outof, stats_merge_function)
def weighted_choice(random_state, values, weights):
"""
Choose a random value in a weighted manner.
Arguments:
random_state: A random.Random instance
values: A list of values to choose from
weights: The weights that corrospond to each value
Returns:
The selected value
"""
total = 0
cum_weights = []
for w in weights:
total += w
cum_weights.append(total)
x = random_state.random() * total
i = bisect(cum_weights, x)
return values[i]
def weighted_stat_choice(random_state, stats):
"""
Perform a weighted choice on a stat dictionary as used in chainers.
Arguments:
random_state: A random.Random instance
stats: A stats dictionary from a chainer
"""
values = tuple(stats.keys())
weights = tuple(stats[x][1] for x in values)
return weighted_choice(random_state, values, weights)
Loading…
Cancel
Save