Files
@ 004a7cde6de0
Branch filter:
Location: Rank-Progress/rank_progress.py
004a7cde6de0
4.4 KiB
text/x-python
a readme
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import re
import io
from itertools import groupby
from datetime import datetime
import shutil
import argparse
import urllib.request
from zipfile import ZipFile
class Record:
def __init__(self, tokens, source):
self.source = source
self.pin = int(tokens[0])
self.country_code = tokens[2]
self.rating_before = int(tokens[8])
self.rating_after = int(tokens[9])
tournament_code = tokens[5]
self.date = datetime.strptime(tournament_code[1:7], "%y%m%d")
self.rank_change = ("", "")
@classmethod
def parse(cls, line):
tokens = re.split(r" {2,}", line.strip())
if len(tokens) != 10:
return None
return cls(tokens, line)
def __str__(self):
s = self.source + " {} -> {}".format(*self.rank_change)
return s
class RankTracker:
def __init__(self, rating):
assert rating >= -900
self._rounded_rating = round_rating(rating)
self._best = self._rounded_rating
@property
def rank(self):
return rating_to_rank(self._rounded_rating)
@property
def rounded_rating(self):
return self._rounded_rating
def update(self, rating):
assert rating >= -900
rounded_rating = round_rating(rating)
old_rank = self.rank
if rounded_rating == self._rounded_rating:
pass
elif rounded_rating > self._rounded_rating: # promotion
# when pushing the best by 2 and more ranks you have to cross the hundred, not just the fifty
if rounded_rating >= self._best+200 and rating < rounded_rating:
self._rounded_rating = rounded_rating-100
else:
self._rounded_rating = rounded_rating
else: # demotion
# 100 points for 5k and better
if self._rounded_rating >= 1600 and self._rounded_rating - rating > 100:
self._rounded_rating = rounded_rating
# 150 points for the others
elif self._rounded_rating - rating > 150:
self._rounded_rating = rounded_rating+100
else:
pass
self._best = max(self._best, self._rounded_rating)
new_rank = self.rank
return new_rank if new_rank != old_rank else False
def parse_record(record):
types = [int, str, str, str, str, str, int, int, int, int]
columns = [f(token) for (f, token) in zip(types, record)] + [datetime.strptime(record[5][1:7], "%y%m%d")]
return tuple(columns)
def round_rating(r):
return (r+50)//100*100
def rating_to_rank(rating):
rank_list = [str(i)+"k" for i in range(30, 0, -1)] + [str(i)+"d" for i in range(1, 8)]
key = round_rating(rating)//100
return rank_list[min(key+9, 36)]
def main(s, since, to, args):
records = [Record.parse(line) for line in s.splitlines()]
records = filter(lambda rec: rec is not None, records)
national_records = filter(lambda rec: rec.country_code == args.country_code, records)
player_records = groupby(national_records, lambda rec: rec.pin)
print("Detecting rank changes...")
hr = "=" * shutil.get_terminal_size().columns
found_anything = False
print(hr)
for (pin, recs) in player_records:
tourneys = list(recs)
tracker = RankTracker(tourneys[0].rating_before)
steps = []
for r in tourneys:
# omit reset ratings
if tracker.rounded_rating != round_rating(r.rating_before):
tracker = RankTracker(r.rating_before)
old_rank = tracker.rank
new_rank = tracker.update(r.rating_after)
if new_rank is not False:
r.rank_change = (old_rank, new_rank)
steps.append(r)
steps = [r for r in steps if since <= r.date <= to]
if steps:
print("\n".join(map(str, steps)))
found_anything = True
if not found_anything:
print("Nothing found.")
print(hr)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("since", help="a date in YYYYMMDD format")
parser.add_argument("to", nargs="?", help="a date in YYYYMMDD format")
parser.add_argument("-c", "--country-code", default="CZ", help="a two letter country code, default=CZ")
parser.add_argument("-f", "--file", help="a path to the rating history file")
args = parser.parse_args()
since = datetime.strptime(args.since, "%Y%m%d")
to = datetime.strptime(args.to, "%Y%m%d") if args.to else datetime.now()
if args.file:
print("Reading {} ...".format(args.file))
with ZipFile(args.file) as f:
s = f.read("all.hst").decode("utf-8")
else:
url = "https://europeangodatabase.eu/EGD/EGD_2_0/downloads/hisgor.zip"
print("Downloading data from {} ...".format(url))
with urllib.request.urlopen(url) as f:
compressed_data = f.read()
with ZipFile(io.BytesIO(compressed_data)) as g:
s = g.read("all.hst").decode("utf-8")
print("Processing...")
main(s, since, to, args)
print("Done.")
|