import sys import re import io from itertools import groupby from datetime import datetime import shutil import argparse import zlib from zipfile import ZipFile import urllib.request from urllib.error import URLError from base64 import b64decode class Record: def __init__(self, tokens, source): self.source = source self.pin = int(tokens[0]) self.country_code = tokens[2] self.rating_before = int(tokens[8]) self.rating_after = int(tokens[9]) tournament_code = tokens[5] self.date = datetime.strptime(tournament_code[1:7], "%y%m%d") self.rank_change = ("", "") @classmethod def parse(cls, line): tokens = re.split(r" {2,}", line.strip()) if len(tokens) != 10: return None return cls(tokens, line) def __str__(self): s = self.source + " {} -> {}".format(*self.rank_change) return s class RankTracker: def __init__(self, rating): assert rating >= -900 self._precise_rating = rating self._rounded_rating = round_rating(rating) self._best = self._rounded_rating @property def rank(self): return rating_to_rank(self._rounded_rating) @property def rounded_rating(self): return self._rounded_rating @property def precise_rating(self): return self._precise_rating def update(self, rating): assert rating >= -900 rounded_rating = round_rating(rating) old_rank = self.rank self._precise_rating = rating if rounded_rating == self._rounded_rating: pass elif rounded_rating > self._rounded_rating: # promotion # when pushing the best by 2 and more ranks you have to cross the hundred, not just the fifty if rounded_rating >= self._best+200 and rating < rounded_rating: self._rounded_rating = rounded_rating-100 else: self._rounded_rating = rounded_rating else: # demotion # 100 points for 5k and better if self._rounded_rating >= 1600 and self._rounded_rating - rating > 100: self._rounded_rating = rounded_rating # 150 points for the others elif self._rounded_rating - rating > 150: self._rounded_rating = rounded_rating+100 else: pass if self._rounded_rating > self._best and self.rank != old_rank: self._best = self._rounded_rating return self.rank else: return False def parse_record(record): types = [int, str, str, str, str, str, int, int, int, int] columns = [f(token) for (f, token) in zip(types, record)] + [datetime.strptime(record[5][1:7], "%y%m%d")] return tuple(columns) def round_rating(r): return (r+50)//100*100 def rating_to_rank(rating): rank_list = [str(i)+"k" for i in range(30, 0, -1)] + [str(i)+"d" for i in range(1, 8)] key = round_rating(rating)//100 return rank_list[min(key+9, 36)] def main(s, since, to, args): records = [Record.parse(line) for line in s.splitlines()] records = filter(lambda rec: rec is not None, records) national_records = filter(lambda rec: rec.country_code == args.country_code, records) player_records = groupby(national_records, lambda rec: rec.pin) print("Detecting rank changes...") hr = "=" * shutil.get_terminal_size().columns found_anything = False print(hr) for (pin, recs) in player_records: tourneys = list(recs) tracker = RankTracker(tourneys[0].rating_before) steps = [] for r in tourneys: # omit reset ratings if tracker.precise_rating != r.rating_before: tracker = RankTracker(r.rating_before) old_rank = tracker.rank new_rank = tracker.update(r.rating_after) if new_rank is not False: r.rank_change = (old_rank, new_rank) steps.append(r) steps = [r for r in steps if since <= r.date <= to] if steps: print("\n".join(map(str, steps))) found_anything = True if not found_anything: print("Nothing found.") print(hr) if __name__ == "__main__": copyright_notice = str(zlib.decompress(b64decode(b'eJw1zk0OgjAUBOCrzAUsIa5wZdiwMYYYiOvSPoEALXl9+HcbzmK8lzXGzazmy8yZexFyaB6otKXX+l6REw+TdtDOYuwNuUAWi7PEKI41ivKA6xae4ehGrFB1hOAXNgTjbYxoG8LFRwMt6ETmsEsSptmrNLunmTLP5KTdsCnZt0whQCFf2vCn3yZLXBUPUc3vz54m3Y+RfgDMjz+e')), encoding="utf-8") parser = argparse.ArgumentParser(epilog=copyright_notice) parser.add_argument("since", help="a date in YYYYMMDD format") parser.add_argument("to", nargs="?", help="a date in YYYYMMDD format") parser.add_argument("-c", "--country-code", default="CZ", help="a two letter country code, default=CZ") parser.add_argument("-f", "--file", help="a path to the rating history file") args = parser.parse_args() since = datetime.strptime(args.since, "%Y%m%d") to = datetime.strptime(args.to, "%Y%m%d") if args.to else datetime.now() if args.file: print("Reading {} ...".format(args.file)) with ZipFile(args.file) as f: s = f.read("all.hst").decode("iso8859") else: url = "https://europeangodatabase.eu/EGD/EGD_2_0/downloads/hisgor.zip" print("Downloading data from {} ...".format(url)) try: with urllib.request.urlopen(url) as f: compressed_data = f.read() with ZipFile(io.BytesIO(compressed_data)) as g: s = g.read("all.hst").decode("iso8859") except URLError as e: print( "* Failed to connect to the server. " "You can try again later or download {} and run the program with the -f parameter.\n" "The error message:\n{}".format(url, e) ) sys.exit(1) print("Processing...") main(s, since, to, args) print("Done.")