Source code for M2Crypto.SMIME

"""M2Crypto wrapper for OpenSSL S/MIME API.

Copyright (c) 1999-2003 Ng Pheng Siong. All rights reserved."""

from M2Crypto import BIO, EVP, Err, X509, m2, util, types as C
from typing import Callable, Optional, Tuple, Union  # noqa

PKCS7_TEXT: int = m2.PKCS7_TEXT
PKCS7_NOCERTS: int = m2.PKCS7_NOCERTS
PKCS7_NOSIGS: int = m2.PKCS7_NOSIGS
PKCS7_NOCHAIN: int = m2.PKCS7_NOCHAIN
PKCS7_NOINTERN: int = m2.PKCS7_NOINTERN
PKCS7_NOVERIFY: int = m2.PKCS7_NOVERIFY
PKCS7_DETACHED: int = m2.PKCS7_DETACHED
PKCS7_BINARY: int = m2.PKCS7_BINARY
PKCS7_NOATTR: int = m2.PKCS7_NOATTR
PKCS7_SIGNED: int = m2.PKCS7_SIGNED
PKCS7_ENVELOPED: int = m2.PKCS7_ENVELOPED
PKCS7_SIGNED_ENVELOPED: int = m2.PKCS7_SIGNED_ENVELOPED  # Deprecated
PKCS7_DATA: int = m2.PKCS7_DATA


