Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

crypt wsaa #53

Merged
merged 13 commits into from
Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ jobs:
- name: Install package
run: |
python setup.py install
- name: Download certificate and private key
run: |
wget https://www.sistemasagiles.com.ar/soft/pyafipws/reingart2019.zip
unzip reingart2019.zip
- name: Test with pytest
run: |
pytest
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
httplib2==0.9.2; python_version <= '2.7'
httplib2==0.19.0; python_version > '3'
pysimplesoap==1.08.14
#m2crypto>=0.18
cryptography==3.3.2; python_version <= '2.7'
cryptography==3.4.7; python_version > '3'
fpdf>=1.7.2
dbf>=0.88.019
Pillow>=2.0.0
Expand Down
35 changes: 35 additions & 0 deletions tests/test_wsaa_crypto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import base64, subprocess

from past.builtins import basestring

from pyafipws.wsaa import WSAA


def test_wsfev1_create_tra():
wsaa = WSAA()
tra = wsaa.CreateTRA(service="wsfe")
# TODO: return string
tra = tra.decode("utf8")
# sanity checks:
assert isinstance(tra, basestring)
assert tra.startswith(
'<?xml version="1.0" encoding="UTF-8"?>'
'<loginTicketRequest version="1.0">'
)
assert '<uniqueId>' in tra
assert '<expirationTime>' in tra
assert tra.endswith('<service>wsfe</service></loginTicketRequest>')


def test_wsfev1_sign():
wsaa = WSAA()
tra = '<?xml version="1.0" encoding="UTF-8"?><loginTicketRequest version="1.0"/>'
# TODO: use certificate and private key as fixture / PEM text (not files)
cms = wsaa.SignTRA(tra, "reingart.crt", "reingart.key")
# TODO: return string
if not isinstance(cms, str):
cms = cms.decode("utf8")
# sanity checks:
assert isinstance(cms, str)
out = base64.b64decode(cms)
assert tra.encode("utf8") in out
194 changes: 104 additions & 90 deletions wsaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,19 @@
)

try:
from M2Crypto import BIO, Rand, SMIME, SSL
from cryptography import x509
from cryptography.x509.oid import NameOID
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.bindings.openssl.binding import Binding

except ImportError:
ex = exception_info()
warnings.warn("No es posible importar M2Crypto (OpenSSL)")
warnings.warn(ex["msg"]) # revisar instalación y DLLs de OpenSSL
BIO = Rand = SMIME = SSL = None
warnings.warn("No es posible importar cryptography (OpenSSL)")
warnings.warn(ex['msg']) # revisar instalación y DLLs de OpenSSL
Binding = None
# utilizar alternativa (ejecutar proceso por separado)
from subprocess import Popen, PIPE
from base64 import b64encode
Expand Down Expand Up @@ -85,51 +92,66 @@ def create_tra(service=SERVICE, ttl=2400):
# El source es opcional. Si falta, toma la firma (recomendado).
# tra.header.addChild('source','subject=...')
# tra.header.addChild('destination','cn=wsaahomo,o=afip,c=ar,serialNumber=CUIT 33693450239')
tra.header.add_child("uniqueId", str(date("U")))
tra.header.add_child("generationTime", str(date("c", date("U") - ttl)))
tra.header.add_child("expirationTime", str(date("c", date("U") + ttl)))
tra.add_child("service", service)
tra.header.add_child('uniqueId', str(date('U')))
tra.header.add_child('generationTime', str(date('c', date('U') - ttl)))
tra.header.add_child('expirationTime', str(date('c', date('U') + ttl)))
tra.add_child('service', service)
return tra.as_xml()


def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
"Firmar PKCS#7 el TRA y devolver CMS (recortando los headers SMIME)"

if BIO:
# Firmar el texto (tra) usando m2crypto (openssl bindings para python)
buf = BIO.MemoryBuffer(tra.encode("utf8")) # Crear un buffer desde el texto
# Rand.load_file('randpool.dat', -1) # Alimentar el PRNG
s = SMIME.SMIME() # Instanciar un SMIME
# soporte de contraseña de encriptación (clave privada, opcional)
callback = lambda *args, **kwarg: passphrase
# Cargar clave privada y certificado
if not privatekey.startswith(b"-----BEGIN RSA PRIVATE KEY-----"):
# leer contenido desde archivo (evitar problemas Applink / MSVCRT)
if os.path.exists(privatekey) and os.path.exists(cert):
privatekey = open(privatekey, "rb").read()
cert = open(cert, "rb").read()
else:
raise RuntimeError(
"Archivos no encontrados: %s, %s" % (privatekey, cert)
)
# crear buffers en memoria de la clave privada y certificado:
key_bio = BIO.MemoryBuffer(privatekey)
crt_bio = BIO.MemoryBuffer(cert)
s.load_key_bio(key_bio, crt_bio, callback) # (desde buffer)
p7 = s.sign(buf, 0) # Firmar el buffer
out = BIO.MemoryBuffer() # Crear un buffer para la salida
s.write(out, p7) # Generar p7 en formato mail
# Rand.save_file('randpool.dat') # Guardar el estado del PRNG's

