Bug#927856: unblock: python-jwcrypto/0.6.0-1
Package: release.debian.org
Severity: normal
User: release.debian.org@packages.debian.org
Usertags: unblock
Please unblock package python-jwcrypto
The new upstream release is needed to fix:
https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=925457
diff -Nru python-jwcrypto-0.4.2/debian/changelog python-jwcrypto-0.6.0/debian/changelog
--- python-jwcrypto-0.4.2/debian/changelog 2017-12-23 10:00:03.000000000 +0200
+++ python-jwcrypto-0.6.0/debian/changelog 2019-04-02 09:05:15.000000000 +0300
@@ -1,3 +1,11 @@
+python-jwcrypto (0.6.0-1) unstable; urgency=medium
+
+ * New upstream release. (Closes: #925457)
+ * control: Update vcs urls.
+ * control: Drop X-Python-Version*.
+
+ -- Timo Aaltonen <tjaalton@debian.org> Tue, 02 Apr 2019 09:05:15 +0300
+
python-jwcrypto (0.4.2-1) unstable; urgency=medium
* New upstream release.
diff -Nru python-jwcrypto-0.4.2/debian/control python-jwcrypto-0.6.0/debian/control
--- python-jwcrypto-0.4.2/debian/control 2017-12-23 09:52:28.000000000 +0200
+++ python-jwcrypto-0.6.0/debian/control 2019-04-02 09:04:58.000000000 +0300
@@ -14,12 +14,10 @@
python3-cryptography,
python3-nose,
python3-setuptools,
-X-Python-Version: >= 2.7
-X-Python3-Version: >= 3.3
Standards-Version: 4.1.2
Homepage: https://github.com/latchset/jwcrypto
-Vcs-Git: https://anonscm.debian.org/git/pkg-freeipa/python-jwcrypto.git
-Vcs-Browser: https://anonscm.debian.org/cgit/pkg-freeipa/python-jwcrypto.git
+Vcs-Git: https://salsa.debian.org/freeipa-team/python-jwcrypto.git
+Vcs-Browser: https://salsa.debian.org/freeipa-team/python-jwcrypto
Package: python-jwcrypto
Architecture: all
diff -Nru python-jwcrypto-0.4.2/docs/source/conf.py python-jwcrypto-0.6.0/docs/source/conf.py
--- python-jwcrypto-0.4.2/docs/source/conf.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/docs/source/conf.py 2018-11-05 17:14:47.000000000 +0200
@@ -46,16 +46,16 @@
# General information about the project.
project = u'JWCrypto'
-copyright = u'2016-2017, JWCrypto Contributors'
+copyright = u'2016-2018, JWCrypto Contributors'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
-version = '0.4'
+version = '0.6'
# The full version, including alpha/beta/rc tags.
-release = '0.4.2'
+release = '0.6'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff -Nru python-jwcrypto-0.4.2/docs/source/jwe.rst python-jwcrypto-0.6.0/docs/source/jwe.rst
--- python-jwcrypto-0.4.2/docs/source/jwe.rst 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/docs/source/jwe.rst 2018-11-05 17:14:47.000000000 +0200
@@ -51,6 +51,9 @@
Examples
--------
+Symmetric keys
+~~~~~~~~~~~~~~
+
Encrypt a JWE token::
>>> from jwcrypto import jwk, jwe
>>> from jwcrypto.common import json_encode
@@ -67,3 +70,29 @@
>>> jwetoken.deserialize(enc)
>>> jwetoken.decrypt(key)
>>> payload = jwetoken.payload
+
+Asymmetric keys
+~~~~~~~~~~~~~~~
+
+Encrypt a JWE token::
+ >>> from jwcrypto import jwk, jwe
+ >>> from jwcrypto.common import json_encode, json_decode
+ >>> public_key = jwk.JWK()
+ >>> private_key = jwk.JWK.generate(kty='RSA', size=2048)
+ >>> public_key.import_key(**json_decode(private_key.export_public()))
+ >>> payload = "My Encrypted message"
+ >>> protected_header = {
+ "alg": "RSA-OAEP-256",
+ "enc": "A256CBC-HS512",
+ "typ": "JWE",
+ "kid": public_key.thumbprint(),
+ }
+ >>> jwetoken = jwe.JWE(payload.encode('utf-8'),
+ recipient=public_key,
+ protected=protected_header)
+ >>> enc = jwetoken.serialize()
+
+Decrypt a JWE token::
+ >>> jwetoken = jwe.JWE()
+ >>> jwetoken.deserialize(enc, key=private_key)
+ >>> payload = jwetoken.payload
diff -Nru python-jwcrypto-0.4.2/jwcrypto/common.py python-jwcrypto-0.6.0/jwcrypto/common.py
--- python-jwcrypto-0.4.2/jwcrypto/common.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/common.py 2018-11-05 17:14:47.000000000 +0200
@@ -16,12 +16,12 @@
def base64url_decode(payload):
- l = len(payload) % 4
- if l == 2:
+ size = len(payload) % 4
+ if size == 2:
payload += '=='
- elif l == 3:
+ elif size == 3:
payload += '='
- elif l != 0:
+ elif size != 0:
raise ValueError('Invalid base64 string')
return urlsafe_b64decode(payload.encode('utf-8'))
diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwa.py python-jwcrypto-0.6.0/jwcrypto/jwa.py
--- python-jwcrypto-0.4.2/jwcrypto/jwa.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/jwa.py 2018-11-05 17:14:47.000000000 +0200
@@ -14,6 +14,7 @@
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
+from cryptography.hazmat.primitives.keywrap import aes_key_unwrap, aes_key_wrap
from cryptography.hazmat.primitives.padding import PKCS7
import six
@@ -141,15 +142,15 @@
def sign(self, key, payload):
skey = key.get_op_key('sign', self._curve)
signature = skey.sign(payload, ec.ECDSA(self.hashfn))
- r, s = ec_utils.decode_rfc6979_signature(signature)
- l = key.get_curve(self._curve).key_size
- return _encode_int(r, l) + _encode_int(s, l)
+ r, s = ec_utils.decode_dss_signature(signature)
+ size = key.get_curve(self._curve).key_size
+ return _encode_int(r, size) + _encode_int(s, size)
def verify(self, key, payload, signature):
pkey = key.get_op_key('verify', self._curve)
r = signature[:len(signature) // 2]
s = signature[len(signature) // 2:]
- enc_signature = ec_utils.encode_rfc6979_signature(
+ enc_signature = ec_utils.encode_dss_signature(
int(hexlify(r), 16), int(hexlify(s), 16))
pkey.verify(enc_signature, payload, ec.ECDSA(self.hashfn))
@@ -439,49 +440,14 @@
if not cek:
cek = _randombits(bitsize)
- # Implement RFC 3394 Key Unwrap - 2.2.2
- # TODO: Use cryptography once issue #1733 is resolved
- iv = 'a6a6a6a6a6a6a6a6'
- a = unhexlify(iv)
- r = [cek[i:i + 8] for i in range(0, len(cek), 8)]
- n = len(r)
- for j in range(0, 6):
- for i in range(0, n):
- e = Cipher(algorithms.AES(rk), modes.ECB(),
- backend=self.backend).encryptor()
- b = e.update(a + r[i]) + e.finalize()
- a = _encode_int(_decode_int(b[:8]) ^ ((n * j) + i + 1), 64)
- r[i] = b[-8:]
- ek = a
- for i in range(0, n):
- ek += r[i]
+ ek = aes_key_wrap(rk, cek, default_backend())
+
return {'cek': cek, 'ek': ek}
def unwrap(self, key, bitsize, ek, headers):
rk = self._get_key(key, 'decrypt')
- # Implement RFC 3394 Key Unwrap - 2.2.3
- # TODO: Use cryptography once issue #1733 is resolved
- iv = 'a6a6a6a6a6a6a6a6'
- aiv = unhexlify(iv)
-
- r = [ek[i:i + 8] for i in range(0, len(ek), 8)]
- a = r.pop(0)
- n = len(r)
- for j in range(5, -1, -1):
- for i in range(n - 1, -1, -1):
- da = _decode_int(a)
- atr = _encode_int((da ^ ((n * j) + i + 1)), 64) + r[i]
- d = Cipher(algorithms.AES(rk), modes.ECB(),
- backend=self.backend).decryptor()
- b = d.update(atr) + d.finalize()
- a = b[:8]
- r[i] = b[-8:]
-
- if a != aiv:
- raise RuntimeError('Decryption Failed')
-
- cek = b''.join(r)
+ cek = aes_key_unwrap(rk, ek, default_backend())
if _bitsize(cek) != bitsize:
raise InvalidJWEKeyLength(bitsize, _bitsize(cek))
return cek
@@ -761,23 +727,24 @@
def wrap(self, key, bitsize, cek, headers):
self._check_key(key)
+ dk_size = self.keysize
if self.keysize is None:
if cek is not None:
raise InvalidJWEOperation('ECDH-ES cannot use an existing CEK')
alg = headers['enc']
+ dk_size = bitsize
else:
- bitsize = self.keysize
alg = headers['alg']
epk = JWK.generate(kty=key.key_type, crv=key.key_curve)
dk = self._derive(epk.get_op_key('unwrapKey'),
key.get_op_key('wrapKey'),
- alg, bitsize, headers)
+ alg, dk_size, headers)
if self.keysize is None:
ret = {'cek': dk}
else:
- aeskw = self.aeskwmap[bitsize]()
+ aeskw = self.aeskwmap[self.keysize]()
kek = JWK(kty="oct", use="enc", k=base64url_encode(dk))
ret = aeskw.wrap(kek, bitsize, cek, headers)
@@ -788,20 +755,21 @@
if 'epk' not in headers:
raise ValueError('Invalid Header, missing "epk" parameter')
self._check_key(key)
+ dk_size = self.keysize
if self.keysize is None:
alg = headers['enc']
+ dk_size = bitsize
else:
- bitsize = self.keysize
alg = headers['alg']
epk = JWK(**headers['epk'])
dk = self._derive(key.get_op_key('unwrapKey'),
epk.get_op_key('wrapKey'),
- alg, bitsize, headers)
+ alg, dk_size, headers)
if self.keysize is None:
return dk
else:
- aeskw = self.aeskwmap[bitsize]()
+ aeskw = self.aeskwmap[self.keysize]()
kek = JWK(kty="oct", use="enc", k=base64url_encode(dk))
cek = aeskw.unwrap(kek, bitsize, ek, headers)
return cek
@@ -828,7 +796,7 @@
class _EcdhEsAes256Kw(_EcdhEs):
name = 'ECDH-ES+A256KW'
- description = 'ECDH-ES using Concat KDF and "A128KW" wrapping'
+ description = 'ECDH-ES using Concat KDF and "A256KW" wrapping'
keysize = 256
algorithm_usage_location = 'alg'
algorithm_use = 'kex'
diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwe.py python-jwcrypto-0.6.0/jwcrypto/jwe.py
--- python-jwcrypto-0.4.2/jwcrypto/jwe.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/jwe.py 2018-11-05 17:14:47.000000000 +0200
@@ -3,6 +3,7 @@
import zlib
from jwcrypto import common
+from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwa import JWA
@@ -40,7 +41,7 @@
"""Default allowed algorithms"""
-class InvalidJWEData(Exception):
+class InvalidJWEData(JWException):
"""Invalid JWE Object.
This exception is raised when the JWE Object is invalid and/or
@@ -58,7 +59,7 @@
super(InvalidJWEData, self).__init__(msg)
-# These have been moved to jwcrypto.common, maintain here for bacwards compat
+# These have been moved to jwcrypto.common, maintain here for backwards compat
InvalidCEKeyLength = common.InvalidCEKeyLength
InvalidJWEKeyLength = common.InvalidJWEKeyLength
InvalidJWEKeyType = common.InvalidJWEKeyType
@@ -108,7 +109,7 @@
json_decode(unprotected) # check header encoding
self.objects['unprotected'] = unprotected
if algs:
- self.allowed_algs = algs
+ self._allowed_algs = algs
if recipient:
self.add_recipient(recipient, header=header)
@@ -269,7 +270,19 @@
if compact:
for invalid in 'aad', 'unprotected':
if invalid in self.objects:
- raise InvalidJWEOperation("Can't use compact encoding")
+ raise InvalidJWEOperation(
+ "Can't use compact encoding when the '%s' parameter"
+ "is set" % invalid)
+ if 'protected' not in self.objects:
+ raise InvalidJWEOperation(
+ "Can't use compat encoding without protected headers")
+ else:
+ ph = json_decode(self.objects['protected'])
+ for required in 'alg', 'enc':
+ if required not in ph:
+ raise InvalidJWEOperation(
+ "Can't use compat encoding, '%s' must be in the "
+ "protected header" % required)
if 'recipients' in self.objects:
if len(self.objects['recipients']) != 1:
raise InvalidJWEOperation("Invalid number of recipients")
diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwk.py python-jwcrypto-0.6.0/jwcrypto/jwk.py
--- python-jwcrypto-0.4.2/jwcrypto/jwk.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/jwk.py 2018-11-05 17:14:47.000000000 +0200
@@ -1,8 +1,9 @@
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
import os
-
from binascii import hexlify, unhexlify
+from collections import namedtuple
+from enum import Enum
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@@ -12,6 +13,7 @@
from six import iteritems
+from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
@@ -22,36 +24,59 @@
'oct': 'Octet sequence'}
"""Registry of valid Key Types"""
+
# RFC 7518 - 7.5
# It is part of the JWK Parameters Registry, but we want a more
# specific map for internal usage
-JWKValuesRegistry = {'EC': {'crv': ('Curve', 'Public', 'Required'),
- 'x': ('X Coordinate', 'Public', 'Required'),
- 'y': ('Y Coordinate', 'Public', 'Required'),
- 'd': ('ECC Private Key', 'Private', None)},
- 'RSA': {'n': ('Modulus', 'Public', 'Required'),
- 'e': ('Exponent', 'Public', 'Required'),
- 'd': ('Private Exponent', 'Private', None),
- 'p': ('First Prime Factor', 'Private', None),
- 'q': ('Second Prime Factor', 'Private', None),
- 'dp': ('First Factor CRT Exponent', 'Private',
- None),
- 'dq': ('Second Factor CRT Exponent', 'Private',
- None),
- 'qi': ('First CRT Coefficient', 'Private', None)},
- 'oct': {'k': ('Key Value', 'Private', 'Required')}}
+class ParmType(Enum):
+ name = 'A string with a name'
+ b64 = 'Base64url Encoded'
+ b64U = 'Base64urlUint Encoded'
+ unsupported = 'Unsupported Parameter'
+
+
+JWKParameter = namedtuple('Parameter', 'description public required type')
+JWKValuesRegistry = {
+ 'EC': {
+ 'crv': JWKParameter('Curve', True, True, ParmType.name),
+ 'x': JWKParameter('X Coordinate', True, True, ParmType.b64),
+ 'y': JWKParameter('Y Coordinate', True, True, ParmType.b64),
+ 'd': JWKParameter('ECC Private Key', False, False, ParmType.b64),
+ },
+ 'RSA': {
+ 'n': JWKParameter('Modulus', True, True, ParmType.b64),
+ 'e': JWKParameter('Exponent', True, True, ParmType.b64U),
+ 'd': JWKParameter('Private Exponent', False, False, ParmType.b64U),
+ 'p': JWKParameter('First Prime Factor', False, False, ParmType.b64U),
+ 'q': JWKParameter('Second Prime Factor', False, False, ParmType.b64U),
+ 'dp': JWKParameter('First Factor CRT Exponent',
+ False, False, ParmType.b64U),
+ 'dq': JWKParameter('Second Factor CRT Exponent',
+ False, False, ParmType.b64U),
+ 'qi': JWKParameter('First CRT Coefficient',
+ False, False, ParmType.b64U),
+ 'oth': JWKParameter('Other Primes Info',
+ False, False, ParmType.unsupported),
+ },
+ 'oct': {
+ 'k': JWKParameter('Key Value', False, True, ParmType.b64),
+ }
+}
"""Registry of valid key values"""
-JWKParamsRegistry = {'kty': ('Key Type', 'Public', ),
- 'use': ('Public Key Use', 'Public'),
- 'key_ops': ('Key Operations', 'Public'),
- 'alg': ('Algorithm', 'Public'),
- 'kid': ('Key ID', 'Public'),
- 'x5u': ('X.509 URL', 'Public'),
- 'x5c': ('X.509 Certificate Chain', 'Public'),
- 'x5t': ('X.509 Certificate SHA-1 Thumbprint', 'Public'),
- 'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint',
- 'Public')}
+JWKParamsRegistry = {
+ 'kty': JWKParameter('Key Type', True, None, None),
+ 'use': JWKParameter('Public Key Use', True, None, None),
+ 'key_ops': JWKParameter('Key Operations', True, None, None),
+ 'alg': JWKParameter('Algorithm', True, None, None),
+ 'kid': JWKParameter('Key ID', True, None, None),
+ 'x5u': JWKParameter('X.509 URL', True, None, None),
+ 'x5c': JWKParameter('X.509 Certificate Chain', True, None, None),
+ 'x5t': JWKParameter('X.509 Certificate SHA-1 Thumbprint',
+ True, None, None),
+ 'x5t#S256': JWKParameter('X.509 Certificate SHA-256 Thumbprint',
+ True, None, None)
+}
"""Regstry of valid key parameters"""
# RFC 7518 - 7.6
@@ -83,7 +108,7 @@
'secp521r1': 'P-521'}
-class InvalidJWKType(Exception):
+class InvalidJWKType(JWException):
"""Invalid JWK Type Exception.
This exception is raised when an invalid parameter type is used.
@@ -98,7 +123,7 @@
self.value, list(JWKTypesRegistry.keys()))
-class InvalidJWKUsage(Exception):
+class InvalidJWKUsage(JWException):
"""Invalid JWK usage Exception.
This exception is raised when an invalid key usage is requested,
@@ -123,7 +148,7 @@
valid)
-class InvalidJWKOperation(Exception):
+class InvalidJWKOperation(JWException):
"""Invalid JWK Operation Exception.
This exception is raised when an invalid key operation is requested,
@@ -150,7 +175,7 @@
valid)
-class InvalidJWKValue(Exception):
+class InvalidJWKValue(JWException):
"""Invalid JWK Value Exception.
This exception is raised when an invalid/unknown value is used in the
@@ -210,6 +235,7 @@
@classmethod
def generate(cls, **kwargs):
obj = cls()
+ kty = None
try:
kty = kwargs['kty']
gen = getattr(obj, '_generate_%s' % kty)
@@ -219,6 +245,7 @@
return obj
def generate_key(self, **params):
+ kty = None
try:
kty = params.pop('generate')
gen = getattr(self, '_generate_%s' % kty)
@@ -346,8 +373,26 @@
names.remove(name)
for name, val in iteritems(JWKValuesRegistry[kty]):
- if val[2] == 'Required' and name not in self._key:
+ if val.required and name not in self._key:
raise InvalidJWKValue('Missing required value %s' % name)
+ if val.type == ParmType.unsupported and name in self._key:
+ raise InvalidJWKValue('Unsupported parameter %s' % name)
+ if val.type == ParmType.b64 and name in self._key:
+ # Check that the value is base64url encoded
+ try:
+ base64url_decode(self._key[name])
+ except Exception: # pylint: disable=broad-except
+ raise InvalidJWKValue(
+ '"%s" is not base64url encoded' % name
+ )
+ if val[3] == ParmType.b64U and name in self._key:
+ # Check that the value is Base64urlUInt encoded
+ try:
+ self._decode_int(self._key[name])
+ except Exception: # pylint: disable=broad-except
+ raise InvalidJWKValue(
+ '"%s" is not Base64urlUInt encoded' % name
+ )
# Unknown key parameters are allowed
# Let's just store them out of the way
@@ -385,6 +430,20 @@
' "key_ops" values specified at'
' the same time')
+ @classmethod
+ def from_json(cls, key):
+ """Creates a RFC 7517 JWK from the standard JSON format.
+
+ :param key: The RFC 7517 representation of a JWK.
+ """
+ obj = cls()
+ try:
+ jkey = json_decode(key)
+ except Exception as e: # pylint: disable=broad-except
+ raise InvalidJWKValue(e)
+ obj.import_key(**jkey)
+ return obj
+
def export(self, private_key=True):
"""Exports the key in the standard JSON format.
Exports the key regardless of type, if private_key is False
@@ -405,19 +464,23 @@
It fails if one is not available like when this function
is called on a symmetric key.
"""
+ pub = self._public_params()
+ return json_encode(pub)
+
+ def _public_params(self):
if not self.has_public:
raise InvalidJWKType("No public key available")
pub = {}
preg = JWKParamsRegistry
for name in preg:
- if preg[name][1] == 'Public':
+ if preg[name].public:
if name in self._params:
pub[name] = self._params[name]
reg = JWKValuesRegistry[self._params['kty']]
for param in reg:
- if reg[param][1] == 'Public':
+ if reg[param].public:
pub[param] = self._key[param]
- return json_encode(pub)
+ return pub
def _export_all(self):
d = dict()
@@ -439,6 +502,10 @@
return self._export_all()
raise InvalidJWKType("Not a symmetric key")
+ def public(self):
+ pub = self._public_params()
+ return JWK(**pub)
+
@property
def has_public(self):
"""Whether this JWK has an asymmetric Public key."""
@@ -446,7 +513,7 @@
return False
reg = JWKValuesRegistry[self._params['kty']]
for value in reg:
- if reg[value][1] == 'Public' and value in self._key:
+ if reg[value].public and value in self._key:
return True
@property
@@ -456,7 +523,7 @@
return False
reg = JWKValuesRegistry[self._params['kty']]
for value in reg:
- if reg[value][1] == 'Private' and value in self._key:
+ if not reg[value].public and value in self._key:
return True
return False
@@ -700,7 +767,7 @@
t = {'kty': self._params['kty']}
for name, val in iteritems(JWKValuesRegistry[t['kty']]):
- if val[2] == 'Required':
+ if val.required:
t[name] = self._key[name]
digest = hashes.Hash(hashalg, backend=default_backend())
digest.update(bytes(json_encode(t).encode('utf8')))
@@ -733,6 +800,12 @@
super(JWKSet, self).__setitem__('keys', _JWKkeys())
self.update(*args, **kwargs)
+ def __iter__(self):
+ return self['keys'].__iter__()
+
+ def __contains__(self, key):
+ return self['keys'].__contains__(key)
+
def __setitem__(self, key, val):
if key == 'keys':
self['keys'].add(val)
@@ -769,7 +842,7 @@
"""
try:
jwkset = json_decode(keyset)
- except:
+ except Exception: # pylint: disable=broad-except
raise InvalidJWKValue()
if 'keys' not in jwkset:
@@ -782,8 +855,6 @@
else:
self[k] = v
- return self
-
@classmethod
def from_json(cls, keyset):
"""Creates a RFC 7517 keyset from the standard JSON format.
@@ -791,7 +862,8 @@
:param keyset: The RFC 7517 representation of a JOSE Keyset.
"""
obj = cls()
- return obj.import_keyset(keyset)
+ obj.import_keyset(keyset)
+ return obj
def get_key(self, kid):
"""Gets a key from the set.
diff -Nru python-jwcrypto-0.4.2/jwcrypto/jws.py python-jwcrypto-0.6.0/jwcrypto/jws.py
--- python-jwcrypto-0.4.2/jwcrypto/jws.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/jws.py 2018-11-05 17:14:47.000000000 +0200
@@ -1,5 +1,8 @@
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
+from collections import namedtuple
+
+from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwa import JWA
@@ -8,18 +11,24 @@
# RFC 7515 - 9.1
# name: (description, supported?)
-JWSHeaderRegistry = {'alg': ('Algorithm', True),
- 'jku': ('JWK Set URL', False),
- 'jwk': ('JSON Web Key', False),
- 'kid': ('Key ID', True),
- 'x5u': ('X.509 URL', False),
- 'x5c': ('X.509 Certificate Chain', False),
- 'x5t': ('X.509 Certificate SHA-1 Thumbprint', False),
- 'x5t#S256': ('X.509 Certificate SHA-256 Thumbprint',
- False),
- 'typ': ('Type', True),
- 'cty': ('Content Type', True),
- 'crit': ('Critical', True)}
+JWSHeaderParameter = namedtuple('Parameter',
+ 'description mustprotect supported')
+JWSHeaderRegistry = {
+ 'alg': JWSHeaderParameter('Algorithm', False, True),
+ 'jku': JWSHeaderParameter('JWK Set URL', False, False),
+ 'jwk': JWSHeaderParameter('JSON Web Key', False, False),
+ 'kid': JWSHeaderParameter('Key ID', False, True),
+ 'x5u': JWSHeaderParameter('X.509 URL', False, False),
+ 'x5c': JWSHeaderParameter('X.509 Certificate Chain', False, False),
+ 'x5t': JWSHeaderParameter(
+ 'X.509 Certificate SHA-1 Thumbprint', False, False),
+ 'x5t#S256': JWSHeaderParameter(
+ 'X.509 Certificate SHA-256 Thumbprint', False, False),
+ 'typ': JWSHeaderParameter('Type', False, True),
+ 'cty': JWSHeaderParameter('Content Type', False, True),
+ 'crit': JWSHeaderParameter('Critical', True, True),
+ 'b64': JWSHeaderParameter('Base64url-Encode Payload', True, True)
+}
"""Registry of valid header parameters"""
default_allowed_algs = [
@@ -30,7 +39,7 @@
"""Default allowed algorithms"""
-class InvalidJWSSignature(Exception):
+class InvalidJWSSignature(JWException):
"""Invalid JWS Signature.
This exception is raised when a signature cannot be validated.
@@ -47,7 +56,7 @@
super(InvalidJWSSignature, self).__init__(msg)
-class InvalidJWSObject(Exception):
+class InvalidJWSObject(JWException):
"""Invalid JWS Object.
This exception is raised when the JWS Object is invalid and/or
@@ -63,7 +72,7 @@
super(InvalidJWSObject, self).__init__(msg)
-class InvalidJWSOperation(Exception):
+class InvalidJWSOperation(JWException):
"""Invalid JWS Object.
This exception is raised when a requested operation cannot
@@ -113,11 +122,16 @@
if header is not None:
if isinstance(header, dict):
+ self.header = header
header = json_encode(header)
+ else:
+ self.header = json_decode(header)
+
self.protected = base64url_encode(header.encode('utf-8'))
else:
+ self.header = dict()
self.protected = ''
- self.payload = base64url_encode(payload)
+ self.payload = payload
def _jwa(self, name, allowed):
if allowed is None:
@@ -126,12 +140,22 @@
raise InvalidJWSOperation('Algorithm not allowed')
return JWA.signing_alg(name)
+ def _payload(self):
+ if self.header.get('b64', True):
+ return base64url_encode(self.payload).encode('utf-8')
+ else:
+ if isinstance(self.payload, bytes):
+ return self.payload
+ else:
+ return self.payload.encode('utf-8')
+
def sign(self):
"""Generates a signature"""
- sigin = ('.'.join([self.protected, self.payload])).encode('utf-8')
+ payload = self._payload()
+ sigin = b'.'.join([self.protected.encode('utf-8'), payload])
signature = self.engine.sign(self.key, sigin)
return {'protected': self.protected,
- 'payload': self.payload,
+ 'payload': payload,
'signature': base64url_encode(signature)}
def verify(self, signature):
@@ -140,7 +164,8 @@
:raises InvalidJWSSignature: if the verification fails.
"""
try:
- sigin = ('.'.join([self.protected, self.payload])).encode('utf-8')
+ payload = self._payload()
+ sigin = b'.'.join([self.protected.encode('utf-8'), payload])
self.engine.verify(self.key, sigin, signature)
except Exception as e: # pylint: disable=broad-except
raise InvalidJWSSignature('Verification failed', repr(e))
@@ -164,16 +189,6 @@
self.verifylog = None
self._allowed_algs = None
- def _check_crit(self, crit):
- for k in crit:
- if k not in JWSHeaderRegistry:
- raise InvalidJWSSignature('Unknown critical header: '
- '"%s"' % k)
- else:
- if not JWSHeaderRegistry[k][1]:
- raise InvalidJWSSignature('Unsupported critical '
- 'header: "%s"' % k)
-
@property
def allowed_algs(self):
"""Allowed algorithms.
@@ -197,31 +212,61 @@
def is_valid(self):
return self.objects.get('valid', False)
- def _merge_headers(self, h1, h2):
- for k in list(h1.keys()):
- if k in h2:
- raise InvalidJWSObject('Duplicate header: "%s"' % k)
- h1.update(h2)
- return h1
+ # TODO: allow caller to specify list of headers it understands
+ def _merge_check_headers(self, protected, *headers):
+ header = None
+ crit = []
+ if protected is not None:
+ if 'crit' in protected:
+ crit = protected['crit']
+ # Check immediately if we support these critical headers
+ for k in crit:
+ if k not in JWSHeaderRegistry:
+ raise InvalidJWSObject(
+ 'Unknown critical header: "%s"' % k)
+ else:
+ if not JWSHeaderRegistry[k][1]:
+ raise InvalidJWSObject(
+ 'Unsupported critical header: "%s"' % k)
+ header = protected
+ if 'b64' in header:
+ if not isinstance(header['b64'], bool):
+ raise InvalidJWSObject('b64 header must be a boolean')
+
+ for hn in headers:
+ if hn is None:
+ continue
+ if header is None:
+ header = dict()
+ for h in list(hn.keys()):
+ if h in JWSHeaderRegistry:
+ if JWSHeaderRegistry[h].mustprotect:
+ raise InvalidJWSObject('"%s" must be protected' % h)
+ if h in header:
+ raise InvalidJWSObject('Duplicate header: "%s"' % h)
+ header.update(hn)
+
+ for k in crit:
+ if k not in header:
+ raise InvalidJWSObject('Missing critical header "%s"' % k)
+
+ return header
# TODO: support selecting key with 'kid' and passing in multiple keys
def _verify(self, alg, key, payload, signature, protected, header=None):
- # verify it is a valid JSON object and keep a decode copy
+ p = dict()
+ # verify it is a valid JSON object and decode
if protected is not None:
p = json_decode(protected)
- else:
- p = dict()
- if not isinstance(p, dict):
- raise InvalidJWSSignature('Invalid Protected header')
+ if not isinstance(p, dict):
+ raise InvalidJWSSignature('Invalid Protected header')
# merge heders, and verify there are no duplicates
if header:
if not isinstance(header, dict):
raise InvalidJWSSignature('Invalid Unprotected header')
- p = self._merge_headers(p, header)
- # verify critical headers
- # TODO: allow caller to specify list of headers it understands
- if 'crit' in p:
- self._check_crit(p['crit'])
+
+ # Merge and check (critical) headers
+ self._merge_check_headers(p, header)
# check 'alg' is present
if alg is None and 'alg' not in p:
raise InvalidJWSSignature('No "alg" in headers')
@@ -282,6 +327,33 @@
raise InvalidJWSSignature('Verification failed for all '
'signatures' + repr(self.verifylog))
+ def _deserialize_signature(self, s):
+ o = dict()
+ o['signature'] = base64url_decode(str(s['signature']))
+ if 'protected' in s:
+ p = base64url_decode(str(s['protected']))
+ o['protected'] = p.decode('utf-8')
+ if 'header' in s:
+ o['header'] = s['header']
+ return o
+
+ def _deserialize_b64(self, o, protected):
+ if protected is None:
+ b64n = None
+ else:
+ p = json_decode(protected)
+ b64n = p.get('b64')
+ if b64n is not None:
+ if not isinstance(b64n, bool):
+ raise InvalidJWSObject('b64 header must be boolean')
+ b64 = o.get('b64')
+ if b64 == b64n:
+ return
+ elif b64 is None:
+ o['b64'] = b64n
+ else:
+ raise InvalidJWSObject('conflicting b64 values')
+
def deserialize(self, raw_jws, key=None, alg=None):
"""Deserialize a JWS token.
@@ -304,25 +376,21 @@
try:
try:
djws = json_decode(raw_jws)
- o['payload'] = base64url_decode(str(djws['payload']))
if 'signatures' in djws:
o['signatures'] = list()
for s in djws['signatures']:
- os = dict()
- os['signature'] = base64url_decode(str(s['signature']))
- if 'protected' in s:
- p = base64url_decode(str(s['protected']))
- os['protected'] = p.decode('utf-8')
- if 'header' in s:
- os['header'] = s['header']
+ os = self._deserialize_signature(s)
o['signatures'].append(os)
+ self._deserialize_b64(o, os.get('protected'))
else:
- o['signature'] = base64url_decode(str(djws['signature']))
- if 'protected' in djws:
- p = base64url_decode(str(djws['protected']))
- o['protected'] = p.decode('utf-8')
- if 'header' in djws:
- o['header'] = djws['header']
+ o = self._deserialize_signature(djws)
+ self._deserialize_b64(o, o.get('protected'))
+
+ if 'payload' in djws:
+ if o.get('b64', True):
+ o['payload'] = base64url_decode(str(djws['payload']))
+ else:
+ o['payload'] = djws['payload']
except ValueError:
c = raw_jws.split('.')
@@ -331,6 +399,7 @@
p = base64url_decode(str(c[0]))
if len(p) > 0:
o['protected'] = p.decode('utf-8')
+ self._deserialize_b64(o, o['protected'])
o['payload'] = base64url_decode(str(c[1]))
o['signature'] = base64url_decode(str(c[2]))
@@ -353,7 +422,8 @@
:param potected: The Protected Header (optional)
:param header: The Unprotected Header (optional)
- :raises InvalidJWSObject: if no payload has been set on the object.
+ :raises InvalidJWSObject: if no payload has been set on the object,
+ or invalid headers are provided.
:raises ValueError: if the key is not a :class:`JWK` object.
:raises ValueError: if the algorithm is missing or is not provided
by one of the headers.
@@ -364,20 +434,36 @@
if not self.objects.get('payload', None):
raise InvalidJWSObject('Missing Payload')
+ b64 = True
+
p = dict()
if protected:
if isinstance(protected, dict):
- protected = json_encode(protected)
- p = json_decode(protected)
- # TODO: allow caller to specify list of headers it understands
- if 'crit' in p:
- self._check_crit(p['crit'])
+ p = protected
+ protected = json_encode(p)
+ else:
+ p = json_decode(protected)
+ # If b64 is present we must enforce criticality
+ if 'b64' in list(p.keys()):
+ crit = p.get('crit', [])
+ if 'b64' not in crit:
+ raise InvalidJWSObject('b64 header must always be critical')
+ b64 = p['b64']
+
+ if 'b64' in self.objects:
+ if b64 != self.objects['b64']:
+ raise InvalidJWSObject('Mixed b64 headers on signatures')
+
+ h = None
if header:
if isinstance(header, dict):
+ h = header
header = json_encode(header)
- h = json_decode(header)
- p = self._merge_headers(p, h)
+ else:
+ h = json_decode(header)
+
+ p = self._merge_check_headers(p, h)
if 'alg' in p:
if alg is None:
@@ -416,6 +502,7 @@
self.objects['signatures'].append(o)
else:
self.objects.update(o)
+ self.objects['b64'] = b64
def serialize(self, compact=False):
"""Serializes the object into a JWS token.
@@ -428,7 +515,6 @@
:raises InvalidJWSSignature: if no signature has been added
to the object, or no valid signature can be found.
"""
-
if compact:
if 'signatures' in self.objects:
raise InvalidJWSOperation("Can't use compact encoding with "
@@ -441,23 +527,40 @@
protected = base64url_encode(self.objects['protected'])
else:
protected = ''
- return '.'.join([protected,
- base64url_encode(self.objects['payload']),
+ if self.objects.get('payload', False):
+ if self.objects.get('b64', True):
+ payload = base64url_encode(self.objects['payload'])
+ else:
+ if isinstance(self.objects['payload'], bytes):
+ payload = self.objects['payload'].decode('utf-8')
+ else:
+ payload = self.objects['payload']
+ if '.' in payload:
+ raise InvalidJWSOperation(
+ "Can't use compact encoding with unencoded "
+ "payload that uses the . character")
+ else:
+ payload = ''
+ return '.'.join([protected, payload,
base64url_encode(self.objects['signature'])])
else:
obj = self.objects
+ sig = dict()
+ if self.objects.get('payload', False):
+ if self.objects.get('b64', True):
+ sig['payload'] = base64url_encode(self.objects['payload'])
+ else:
+ sig['payload'] = self.objects['payload']
if 'signature' in obj:
if not obj.get('valid', False):
raise InvalidJWSSignature("No valid signature found")
- sig = {'payload': base64url_encode(obj['payload']),
- 'signature': base64url_encode(obj['signature'])}
+ sig['signature'] = base64url_encode(obj['signature'])
if 'protected' in obj:
sig['protected'] = base64url_encode(obj['protected'])
if 'header' in obj:
sig['header'] = obj['header']
elif 'signatures' in obj:
- sig = {'payload': base64url_encode(obj['payload']),
- 'signatures': list()}
+ sig['signatures'] = list()
for o in obj['signatures']:
if not o.get('valid', False):
continue
@@ -481,24 +584,27 @@
raise InvalidJWSOperation("Payload not verified")
return self.objects['payload']
+ def detach_payload(self):
+ self.objects.pop('payload', None)
+
@property
def jose_header(self):
obj = self.objects
if 'signature' in obj:
- jh = dict()
if 'protected' in obj:
p = json_decode(obj['protected'])
- jh = self._merge_headers(jh, p)
- jh = self._merge_headers(jh, obj.get('header', dict()))
- return jh
+ else:
+ p = None
+ return self._merge_check_headers(p, obj.get('header', dict()))
elif 'signatures' in self.objects:
jhl = list()
for o in obj['signatures']:
jh = dict()
- if 'protected' in obj:
+ if 'protected' in o:
p = json_decode(o['protected'])
- jh = self._merge_headers(jh, p)
- jh = self._merge_headers(jh, o.get('header', dict()))
+ else:
+ p = None
+ jh = self._merge_check_headers(p, o.get('header', dict()))
jhl.append(jh)
return jhl
else:
diff -Nru python-jwcrypto-0.4.2/jwcrypto/jwt.py python-jwcrypto-0.6.0/jwcrypto/jwt.py
--- python-jwcrypto-0.4.2/jwcrypto/jwt.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/jwt.py 2018-11-05 17:14:47.000000000 +0200
@@ -5,7 +5,7 @@
from six import string_types
-from jwcrypto.common import json_decode, json_encode
+from jwcrypto.common import JWException, json_decode, json_encode
from jwcrypto.jwe import JWE
from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jws import JWS
@@ -22,7 +22,7 @@
'jti': 'JWT ID'}
-class JWTExpired(Exception):
+class JWTExpired(JWException):
"""Json Web Token is expired.
This exception is raised when a token is expired accoring to its claims.
@@ -39,7 +39,7 @@
super(JWTExpired, self).__init__(msg)
-class JWTNotYetValid(Exception):
+class JWTNotYetValid(JWException):
"""Json Web Token is not yet valid.
This exception is raised when a token is not valid yet according to its
@@ -57,7 +57,7 @@
super(JWTNotYetValid, self).__init__(msg)
-class JWTMissingClaim(Exception):
+class JWTMissingClaim(JWException):
"""Json Web Token claim is invalid.
This exception is raised when a claim does not match the expected value.
@@ -74,7 +74,7 @@
super(JWTMissingClaim, self).__init__(msg)
-class JWTInvalidClaimValue(Exception):
+class JWTInvalidClaimValue(JWException):
"""Json Web Token claim is invalid.
This exception is raised when a claim does not match the expected value.
@@ -91,7 +91,7 @@
super(JWTInvalidClaimValue, self).__init__(msg)
-class JWTInvalidClaimFormat(Exception):
+class JWTInvalidClaimFormat(JWException):
"""Json Web Token claim format is invalid.
This exception is raised when a claim is not in a valid format.
@@ -108,7 +108,7 @@
super(JWTInvalidClaimFormat, self).__init__(msg)
-class JWTMissingKeyID(Exception):
+class JWTMissingKeyID(JWException):
"""Json Web Token is missing key id.
This exception is raised when trying to decode a JWT with a key set
@@ -126,7 +126,7 @@
super(JWTMissingKeyID, self).__init__(msg)
-class JWTMissingKey(Exception):
+class JWTMissingKey(JWException):
"""Json Web Token is using a key not in the key set.
This exception is raised if the key that was used is not available
@@ -155,15 +155,15 @@
"""Creates a JWT object.
:param header: A dict or a JSON string with the JWT Header data.
- :param claims: A dict or a string withthe JWT Claims data.
+ :param claims: A dict or a string with the JWT Claims data.
:param jwt: a 'raw' JWT token
:param key: A (:class:`jwcrypto.jwk.JWK`) key to deserialize
- the token. A (:class:`jwcrypt.jwk.JWKSet`) can also be used.
+ the token. A (:class:`jwcrypto.jwk.JWKSet`) can also be used.
:param algs: An optional list of allowed algorithms
:param default_claims: An optional dict with default values for
registred claims. A None value for NumericDate type claims
will cause generation according to system time. Only the values
- fro RFC 7519 - 4.1 are evaluated.
+ from RFC 7519 - 4.1 are evaluated.
:param check_claims: An optional dict of claims that must be
present in the token, if the value is not None the claim must
match exactly.
@@ -212,9 +212,15 @@
@header.setter
def header(self, h):
if isinstance(h, dict):
- self._header = json_encode(h)
+ eh = json_encode(h)
else:
- self._header = h
+ eh = h
+ h = json_decode(eh)
+
+ if h.get('b64') is False:
+ raise ValueError("b64 header is invalid."
+ "JWTs cannot use unencoded payloads")
+ self._header = eh
@property
def claims(self):
@@ -224,6 +230,10 @@
@claims.setter
def claims(self, c):
+ if self._reg_claims and not isinstance(c, dict):
+ # decode c so we can set default claims
+ c = json_decode(c)
+
if isinstance(c, dict):
self._add_default_claims(c)
self._claims = json_encode(c)
@@ -276,7 +286,7 @@
def _add_jti_claim(self, claims):
if 'jti' in claims or 'jti' not in self._reg_claims:
return
- claims['jti'] = uuid.uuid4()
+ claims['jti'] = str(uuid.uuid4())
def _add_default_claims(self, claims):
if self._reg_claims is None:
@@ -380,8 +390,8 @@
if value in claims[name]:
continue
raise JWTInvalidClaimValue(
- "Invalid '%s' value. Expected '%s' in '%s'" % (
- name, value, claims[name]))
+ "Invalid '%s' value. Expected '%s' to be in '%s'" % (
+ name, claims[name], value))
elif name == 'exp':
if value is not None:
@@ -398,7 +408,7 @@
else:
if value is not None and value != claims[name]:
raise JWTInvalidClaimValue(
- "Invalid '%s' value. Expected '%d' got '%d'" % (
+ "Invalid '%s' value. Expected '%s' got '%s'" % (
name, value, claims[name]))
def make_signed_token(self, key):
@@ -437,7 +447,7 @@
:param jwt: a 'raw' JWT token.
:param key: A (:class:`jwcrypto.jwk.JWK`) verification or
- decryption key, or a (:class:`jwcrypt.jwk.JWKSet`) that
+ decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that
contains a key indexed by the 'kid' header.
"""
c = jwt.count('.')
diff -Nru python-jwcrypto-0.4.2/jwcrypto/tests.py python-jwcrypto-0.6.0/jwcrypto/tests.py
--- python-jwcrypto-0.4.2/jwcrypto/tests.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/jwcrypto/tests.py 2018-11-05 17:14:47.000000000 +0200
@@ -3,7 +3,6 @@
from __future__ import unicode_literals
import copy
-
import unittest
from cryptography.hazmat.backends import default_backend
@@ -312,11 +311,17 @@
self.assertRaises(jwk.InvalidJWKValue,
jwk.JWK.from_pyca, dict())
+ def test_jwk_from_json(self):
+ k = jwk.JWK.generate(kty='oct', size=256)
+ y = jwk.JWK.from_json(k.export())
+ self.assertEqual(k.export(), y.export())
+
def test_jwkset(self):
k = jwk.JWK(**RSAPrivateKey)
ks = jwk.JWKSet()
ks.add(k)
- ks2 = jwk.JWKSet().import_keyset(ks.export())
+ ks2 = jwk.JWKSet()
+ ks2.import_keyset(ks.export())
self.assertEqual(len(ks), len(ks2))
self.assertEqual(len(ks), 1)
k1 = ks.get_key(RSAPrivateKey['kid'])
@@ -329,6 +334,15 @@
ks3 = jwk.JWKSet.from_json(ks.export())
self.assertEqual(len(ks), len(ks3))
+ # Test Keyset with mutiple keys
+ ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys))
+ num = 0
+ for item in ksm:
+ self.assertTrue(isinstance(item, jwk.JWK))
+ self.assertTrue(item in ksm)
+ num += 1
+ self.assertEqual(num, len(PrivateKeys['keys']))
+
def test_thumbprint(self):
for i in range(0, len(PublicKeys['keys'])):
k = jwk.JWK(**PublicKeys['keys'][i])
@@ -378,6 +392,25 @@
self.assertFalse(pubkey.has_private)
self.assertEqual(prikey.key_id, pubkey.key_id)
+ def test_public(self):
+ key = jwk.JWK.from_pem(RSAPrivatePEM, password=RSAPrivatePassword)
+ self.assertTrue(key.has_public)
+ self.assertTrue(key.has_private)
+ pubkey = key.public()
+ self.assertTrue(pubkey.has_public)
+ self.assertFalse(pubkey.has_private)
+ # finally check public works
+ e = jwe.JWE('plaintext', '{"alg":"RSA-OAEP","enc":"A256GCM"}')
+ e.add_recipient(pubkey)
+ enc = e.serialize()
+ d = jwe.JWE()
+ d.deserialize(enc, key)
+ self.assertEqual(d.payload, b'plaintext')
+
+ def test_invalid_value(self):
+ with self.assertRaises(jwk.InvalidJWKValue):
+ jwk.JWK(kty='oct', k=b'\x01')
+
# RFC 7515 - A.1
A1_protected = \
@@ -556,7 +589,11 @@
'key2': jwk.JWK(**A3_key),
'protected2': bytes(bytearray(A3_protected)).decode('utf-8'),
'header2': json_encode({"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d"}),
- 'serialized': A6_serialized}
+ 'serialized': A6_serialized,
+ 'jose_header': [{"kid": "2010-12-29",
+ "alg": "RS256"},
+ {"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d",
+ "alg": "ES256"}]}
A7_example = \
'{' + \
@@ -630,6 +667,7 @@
sig = s.serialize()
s.deserialize(sig, A6_example['key1'])
s.deserialize(A6_serialized, A6_example['key2'])
+ self.assertEqual(A6_example['jose_header'], s.jose_header)
def test_A7(self):
s = jws.JWS(A6_example['payload'])
@@ -801,6 +839,29 @@
'"ciphertext":"KDlTtXchhZTGufMYmOYGS4HffxPSUrfmqCHXaI9wOGY",' \
'"tag":"Mz-VPPyU4RlcuYv1IwIvzw"}'
+Issue_136_Protected_Header_no_epk = {
+ "alg": "ECDH-ES+A256KW",
+ "enc": "A256CBC-HS512"}
+
+Issue_136_Contributed_JWE = \
+ "eyJhbGciOiJFQ0RILUVTK0ExMjhLVyIsImVuYyI6IkEyNTZDQkMtSFM1MTIiLCJr" \
+ "aWQiOiJrZXkxIiwiZXBrIjp7Imt0eSI6IkVDIiwiY3J2IjoiUC0yNTYiLCJ4Ijoi" \
+ "cDNpU241cEFSNUpYUE5aVF9SSEw2MTJMUGliWEI2WDhvTE9EOXFrN2NhTSIsInki" \
+ "OiI1Y04yQ2FqeXM3SVlDSXFEby1QUHF2bVQ1RzFvMEEtU0JicEQ5NFBOb3NNIn19" \
+ ".wG51hYE_Vma8tvFKVyeZs4lsHhXiarEw3-59eWHPmhRflDAKrMvnBw1urezo_Bz" \
+ "ZyPJ76m42ORQPbhEu5NvbJk3vgdgcp03j" \
+ ".lRttW8r6P6zM0uYDQt0EjQ.qnOnz7biCbqdLEdUH3acMamFm-cBRCSTFb83tNPrgDU" \
+ ".vZnwYpYjzrTaYritwMzaguaAMsq9rQOWe8NUHICv2hg"
+
+Issue_136_Contributed_Key = {
+ "alg": "ECDH-ES+A128KW",
+ "crv": "P-256",
+ "d": "F2PnliYin65AoIUxL1CwwzBPNeL2TyZPAKtkXOP50l8",
+ "kid": "key1",
+ "kty": "EC",
+ "x": "FPrb_xwxe8SBP3kO-e-WsofFp7n5-yc_tGgfAvqAP8g",
+ "y": "lM3HuyKMYUVsYdGqiWlkwTZbGO3Fh-hyadq8lfkTgBc"}
+
class TestJWE(unittest.TestCase):
def check_enc(self, plaintext, protected, key, vector):
@@ -843,6 +904,40 @@
e = jwe.JWE(algs=['A256KW'])
e.deserialize(E_A5_ex, E_A4_ex['key2'])
+ def test_compact_protected_header(self):
+ """Compact representation requires a protected header"""
+ e = jwe.JWE(E_A1_ex['plaintext'])
+ e.add_recipient(E_A1_ex['key'], E_A1_ex['protected'])
+
+ with self.assertRaises(jwe.InvalidJWEOperation):
+ e.serialize(compact=True)
+
+ def test_compact_invalid_header(self):
+ with self.assertRaises(jwe.InvalidJWEOperation):
+ e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'],
+ aad='XYZ', recipient=E_A1_ex['key'])
+ e.serialize(compact=True)
+
+ with self.assertRaises(jwe.InvalidJWEOperation):
+ e = jwe.JWE(E_A1_ex['plaintext'], E_A1_ex['protected'],
+ unprotected='{"jku":"https://example.com/keys.jwks"}',
+ recipient=E_A1_ex['key'])
+ e.serialize(compact=True)
+
+ def test_JWE_Issue_136(self):
+ plaintext = "plain"
+ protected = json_encode(Issue_136_Protected_Header_no_epk)
+ key = jwk.JWK.generate(kty='EC', crv='P-521')
+ e = jwe.JWE(plaintext, protected)
+ e.add_recipient(key)
+ enc = e.serialize()
+ e.deserialize(enc, key)
+ self.assertEqual(e.payload, plaintext.encode('utf-8'))
+
+ e = jwe.JWE()
+ e.deserialize(Issue_136_Contributed_JWE,
+ jwk.JWK(**Issue_136_Contributed_Key))
+
MMA_vector_key = jwk.JWK(**E_A2_key)
MMA_vector_ok_cek = \
@@ -1018,6 +1113,39 @@
keyset.add(key)
jwt.JWT(jwt=token, key=keyset, check_claims={'exp': 1300819380})
+ def test_invalid_claim_type(self):
+ key = jwk.JWK(**E_A2_key)
+ claims = {"testclaim": "test"}
+ claims.update(A1_claims)
+ t = jwt.JWT(A1_header, claims)
+ t.make_encrypted_token(key)
+ token = t.serialize()
+
+ # Wrong string
+ self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token,
+ key=key, check_claims={"testclaim": "ijgi"})
+
+ # Wrong type
+ self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, jwt=token,
+ key=key, check_claims={"testclaim": 123})
+
+ # Correct
+ jwt.JWT(jwt=token, key=key, check_claims={"testclaim": "test"})
+
+ def test_claim_params(self):
+ key = jwk.JWK(**E_A2_key)
+ default_claims = {"iss": "test", "exp": None}
+ string_claims = '{"string_claim":"test"}'
+ string_header = '{"alg":"RSA1_5","enc":"A128CBC-HS256"}'
+ t = jwt.JWT(string_header, string_claims,
+ default_claims=default_claims)
+ t.make_encrypted_token(key)
+ token = t.serialize()
+
+ # Check default_claims
+ jwt.JWT(jwt=token, key=key, check_claims={"iss": "test", "exp": None,
+ "string_claim": "test"})
+
class ConformanceTests(unittest.TestCase):
@@ -1148,3 +1276,57 @@
self.assertEqual(inst.name, name)
else:
self.fail((name, cls))
+
+
+# RFC 7797
+
+rfc7797_e_header = '{"alg":"HS256"}'
+rfc7797_u_header = '{"alg":"HS256","b64":false,"crit":["b64"]}'
+rfc7797_payload = "$.02"
+
+
+class TestUnencodedPayload(unittest.TestCase):
+
+ def test_regular(self):
+ result = \
+ 'eyJhbGciOiJIUzI1NiJ9.JC4wMg.' + \
+ '5mvfOroL-g7HyqJoozehmsaqmvTYGEq5jTI1gVvoEoQ'
+
+ s = jws.JWS(rfc7797_payload)
+ s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
+ protected=rfc7797_e_header)
+ sig = s.serialize(compact=True)
+ self.assertEqual(sig, result)
+
+ def test_compat_unencoded(self):
+ result = \
+ 'eyJhbGciOiJIUzI1NiIsImI2NCI6ZmFsc2UsImNyaXQiOlsiYjY0Il19..' + \
+ 'A5dxf2s96_n5FLueVuW1Z_vh161FwXZC4YLPff6dmDY'
+
+ s = jws.JWS(rfc7797_payload)
+ s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
+ protected=rfc7797_u_header)
+ # check unencoded payload is in serialized form
+ sig = s.serialize()
+ self.assertEqual(json_decode(sig)['payload'], rfc7797_payload)
+ # check error raises if we try to get compact serialization
+ with self.assertRaises(jws.InvalidJWSOperation):
+ sig = s.serialize(compact=True)
+ # check compact serialization is allowed with detached payload
+ s.detach_payload()
+ sig = s.serialize(compact=True)
+ self.assertEqual(sig, result)
+
+ def test_misses_crit(self):
+ s = jws.JWS(rfc7797_payload)
+ with self.assertRaises(jws.InvalidJWSObject):
+ s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
+ protected={"alg": "HS256", "b64": False})
+
+ def test_mismatching_encoding(self):
+ s = jws.JWS(rfc7797_payload)
+ s.add_signature(jwk.JWK(**SymmetricKeys['keys'][0]),
+ protected=rfc7797_e_header)
+ with self.assertRaises(jws.InvalidJWSObject):
+ s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
+ protected=rfc7797_u_header)
diff -Nru python-jwcrypto-0.4.2/Makefile python-jwcrypto-0.6.0/Makefile
--- python-jwcrypto-0.4.2/Makefile 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/Makefile 2018-11-05 17:14:47.000000000 +0200
@@ -21,13 +21,15 @@
testlong: export TOX_TESTENV_PASSENV=JWCRYPTO_TESTS_ENABLE_MMA
testlong:
rm -f .coverage
- tox -e py35
+ tox -e py36
test:
rm -f .coverage
tox -e py27
tox -e py34 --skip-missing-interpreter
tox -e py35 --skip-missing-interpreter
+ tox -e py36 --skip-missing-interpreter
+ tox -e py37 --skip-missing-interpreter
DOCS_DIR = docs
.PHONY: docs
diff -Nru python-jwcrypto-0.4.2/README.md python-jwcrypto-0.6.0/README.md
--- python-jwcrypto-0.4.2/README.md 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/README.md 2018-11-05 17:14:47.000000000 +0200
@@ -1,3 +1,5 @@
+[](https://travis-ci.org/latchset/jwcrypto)
+
JWCrypto
========
diff -Nru python-jwcrypto-0.4.2/setup.py python-jwcrypto-0.6.0/setup.py
--- python-jwcrypto-0.4.2/setup.py 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/setup.py 2018-11-05 17:14:47.000000000 +0200
@@ -6,7 +6,7 @@
setup(
name = 'jwcrypto',
- version = '0.4.2',
+ version = '0.6.0',
license = 'LGPLv3+',
maintainer = 'JWCrypto Project Contributors',
maintainer_email = 'simo@redhat.com',
@@ -18,6 +18,7 @@
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
+ 'Programming Language :: Python :: 3.7',
'Intended Audience :: Developers',
'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules'
diff -Nru python-jwcrypto-0.4.2/tox.ini python-jwcrypto-0.6.0/tox.ini
--- python-jwcrypto-0.4.2/tox.ini 2017-08-01 18:56:23.000000000 +0300
+++ python-jwcrypto-0.6.0/tox.ini 2018-11-05 17:14:47.000000000 +0200
@@ -1,5 +1,5 @@
[tox]
-envlist = lint,py27,py34,py35,py36,pep8py2,pep8py3,doc,sphinx
+envlist = lint,py27,py34,py35,py36,py37,pep8py2,pep8py3,doc,sphinx
skip_missing_interpreters = true
[testenv]
unblock python-jwcrypto/0.6.0-1
Reply to: