Changeset - 8b18810a3c7c
[Not reviewed]
default
0 2 0
Laman - 2 years ago 2023-05-01 17:57:53

extended documentation
2 files changed with 43 insertions and 15 deletions:
0 comments (0 inline, 0 general)
src/languedoc/predict.py
Show inline comments
 
import os
 
import re
 
import itertools
 
import json
 
import gzip
 
from typing import Union
 

	
 
TOP_NGRAM_COUNT = 3000
 
MODEL_PATH = os.path.join(os.path.dirname(__file__), "models.json.gz")
 

	
 

	
 
def preprocess(text: str) -> str:
 
	"""Preprocess text by stripping non-letter characters, collapsing whitespace and converting to lowercase."""
 
	text = re.sub(r"[\W\d_]+", " ", " "+text+" ")
 
	return text.lower()
 

	
 

	
 
def extract_kgram_counts(text: str, k: int) -> dict[str, int]:
 
	"""Extract k-gram counts from the text for a provided k.
 

	
 
	:param text: the source text
 
	:param k: length of the kgrams to extract. 1 for letters, 2 for bigrams, ...
 
	:return: a dict mapping kgrams to their counts in the text"""
 
	n = len(text)
 
	counts = dict()
 

	
 
	for i in range(0, n-k+1):
 
		key = text[i:i+k]
 
		if key.isspace():
 
			continue
 

	
 
		counts[key] = counts.get(key, 0) + 1
 

	
 
	return counts
 

	
 

	
 
def extract_ngram_counts(text: str) -> dict[str, int]:
 
	"""Extract counts of 1- to 3-grams from the text.
 

	
 
	:param text: the source text
 
	:return: a dict mapping ngrams to their counts in the text"""
 
	counts = dict()
 

	
 
	for k in range(1, 4):
 
		counts.update(extract_kgram_counts(text, k))
 

	
 
	return counts
 

	
 

	
 
def rank_ngram_counts(counts: dict[str, int]) -> dict[str, int]:
 
def rank_ngram_counts(counts: dict[str, Union[int, float]]) -> dict[str, int]:
 
	"""Order supplied ngrams by their counts (then length, then alphabetically) and return their ranking.
 

	
 
	:param counts: a dict mapping ngrams to their counts
 
	:return: a dict mapping ngrams to their rank (the most frequent: 0, the second: 1, ...)"""
 
	ordered_ngrams = sorted(counts.items(), key=lambda kv: (-kv[1], len(kv[0]), kv[0]))[:TOP_NGRAM_COUNT]
 
	return dict(zip([key for (key, count) in ordered_ngrams], itertools.count(0)))
 

	
 

	
 
def extract_ranked_ngrams(text: str) -> dict[str, int]:
 
	"""Extract ngrams from the text and rank them from the most common.
 

	
 
	:param text: the source text
 
	:return: a dict mapping ngrams to their ranks {most_common_ngram: 0, second: 1, ...}"""
 
	counts = extract_ngram_counts(text)
 
	return rank_ngram_counts(counts)
 

	
 

	
 