msg_out = out.read()
if isinstance(msg_out, bytes):
msg_out = msg_out.decode("utf8")
# extraer el cuerpo del mensaje (parte firmada)
msg = email.message_from_string(msg_out)
if isinstance(tra, str):
tra = tra.encode("utf8")

if Binding:
_lib = Binding.lib
_ffi = Binding.ffi
# Crear un buffer desde el texto
bio_in = _lib.BIO_new_mem_buf(tra, len(tra))

# Leer privatekey y cert
with open(privatekey, 'rb') as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(), None, default_backend())

with open(cert, 'rb') as cert_file:
cert = x509.load_pem_x509_certificate(
cert_file.read(), default_backend())

try:
# Firmar el texto (tra) usando cryptography (openssl bindings para python)
p7 = _lib.PKCS7_sign(cert._x509, private_key._evp_pkey, _ffi.NULL, bio_in, 0)
finally:
# Liberar memoria asignada
_lib.BIO_free(bio_in)
# Se crea un buffer nuevo porque la firma lo consume
bio_in = _lib.BIO_new_mem_buf(tra, len(tra))
try:
# Crear buffer de salida
bio_out = _lib.BIO_new(_lib.BIO_s_mem())
try:
# Instanciar un SMIME
_lib.SMIME_write_PKCS7(bio_out, p7, bio_in, 0)

# Tomar datos para la salida
result_buffer = _ffi.new('char**')
buffer_length = _lib.BIO_get_mem_data(bio_out, result_buffer)
output = _ffi.buffer(result_buffer[0], buffer_length)[:]
finally:
_lib.BIO_free(bio_out)
finally:
_lib.BIO_free(bio_in)

# Generar p7 en formato mail y recortar headers
msg = email.message_from_string(output.decode('utf8'))
for part in msg.walk():
filename = part.get_filename()
if filename == "smime.p7m": # es la parte firmada?
return part.get_payload(decode=False) # devolver CMS
if filename == "smime.p7m":
# Es la parte firmada?
# Devolver CMS
return part.get_payload(decode=False)
else:
# Firmar el texto (tra) usando OPENSSL directamente
try:
Expand All @@ -149,7 +171,7 @@ def sign_tra(tra, cert=CERT, privatekey=PRIVATEKEY, passphrase=""):
stdin=PIPE,
stdout=PIPE,
stderr=PIPE,
).communicate(tra.encode("utf8"))[0]
).communicate(tra)[0]
return b64encode(out)
except OSError as e:
if e.errno == 2:
Expand Down Expand Up @@ -245,21 +267,20 @@ def CreateTRA(self, service="wsfe", ttl=2400):
@inicializar_y_capturar_excepciones
def AnalizarCertificado(self, crt, binary=False):
"Carga un certificado digital y extrae los campos más importantes"
from M2Crypto import BIO, EVP, RSA, X509

if binary:
bio = BIO.MemoryBuffer(cert)
x509 = X509.load_cert_bio(bio, X509.FORMAT_DER)
cert = x509.load_pem_x509_certificate(crt, default_backend())
else:
if not crt.startswith("-----BEGIN CERTIFICATE-----"):
crt = open(crt).read()
bio = BIO.MemoryBuffer(crt)
x509 = X509.load_cert_bio(bio, X509.FORMAT_PEM)
if x509:
self.Identidad = x509.get_subject().as_text()
self.Caducidad = x509.get_not_after().get_datetime()
self.Emisor = x509.get_issuer().as_text()
self.CertX509 = x509.as_text()
if isinstance(crt, str):
crt = crt.encode('utf-8')
cert = x509.load_pem_x509_certificate(crt, default_backend())
if cert:
self.Identidad = cert.subject
self.Caducidad = cert.not_valid_after
self.Emisor = cert.issuer
self.CertX509 = cert
return True

@inicializar_y_capturar_excepciones
Expand All @@ -271,32 +292,33 @@ def CrearClavePrivada(
passphrase="",
):
"Crea una clave privada (private key)"
from M2Crypto import RSA, EVP

