Files @ c234eb08d816
Branch filter:

Location: Rank-Progress/rank_progress.py

Laman
fixed wrong rating reset
import sys
import re
import io
from itertools import groupby
from datetime import datetime
import shutil
import argparse
import zlib
from zipfile import ZipFile
import urllib.request
from urllib.error import URLError
from base64 import b64decode


class Record:
	def __init__(self, tokens, source):
		self.source = source

		self.pin = int(tokens[0])
		self.country_code = tokens[2]
		self.rating_before = int(tokens[8])
		self.rating_after = int(tokens[9])

		tournament_code = tokens[5]
		self.date = datetime.strptime(tournament_code[1:7], "%y%m%d")

		self.rank_change = ("", "")

	@classmethod
	def parse(cls, line):
		tokens = re.split(r" {2,}", line.strip())

		if len(tokens) != 10:
			return None

		return cls(tokens, line)

	def __str__(self):
		s = self.source + "  {} -> {}".format(*self.rank_change)
		return s


class RankTracker:
	def __init__(self, rating):
		assert rating >= -900
		self._precise_rating = rating
		self._rounded_rating = round_rating(rating)
		self._best = self._rounded_rating

	@property
	def rank(self):
		return rating_to_rank(self._rounded_rating)

	@property
	def rounded_rating(self):
		return self._rounded_rating

	@property
	def precise_rating(self):
		return self._precise_rating

	def update(self, rating):
		assert rating >= -900
		rounded_rating = round_rating(rating)
		old_rank = self.rank

		self._precise_rating = rating

		if rounded_rating == self._rounded_rating:
			pass
		elif rounded_rating > self._rounded_rating:  # promotion
			# when pushing the best by 2 and more ranks you have to cross the hundred, not just the fifty
			if rounded_rating >= self._best+200 and rating < rounded_rating:
				self._rounded_rating = rounded_rating-100
			else:
				self._rounded_rating = rounded_rating
		else:  # demotion
			# 100 points for 5k and better
			if self._rounded_rating >= 1600 and self._rounded_rating - rating > 100:
				self._rounded_rating = rounded_rating
			# 150 points for the others
			elif self._rounded_rating - rating > 150:
				self._rounded_rating = rounded_rating+100
			else:
				pass

		if self._rounded_rating > self._best and self.rank != old_rank:
			self._best = self._rounded_rating
			return self.rank
		else:
			return False


def parse_record(record):
	types = [int, str, str, str, str, str, int, int, int, int]
	columns = [f(token) for (f, token) in zip(types, record)] + [datetime.strptime(record[5][1:7], "%y%m%d")]
	return tuple(columns)


def round_rating(r):
	return (r+50)//100*100


def rating_to_rank(rating):
	rank_list = [str(i)+"k" for i in range(30, 0, -1)] + [str(i)+"d" for i in range(1, 8)]
	key = round_rating(rating)//100
	return rank_list[min(key+9, 36)]


def main(s, since, to, args):
	records = [Record.parse(line) for line in s.splitlines()]
	records = filter(lambda rec: rec is not None, records)
	national_records = filter(lambda rec: rec.country_code == args.country_code, records)
	player_records = groupby(national_records, lambda rec: rec.pin)
	print("Detecting rank changes...")
	hr = "=" * shutil.get_terminal_size().columns
	found_anything = False
	print(hr)

	for (pin, recs) in player_records:
		tourneys = list(recs)
		tracker = RankTracker(tourneys[0].rating_before)
		steps = []

		for r in tourneys:
			# omit reset ratings
			if tracker.precise_rating != r.rating_before:
				tracker = RankTracker(r.rating_before)

			old_rank = tracker.rank
			new_rank = tracker.update(r.rating_after)
			if new_rank is not False:
				r.rank_change = (old_rank, new_rank)
				steps.append(r)

		steps = [r for r in steps if since <= r.date <= to]
		if steps:
			print("\n".join(map(str, steps)))
			found_anything = True

	if not found_anything:
		print("Nothing found.")
	print(hr)


if __name__ == "__main__":
	copyright_notice = str(zlib.decompress(b64decode(b'eJw1zk0OgjAUBOCrzAUsIa5wZdiwMYYYiOvSPoEALXl9+HcbzmK8lzXGzazmy8yZexFyaB6otKXX+l6REw+TdtDOYuwNuUAWi7PEKI41ivKA6xae4ehGrFB1hOAXNgTjbYxoG8LFRwMt6ETmsEsSptmrNLunmTLP5KTdsCnZt0whQCFf2vCn3yZLXBUPUc3vz54m3Y+RfgDMjz+e')), encoding="utf-8")

	parser = argparse.ArgumentParser(epilog=copyright_notice)
	parser.add_argument("since", help="a date in YYYYMMDD format")
	parser.add_argument("to", nargs="?", help="a date in YYYYMMDD format")
	parser.add_argument("-c", "--country-code", default="CZ", help="a two letter country code, default=CZ")
	parser.add_argument("-f", "--file", help="a path to the rating history file")

	args = parser.parse_args()
	
	since = datetime.strptime(args.since, "%Y%m%d")
	to = datetime.strptime(args.to, "%Y%m%d") if args.to else datetime.now()

	if args.file:
		print("Reading {} ...".format(args.file))
		with ZipFile(args.file) as f:
			s = f.read("all.hst").decode("iso8859")
	else:
		url = "https://europeangodatabase.eu/EGD/EGD_2_0/downloads/hisgor.zip"
		print("Downloading data from {} ...".format(url))
		try:
			with urllib.request.urlopen(url) as f:
				compressed_data = f.read()
				with ZipFile(io.BytesIO(compressed_data)) as g:
					s = g.read("all.hst").decode("iso8859")
		except URLError as e:
			print(
				"* Failed to connect to the server. "
				"You can try again later or download {} and run the program with the -f parameter.\n"
				"The error message:\n{}".format(url, e)
			)
			sys.exit(1)

	print("Processing...")
	main(s, since, to, args)

	print("Done.")