diff --git a/api/apps/user_app.py b/api/apps/user_app.py index 3668efe75..95a6e2dc4 100644 --- a/api/apps/user_app.py +++ b/api/apps/user_app.py @@ -34,7 +34,6 @@ from api.db.services.user_service import TenantService, UserService, UserTenantS from api.utils import ( current_timestamp, datetime_format, - decrypt, download_img, get_format_time, get_uuid, @@ -46,6 +45,7 @@ from api.utils.api_utils import ( server_error_response, validate_request, ) +from api.utils.crypt import decrypt @manager.route("/login", methods=["POST", "GET"]) # noqa: F821 diff --git a/api/utils/__init__.py b/api/utils/__init__.py index 461340b63..22161b52f 100644 --- a/api/utils/__init__.py +++ b/api/utils/__init__.py @@ -28,8 +28,6 @@ import logging import copy from enum import Enum, IntEnum import importlib -from Cryptodome.PublicKey import RSA -from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 from filelock import FileLock from api.constants import SERVICE_CONF @@ -363,37 +361,6 @@ def elapsed2time(elapsed): return '%02d:%02d:%02d' % (hour, minuter, second) -def decrypt(line): - file_path = os.path.join( - file_utils.get_project_base_directory(), - "conf", - "private.pem") - rsa_key = RSA.importKey(open(file_path).read(), "Welcome") - cipher = Cipher_pkcs1_v1_5.new(rsa_key) - return cipher.decrypt(base64.b64decode( - line), "Fail to decrypt password!").decode('utf-8') - - -def decrypt2(crypt_text): - from base64 import b64decode, b16decode - from Crypto.Cipher import PKCS1_v1_5 as Cipher_PKCS1_v1_5 - from Crypto.PublicKey import RSA - decode_data = b64decode(crypt_text) - if len(decode_data) == 127: - hex_fixed = '00' + decode_data.hex() - decode_data = b16decode(hex_fixed.upper()) - - file_path = os.path.join( - file_utils.get_project_base_directory(), - "conf", - "private.pem") - pem = open(file_path).read() - rsa_key = RSA.importKey(pem, "Welcome") - cipher = Cipher_PKCS1_v1_5.new(rsa_key) - decrypt_text = cipher.decrypt(decode_data, None) - return (b64decode(decrypt_text)).decode() - - def download_img(url): if not url: return "" diff --git a/api/utils/crypt.py b/api/utils/crypt.py new file mode 100644 index 000000000..218231db0 --- /dev/null +++ b/api/utils/crypt.py @@ -0,0 +1,61 @@ +# +# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import base64 +import os +import sys +from Cryptodome.PublicKey import RSA +from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 +from api.utils import file_utils + + +def crypt(line): + file_path = os.path.join(file_utils.get_project_base_directory(), "conf", "public.pem") + rsa_key = RSA.importKey(open(file_path).read(), "Welcome") + cipher = Cipher_pkcs1_v1_5.new(rsa_key) + password_base64 = base64.b64encode(line.encode('utf-8')).decode("utf-8") + encrypted_password = cipher.encrypt(password_base64.encode()) + return base64.b64encode(encrypted_password).decode('utf-8') + + +def decrypt(line): + file_path = os.path.join(file_utils.get_project_base_directory(), "conf", "private.pem") + rsa_key = RSA.importKey(open(file_path).read(), "Welcome") + cipher = Cipher_pkcs1_v1_5.new(rsa_key) + return cipher.decrypt(base64.b64decode(line), "Fail to decrypt password!").decode('utf-8') + + +def decrypt2(crypt_text): + from base64 import b64decode, b16decode + from Crypto.Cipher import PKCS1_v1_5 as Cipher_PKCS1_v1_5 + from Crypto.PublicKey import RSA + decode_data = b64decode(crypt_text) + if len(decode_data) == 127: + hex_fixed = '00' + decode_data.hex() + decode_data = b16decode(hex_fixed.upper()) + + file_path = os.path.join(file_utils.get_project_base_directory(), "conf", "private.pem") + pem = open(file_path).read() + rsa_key = RSA.importKey(pem, "Welcome") + cipher = Cipher_PKCS1_v1_5.new(rsa_key) + decrypt_text = cipher.decrypt(decode_data, None) + return (b64decode(decrypt_text)).decode() + + +if __name__ == "__main__": + passwd = crypt(sys.argv[1]) + print(passwd) + print(decrypt(passwd)) diff --git a/api/utils/t_crypt.py b/api/utils/t_crypt.py deleted file mode 100644 index d0763c19f..000000000 --- a/api/utils/t_crypt.py +++ /dev/null @@ -1,40 +0,0 @@ -# -# Copyright 2025 The InfiniFlow Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import base64 -import os -import sys -from Cryptodome.PublicKey import RSA -from Cryptodome.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5 -from api.utils import decrypt, file_utils - - -def crypt(line): - file_path = os.path.join( - file_utils.get_project_base_directory(), - "conf", - "public.pem") - rsa_key = RSA.importKey(open(file_path).read(),"Welcome") - cipher = Cipher_pkcs1_v1_5.new(rsa_key) - password_base64 = base64.b64encode(line.encode('utf-8')).decode("utf-8") - encrypted_password = cipher.encrypt(password_base64.encode()) - return base64.b64encode(encrypted_password).decode('utf-8') - - -if __name__ == "__main__": - passwd = crypt(sys.argv[1]) - print(passwd) - print(decrypt(passwd))