[docs] class PKCS7_Error(Exception): pass
m2.pkcs7_init(PKCS7_Error)
[docs] class PKCS7(object): def __init__(self, pkcs7: Optional[C.PKCS7] = None, _pyfree: int = 0) -> None: """PKCS7 object. :param pkcs7: binary representation of the OpenSSL type PKCS7 """ if pkcs7 is not None: self.pkcs7 = pkcs7 self._pyfree = _pyfree else: self.pkcs7 = m2.pkcs7_new() self._pyfree = 1 @staticmethod def _m2_pkcs7_free(p7: C.PKCS7) -> None: m2.pkcs7_free(p7) def __del__(self) -> None: if getattr(self, "_pyfree", 0): self._m2_pkcs7_free(self.pkcs7) def _ptr(self): return self.pkcs7
[docs] def type(self, text_name: int = 0) -> Union[int, str]: if text_name: return m2.pkcs7_type_sn(self.pkcs7) else: return m2.pkcs7_type_nid(self.pkcs7)
[docs] def write(self, bio: BIO.BIO) -> int: return m2.pkcs7_write_bio(self.pkcs7, bio._ptr())
[docs] def write_der(self, bio: BIO.BIO) -> int: return m2.pkcs7_write_bio_der(self.pkcs7, bio._ptr())
[docs] def get0_signers(self, certs: X509.X509_Stack, flags: int = 0) -> X509.X509_Stack: return X509.X509_Stack(m2.pkcs7_get0_signers(self.pkcs7, certs.stack, flags), 1)
[docs] def load_pkcs7(p7file: Union[str, bytes]) -> PKCS7: with BIO.openfile(p7file, "r") as bio: p7_ptr = m2.pkcs7_read_bio(bio.bio) return PKCS7(p7_ptr, 1)
[docs] def load_pkcs7_der(p7file: Union[str, bytes]) -> PKCS7: with BIO.openfile(p7file, "rb") as bio: p7_ptr = m2.pkcs7_read_bio_der(bio.bio) return PKCS7(p7_ptr, 1)
[docs] def load_pkcs7_bio(p7_bio: BIO.BIO) -> PKCS7: p7_ptr = m2.pkcs7_read_bio(p7_bio._ptr()) return PKCS7(p7_ptr, 1)
[docs] def load_pkcs7_bio_der(p7_bio: BIO.BIO) -> PKCS7: p7_ptr = m2.pkcs7_read_bio_der(p7_bio._ptr()) return PKCS7(p7_ptr, 1)
[docs] def smime_load_pkcs7(p7file: Union[str, bytes]) -> Tuple[PKCS7, Optional[BIO.BIO]]: filename = p7file.decode() if isinstance(p7file, bytes) else p7file bio = m2.bio_new_file(filename, "r") try: p7_ptr, bio_ptr = m2.smime_read_pkcs7(bio) finally: m2.bio_free(bio) if bio_ptr is None: return PKCS7(p7_ptr, 1), None else: return PKCS7(p7_ptr, 1), BIO.BIO(bio_ptr, 1)
[docs] def smime_load_pkcs7_bio(p7_bio: BIO.BIO) -> Tuple[PKCS7, Optional[BIO.BIO]]: p7_ptr, bio_ptr = m2.smime_read_pkcs7(p7_bio._ptr()) if p7_ptr is None: raise SMIME_Error(Err.get_error()) if bio_ptr is None: return PKCS7(p7_ptr, 1), None else: return PKCS7(p7_ptr, 1), BIO.BIO(bio_ptr, 1)
[docs] class Cipher(object): """Object interface to EVP_CIPHER without all the frills of M2Crypto.EVP.Cipher. """ def __init__(self, algo: str) -> None: cipher = getattr(m2, algo, None) if cipher is None: raise ValueError("unknown cipher", algo) self.cipher = cipher() def _ptr(self): return self.cipher
[docs] class SMIME_Error(Exception): pass
m2.smime_init(SMIME_Error) # FIXME class has no __init__ method
[docs] class SMIME(object):
[docs] def load_key( self, keyfile: Union[str, bytes], certfile: Union[str, bytes, None] = None, callback: Callable = util.passphrase_callback, ) -> None: if certfile is None: certfile = keyfile self.pkey = EVP.load_key(keyfile, callback) self.x509 = X509.load_cert(certfile)
[docs] def load_key_bio( self, keybio: BIO.BIO, certbio: Optional[BIO.BIO] = None, callback: Callable = util.passphrase_callback, ) -> None: if certbio is None: certbio = keybio self.pkey = EVP.load_key_bio(keybio, callback) self.x509 = X509.load_cert_bio(certbio)
[docs] def set_x509_stack(self, stack: X509.X509_Stack) -> None: assert isinstance(stack, X509.X509_Stack) self.x509_stack = stack
[docs] def set_x509_store(self, store: X509.X509_Store) -> None: assert isinstance(store, X509.X509_Store) self.x509_store = store
[docs] def set_cipher(self, cipher: Cipher) -> None: assert isinstance(cipher, Cipher) self.cipher = cipher
[docs] def unset_key(self) -> None: del self.pkey del self.x509
[docs] def unset_x509_stack(self) -> None: del self.x509_stack
[docs] def unset_x509_store(self) -> None: del self.x509_store
[docs] def unset_cipher(self) -> None: del self.cipher
[docs] def encrypt(self, data_bio: BIO.BIO, flags: int = 0) -> PKCS7: if not hasattr(self, "cipher"): raise SMIME_Error("no cipher: use set_cipher()") if not hasattr(self, "x509_stack"): raise SMIME_Error("no recipient certs: use set_x509_stack()") pkcs7 = m2.pkcs7_encrypt( self.x509_stack._ptr(), data_bio._ptr(), self.cipher._ptr(), flags, ) return PKCS7(pkcs7, 1)
[docs] def decrypt(self, pkcs7: PKCS7, flags: int = 0) -> Optional[bytes]: if not hasattr(self, "pkey"): raise SMIME_Error("no private key: use load_key()") if not hasattr(self, "x509"): raise SMIME_Error("no certificate: load_key() used incorrectly?") blob = m2.pkcs7_decrypt(pkcs7._ptr(), self.pkey._ptr(), self.x509._ptr(), flags) return blob
[docs] def sign( self, data_bio: BIO.BIO, flags: int = 0, algo: str = "sha256", ) -> PKCS7: if not hasattr(self, "pkey"): raise SMIME_Error("no private key: use load_key()") hash = getattr(m2, algo, None) if hash is None: raise SMIME_Error("no such hash algorithm %s" % algo) if hasattr(self, "x509_stack"): pkcs7 = m2.pkcs7_sign1( self.x509._ptr(), self.pkey._ptr(), self.x509_stack._ptr(), data_bio._ptr(), hash(), flags, ) return PKCS7(pkcs7, 1) else: pkcs7 = m2.pkcs7_sign0( self.x509._ptr(), self.pkey._ptr(), data_bio._ptr(), hash(), flags, ) return PKCS7(pkcs7, 1)
[docs] def verify( self, pkcs7: PKCS7, data_bio: Optional[BIO.BIO] = None, flags: int = 0, ) -> Optional[bytes]: if not hasattr(self, "x509_stack"): raise SMIME_Error("no signer certs: use set_x509_stack()") if not hasattr(self, "x509_store"): raise SMIME_Error("no x509 cert store: use set_x509_store()") assert isinstance(pkcs7, PKCS7), "pkcs7 not an instance of PKCS7" p7 = pkcs7._ptr() if data_bio is None: blob = m2.pkcs7_verify0( p7, self.x509_stack._ptr(), self.x509_store._ptr(), flags, ) else: blob = m2.pkcs7_verify1( p7, self.x509_stack._ptr(), self.x509_store._ptr(), data_bio._ptr(), flags, ) return blob
[docs] def write( self, out_bio: BIO.BIO, pkcs7: PKCS7, data_bio: Optional[BIO.BIO] = None, flags: int = 0, ) -> int: if data_bio is None: return m2.smime_write_pkcs7(out_bio._ptr(), pkcs7._ptr(), flags) else: return m2.smime_write_pkcs7_multi( out_bio._ptr(), pkcs7._ptr(), data_bio._ptr(), flags )
[docs] def text_crlf(text: bytes) -> bytes: bio_in = BIO.MemoryBuffer(text) bio_out = BIO.MemoryBuffer() if m2.smime_crlf_copy(bio_in._ptr(), bio_out._ptr()): return bio_out.read() else: raise SMIME_Error(Err.get_error())
[docs] def text_crlf_bio(bio_in: BIO.BIO) -> BIO.BIO: bio_out = BIO.MemoryBuffer() if m2.smime_crlf_copy(bio_in._ptr(), bio_out._ptr()): return bio_out else: raise SMIME_Error(Err.get_error())