Changeset - 32a0e0fcabd0
[Not reviewed]
0 6 0
Laman - 5 years ago 2020-05-03 18:28:16

optimized the reconstruction operation
6 files changed with 55 insertions and 32 deletions:
0 comments (0 inline, 0 general)
Show inline comments
# Shamira #

Implements [Shamir's secret sharing algorithm]('s_Secret_Sharing). Splits a string or a byte sequence byte-per-byte into _n_<255 shares, with any _k_ of them sufficient for reconstruction of the original input.

Outputs the shares as hexadecimal, Base32 or Base64 encoded strings.

## Installation and usage ##

Can be run straight from the cloned repository by executing the package with `python -m shamira` or simply installed with `python build`, `python install`. Then imported in your code with `import shamira` or run from the command line with `shamira`.

## Performance ##

As it is, the code is not very fast. Splitting takes _n_ evaluations of a polynomial of order _k_ over Galois field 256, leading to _O(n\*k)_ finite field multiplications. Reconstruction of the constant parameters during joining similarly takes _O(k\*k)_ multiplications.
As it is, the code is not very fast. Let's assume we have a secret of length _m_. For each byte, the splitting takes _n_ evaluations of a polynomial of order _k_ over Galois field 256, leading to _O(n\*k\*m)_ finite field multiplications. Reconstruction of the constant parameters during joining takes _O(k\*k + k\*m)_ multiplications.

Benchmark results, all values mean _seconds per byte_ of the secret length:
        <th>k / n parameters</th>
        <td>2 / 3 (a Raspberry Pi 3)</td>
        <td>2 / 3 (a laptop)</td>
        <td>254 / 254 (a Raspberry Pi 3)</td>
        <td>254 / 254 (a laptop)</td>

While the speeds are not awful, for longer secrets I recommend encrypting them with a random key of your choice and splitting only the key. Anyway, you can run your own benchmark with `shamira benchmark`
Show inline comments
@@ -6,64 +6,64 @@

"""Arithmetic operations on Galois Field 2**8. See"""


def gfmul(a, b):
	"""Basic multiplication. Russian peasant algorithm."""
	res = 0
	while a and b:
		if b&1: res ^= a
		if a&0x80: a = 0xff&(a<<1)^0x1b
		else: a <<= 1
		b >>= 1
	return res


g = 3  # generator
E = [None]*256  # exponentials
L = [None]*256  # logarithms
acc = 1
for i in range(256):
	E[i] = acc
	L[acc] = i
	acc = gfmul(acc, g)
L[1] = 0
inv = [E[255-L[i]] if i!=0 else None for i in range(256)]  # multiplicative inverse
INV = [E[255-L[i]] if i!=0 else None for i in range(256)]  # multiplicative inverse