# only protect if passphrase was given (it will fail otherwise)
callback = lambda *args, **kwarg: passphrase
chiper = None if not passphrase else "aes_128_cbc"
# create the RSA key pair (and save the result to a file):
rsa_key_pair = RSA.gen_key(key_length, pub_exponent, callback)
bio = BIO.MemoryBuffer()
rsa_key_pair.save_key_bio(bio, chiper, callback)
f = open(filename, "w")
f.write(bio.read())
f.close()
# create a public key to sign the certificate request:
self.pkey = EVP.PKey(md="sha256")
self.pkey.assign_rsa(rsa_key_pair)
rsa_key = rsa.generate_private_key(pub_exponent, key_length, backend=default_backend())

if passphrase:
passp = passphrase.encode('utf-8')
# encryption AES-256-CBC
cypher = serialization.BestAvailableEncryption(passp)
else:
cypher = serialization.NoEncryption()

with open(filename, "wb") as f:
f.write(rsa_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=cypher,
))

self.rsa_key = rsa_key
return True

@inicializar_y_capturar_excepciones
def CrearPedidoCertificado(
self, cuit="", empresa="", nombre="pyafipws", filename="empresa.csr"
):
"Crear un certificate signing request (X509 CSR)"
from M2Crypto import RSA, EVP, X509

# create the certificate signing request (CSR):
self.x509_req = X509.Request()
self.x509_req = x509.CertificateSigningRequestBuilder()

# normalizar encoding (reemplazar acentos, eñe, etc.)
if isinstance(empresa, str):
Expand All @@ -305,24 +327,18 @@ def CrearPedidoCertificado(
nombre = unicodedata.normalize("NFKD", nombre).encode("ASCII", "ignore")

# subjet: C=AR/O=[empresa]/CN=[nombre]/serialNumber=CUIT [nro_cuit]
x509name = X509.X509_Name()
# default OpenSSL parameters:
kwargs = {"type": 0x1000 | 1, "len": -1, "loc": -1, "set": 0}
x509name.add_entry_by_txt(field="C", entry="AR", **kwargs)
x509name.add_entry_by_txt(field="O", entry=empresa, **kwargs)
x509name.add_entry_by_txt(field="CN", entry=nombre, **kwargs)
x509name.add_entry_by_txt(
field="serialNumber", entry="CUIT %s" % str(cuit), **kwargs
)
self.x509_req.set_subject_name(x509name)

# sign the request with the previously created key (CrearClavePrivada)
self.x509_req.set_pubkey(pkey=self.pkey)
self.x509_req.sign(pkey=self.pkey, md="sha256")
csrs = self.x509_req.subject_name(x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, 'AR'),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, '{}'.format(empresa)),
x509.NameAttribute(NameOID.COMMON_NAME, '{}'.format(nombre)),
x509.NameAttribute(NameOID.SERIAL_NUMBER, 'CUIT {}'.format(cuit)),
])).sign(self.rsa_key, hashes.SHA256(), default_backend())

# save the CSR result to a file:
f = open(filename, "w")
f.write(self.x509_req.as_pem())
f.close()
with open(filename, "wb") as f:
f.write(csrs.public_bytes(serialization.Encoding.PEM))

return True

@inicializar_y_capturar_excepciones
Expand Down Expand Up @@ -409,7 +425,7 @@ def Autenticar(
tra = self.CreateTRA(service=service, ttl=DEFAULT_TTL)
# firmarlo criptográficamente
if DEBUG:
print("Frimando TRA...")
print("Firmando TRA...")
cms = self.SignTRA(tra, crt, key)
# concectar con el servicio web:
if DEBUG:
Expand Down Expand Up @@ -528,9 +544,9 @@ def Autenticar(
print(pedido_cert)
# convertir a terminación de linea windows y abrir con bloc de notas
if sys.platform == "win32":
txt = open(pedido_cert + ".txt", "wb")
txt = open(pedido_cert + ".txt", "w")
for linea in open(pedido_cert, "r"):
txt.write("%s\r\n" % linea)
txt.write("{}".format(linea))
txt.close()
os.startfile(pedido_cert + ".txt")
else:
Expand Down Expand Up @@ -590,5 +606,3 @@ def Autenticar(
print("Generation Time:", wsaa.ObtenerTagXml("generationTime"))
print("Expiration Time:", wsaa.ObtenerTagXml("expirationTime"))
print("Expiro?", wsaa.Expirado())
##import time; time.sleep(10)
##print "Expiro?", wsaa.Expirado()