Changeset - 7c94e9b021f6
[Not reviewed]
default
0 3 0
Laman - 4 years ago 2020-04-13 13:50:53

prefixing shares with labels
3 files changed with 23 insertions and 11 deletions:
0 comments (0 inline, 0 general)
src/cli.py
Show inline comments
 
@@ -7,77 +7,80 @@ from shamira import generate, reconstruc
 

	
 

	
 
def run():
 
	parser = ArgumentParser()
 
	subparsers = parser.add_subparsers()
 

	
 
	build_split_parser(subparsers.add_parser("split"))
 
	build_join_parser(subparsers.add_parser("join"))
 

	
 
	parser.set_defaults(func=lambda _: parser.error("missing command"))
 

	
 
	args = parser.parse_args()
 
	args.func(args)
 

	
 

	
 
def build_split_parser(parser):
 
	parser.add_argument("-k", type=int, required=True, help="number of shares necessary for recovering the secret")
 
	parser.add_argument("-n", type=int, required=True, help="number of generated shares")
 

	
 
	encoding = parser.add_mutually_exclusive_group()
 
	encoding.add_argument("--hex", action="store_true", help="encode shares' bytes as a hexadecimal string")
 
	encoding.add_argument("--b32", action="store_true", help="encode shares' bytes as a base32 string")
 
	encoding.add_argument("--b64", action="store_true", help="encode shares' bytes as a base64 string")
 

	
 
	parser.add_argument("--label", help="any label to prefix the shares with")
 
	parser.add_argument("--omit_k_n", action="store_true", help="suppress the default shares prefix")
 

	
 
	parser.add_argument("secret", nargs="?", help="a secret to be split. Can be provided on the command line,"
 
		" redirected through stdin, or will be asked for interactively.")
 
	parser.set_defaults(func=_generate)
 
	
 

	
 
def build_join_parser(parser):
 
	encoding = parser.add_mutually_exclusive_group()
 
	encoding.add_argument("--hex", action="store_true", help="decode shares' bytes from a hexadecimal string")
 
	encoding.add_argument("--b32", action="store_true", help="decode shares' bytes from a base32 string")
 
	encoding.add_argument("--b64", action="store_true", help="decode shares' bytes from a base64 string")
 

	
 
	parser.add_argument("-r", "--raw", action="store_true", help="return the secret as raw bytes")
 
	parser.add_argument("share", nargs="*", help="shares to be joined. Can be provided on the command line,"
 
		" redirected through stdin, or will be asked for interactively.")
 
	parser.set_defaults(func=_reconstruct)
 

	
 

	
 
def _generate(args):
 
	encoding = get_encoding(args) or "b32"
 

	
 
	if args.secret:  # provided as a positional argument
 
		secret = args.secret
 
	elif sys.stdin.isatty():  # input from terminal
 
		secret = input("Enter your secret:\n")
 
	else:  # redirected from other source
 
		secret = sys.stdin.read()
 

	
 
	try:
 
		shares = generate(secret, args.k, args.n, encoding)
 
		shares = generate(secret, args.k, args.n, encoding, label=args.label, omit_k_n=args.omit_k_n)
 
		for s in shares:
 
			print(s)
 
	except SException as e:
 
		print(e, file=sys.stderr)
 

	
 

	
 
def _reconstruct(args):
 
	encoding = get_encoding(args)
 

	
 
	if args.share:  # provided as a positional argument
 
		shares = args.share
 
	elif sys.stdin.isatty():  # input from terminal
 
		print("Enter the shares, each on separate line, end with an empty line:")
 
		shares = []
 
		while not shares or shares[-1]:
 
			shares.append(input())
 
		shares.pop()
 
	else:  # redirected from other source
 
		shares = sys.stdin.read().split()
 

	
 
	try:
 
		print(reconstruct(*shares, encoding=encoding, raw=args.raw))
 
	except SException as e:
 
		print(e, file=sys.stderr)
src/condensed.py
Show inline comments
 
@@ -37,74 +37,74 @@ def get_constant_coef(*points):
 
	k = len(points)
 
	res = 0
 
	for i in range(k):
 
		(x, y) = points[i]
 
		prod = 1
 
		for j in range(k):
 
			if i==j: continue
 
			(xj, yj) = points[j]
 
			prod = gfmul(prod, (gfmul(xj, inv[xj^x])))
 
		res ^= gfmul(y, prod)
 
	return res
 

	
 
