Changeset - b9be6aa9ece0
[Not reviewed]
default
0 1 0
Laman - 3 years ago 2021-12-16 21:20:07

added a copyright notice to the help
1 file changed with 5 insertions and 1 deletions:
0 comments (0 inline, 0 general)
rank_progress.py
Show inline comments
 
import sys
 
import re
 
import io
 
from itertools import groupby
 
from datetime import datetime
 
import shutil
 
import argparse
 
import zlib
 
from zipfile import ZipFile
 
import urllib.request
 
from urllib.error import URLError
 
from base64 import b64decode
 

	
 

	
 
class Record:
 
	def __init__(self, tokens, source):
 
		self.source = source
 

	
 
		self.pin = int(tokens[0])
 
		self.country_code = tokens[2]
 
		self.rating_before = int(tokens[8])
 
		self.rating_after = int(tokens[9])
 

	
 
		tournament_code = tokens[5]
 
		self.date = datetime.strptime(tournament_code[1:7], "%y%m%d")
 

	
 
		self.rank_change = ("", "")
 

	
 
	@classmethod
 
	def parse(cls, line):
 
		tokens = re.split(r" {2,}", line.strip())
 

	
 
		if len(tokens) != 10:
 
			return None
 

	
 
		return cls(tokens, line)
 

	
 
	def __str__(self):
 
		s = self.source + "  {} -> {}".format(*self.rank_change)
 
		return s
 

	
 

	
 
class RankTracker:
 
	def __init__(self, rating):
 
		assert rating >= -900
 
		self._rounded_rating = round_rating(rating)
 
		self._best = self._rounded_rating
 

	
 
	@property
 
	def rank(self):
 
		return rating_to_rank(self._rounded_rating)
 

	
 
	@property
 
	def rounded_rating(self):
 
		return self._rounded_rating
 

	
 
	def update(self, rating):
 
		assert rating >= -900
 
		rounded_rating = round_rating(rating)
 
		old_rank = self.rank
 

	
 
		if rounded_rating == self._rounded_rating:
 
			pass
 
		elif rounded_rating > self._rounded_rating:  # promotion
 
			# when pushing the best by 2 and more ranks you have to cross the hundred, not just the fifty
 
			if rounded_rating >= self._best+200 and rating < rounded_rating:
 
				self._rounded_rating = rounded_rating-100
 
			else:
 
				self._rounded_rating = rounded_rating
 
		else:  # demotion
 
			# 100 points for 5k and better
 
			if self._rounded_rating >= 1600 and self._rounded_rating - rating > 100:
 
				self._rounded_rating = rounded_rating
 
			# 150 points for the others
 
			elif self._rounded_rating - rating > 150:
 
				self._rounded_rating = rounded_rating+100
 
			else:
 
				pass
 

	
 
		self._best = max(self._best, self._rounded_rating)
 
		new_rank = self.rank
 
		return new_rank if new_rank != old_rank else False
 

	
 

	
 
def parse_record(record):
 
	types = [int, str, str, str, str, str, int, int, int, int]
 
	columns = [f(token) for (f, token) in zip(types, record)] + [datetime.strptime(record[5][1:7], "%y%m%d")]
 
	return tuple(columns)
 

	
 

	
 
def round_rating(r):
 
	return (r+50)//100*100
 

	
 

	
 
def rating_to_rank(rating):
 
	rank_list = [str(i)+"k" for i in range(30, 0, -1)] + [str(i)+"d" for i in range(1, 8)]
 
	key = round_rating(rating)//100
 
	return rank_list[min(key+9, 36)]
 

	
 

	
 
def main(s, since, to, args):
 
	records = [Record.parse(line) for line in s.splitlines()]
 
	records = filter(lambda rec: rec is not None, records)
 
	national_records = filter(lambda rec: rec.country_code == args.country_code, records)
 
	player_records = groupby(national_records, lambda rec: rec.pin)
 
	print("Detecting rank changes...")
 
	hr = "=" * shutil.get_terminal_size().columns
 
	found_anything = False
 
	print(hr)
 

	
 
	for (pin, recs) in player_records:
 
		tourneys = list(recs)
 
		tracker = RankTracker(tourneys[0].rating_before)
 
		steps = []
 

	
 
		for r in tourneys:
 
			# omit reset ratings
 
			if tracker.rounded_rating != round_rating(r.rating_before):
 
				tracker = RankTracker(r.rating_before)
 

	
 
			old_rank = tracker.rank
 
			new_rank = tracker.update(r.rating_after)
 
			if new_rank is not False:
 
				r.rank_change = (old_rank, new_rank)
 
				steps.append(r)
 

	
 
		steps = [r for r in steps if since <= r.date <= to]
 
		if steps:
 
			print("\n".join(map(str, steps)))
 
			found_anything = True
 

	
 
	if not found_anything:
 
		print("Nothing found.")
 
	print(hr)
 

	
 

	
 
if __name__ == "__main__":
 
	parser = argparse.ArgumentParser()
 
	copyright_notice = str(zlib.decompress(b64decode(b'eJw1zk0OgjAUBOCrzAUsIa5wZdiwMYYYiOvSPoEALXl9+HcbzmK8lzXGzazmy8yZexFyaB6otKXX+l6REw+TdtDOYuwNuUAWi7PEKI41ivKA6xae4ehGrFB1hOAXNgTjbYxoG8LFRwMt6ETmsEsSptmrNLunmTLP5KTdsCnZt0whQCFf2vCn3yZLXBUPUc3vz54m3Y+RfgDMjz+e')), encoding="utf-8")
 

	
 
	parser = argparse.ArgumentParser(epilog=copyright_notice)
 
	parser.add_argument("since", help="a date in YYYYMMDD format")
 
	parser.add_argument("to", nargs="?", help="a date in YYYYMMDD format")
 
	parser.add_argument("-c", "--country-code", default="CZ", help="a two letter country code, default=CZ")
 
	parser.add_argument("-f", "--file", help="a path to the rating history file")
 

	
 
	args = parser.parse_args()
 
	
 
	since = datetime.strptime(args.since, "%Y%m%d")
 
	to = datetime.strptime(args.to, "%Y%m%d") if args.to else datetime.now()
 

	
 
	if args.file:
 
		print("Reading {} ...".format(args.file))
 
		with ZipFile(args.file) as f:
 
			s = f.read("all.hst").decode("utf-8")
 
	else:
 
		url = "https://europeangodatabase.eu/EGD/EGD_2_0/downloads/hisgor.zip"
 
		print("Downloading data from {} ...".format(url))
 
		try:
 
			with urllib.request.urlopen(url) as f:
 
				compressed_data = f.read()
 
				with ZipFile(io.BytesIO(compressed_data)) as g:
 
					s = g.read("all.hst").decode("utf-8")
 
		except URLError as e:
 
			print(
 
				"* Failed to connect to the server. "
 
				"You can try again later or download {} and run the program with the -f parameter.\n"
 
				"The error message:\n{}".format(url, e)
 
			)
 
			sys.exit(1)
 

	
 
	print("Processing...")
 
	main(s, since, to, args)
 

	
 
	print("Done.")
0 comments (0 inline, 0 general)