I am using RSA algorithm to encrypt data. The frontend uses the window.crypto and the backend uses the pycryptodome.
However, I discovered that after the frontend encrypts a payload, the backend can’t decrypt it. The frontend refers MDN‘s example code: https://github.com/mdn/dom-examples/blob/main/web-crypto/import-key/spki.js, and my code is not shown on GitHub now. The backend is copied from the example which is written by myself: https://github.com/zvms/rsa-bcrypt-jwt-login-eg/blob/main/cert.py. The full code is showed here:
Frontend
crpyto.ts
// Function to convert PEM formatted public key to a CryptoKey object
async function importPublicKey(pemKey: string) {
const withoutNewlines = pemKey
.replace('-----BEGIN PUBLIC KEY-----', '')
.replace('-----END PUBLIC KEY-----', '')
.split('n')
.filter((line) => line.trim() !== '')
.join('')
console.log(withoutNewlines)
// Base64 decode the string to get the binary data
const binaryDerString = window.atob(withoutNewlines)
// Convert from a binary string to an ArrayBuffer
const binaryDer = str2ab(binaryDerString)
return window.crypto.subtle.importKey(
'spki',
binaryDer,
{
name: 'RSA-OAEP',
hash: 'SHA-256' // Specify the hash algorithm
},
true,
['encrypt']
)
}
// Utility function to convert a binary string to an ArrayBuffer
function str2ab(str: string) {
const buf = new ArrayBuffer(str.length)
const bufView = new Uint8Array(buf)
for (let i = 0, strLen = str.length; i < strLen; i++) {
bufView[i] = str.charCodeAt(i)
}
return buf
}
// Function to encrypt data using RSA-OAEP
async function encryptData(publicKey: CryptoKey, data: string) {
const encoder = new TextEncoder()
const encodedData = encoder.encode(data)
const encryptedData = await window.crypto.subtle.encrypt(
{
name: 'RSA-OAEP'
},
publicKey,
encodedData
)
return encryptedData
}
export { importPublicKey, encryptData }
auth.ts
async function UserLogin(user: string, password: string, term: 'long' | 'short' = 'long') {
const payload = JSON.stringify({
password: password,
time: Date.now()
})
const publicKey = await importPublicKey(await getRSAPublicCert())
const credential = await encryptData(publicKey, payload)
// console.log(credential)
const hex = byteArrayToHex(new Uint8Array(credential))
console.log(`User ${user} login with ${term} term, with the credential ${hex}`)
const result = (await axios('/user/auth', {
method: 'POST',
data: {
id: user.toString(),
credential: hex,
mode: term
}
})) as Response<LoginResult>
}
Backend
import bcrypt
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from fastapi import HTTPException
import jwt
import json
import datetime
from database import db
from bson import ObjectId
from bcrypt import checkpw
class Auth:
password: str
time: int
def hash_password(password):
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt())
def check_password(password, hashed):
return bcrypt.checkpw(password.encode("utf-8"), hashed)
public_key = RSA.import_key(open("rsa_public_key.pem", "rb").read())
private_key = RSA.import_key(open("rsa_private_key.pem", "rb").read())
jwt_private_key = open("aes_key.txt", "r").read()
def rsa_encrypt(plaintext):
cipher = PKCS1_OAEP.new(public_key)
encrypt_text = cipher.encrypt(bytes(plaintext.encode("utf8")))
return encrypt_text.hex()
def rsa_decrypt(ciphertext):
cipher = PKCS1_OAEP.new(private_key)
decrypt_text = cipher.decrypt(bytes.fromhex(ciphertext))
return decrypt_text.decode("utf8")
def jwt_encode(id: str):
payload = {
"exp": datetime.datetime.utcnow() + datetime.timedelta(seconds=60),
"iat": datetime.datetime.utcnow(),
"sub": id,
"scope": "access_token",
"type": "long-term",
}
return jwt.encode(payload, jwt_private_key, algorithm="HS256")
def jwt_decode(token):
return jwt.decode(token, jwt_private_key, algorithms=["HS256"], verify=True)
def validate_by_cert(id: str, cert: str):
auth_field = json.loads(rsa_decrypt(cert))
time = auth_field["time"]
# in a minute
if time > datetime.datetime.now().timestamp() + 60:
raise HTTPException(status_code=401, detail="Token expired")
if checkpwd(id, auth_field["password"]):
raise HTTPException(status_code=401, detail="Password incorrect")
return jwt_encode(id)
def checkpwd(id: str, pwd: str):
user = db.zvms.users.find_one({"_id": ObjectId(id)})
if checkpw(bytes(pwd, 'utf-8'), user.get('password')):
return True
return False
and the api handler directly calls validate_by_cert method.
Key Generation
Directly use openssl.
openssl genrsa -out rsa_private_key.pem 1024
openssl rsa -in rsa_private_key.pem -pubout -out rsa_public_key.pem
openssl rand -hex 32 > aes_key.txt
Behavior
It returns error when calling this api:
File "/Users/*/**/zvms/routers/users_router.py", line 43, in auth_user
result = validate_by_cert(id, credential)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/*/**/zvms/util/cert.py", line 59, in validate_by_cert
auth_field = json.loads(rsa_decrypt(cert))
^^^^^^^^^^^^^^^^^
File "/Users/*/**/zvms/util/cert.py", line 39, in rsa_decrypt
decrypt_text = cipher.decrypt(bytes.fromhex(ciphertext))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/*/anaconda3/envs/zvms/lib/python3.12/site-packages/Crypto/Cipher/PKCS1_OAEP.py", line 191, in decrypt
raise ValueError("Incorrect decryption.")
However, when I call the native rsa_encrypt method and use it to decrypt, it works well.
The frontend and backend can transfer encrypted data successfully.