###
 

	
 
import base64
 
import binascii
 

	
 

	
 
class SException(Exception): pass
 

	
 

	
 
def reconstruct_raw(*shares):
 
	"""Tries to recover the secret from its shares.
 

	
 
	:param shares: ((i, (bytes) share), ...)
 
	:param shares: (((int) i, (bytes) share), ...)
 
	:return: (bytes) reconstructed secret. Too few shares returns garbage."""
 
	secret_len = len(shares[0][1])
 
	res = [None]*secret_len
 
	for i in range(secret_len):
 
		points = [(x, s[i]) for (x, s) in shares]
 
		res[i] = (get_constant_coef(*points))
 
	return bytes(res)
 

	
 

	
 
def reconstruct(*shares):
 
	"""Wraps reconstruct_raw.
 

	
 
	:param shares: ((str) share, ...)
 
	:return: (str) reconstructed secret. Too few shares returns garbage."""
 

	
 
	bs = reconstruct_raw(*(decode(s) for s in shares))
 
	try:
 
		return bs.decode(encoding="utf-8")
 
	except UnicodeDecodeError:
 
		raise SException('Failed to decode bytes to utf-8. Either you supplied invalid shares, or you missed the "raw" flag. Offending value: "{0}"'.format(bs))
 

	
 

	
 
def decode(share):
 
	try:
 
		(i, _, share_str) = share.partition(".")
 
		(*_, i, share_str) = share.split(".")
 
		i = int(i)
 
		if not 1<=i<=255:
 
			raise SException("Malformed share: Failed 1<=k<=255, k={0}".format(i))
 

	
 
		share_bytes = base64.b32decode(share_str)
 
		return (i, share_bytes)
 
	except (ValueError, binascii.Error):
 
		raise SException('Malformed share: share="{0}"'.format(share))
 

	
 
###
 

	
 
from argparse import ArgumentParser
 

	
 

	
 
def run():
 
	parser = ArgumentParser()
 
	subparsers = parser.add_subparsers()
 

	
 
	joiner = subparsers.add_parser("join")
 
	joiner.add_argument("share", nargs="+", help="shares to be joined")
 
	joiner.set_defaults(func=_reconstruct)
 

	
 
	parser.set_defaults(func=lambda: parser.error("missing command"))
 

	
src/shamira.py
Show inline comments
 
@@ -17,103 +17,112 @@ class MalformedShare(SException): pass
 

	
 
def _share_byte(secret_b, k, n):
 
	if not k<=n<255:
 
		raise InvalidParams("Failed k<=n<255, k={0}, n={1}".format(k, n))
 
	# we might be concerned with zero coefficients degenerating our polynomial, but there's no reason - we still need k shares to determine it is the case
 
	coefs = [int(secret_b)]+[int(b) for b in os.urandom(k-1)]
 
	points = [gf256.evaluate(coefs, i) for i in range(1, n+1)]
 
	return points
 

	
 

	
 
def generate_raw(secret, k, n):
 
	"""Splits secret into shares.
 

	
 
	:param secret: (bytes)
 
	:param k: number of shares necessary for secret recovery. 1 <= k <= n
 
	:param n: (int) number of shares generated. 1 <= n < 255
 
	:return: [(i, (bytes) share), ...]"""
 
	shares = [_share_byte(b, k, n) for b in secret]
 
	return [(i+1, bytes([s[i] for s in shares])) for i in range(n)]
 

	
 

	
 
