Changeset - dbaf68186bdf
[Not reviewed]
default
0 3 0
Laman - 2 years ago 2022-10-19 22:19:14

switched from frequencies to basic counts
3 files changed with 30 insertions and 31 deletions:
0 comments (0 inline, 0 general)
src/languedoc/predict.py
Show inline comments
 
import os
 
import re
 
import itertools
 
import json
 
import gzip
 

	
 
TOP_NGRAM_COUNT = 3000
 
MODEL_PATH = os.path.join(os.path.dirname(__file__), "models.json.gz")
 

	
 

	
 
def preprocess(text: str) -> str:
 
	"""Preprocess text by stripping non-letter characters, collapsing whitespace and converting to lowercase."""
 
	text = re.sub(r"[\W\d_]+", " ", " "+text+" ")
 
	return text.lower()
 

	
 

	
 
def extract_kgram_freqs(text, k):
 
def extract_kgram_counts(text, k):
 
	n = len(text)
 
	d = dict()
 
	counts = dict()
 

	
 
	for i in range(0, n-k+1):
 
		key = text[i:i+k]
 
		if key.isspace():
 
			continue
 

	
 
		d[key] = d.get(key, 0) + 1
 
		counts[key] = counts.get(key, 0) + 1
 

	
 
	count = sum(d.values())
 

	
 
	return {key: val/count for (key, val) in d.items()}
 
	return counts
 

	
 

	
 
def extract_ngram_freqs(text):
 
	frequencies = {}
 
def extract_ngram_counts(text):
 
	counts = dict()
 

	
 
	for k in range(1, 4):
 
		frequencies.update(extract_kgram_freqs(text, k))
 
		counts.update(extract_kgram_counts(text, k))
 

	
 
	return frequencies
 
	return counts
 

	
 

	
 
def rank_ngram_freqs(frequencies):
 
	ordered_ngrams = sorted(frequencies.items(), key=lambda kv: (-kv[1], len(kv[0]), kv[0]))[:TOP_NGRAM_COUNT]
 
def rank_ngram_counts(counts):
 
	ordered_ngrams = sorted(counts.items(), key=lambda kv: (-kv[1], len(kv[0]), kv[0]))[:TOP_NGRAM_COUNT]
 
	return dict(zip([key for (key, freq) in ordered_ngrams], itertools.count(0)))
 

	
 

	
 
def extract_ranked_ngrams(text):
 
	frequencies = extract_ngram_freqs(text)
 
	return rank_ngram_freqs(frequencies)
 
	counts = extract_ngram_counts(text)
 
	return rank_ngram_counts(counts)
 

	
 

	
 
class Sample:
 
	def __init__(self, language: str, ranked_ngrams: dict[str, float]):
 
		"""Create a new Sample from language and ngrams.
 

	
 
		This is usually impractical and Sample.extract or Sample.load are preferred."""
 
		self.language = language
 
		self.ranked_ngrams = ranked_ngrams
 

	
 
	@classmethod
 
	def extract(cls, text: str, language="??") -> "Sample":
 
		"""Create a new Sample by extracting it from text.
 

	
 
		:param text: a string, from which to extract the ngrams into a Sample
 
		:param language: a two letter language code if it is known (cs|de|en|...)"""
 
		return cls(language, extract_ranked_ngrams(preprocess(text)))
 

	
 
	@classmethod
 
	def load(cls, exported: dict) -> "Sample":
 
		"""Load a previously exported dict and create a new Sample.
 

	
 
		:param exported: {"language": str, "ngrams": [str, ...]}"""
 
		ranked_ngrams = {key: order for (order, key) in enumerate(exported["ngrams"])}
 
		return cls(exported["language"], ranked_ngrams)
 

	
 
	def export(self) -> dict:
 
		"""Export to a dict. Complement to Sample.load()
 

	
 
		:return: {"language": str, "ngrams": [str, ...]}"""
 
		return {
 
			"language": self.language,
 
			"ngrams": [key for (key, order) in sorted(self.ranked_ngrams.items(), key=lambda key_order: key_order[1])]
 
		}
 

	
 
	def compare(self, other: "Sample") -> int:
 
		"""Compute a similarity score between self and other.
 

	
 
		The method is asymmetric. You are supposed to use sample.compare(model), not model.compare(sample).
 

	
 
		:param other: a reference model in known language"""
 
		m = len(other.ranked_ngrams)
 

	
 
		res = sum(
 
			(abs(v - other.ranked_ngrams[k]) if k in other.ranked_ngrams else m)
 
			for (k, v) in self.ranked_ngrams.items()
 
		)
 

	