def get_constant_coef(*points):
	"""Compute constant polynomial coefficient given the points.

	k = len(points)
	res = 0
	for i in range(k):
		(x, y) = points[i]
		prod = 1
		for j in range(k):
			if i==j: continue
			(xj, yj) = points[j]
			prod = gfmul(prod, (gfmul(xj, inv[xj^x])))
			prod = gfmul(prod, (gfmul(xj, INV[xj^x])))
		res ^= gfmul(y, prod)
	return res


import base64
import binascii


class SException(Exception): pass


def reconstruct_raw(*shares):
	"""Tries to recover the secret from its shares.

	:param shares: (((int) i, (bytes) share), ...)
	:return: (bytes) reconstructed secret. Too few shares returns garbage."""
	if len({x for (x, _) in shares}) < len(shares):
		raise SException("Found a non-unique share. Please check your inputs.")

	secret_len = len(shares[0][1])
	res = [None]*secret_len
	for i in range(secret_len):
		points = [(x, s[i]) for (x, s) in shares]
Show inline comments
@@ -22,53 +22,55 @@ def _share_byte(secret_b, k, n):
	coefs = [int(secret_b)]+[int(b) for b in os.urandom(k-1)]
	points = [gf256.evaluate(coefs, i) for i in range(1, n+1)]
	return points


def generate_raw(secret, k, n):
	"""Splits secret into shares.

	:param secret: (bytes)
	:param k: number of shares necessary for secret recovery. 1 <= k <= n
	:param n: (int) number of shares generated. 1 <= n < 255
	:return: [(i, (bytes) share), ...]"""
	shares = [_share_byte(b, k, n) for b in secret]
	return [(i+1, bytes([s[i] for s in shares])) for i in range(n)]


def reconstruct_raw(*shares):
	"""Tries to recover the secret from its shares.

	:param shares: (((int) i, (bytes) share), ...)
	:return: (bytes) reconstructed secret. Too few shares return garbage."""
	if len({x for (x, _) in shares}) < len(shares):
		raise MalformedShare("Found a non-unique share. Please check your inputs.")

	secret_len = len(shares[0][1])
	(xs, payloads) = zip(*shares)
	secret_len = len(payloads[0])
	res = [None]*secret_len
	weights = gf256.compute_weights(xs)
	for i in range(secret_len):
		points = [(x, s[i]) for (x, s) in shares]
		res[i] = (gf256.get_constant_coef(*points))
		ys = [s[i] for s in payloads]
		res[i] = (gf256.get_constant_coef(weights, ys))
	return bytes(res)


def generate(secret, k, n, encoding="b32", label="", omit_k_n=False):
	"""Wraps generate_raw().

	:param secret: (str or bytes)
	:param k: number of shares necessary for secret recovery
	:param n: number of shares generated
	:param encoding: {hex, b32, b64} desired output encoding. Hexadecimal, Base32 or Base64.
	:param label: (str) any label to prefix the shares with
	:param omit_k_n: (boolean) suppress the default shares prefix
	:return: [(str) share, ...]"""
	if isinstance(secret,str):
		secret = secret.encode("utf-8")
	shares = generate_raw(secret, k, n)

	prefix = ""
	if label:
		prefix = label + "."
	if not omit_k_n:
		prefix += "{0}.{1}.".format(k, n)

	return [prefix + encode(s, encoding) for s in shares]
Show inline comments

"""Arithmetic operations on Galois Field 2**8. See"""

from functools import reduce
import operator


def _gfmul(a, b):
	"""Basic multiplication. Russian peasant algorithm."""
	res = 0
	while a and b:
		if b&1: res ^= a
		if a&0x80: a = 0xff&(a<<1)^0x1b
		else: a <<= 1
		b >>= 1
	return res


g = 3  # generator
E = [None]*256  # exponentials
L = [None]*256  # logarithms
acc = 1
for i in range(256):
	E[i] = acc
	L[acc] = i
	acc = _gfmul(acc, g)
L[1] = 0
INV = [E[255-L[i]] if i!=0 else None for i in range(256)]  # multiplicative inverse


def gfmul(a, b):
	"""Fast multiplication. Basic multiplication is expensive. a*b==g**(log(a)+log(b))"""
	assert 0<=a<=255, 0<=b<=255
	if a==0 or b==0: return 0
	t = L[a]+L[b]
	if t>255: t -= 255
	return E[t]


def evaluate(coefs, x):
	"""Evaluate polynomial's value at x.

	:param coefs: [a0, a1, ...]."""
	res = 0
	xk = 1
	for a in coefs:
		res ^= gfmul(a, xk)
		xk = gfmul(xk, x)
	return res


def get_constant_coef(*points):
def get_constant_coef(weights, y_coords):
	"""Compute constant polynomial coefficient given the points.

	k = len(points)
	res = 0
	for i in range(k):
		(x, y) = points[i]
		prod = 1
		for j in range(k):
			if i==j: continue
			(xj, yj) = points[j]
			prod = gfmul(prod, (gfmul(xj, INV[xj^x])))
		res ^= gfmul(y, prod)
	return reduce(
		map(lambda ab: gfmul(*ab), zip(weights, y_coords))


def compute_weights(x_coords):
	assert x_coords

	res = [
				(gfmul(xj, INV[xj^xi]) for xj in x_coords if xi!=xj),
			) for xi in x_coords

	return res
Show inline comments
@@ -9,54 +9,59 @@ from ..gf256 import *


class TestGF256(TestCase):
	def test__gfmul(self):
		self.assertEqual(_gfmul(0, 0), 0)
		self.assertEqual(_gfmul(1, 1), 1)
		self.assertEqual(_gfmul(2, 2), 4)
		self.assertEqual(_gfmul(0, 21), 0)
		self.assertEqual(_gfmul(0x53, 0xca), 0x01)
		self.assertEqual(_gfmul(0xff, 0xff), 0x13)

	def test_gfmul(self):
		for a in range(256):
			for b in range(256):
				self.assertEqual(_gfmul(a, b), gfmul(a, b))

	def test_evaluate(self):
		for x in range(256):
			(a0, a1, a2, a3) = (x, x>>1, x>>2, x>>3)
			self.assertEqual(evaluate([17], x), 17)  # constant polynomial
			self.assertEqual(evaluate([a0, a1, a2, a3], 0), x)  # any polynomial at 0
			self.assertEqual(evaluate([a0, a1, a2, a3], 1), a0^a1^a2^a3)  # polynomial at 1 == sum of coefficients

	def test_get_constant_coef(self):
		self.assertEqual(get_constant_coef((1, 1), (2, 2), (3, 3)), 0)
		weights = compute_weights((1, 2, 3))
		ys = (1, 2, 3)
		self.assertEqual(get_constant_coef(weights, ys), 0)

		random_matches = 0
		for i in range(10):
			k = random.randint(2, 255)

			# exact
			res = self.check_coefs_match(k, k)
			self.assertEqual(res[0], res[1])

			# overdetermined
			res = self.check_coefs_match(k, 256)
			self.assertEqual(res[0], res[1])

			# underdetermined => random
			res = self.check_coefs_match(k, k-1)
			if res[0]==res[1]:
				random_matches += 1
		self.assertLess(random_matches, 2)  # with a chance (255/256)**10=0.96 there should be no match

	def check_coefs_match(self, k, m):
		coefs = [random.randint(0, 255) for i in range(k)]
		points = [(j, evaluate(coefs, j)) for j in range(1, 256)]
		return (get_constant_coef(*points[:m]), coefs[0])

		(xs, ys) = zip(*points[:m])
		weights = compute_weights(xs)
		return (get_constant_coef(weights, ys), coefs[0])


if __name__=='__main__':
Show inline comments
@@ -7,53 +7,59 @@ from .. import *
from .. import gf256
from ..core import encode, decode,detect_encoding, _share_byte


class TestShamira(TestCase):
	_urandom = os.urandom

	def setUpClass(cls):
		os.urandom = lambda n: bytes(random.randint(0, 255) for i in range(n))

	def tearDownClass(cls):
		os.urandom = cls._urandom

	def test_share_byte(self):
		with self.assertRaises(InvalidParams):  # too few shares
			_share_byte(b"a", 5, 4)
		with self.assertRaises(InvalidParams):  # too many shares
			_share_byte(b"a", 5, 255)
		with self.assertRaises(ValueError):  # not castable to int
			_share_byte("x", 2, 3)

		vals = _share_byte(ord(b"a"), 2, 3)
		points = list(zip(range(1, 256), vals))
		self.assertEqual(gf256.get_constant_coef(*points), ord(b"a"))
		self.assertEqual(gf256.get_constant_coef(*points[:2]), ord(b"a"))
		self.assertNotEqual(gf256.get_constant_coef(*points[:1]), ord(b"a"))  # underdetermined => random
		ys = _share_byte(ord(b"a"), 2, 3)
		xs = list(range(1, 4))

		weights = gf256.compute_weights(xs)
		self.assertEqual(gf256.get_constant_coef(weights, ys), ord(b"a"))

		weights = gf256.compute_weights(xs[:2])
		self.assertEqual(gf256.get_constant_coef(weights, ys[:2]), ord(b"a"))

		weights = gf256.compute_weights(xs[:1])
		self.assertNotEqual(gf256.get_constant_coef(weights, ys[:1]), ord(b"a"))  # underdetermined => random

	def test_generate_reconstruct_raw(self):
		for (k, n) in [(2, 3), (254, 254)]:
			shares = generate_raw(b"abcd", k, n)
			self.assertEqual(reconstruct_raw(*shares[:k]), b"abcd")
			self.assertNotEqual(reconstruct_raw(*shares[:k-1]), b"abcd")

	def test_generate_reconstruct(self):
		for encoding in ["hex", "b32", "b64"]:
			for secret in [b"abcd", "abcde", "ěščřžý"]:
				for (k, n) in [(2, 3), (254, 254)]:
					raw = isinstance(secret, bytes)
					with self.subTest(enc=encoding, r=raw, sec=secret, k=k, n=n):
						shares = generate(secret, k, n, encoding)
						self.assertEqual(reconstruct(*shares[:k], encoding=encoding, raw=raw), secret)
						self.assertEqual(reconstruct(*shares[:k], raw=raw), secret)
						s = secret if raw else secret.encode("utf-8")
						self.assertNotEqual(reconstruct(*shares[:k-1], encoding=encoding, raw=True), s)
		shares = generate(b"\xfeaa", 2, 3)
		with self.assertRaises(DecodingException):

0 comments (0 inline, 0 general)