def reconstruct_raw(*shares):
 
	"""Tries to recover the secret from its shares.
 

	
 
	:param shares: ((i, (bytes) share), ...)
 
	:return: (bytes) reconstructed secret. Too few shares returns garbage."""
 
	:param shares: (((int) i, (bytes) share), ...)
 
	:return: (bytes) reconstructed secret. Too few shares return garbage."""
 
	secret_len = len(shares[0][1])
 
	res = [None]*secret_len
 
	for i in range(secret_len):
 
		points = [(x, s[i]) for (x, s) in shares]
 
		res[i] = (gf256.get_constant_coef(*points))
 
	return bytes(res)
 

	
 

	
 
def generate(secret, k, n, encoding="b32"):
 
def generate(secret, k, n, encoding="b32", label="", omit_k_n=False):
 
	"""Wraps generate_raw().
 

	
 
	:param secret: (str or bytes)
 
	:param k: number of shares necessary for secret recovery
 
	:param n: number of shares generated
 
	:param encoding: {hex, b32, b64} desired output encoding. Hexadecimal, Base32 or Base64.
 
	:param label: (str) any label to prefix the shares with
 
	:param omit_k_n: (boolean) suppress the default shares prefix
 
	:return: [(str) share, ...]"""
 
	if isinstance(secret,str):
 
		secret = secret.encode("utf-8")
 
	shares = generate_raw(secret, k, n)
 
	return [encode(s, encoding) for s in shares]
 

	
 
	prefix = ""
 
	if label:
 
		prefix = label + "."
 
	if not omit_k_n:
 
		prefix += "{0}.{1}.".format(k, n)
 

	
 
	return [prefix + encode(s, encoding) for s in shares]
 

	
 

	
 
def reconstruct(*shares, encoding="", raw=False):
 
	"""Wraps reconstruct_raw.
 

	
 
	:param shares: ((str) share, ...)
 
	:param encoding: {hex, b32, b64, ""} encoding of share strings. If not provided or empty, the function tries to guess it.
 
	:param raw: (bool) whether to return bytes (True) or str (False)
 
	:return: (str or bytes) reconstructed secret. Too few shares returns garbage."""
 
	if not encoding:
 
		encoding = detect_encoding(shares)
 

	
 
	bs = reconstruct_raw(*(decode(s, encoding) for s in shares))
 
	try:
 
		return bs if raw else bs.decode(encoding="utf-8")
 
	except UnicodeDecodeError:
 
		raise DecodingException('Failed to decode bytes to utf-8. Either you supplied invalid shares, or you missed the "raw" flag. Offending value: {0}'.format(bs))
 

	
 

	
 
def encode(share, encoding="b32"):
 
	if encoding=="hex": f = base64.b16encode
 
	elif encoding=="b32": f = base64.b32encode
 
	else: f = base64.b64encode
 
	(i, bs) = share
 
	return "{0}.{1}".format(i, f(bs).decode("utf-8"))
 

	
 

	
 
def decode(share, encoding="b32"):
 
	try:
 
		(i, _, share_str) = share.partition(".")
 
		(*_, i, share_str) = share.split(".")
 
		i = int(i)
 
		if not 1<=i<=255:
 
			raise MalformedShare("Malformed share: Failed 1<=k<=255, k={0}".format(i))
 
		if encoding=="hex": f = base64.b16decode
 
		elif encoding=="b32": f = base64.b32decode
 
		else: f = base64.b64decode
 
		share_bytes = f(share_str)
 
		return (i, share_bytes)
 
	except (ValueError, binascii.Error):
 
		raise MalformedShare('Malformed share: share="{0}", encoding="{1}"'.format(share, encoding))
 

	
 

	
 
def detect_encoding(shares):
 
	classes = [
 
		(re.compile(r"\d+\.([0-9A-F]{2})+"), "hex"),
 
		(re.compile(r"\d+\.([A-Z2-7]{8})*([A-Z2-7]{8}|[A-Z2-7]{2}={6}|[A-Z2-7]{4}={4}|[A-Z2-7]{5}={3}|[A-Z2-7]{7}={1})"), "b32"),
 
		(re.compile(r"\d+\.([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{2}={2}|[A-Za-z0-9+/]{3}={1})"), "b64")
 
		(re.compile(r"(.*\.)?\d+\.([0-9A-F]{2})+"), "hex"),
 
		(re.compile(r"(.*\.)?\d+\.([A-Z2-7]{8})*([A-Z2-7]{8}|[A-Z2-7]{2}={6}|[A-Z2-7]{4}={4}|[A-Z2-7]{5}={3}|[A-Z2-7]{7}={1})"), "b32"),
 
		(re.compile(r"(.*\.)?\d+\.([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{2}={2}|[A-Za-z0-9+/]{3}={1})"), "b64")
 
	]
 
	for (regexp, res) in classes:
 
		if all(regexp.fullmatch(share) for share in shares):
 
			return res
 
	raise DetectionException("No expected encoding detected")
 

	
 

	
 
if __name__=="__main__":
 
	import cli
 
	cli.run()
0 comments (0 inline, 0 general)