src/languedoc/train.py
Show inline comments
 
import os
 
import random
 
import itertools
 
import json
 
import gzip
 

	
 
from .predict import preprocess, identify, extract_ngram_freqs, rank_ngram_freqs, Sample
 
from languedoc.predict import preprocess, identify, extract_ngram_counts, rank_ngram_counts, Sample
 

	
 
random.seed(19181028)
 

	
 
CROSSVALIDATION_SOURCE_COUNT = 5
 
TEST_LENS = [8, 16, 32, 64]
 

	
 

	
 
def merge_ngram_freqs(freqs):
 
	n = len(freqs)
 
def merge_ngram_freqs(counts):
 
	n = len(counts)
 
	res = dict()
 

	
 
	for d in freqs:
 
	for d in counts:
 
		k = sum(d.values())
 
		for (key, val) in d.items():
 
			res.setdefault(key, 0)
 
			res[key] += val/n
 
			res[key] += val/k/n
 

	
 
	return res
 

	
 

	
 
class SampleSet:
 
	def __init__(self, language):
 
		self.language = language
 
		self.texts = []
 
		self.frequencies = []
 
		self.counts = []
 

	
 
	def add(self, text):
 
		self.texts.append(text)
 
		self.frequencies.append(extract_ngram_freqs(text))
 
		self.counts.append(extract_ngram_counts(text))
 

	
 
	def create_model(self):
 
		merged_frequencies = merge_ngram_freqs(self.frequencies)
 
		res = Sample(self.language, rank_ngram_freqs(merged_frequencies))
 
		merged_frequencies = merge_ngram_freqs(self.counts)
 
		res = Sample(self.language, rank_ngram_counts(merged_frequencies))
 
		return res
 

	
 
	def generate_tests(self, n):
 
		for (i, (text, freqs)) in enumerate(itertools.cycle(zip(self.texts, self.frequencies))):
 
		for (i, (text, freqs)) in enumerate(itertools.cycle(zip(self.texts, self.counts))):
 
			if i >= n:
 
				break
 

	
 
			ranked_ngrams = rank_ngram_freqs(merge_ngram_freqs([f for f in self.frequencies if f is not freqs]))
 
			ranked_ngrams = rank_ngram_counts(merge_ngram_freqs([f for f in self.counts if f is not freqs]))
 
			yield (text, Sample(self.language, ranked_ngrams))
 

	
 

	
 
def cross_validate(sample_sets):
 
	models = [s.create_model() for s in sample_sets]
 
	score = 0
 
	max_score = 0
 

	
 
	for s in sample_sets:
 
		for (test_text, partial_model) in s.generate_tests(CROSSVALIDATION_SOURCE_COUNT):
 
			real_lang = partial_model.language
 
			test_models = [partial_model] + [m for m in models if m.language != real_lang]
 

	
 
			for k in TEST_LENS:
 
				for i in range(10):
 
					j = random.randrange(0, len(test_text)-k)
 
					t = test_text[j:j+k]
 
					predicted_lang = identify(t, test_models)
 
					if predicted_lang == real_lang:
 
						score += 1
 
					else:
 
						print(real_lang, predicted_lang, t)
 
					max_score += 1
 

	
 
	return score / max_score, (score, max_score)
 

	
 

	
 
DATA_DIR = os.path.join(os.path.dirname(__file__), "../../data")
 
LANG_DIRS = sorted([x.path for x in os.scandir(DATA_DIR)])
 
MODEL_PATH = os.path.join(os.path.dirname(__file__), "models.json.gz")
 

	
 