class Sample:
 
	def __init__(self, language: str, ranked_ngrams: dict[str, float]):
 
		"""Create a new Sample from language and ngrams.
 

	
 
		This is usually impractical and Sample.extract or Sample.load are preferred."""
 
		self.language = language
 
		self.ranked_ngrams = ranked_ngrams
 

	
 
	@classmethod
 
	def extract(cls, text: str, language="??") -> "Sample":
 
		"""Create a new Sample by extracting it from text.
 

	
 
		:param text: a string, from which to extract the ngrams into a Sample
 
		:param language: a two letter language code if it is known (cs|de|en|...)"""
 
		return cls(language, extract_ranked_ngrams(preprocess(text)))
 

	
 
	@classmethod
 
	def load(cls, exported: dict) -> "Sample":
 
		"""Load a previously exported dict and create a new Sample.
 

	
 
		:param exported: {"language": str, "ngrams": [str, ...]}"""
 
		ranked_ngrams = {key: order for (order, key) in enumerate(exported["ngrams"])}
 
		return cls(exported["language"], ranked_ngrams)
 

	
 
	def export(self) -> dict:
 
		"""Export to a dict. Complement to Sample.load()
 

	
 
		:return: {"language": str, "ngrams": [str, ...]}"""
 
		return {
 
			"language": self.language,
 
			"ngrams": [key for (key, order) in sorted(self.ranked_ngrams.items(), key=lambda key_order: key_order[1])]
src/languedoc/train.py
Show inline comments
 
import os
 
import random
 
import itertools
 
import json
 
import gzip
 
from typing import Iterable
 

	
 
from languedoc.predict import preprocess, identify, extract_ngram_counts, rank_ngram_counts, Sample
 

	
 
random.seed(19181028)
 

	
 
CROSSVALIDATION_SOURCE_COUNT = 5
 
TEST_LENS = [8, 16, 32, 64]
 

	
 

	
 
def merge_ngram_freqs(counts):
 
def merge_ngram_freqs(counts: list[dict[str, int]]) -> dict[str, float]:
 
	"""Merge together ngram frequencies from multiple source texts."""
 
	n = len(counts)
 
	res = dict()
 

	
 
	for d in counts:
 
		k = sum(d.values())
 
		for (key, val) in d.items():
 
			res.setdefault(key, 0)
 
			res[key] += val/k/n
 

	
 
	return res
 

	
 

	
 
class SampleSet:
 
	def __init__(self, language):
 
		self.language = language
 
		self.texts = []
 
		self.counts = []
 

	
 
	def add(self, text):
 
	def add(self, text: str):
 
		"""Add another source text and its ngram counts."""
 
		self.texts.append(text)
 
		self.counts.append(extract_ngram_counts(text))
 

	
 
	def create_model(self):
 
	def create_model(self) -> Sample:
 
		"""Create a language model based on SampleSet data."""
 
		merged_frequencies = merge_ngram_freqs(self.counts)
 
		res = Sample(self.language, rank_ngram_counts(merged_frequencies))
 
		return res
 

	
 
	def generate_tests(self, n):
 
	def generate_tests(self, n: int) -> Iterable[tuple[str, Sample]]:
 
		"""Generate tests for crossvalidation.
 

	
 
		Yield source texts and the corresponding models built from the other texts, cycling as necessary.
 
		Therefore, one can test the models with the texts.
 

	
 
		:param n: how many tests to generate
 
		:return: pairs of texts and models"""
 
		for (i, (text, freqs)) in enumerate(itertools.cycle(zip(self.texts, self.counts))):
 
			if i >= n:
 
				break
 

	
 
			ranked_ngrams = rank_ngram_counts(merge_ngram_freqs([f for f in self.counts if f is not freqs]))
 
			yield (text, Sample(self.language, ranked_ngrams))
 

	
 

	
 
def cross_validate(sample_sets):
 
def cross_validate(sample_sets: list[SampleSet]) -> tuple[float, int, int]:
 
	"""Run 10-fold crossvalidation on the samples.
 

	
 
	Iterate through the languages, for each generate `CROSSVALIDATION_SOURCE_COUNT` tests
 
	with one source text left out, then identify ten random excerpts for each length from `TEST_LENS`.
 

	
 
	:param sample_sets: sample sets of all target languages
 
	:return: ratio of correctly predicted samples, its absolute number and the theoretical maximum"""
 
	models = [s.create_model() for s in sample_sets]
 
	score = 0
 
	max_score = 0
 

	
 
	for s in sample_sets:
 
		for (test_text, partial_model) in s.generate_tests(CROSSVALIDATION_SOURCE_COUNT):
 
			real_lang = partial_model.language
 
			test_models = [partial_model] + [m for m in models if m.language != real_lang]
 

	
 
			for k in TEST_LENS:
 
				for i in range(10):
 
					j = random.randrange(0, len(test_text)-k)
 
					t = test_text[j:j+k]
 
					predicted_lang = identify(t, test_models)
 
					if predicted_lang == real_lang:
 
						score += 1
 
					else:
 
						print(real_lang, predicted_lang, t)
 
					max_score += 1
 

	
 
	return score / max_score, (score, max_score)
 
	return score/max_score, score, max_score
 

	
 

	
 
DATA_DIR = os.path.join(os.path.dirname(__file__), "../../data")
 
LANG_DIRS = sorted([x.path for x in os.scandir(DATA_DIR)])
 
MODEL_PATH = os.path.join(os.path.dirname(__file__), "models.json.gz")
 
def train(data_dir: str, model_path: str):
 
	"""Run the training and create a prediction model.
 
	files
 
	:param data_dir: path to the data directory, with one subdirectory for each language
 
		containing several text files as separate sources.
 
	:param model_path: where to save the result language model as a .json.gz"""
 
	samples = []
 
	lang_dirs = sorted([x.path for x in os.scandir(data_dir)])
 

	
 
if __name__ == "__main__":
 
	samples = []
 

	
 
	for d in LANG_DIRS:
 
	for d in lang_dirs:
 
		lang = os.path.basename(d)
 
		lang_samples = SampleSet(lang)
 
		samples.append(lang_samples)
 

	
 
		for file in sorted(os.scandir(d), key=lambda f: f.name):
 
			with open(file) as f:
 
				text = f.read()
 
				text = preprocess(text)
 
				print(f"{lang}: {file.name} ({len(text)})")
 

	
 
				lang_samples.add(text)
 

	
 
	with gzip.open(MODEL_PATH, mode="wt", encoding="utf-8") as f:
 
	with gzip.open(model_path, mode="wt", encoding="utf-8") as f:
 
		json.dump([sample_set.create_model().export() for sample_set in samples], f, ensure_ascii=False)
 

	
 
	print(cross_validate(samples))
 

	
 

	
 
DATA_DIR = os.path.join(os.path.dirname(__file__), "../../data")
 
MODEL_PATH = os.path.join(os.path.dirname(__file__), "models.json.gz")
 

	
 
if __name__ == "__main__":
 
	train(DATA_DIR, MODEL_PATH)
0 comments (0 inline, 0 general)