import sys import re import io from itertools import groupby from datetime import datetime import shutil import argparse from zipfile import ZipFile import urllib.request from urllib.error import URLError class Record: def __init__(self, tokens, source): self.source = source self.pin = int(tokens[0]) self.country_code = tokens[2] self.rating_before = int(tokens[8]) self.rating_after = int(tokens[9]) tournament_code = tokens[5] self.date = datetime.strptime(tournament_code[1:7], "%y%m%d") self.rank_change = ("", "") @classmethod def parse(cls, line): tokens = re.split(r" {2,}", line.strip()) if len(tokens) != 10: return None return cls(tokens, line) def __str__(self): s = self.source + " {} -> {}".format(*self.rank_change) return s class RankTracker: def __init__(self, rating): assert rating >= -900 self._rounded_rating = round_rating(rating) self._best = self._rounded_rating @property def rank(self): return rating_to_rank(self._rounded_rating) @property def rounded_rating(self): return self._rounded_rating def update(self, rating): assert rating >= -900 rounded_rating = round_rating(rating) old_rank = self.rank if rounded_rating == self._rounded_rating: pass elif rounded_rating > self._rounded_rating: # promotion # when pushing the best by 2 and more ranks you have to cross the hundred, not just the fifty if rounded_rating >= self._best+200 and rating < rounded_rating: self._rounded_rating = rounded_rating-100 else: self._rounded_rating = rounded_rating else: # demotion # 100 points for 5k and better if self._rounded_rating >= 1600 and self._rounded_rating - rating > 100: self._rounded_rating = rounded_rating # 150 points for the others elif self._rounded_rating - rating > 150: self._rounded_rating = rounded_rating+100 else: pass self._best = max(self._best, self._rounded_rating) new_rank = self.rank return new_rank if new_rank != old_rank else False def parse_record(record): types = [int, str, str, str, str, str, int, int, int, int] columns = [f(token) for (f, token) in zip(types, record)] + [datetime.strptime(record[5][1:7], "%y%m%d")] return tuple(columns) def round_rating(r): return (r+50)//100*100 def rating_to_rank(rating): rank_list = [str(i)+"k" for i in range(30, 0, -1)] + [str(i)+"d" for i in range(1, 8)] key = round_rating(rating)//100 return rank_list[min(key+9, 36)] def main(s, since, to, args): records = [Record.parse(line) for line in s.splitlines()] records = filter(lambda rec: rec is not None, records) national_records = filter(lambda rec: rec.country_code == args.country_code, records) player_records = groupby(national_records, lambda rec: rec.pin) print("Detecting rank changes...") hr = "=" * shutil.get_terminal_size().columns found_anything = False print(hr) for (pin, recs) in player_records: tourneys = list(recs) tracker = RankTracker(tourneys[0].rating_before) steps = [] for r in tourneys: # omit reset ratings if tracker.rounded_rating != round_rating(r.rating_before): tracker = RankTracker(r.rating_before) old_rank = tracker.rank new_rank = tracker.update(r.rating_after) if new_rank is not False: r.rank_change = (old_rank, new_rank) steps.append(r) steps = [r for r in steps if since <= r.date <= to] if steps: print("\n".join(map(str, steps))) found_anything = True if not found_anything: print("Nothing found.") print(hr) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("since", help="a date in YYYYMMDD format") parser.add_argument("to", nargs="?", help="a date in YYYYMMDD format") parser.add_argument("-c", "--country-code", default="CZ", help="a two letter country code, default=CZ") parser.add_argument("-f", "--file", help="a path to the rating history file") args = parser.parse_args() since = datetime.strptime(args.since, "%Y%m%d") to = datetime.strptime(args.to, "%Y%m%d") if args.to else datetime.now() if args.file: print("Reading {} ...".format(args.file)) with ZipFile(args.file) as f: s = f.read("all.hst").decode("utf-8") else: url = "https://europeangodatabase.eu/EGD/EGD_2_0/downloads/hisgor.zip" print("Downloading data from {} ...".format(url)) try: with urllib.request.urlopen(url) as f: compressed_data = f.read() with ZipFile(io.BytesIO(compressed_data)) as g: s = g.read("all.hst").decode("utf-8") except URLError as e: print( "* Failed to connect to the server. " "You can try again later or download {} and run the program with the -f parameter.\n" "The error message:\n{}".format(url, e) ) sys.exit(1) print("Processing...") main(s, since, to, args) print("Done.")