Files
@ 5c80ca07f00c
Branch filter:
Location: Morevna/src/util.py
5c80ca07f00c
2.4 KiB
text/x-python
reformatted whitespace with more respect for PEP-8
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 | import math
import os
import sys
import collections
from datetime import datetime
def spawnDaemon(fun):
# do the UNIX double-fork magic, see Stevens' "Advanced
# Programming in the UNIX Environment" for details (ISBN 0201563177)
try:
pid = os.fork()
if pid > 0:
# parent process, return and keep running
return
except OSError as e:
print("fork #1 failed: {0} ({1})".format(e.errno,e.strerror),file=sys.stderr)
sys.exit(1)
os.setsid()
# do second fork
try:
pid = os.fork()
if pid > 0:
# exit from second parent
print("[{0}] server running".format(pid))
sys.exit(0)
except OSError as e:
print("fork #2 failed: {0} ({1})".format(e.errno, e.strerror), file=sys.stderr)
sys.exit(1)
fun()
# all done
os._exit(os.EX_OK)
def splitHost(host, defaultPort=0):
address, _, port = host.partition(":")
if not port: port = defaultPort
return (address, port)
class Progress:
def __init__(self, n, i0=0):
self._n = n
self._i0 = i0
self._i = i0
self._last = ""
self._past = collections.deque()
def p(self, i):
i0 = self._i0
n = self._n
now = datetime.now()
assert i0<=i<n or n<i<=i0, (i0, i, n)
percentage = Progress._p(i, n, i0)
res = "{0}%".format(percentage)
if len(self._past)==0 or (now-self._past[-1][0]).total_seconds()>1:
self._past.append((now, i))
if res!=self._last or (now-self._past[0][0]).total_seconds()>5:
eta = formatSeconds(self.eta(i))
self._print("{0} (ETA {1})".format(res, eta))
self._last = res
while (now-self._past[0][0]).total_seconds()>5:
self._past.popleft()
def done(self):
self._print("100%", end="\n")
def eta(self, i2):
t2 = datetime.now()
(t1, i1) = self._past[0]
if i2==i1: return float("nan")
return (self._n-i2)/(i2-i1)*(t2-t1).total_seconds()
@staticmethod
def _p(i, n, i0):
_1=1 if n>=i0 else -1
return 100*(i+_1-i0)//(n-i0)
def _print(self, s, end=""):
print("\r"+" "*80, end="")
print("\r"+s, end=end)
def formatSeconds(secs):
if math.isnan(secs): return "?"
secs = round(secs)
if secs<60: return "{0}s".format(secs)
mins = secs//60
secs %= 60
if mins<60: return "{0:02}:{1:02}".format(mins, secs)
hours = mins//60
mins %= 60
return "{0:02}:{1:02}:{2:02}".format(hours, mins, secs)
if __name__=="__main__":
import random
import time
progress = Progress(100)
for i in range(1, 100):
progress.p(i)
time.sleep(random.random()*2)
progress.done()
|