Files
@ 3a359a8a2134
Branch filter:
Location: Morevna/work.py
3a359a8a2134
3.7 KiB
text/x-python
optimizing disk access
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | import hashlib
import sys
import datetime
import sqlite3
import math
import pathlib
import itertools
BLOCK_SIZE=4096
HASH_LEN=8
DB="morevna.db"
FILES_PER_DIR=100
FILE_COUNT=2**21
def initDB():
conn=sqlite3.connect("morevna.db")
c=conn.cursor()
c.execute("""create table if not exists `hashes` (
`sector_id` integer primary key,
`sector_hash` blob not null,
`dirty` integer check(`dirty` in (0,1))
)""")
c.execute("""create index if not exists `dirty_index` on `hashes` (`dirty`)""")
conn.commit()
def initHashes(fileName):
initDB()
with sqlite3.connect(DB) as db:
handle=db.cursor()
handle.executemany(
"""insert into `hashes` (`sector_id`,`sector_hash`,`dirty`) values (:id,:hash,1)""",
({"id":i,"hash":dataHash} for (i,dataHash) in enumerate(chunks(fileName)))
)
db.commit()
def chunks(fileName):
with open(fileName, mode="br") as f:
data=f.read(BLOCK_SIZE)
while data:
yield hashlib.sha256(data).digest()[:HASH_LEN]
data=f.read(BLOCK_SIZE)
def hashes():
with sqlite3.connect(DB) as db:
handle=db.cursor()
handle.execute("""select `sector_hash` from `hashes` order by `sector_id` asc""")
h=handle.fetchone()
while h is not None:
yield h
h=handle.fetchone()
def findChanges(fileName):
changelist=[]
# build changelist
# can use up to (HASH_LEN+size(int))*(filesize/BLOCK_SIZE) bytes of memory plus overhead
# that's (8+4)*(8*2**30 / 4096) = 24MB for defaults
for (i,(dataHash,savedHash)) in enumerate(itertools.zip_longest(chunks(fileName),hashes())):
if dataHash!=savedHash:
changelist.append((i,dataHash))
if dataHash is None: break # shouldn't happen
# write log
with open(logFile, mode="w") as f:
f.write("sector hash\n")
for (i,dataHash) in changelist:
f.write("{0}\t{1}\n".format(i,dataHash))
# update DB
with sqlite3.connect(DB) as db:
handle=db.cursor()
handle.executemany(
"""update `hashes` set `sector_hash`=:hash, `dirty`=1 where `sector_id`=:id""",
({"id":i,"hash":dataHash} for (i,dataHash) in changelist)
)
db.commit()
def transferChanges(targetPath):
# read changes
with sqlite3.connect(DB) as db, open(fileName,mode="rb") as sf:
handle=db.cursor()
handle.execute("""select `sector_id` from `hashes` where `dirty`=1""")
# transfer modified sectors and mark them as clean
sectorIds=handle.fetchall()
'''for (sectorId,) in sectorIds:
path=targetPath / getPath(sectorId)
try: path.parent.mkdir(parents=True)
except FileExistsError: pass
df=path.open(mode="wb")
sf.seek(sectorId)
df.write(sf.read(BLOCK_SIZE))
handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorId,))
db.commit()'''
sector=sf.read(BLOCK_SIZE)
i=j=0
while sector and j<len(sectorIds):
if i==sectorIds[j][0]:
path=targetPath / getPath(sectorId)
try: path.parent.mkdir(parents=True)
except FileExistsError: pass
df=path.open(mode="wb")
df.write(sector)
handle.execute("""update `hashes` set `dirty`=0 where `sector_id`=?""",(sectorIds[j][0],))
db.commit()
j+=1
i+=1
sector=sf.read(BLOCK_SIZE)
def getPath(index):
nodeIds=[]
k=1
while k<=FILE_COUNT:
paddedLen=int(math.log10(FILE_COUNT/k))
nodeIds.append("{0:0{1}}",index//k,paddedLen)
k*=FILES_PER_DIR
nodeIds.reverse()
return pathlib.Path(*[id for id in enumerate(nodeIds)])
action=sys.argv[1]
fileName=sys.argv[2]
baseName=".".join(fileName.split(".")[:-1])
hashFile="{0}-hash.dat".format(baseName)
isoDate=datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
logFile="{0}-{1}.log".format(baseName,isoDate)
if action=="init": initHashes(fileName)
elif action=="update": findChanges(fileName)
elif action=="transfer": transferChanges(sys.argv[3])
else: print("bad action")
|