if __name__ == "__main__":
 
	samples = []
 

	
 
	for d in LANG_DIRS:
 
		lang = os.path.basename(d)
 
		lang_samples = SampleSet(lang)
 
		samples.append(lang_samples)
 

	
 
		for file in sorted(os.scandir(d), key=lambda f: f.name):
 
			with open(file) as f:
 
				text = f.read()
 
				text = preprocess(text)
 
				print(f"{lang}: {file.name} ({len(text)})")
 

	
 
				lang_samples.add(text)
 

	
 
	with gzip.open(MODEL_PATH, mode="wt", encoding="utf-8") as f:
tests/test_predict.py
Show inline comments
 
from unittest import TestCase
 

	
 
from languedoc.predict import preprocess, rank_ngram_freqs, Sample, identify
 
from languedoc.predict import preprocess, rank_ngram_counts, Sample, identify
 

	
 

	
 
class TestPredict(TestCase):
 
	def test_preprocess(self):
 
		self.assertEqual(preprocess("abc"), " abc ")
 
		self.assertEqual(preprocess("A  b.c"), " a b c ")
 
		self.assertEqual(preprocess("1% "), " ")
 
		self.assertEqual(preprocess("Глава ĚŠČŘŽ"), " глава ěščřž ")
 

	
 
	def test_rank_ngram_freqs(self):
 
	def test_rank_ngram_counts(self):
 
		freqs = {"a": 3, "aa": 1, "b": 4, "bb": 1, "c": 1}
 
		expected = {"b": 0, "a": 1, "c": 2, "aa": 3, "bb": 4}
 
		self.assertEqual(rank_ngram_freqs(freqs), expected)
 
		self.assertEqual(rank_ngram_counts(freqs), expected)
 

	
 

	
 
class TestSample(TestCase):
 
	def test_extract(self):
 
		a = Sample.extract("aaaaaa", "a")
 
		self.assertEqual(a.language, "a")
 
		self.assertEqual(a.ranked_ngrams, {'a': 0, 'aa': 1, 'aaa': 2, ' aa': 3, 'aa ': 4, ' a': 5, 'a ': 6})
 
		self.assertEqual(a.ranked_ngrams, {'a': 0, 'aa': 1, 'aaa': 2, ' a': 3, 'a ': 4, ' aa': 5, 'aa ': 6})
 

	
 
		b = Sample.extract("aa aa aa", "b")
 
		self.assertEqual(b.ranked_ngrams, {'a': 0, ' aa': 1, 'aa ': 2, ' a': 3, 'a ': 4, 'aa': 5, 'a a': 6})
 
		self.assertEqual(b.ranked_ngrams, {'a': 0, ' a': 1, 'a ': 2, 'aa': 3, ' aa': 4, 'aa ': 5, 'a a': 6})
 

	
 
		c = Sample.extract("aa")
 
		self.assertEqual(c.language, "??")
 
		self.assertEqual(c.ranked_ngrams, {'a': 0, ' aa': 1, 'aa ': 2, ' a': 3, 'a ': 4, 'aa': 5})
 
		self.assertEqual(c.ranked_ngrams, {'a': 0, ' a': 1, 'a ': 2, 'aa': 3, ' aa': 4, 'aa ': 5})
 

	
 

	
 
class TestIdentify(TestCase):
 
	def test_identify(self):
 
		samples = [
 
			("cs", "Severní ledový oceán je nejmenší světový oceán."),
 
			("de", "Der Arktische Ozean ist der kleinste Ozean der Erde."),
 
			("en", "The Arctic Ocean is the smallest of the world's oceans."),
 
			("es", "Océano Ártico más pequeña y más septentrional del planeta"),
 
			("fr", "L'océan Arctique ce qui en fait le plus petit des océans."),
 
			("it", "Il Mar Glaciale Artico è una massa d'acqua..."),
 
			("ru", "Се́верный Ледови́тый океа́н — наименьший по площади океан Земли")
 
		]
 

	
 
		for (lang, sample) in samples:
 
			self.assertEqual(lang, identify(sample))
0 comments (0 inline, 0 general)