Changeset - b9f1f39cd7af
[Not reviewed]
0 2 0
Laman - 5 years ago 2020-04-13 13:53:07

check for non-unique shares on joining
2 files changed with 6 insertions and 0 deletions:
0 comments (0 inline, 0 general)
Show inline comments
# easier to print, single file, reconstruct-only version of the code
# encoding fixed to Base32
# Python 3.6
# full version available at

"""Arithmetic operations on Galois Field 2**8. See"""


def gfmul(a, b):
	"""Basic multiplication. Russian peasant algorithm."""
	res = 0
	while a and b:
		if b&1: res ^= a
		if a&0x80: a = 0xff&(a<<1)^0x1b
		else: a <<= 1
		b >>= 1
	return res


g = 3  # generator
E = [None]*256  # exponentials
L = [None]*256  # logarithms
acc = 1
for i in range(256):
	E[i] = acc
	L[acc] = i
	acc = gfmul(acc, g)
L[1] = 0
inv = [E[255-L[i]] if i!=0 else None for i in range(256)]  # multiplicative inverse


def get_constant_coef(*points):
	"""Compute constant polynomial coefficient given the points.

	k = len(points)
	res = 0
	for i in range(k):
		(x, y) = points[i]
		prod = 1
		for j in range(k):
			if i==j: continue
			(xj, yj) = points[j]
			prod = gfmul(prod, (gfmul(xj, inv[xj^x])))
		res ^= gfmul(y, prod)
	return res


import base64
import binascii


class SException(Exception): pass


def reconstruct_raw(*shares):
	"""Tries to recover the secret from its shares.

	:param shares: (((int) i, (bytes) share), ...)
	:return: (bytes) reconstructed secret. Too few shares returns garbage."""
	if len({x for (x, _) in shares}) < len(shares):
		raise SException("Found a non-unique share. Please check your inputs.")

	secret_len = len(shares[0][1])
	res = [None]*secret_len
	for i in range(secret_len):
		points = [(x, s[i]) for (x, s) in shares]
		res[i] = (get_constant_coef(*points))
	return bytes(res)


def reconstruct(*shares):
	"""Wraps reconstruct_raw.

	:param shares: ((str) share, ...)
	:return: (str) reconstructed secret. Too few shares returns garbage."""

	bs = reconstruct_raw(*(decode(s) for s in shares))
		return bs.decode(encoding="utf-8")
	except UnicodeDecodeError:
		raise SException('Failed to decode bytes to utf-8. Either you supplied invalid shares, or you missed the "raw" flag. Offending value: "{0}"'.format(bs))


def decode(share):
		(*_, i, share_str) = share.split(".")
		i = int(i)
		if not 1<=i<=255:
			raise SException("Malformed share: Failed 1<=k<=255, k={0}".format(i))

		share_bytes = base64.b32decode(share_str)
		return (i, share_bytes)
	except (ValueError, binascii.Error):
		raise SException('Malformed share: share="{0}"'.format(share))


from argparse import ArgumentParser


def run():
	parser = ArgumentParser()
	subparsers = parser.add_subparsers()

	joiner = subparsers.add_parser("join")
	joiner.add_argument("share", nargs="+", help="shares to be joined")

	parser.set_defaults(func=lambda: parser.error("missing command"))

	args = parser.parse_args()


def _reconstruct(args):
	try: print(reconstruct(*args.share))
	except SException as e: print(e)


if __name__=="__main__":
Show inline comments

import os
import re
import base64
import binascii

import gf256


class SException(Exception): pass
class InvalidParams(SException): pass
class DetectionException(SException): pass
class DecodingException(SException): pass
class MalformedShare(SException): pass


def _share_byte(secret_b, k, n):
	if not k<=n<255:
		raise InvalidParams("Failed k<=n<255, k={0}, n={1}".format(k, n))
	# we might be concerned with zero coefficients degenerating our polynomial, but there's no reason - we still need k shares to determine it is the case
	coefs = [int(secret_b)]+[int(b) for b in os.urandom(k-1)]
	points = [gf256.evaluate(coefs, i) for i in range(1, n+1)]
	return points


def generate_raw(secret, k, n):
	"""Splits secret into shares.

	:param secret: (bytes)
	:param k: number of shares necessary for secret recovery. 1 <= k <= n
	:param n: (int) number of shares generated. 1 <= n < 255
	:return: [(i, (bytes) share), ...]"""
	shares = [_share_byte(b, k, n) for b in secret]
	return [(i+1, bytes([s[i] for s in shares])) for i in range(n)]


def reconstruct_raw(*shares):
	"""Tries to recover the secret from its shares.

	:param shares: (((int) i, (bytes) share), ...)
	:return: (bytes) reconstructed secret. Too few shares return garbage."""
	if len({x for (x, _) in shares}) < len(shares):
		raise MalformedShare("Found a non-unique share. Please check your inputs.")

	secret_len = len(shares[0][1])
	res = [None]*secret_len
	for i in range(secret_len):
		points = [(x, s[i]) for (x, s) in shares]
		res[i] = (gf256.get_constant_coef(*points))
	return bytes(res)


def generate(secret, k, n, encoding="b32", label="", omit_k_n=False):
	"""Wraps generate_raw().

	:param secret: (str or bytes)
	:param k: number of shares necessary for secret recovery
	:param n: number of shares generated
	:param encoding: {hex, b32, b64} desired output encoding. Hexadecimal, Base32 or Base64.
	:param label: (str) any label to prefix the shares with
	:param omit_k_n: (boolean) suppress the default shares prefix
	:return: [(str) share, ...]"""
	if isinstance(secret,str):
		secret = secret.encode("utf-8")
	shares = generate_raw(secret, k, n)

	prefix = ""
	if label:
		prefix = label + "."
	if not omit_k_n:
		prefix += "{0}.{1}.".format(k, n)

	return [prefix + encode(s, encoding) for s in shares]


def reconstruct(*shares, encoding="", raw=False):
	"""Wraps reconstruct_raw.

	:param shares: ((str) share, ...)
	:param encoding: {hex, b32, b64, ""} encoding of share strings. If not provided or empty, the function tries to guess it.
	:param raw: (bool) whether to return bytes (True) or str (False)
	:return: (str or bytes) reconstructed secret. Too few shares returns garbage."""
	if not encoding:
		encoding = detect_encoding(shares)

	bs = reconstruct_raw(*(decode(s, encoding) for s in shares))
		return bs if raw else bs.decode(encoding="utf-8")
	except UnicodeDecodeError:
		raise DecodingException('Failed to decode bytes to utf-8. Either you supplied invalid shares, or you missed the "raw" flag. Offending value: {0}'.format(bs))


def encode(share, encoding="b32"):
	if encoding=="hex": f = base64.b16encode
	elif encoding=="b32": f = base64.b32encode
	else: f = base64.b64encode
	(i, bs) = share
	return "{0}.{1}".format(i, f(bs).decode("utf-8"))


def decode(share, encoding="b32"):
		(*_, i, share_str) = share.split(".")
		i = int(i)
		if not 1<=i<=255:
			raise MalformedShare("Malformed share: Failed 1<=k<=255, k={0}".format(i))
		if encoding=="hex": f = base64.b16decode
		elif encoding=="b32": f = base64.b32decode
		else: f = base64.b64decode
		share_bytes = f(share_str)
		return (i, share_bytes)
	except (ValueError, binascii.Error):
		raise MalformedShare('Malformed share: share="{0}", encoding="{1}"'.format(share, encoding))


def detect_encoding(shares):
	classes = [
		(re.compile(r"(.*\.)?\d+\.([0-9A-F]{2})+"), "hex"),
		(re.compile(r"(.*\.)?\d+\.([A-Z2-7]{8})*([A-Z2-7]{8}|[A-Z2-7]{2}={6}|[A-Z2-7]{4}={4}|[A-Z2-7]{5}={3}|[A-Z2-7]{7}={1})"), "b32"),
		(re.compile(r"(.*\.)?\d+\.([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{4}|[A-Za-z0-9+/]{2}={2}|[A-Za-z0-9+/]{3}={1})"), "b64")
	for (regexp, res) in classes:
		if all(regexp.fullmatch(share) for share in shares):
			return res
	raise DetectionException("No expected encoding detected")


if __name__=="__main__":
	import cli
0 comments (0 inline, 0 general)