基于PyMySQL实现应用层字段加密:保护敏感数据的Python实战方案

基于PyMySQL实现应用层字段加密:保护敏感数据的Python实战方案 1. 项目概述为什么我们需要在应用层做字段加密最近在做一个涉及用户敏感信息的项目比如身份证号、手机号、家庭住址这些数据最终要存到MySQL里。甲方爸爸和合规部门的要求很明确这些敏感字段在数据库里不能是明文。一开始我理所当然地想到了MySQL自带的加密函数比如AES_ENCRYPT直接在SQL里搞定感觉挺省事。但实际踩坑后发现这条路问题不少。首先密钥管理麻烦得在数据库配置或SQL里写死权限控制不好做其次一旦数据导出来或者有数据库备份密文和密钥可能一起泄露风险并没降低最重要的是如果运维同学或者有数据库直接访问权限的人想查他们还是能看到明文这达不到“库中无明文”的真正要求。所以我们把目光投向了应用层加密也就是在数据进入数据库之前在Python代码里就完成加密。PyMySQL作为Python连接MySQL的主流驱动之一自然成了我们的操作入口。这个方案的核心思想是“端到端加密”数据在离开应用的那一刻就已经是密文数据库只是作为一个“盲存储”的容器即使数据库被拖库或者DBA直接查询没有应用层的密钥也无法解密。这才能真正满足对敏感数据的保护需求。今天要聊的就是基于PyMySQL如何实现一套稳健、易用的字段级加密方案而不是简单调用几个加密函数。2. 整体设计与核心思路拆解2.1 方案选型应用层加密 vs 数据库层加密为什么坚定地选择应用层加密这背后是一系列权衡。数据库层加密如TDE、列加密对应用透明但通常保护的是“静止数据”即存储在磁盘上的数据文件。对于拥有数据库权限的用户数据在内存中或查询结果里依然是明文。而字段级加密尤其是在应用层实现可以做到“使用中数据”也是密文。我们的目标是让敏感数据在数据库的整个生命周期存储、传输、备份中都以密文形式存在只有经过授权的应用服务在内存中处理时才会短暂解密。基于PyMySQL我们有几种实现路径SQL拼接法在构造INSERT或UPDATE语句时手动调用Python的加密库如cryptography对值进行加密然后将密文的字节串或Base64编码后的字符串拼接到SQL中。这种方法最直接但代码侵入性强容易遗漏且加解密逻辑散落在各处难以维护。ORM拦截法如果使用SQLAlchemy等ORM框架可以利用其事件监听机制如before_flush,before_update在数据提交前自动加密查询后自动解密。这是比较优雅的方式但对项目技术栈有要求。驱动层包装法在PyMySQL的游标Cursor层面进行拦截。通过继承或包装PyMySQL的Cursor类覆写execute和fetch*系列方法在SQL执行前对参数中的特定字段进行加密在获取结果后对特定列进行解密。这种方法对业务代码几乎零侵入只需要更换一下连接或游标的使用方式是本次重点讨论的方案。我们选择了驱动层包装法。它的优势在于将加解密逻辑集中在一个地方与业务逻辑解耦无论是使用原生SQL还是简单ORM如自己封装的DB类都能无缝接入。关键在于如何精准地识别哪些字段需要加解密。2.2 核心组件与加密策略定义一个完整的字段级加密方案需要明确以下几个核心部分加密算法与模式选择算法AES高级加密标准是目前对称加密的主流选择兼顾安全与性能。密钥长度使用AES-256-GCM或AES-256-CBC。AES-256提供更强的安全性。GCM模式是认证加密模式能同时提供保密性和完整性校验且支持关联数据AAD更为现代和安全。CBC模式则需要单独处理填充和初始化向量IV并需结合HMAC来保证完整性稍显繁琐但兼容性极广。初始化向量IV为了确保同样的明文每次加密后产生不同的密文必须使用随机且唯一的IV。对于CBC模式IV需要随密文一起存储对于GCM模式除了IV在GCM中常称为nonce还有一个认证标签Tag也需要存储。密钥管理这是安全的核心。绝对禁止将密钥硬编码在代码或配置文件中。推荐使用环境变量、或专业的密钥管理服务KMS来获取加密主密钥。应用启动时从安全的位置读取。在实际操作中我们可以使用一个固定的主密钥或者通过KMS生成一个数据加密密钥DEK再用主密钥加密DEK形成加密的DEK即EDEK存储。每次加密时使用DEK。这里为了简化我们先演示使用一个从环境变量获取的固定密钥。字段识别规则我们需要一种方式来告诉包装器哪张表的哪个字段需要加密。可以通过配置字典来实现例如{‘users’: [‘id_card’, ‘mobile’, ‘address’], ‘bank_cards’: [‘card_number’, ‘cvv’]}。在游标中解析SQL虽然复杂但可以做到精准匹配。一个更实用的简化方案是我们约定所有需要加密的字段在传入参数时其值是一个特殊的标记对象如一个字典{‘__encrypted__’: True, ‘value’: raw_value}或者在参数名上做约定如以_enc后缀结尾。在包装器里我们检测到这个标记就对raw_value进行加密并用密文替换原值。查询解密时则根据结果集的元数据列名判断是否需要解密。为了平衡安全性与实现的复杂度我们下面的实操将采用AES-256-GCM算法密钥从环境变量获取并通过一个字段映射配置来识别需要加解密的表字段。同时我们会将IV和认证标签与密文一起组合成一个字符串存储到数据库的TEXT或BLOB字段中。3. 核心工具准备与加密模块实现3.1 环境与依赖安装首先确保你的Python环境建议3.7并安装必要的库。除了PyMySQL我们主要依赖cryptography这个强大的密码学库。pip install pymysql cryptographycryptography库提供了工业级的、安全的密码学原语实现比Python自带的hashlib和早期的Crypto库更受推荐。3.2 实现安全的加解密工具类我们先创建一个独立的模块如crypto_util.py实现核心的加密和解密功能。这里采用AES-256-GCM模式。# crypto_util.py import os import base64 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.primitives import padding from cryptography.hazmat.backends import default_backend from typing import Union class FieldCrypto: 字段级加密工具类使用 AES-256-GCM 模式。 密文格式base64(iv ciphertext tag)。IV长度为12字节Tag长度为16字节。 def __init__(self, key: Union[str, bytes]): 初始化加密器。 :param key: 加密密钥。如果是字符串将使用UTF-8编码并确保长度为32字节256位。 强烈建议从环境变量等安全位置获取。 if isinstance(key, str): key key.encode(utf-8) # 确保密钥长度为32字节 (AES-256) if len(key) ! 32: # 如果长度不对可以使用HKDF或SHA256派生但这里简单抛错提示 raise ValueError(f密钥必须为32字节长度当前为{len(key)}字节。请检查密钥来源。) self.key key self.iv_length 12 # GCM推荐的非重复随机数长度 self.tag_length 16 # GCM认证标签长度 def encrypt(self, plaintext: str) - str: 加密明文字符串返回Base64编码的完整密文包含IV和Tag。 if plaintext is None: return None # 生成随机IV iv os.urandom(self.iv_length) # 构建加密器 encryptor Cipher( algorithms.AES(self.key), modes.GCM(iv), backenddefault_backend() ).encryptor() # 加密并生成Tag plaintext_bytes plaintext.encode(utf-8) ciphertext encryptor.update(plaintext_bytes) encryptor.finalize() tag encryptor.tag # 组合 IV Ciphertext Tag combined iv ciphertext tag # 返回Base64字符串便于存储到文本字段 return base64.b64encode(combined).decode(utf-8) def decrypt(self, encrypted_b64: str) - str: 解密Base64编码的密文包返回明文字符串。 if encrypted_b64 is None: return None try: combined base64.b64decode(encrypted_b64) except Exception: # 如果无法解码可能不是加密字段直接返回原值或根据策略处理 # 这里为了安全我们选择抛出一个明确的异常 raise ValueError(提供的字符串不是有效的Base64编码密文格式) # 拆分 IV, Ciphertext, Tag iv combined[:self.iv_length] tag combined[-self.tag_length:] ciphertext combined[self.iv_length:-self.tag_length] # 构建解密器 decryptor Cipher( algorithms.AES(self.key), modes.GCM(iv, tag), backenddefault_backend() ).decryptor() # 解密 plaintext_bytes decryptor.update(ciphertext) decryptor.finalize() return plaintext_bytes.decode(utf-8) # 示例从环境变量获取密钥生产环境务必如此 import os SECRET_KEY os.environ.get(DB_FIELD_ENCRYPTION_KEY) if not SECRET_KEY: # 仅为演示生产环境必须设置环境变量 SECRET_KEY this-is-a-32-byte-long-secret-key-here! # 32字节 crypto_util FieldCrypto(SECRET_KEY) # 简单测试 if __name__ __main__: test_text 130123456781234567 encrypted crypto_util.encrypt(test_text) print(f密文: {encrypted}) decrypted crypto_util.decrypt(encrypted) print(f解密后: {decrypted}) assert test_text decrypted注意上面的SECRET_KEY在示例中是硬编码的这只是为了演示。在实际生产环境中你必须通过环境变量、或从安全的密钥管理服务中动态获取密钥。密钥的泄露意味着所有加密数据都可能被解密。3.3 定义字段映射配置我们需要一个配置来告知系统哪些字段需要被处理。创建一个配置模块如config.py或直接在主逻辑中定义。# encryption_config.py ENCRYPTION_CONFIG { # ‘表名’: [‘字段名1‘ ’字段名2‘, ...] users: [id_card, mobile, email, real_name], orders: [recipient_phone, recipient_address], medical_records: [patient_id_number, diagnosis_details] }这个配置将用于指导我们的PyMySQL游标包装器在执行SQL时识别和转换目标字段。4. 实现PyMySQL游标包装器这是整个方案的核心。我们将创建一个自定义的游标类继承自pymysql.cursors.Cursor并覆写关键方法。4.1 包装器类结构设计我们的EncryptedCursor需要做两件事在执行execute前分析传入的参数可能是字典或元组根据ENCRYPTION_CONFIG和当前执行的SQL通过解析表名对参数中对应字段的值进行加密。在fetchone或fetchall后根据返回结果集的列名判断哪些列是加密字段并对这些列的值进行解密。难点在于准确地将参数与SQL中的占位符匹配并识别出表名。一个相对稳健的简化策略是我们假设使用命名占位符%(name)s并且参数是字典形式。这样我们可以直接通过字典键名即字段名来判断是否需要加密。# encrypted_cursor.py import pymysql.cursors import re from crypto_util import crypto_util # 导入之前实现的加密工具 from encryption_config import ENCRYPTION_CONFIG # 导入加密配置 class EncryptedCursor(pymysql.cursors.Cursor): 支持字段级加密的PyMySQL游标包装器。 假设SQL使用命名占位符%(name)s且参数为字典。 def _get_table_name_from_sql(self, sql: str) - str: 粗糙地从SQL语句中提取表名。 仅适用于简单的 INSERT INTO table_name, UPDATE table_name, SELECT ... FROM table_name 语句。 生产环境可能需要更复杂的SQL解析器。 sql_upper sql.strip().upper() # 移除注释等简单处理 sql_upper re.sub(r--.*$, , sql_upper, flagsre.MULTILINE) sql_upper re.sub(r/\*.*?\*/, , sql_upper, flagsre.DOTALL) table_name None # 匹配 INSERT INTO table / INSERT INTO table insert_match re.search(rINSERT\sINTO\s[]?(\w)[]?, sql_upper, re.IGNORECASE) if insert_match: table_name insert_match.group(1).lower() # 匹配 UPDATE table / UPDATE table update_match re.search(rUPDATE\s[]?(\w)[]?, sql_upper, re.IGNORECASE) if update_match: table_name update_match.group(1).lower() # 匹配 SELECT ... FROM table / FROM table # 这里非常简陋仅处理单表简单查询 from_match re.search(rFROM\s[]?(\w)[]?, sql_upper, re.IGNORECASE) if from_match and not table_name: table_name from_match.group(1).lower() return table_name def _encrypt_parameters(self, sql: str, params: dict) - dict: 根据SQL和配置加密参数字典中需要加密的字段值。 if not params or not isinstance(params, dict): return params table_name self._get_table_name_from_sql(sql) if not table_name or table_name not in ENCRYPTION_CONFIG: # 如果没提取到表名或该表不在加密配置中直接返回原参数 return params encrypted_fields ENCRYPTION_CONFIG[table_name] encrypted_params params.copy() for field in encrypted_fields: if field in encrypted_params and encrypted_params[field] is not None: raw_value encrypted_params[field] # 只加密字符串类型如果是其他类型需要先转字符串根据业务决定 if isinstance(raw_value, str): encrypted_params[field] crypto_util.encrypt(raw_value) # 也可以处理数字类型但需统一转换为字符串格式 # elif isinstance(raw_value, (int, float)): # encrypted_params[field] crypto_util.encrypt(str(raw_value)) else: # 非字符串类型可以选择不加密、记录日志或抛异常 # 这里我们选择原样保留但记录警告实际项目应用日志库 print(fWarning: Field {field} in table {table_name} has non-string value, skipped encryption.) return encrypted_params def _decrypt_result_row(self, row: tuple, description: tuple) - tuple: 解密查询结果的一行数据。 :param row: 原始行数据元组 :param description: cursor.description包含列信息 :return: 解密后的行数据元组 if not row or not description: return row # 获取当前查询涉及的表名这里需要额外处理因为游标可能不知道表名 # 一个变通方法在_execute方法中将表名暂存到游标属性中但多表查询会复杂化。 # 更实用的方法根据列名判断。我们约定加密字段的列名在配置中唯一或者携带特殊前缀/后缀。 # 这里采用一个简化方案假设配置中的字段名在所有表中是唯一的或者我们通过上下文知道表名。 # 由于这是一个复杂点我们在下一个版本优化。此处先实现一个基础版如果列名在全局加密字段集合中则尝试解密。 all_encrypted_fields set() for fields in ENCRYPTION_CONFIG.values(): all_encrypted_fields.update(fields) decrypted_values list(row) for idx, col_desc in enumerate(description): col_name col_desc[0] # 列名 if col_name in all_encrypted_fields and decrypted_values[idx] is not None: # 尝试解密 try: decrypted_values[idx] crypto_util.decrypt(decrypted_values[idx]) except (ValueError, Exception) as e: # 解密失败可能该列不是加密数据或数据已损坏。保留原值并记录错误。 print(fWarning: Failed to decrypt column {col_name}: {e}) # 生产环境应使用日志记录器 return tuple(decrypted_values) def execute(self, query, argsNone): 重写execute方法在执行前加密参数。 encrypted_args args if isinstance(args, dict): # 只有参数是字典时才进行加密处理 encrypted_args self._encrypt_parameters(query, args) # 调用父类方法执行SQL return super().execute(query, encrypted_args) def executemany(self, query, args): 重写executemany方法用于批量插入/更新。 if args and isinstance(args[0], dict): encrypted_args [] for arg_set in args: encrypted_args.append(self._encrypt_parameters(query, arg_set)) args encrypted_args return super().executemany(query, args) def fetchone(self): 重写fetchone获取数据后解密。 row super().fetchone() if row: return self._decrypt_result_row(row, self.description) return row def fetchall(self): 重写fetchall获取所有数据后逐行解密。 rows super().fetchall() if rows: return [self._decrypt_result_row(row, self.description) for row in rows] return rows def fetchmany(self, sizeNone): 重写fetchmany。 rows super().fetchmany(size) if rows: return [self._decrypt_result_row(row, self.description) for row in rows] return rows4.2 创建加密连接类为了方便使用我们还可以创建一个返回加密游标的连接类。# encrypted_connection.py import pymysql from encrypted_cursor import EncryptedCursor class EncryptedConnection(pymysql.Connection): 使用EncryptedCursor作为默认游标的连接类。 def cursor(self, cursorNone): 重写cursor方法默认返回我们的EncryptedCursor。 if cursor: return super().cursor(cursor) return super().cursor(EncryptedCursor) # 使用示例 def get_encrypted_connection(): conn EncryptedConnection( hostlocalhost, useryour_username, passwordyour_password, databaseyour_database, charsetutf8mb4, cursorclassEncryptedCursor # 这里指定游标类或在EncryptedConnection中已重写 ) return conn5. 完整实操流程与测试现在我们将上述模块组合起来进行一个从建表到增删改查的全流程测试。5.1 数据库表结构准备假设我们有一张users表其中包含需要加密的字段。CREATE TABLE users ( id INT PRIMARY KEY AUTO_INCREMENT, username VARCHAR(50) NOT NULL UNIQUE, id_card TEXT COMMENT 加密存储的身份证号, mobile TEXT COMMENT 加密存储的手机号, email TEXT COMMENT 加密存储的邮箱, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) ENGINEInnoDB DEFAULT CHARSETutf8mb4;注意加密后的数据是二进制经过Base64编码的字符串长度会显著增加大约为原明文长度的4/3倍再加上IV和Tag的固定开销因此字段类型需要设置为TEXT或BLOBVARCHAR可能不够用。5.2 应用层代码集成与测试编写一个测试脚本演示如何使用我们的加密连接和游标。# test_encrypted_db.py import sys import os # 将项目目录加入路径确保能导入自定义模块 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from encrypted_connection import get_encrypted_connection from crypto_util import crypto_util def test_encryption_flow(): # 1. 获取加密连接 conn get_encrypted_connection() try: with conn.cursor() as cursor: # 2. 插入加密数据 insert_sql INSERT INTO users (username, id_card, mobile, email) VALUES (%(username)s, %(id_card)s, %(mobile)s, %(email)s) user_data { username: zhangsan, id_card: 110101199001011234, # 明文 mobile: 13800138000, # 明文 email: zhangsanexample.com # 明文 } # 注意cursor.execute内部会自动加密id_card, mobile, email字段 cursor.execute(insert_sql, user_data) user_id conn.insert_id() # 获取自增ID print(f插入用户成功ID: {user_id}) conn.commit() # 3. 查询数据应自动解密 select_sql SELECT id, username, id_card, mobile, email FROM users WHERE id %s cursor.execute(select_sql, (user_id,)) result cursor.fetchone() print(查询结果已自动解密:) print(f ID: {result[0]}) print(f Username: {result[1]}) print(f ID Card: {result[2]}) # 这里应该是明文 print(f Mobile: {result[3]}) # 这里应该是明文 print(f Email: {result[4]}) # 这里应该是明文 # 4. 验证数据库中的存储确实是密文使用普通游标查询 with conn.cursor(pymysql.cursors.Cursor) as raw_cursor: raw_cursor.execute(SELECT id_card, mobile FROM users WHERE id %s, (user_id,)) raw_result raw_cursor.fetchone() print(\n数据库原始存储内容密文:) print(f ID Card (密文): {raw_result[0]}) print(f Mobile (密文): {raw_result[1]}) # 尝试用工具解密验证一致性 decrypted_id_card crypto_util.decrypt(raw_result[0]) print(f 工具解密ID Card: {decrypted_id_card}) assert decrypted_id_card user_data[id_card] # 5. 更新加密字段 update_sql UPDATE users SET mobile %(mobile)s WHERE id %(id)s update_data {mobile: 13900139000, id: user_id} cursor.execute(update_sql, update_data) conn.commit() print(f\n已更新用户 {user_id} 的手机号。) # 再次查询确认更新和解密 cursor.execute(select_sql, (user_id,)) updated_result cursor.fetchone() print(f更新后手机号: {updated_result[3]}) finally: conn.close() if __name__ __main__: # 确保环境变量已设置或修改crypto_util中的密钥 # os.environ[DB_FIELD_ENCRYPTION_KEY] your-32-byte-secret-key-here!!! test_encryption_flow()运行这个脚本你会看到数据插入时id_card,mobile,email在代码中是明文但通过EncryptedCursor执行后存入数据库的是Base64编码的密文。通过同一个EncryptedCursor查询时取出的数据自动被解密为明文。用普通游标查询可以看到数据库中存储的确实是无法直接识别的密文。更新操作同样会自动加密新值。6. 高级议题、常见问题与排查技巧6.1 模糊查询与索引失效问题这是字段级加密最大的挑战之一。一旦数据被加密数据库的LIKE查询、范围查询BETWEEN,,以及基于该字段的索引都将完全失效因为数据库操作的是无意义的密文。解决方案取舍与业务设计首先评估该字段是否真的需要模糊查询。例如手机号、身份证号通常用于精确匹配加密不影响查询。如果需要模糊搜索考虑业务上是否可以用其他非敏感字段替代如用户昵称。保留明文哈希对于需要模糊查询的字段如姓名可以额外存储一个单向哈希值如SHA256或盲索引。查询时对搜索词同样计算哈希然后在哈希列上进行精确匹配。但这只能用于等值查询不能用于LIKE ‘%张%’这种部分匹配。确定性加密使用确定性加密算法如AES-SIV或在固定IV下的ECB模式相同的明文总是产生相同的密文。这样可以在密文上做等值查询和建立索引。但这种方法会泄露明文模式安全性降低需谨慎评估。应用层过滤如果数据量不大可以将所有数据取回应用层解密后在内存中过滤。这显然不适用于大数据集。实操心得在项目初期就必须和产品、合规部门明确哪些字段需要加密以及这些字段的查询需求。通常身份证、银行卡号等用于核验的字段只做精确匹配加密不影响。而像“诊断详情”这类长文本本身就不适合数据库索引和模糊查询加密存储是合理的。6.2 密钥轮换与数据重加密密钥不能永远不变。出于安全最佳实践需要定期轮换加密密钥。这意味着需要用新密钥重新加密数据库中所有已有的密文数据。操作步骤生成新密钥安全地生成一个新的加密密钥KEY_NEW。创建新加密工具实例使用KEY_NEW创建一个新的FieldCrypto实例。分批读取-解密-再加密使用旧密钥KEY_OLD的解密功能读取数据。使用KEY_NEW的加密功能重新加密数据。更新数据库记录。更新应用配置在所有应用实例中将使用的密钥从KEY_OLD切换到KEY_NEW。安全废弃旧密钥确认所有数据重加密完成且应用运行稳定后安全地销毁KEY_OLD。这个过程需要在维护窗口进行并确保数据一致性。务必先备份数据6.3 性能考量与优化应用层加密解密会带来额外的CPU开销。影响主要影响批量插入/更新和数据量大的查询。单条操作的开销通常可忽略。优化对于批量操作确保使用executemany我们的包装器已支持。考虑使用更快的加密库实现如pyca/cryptography本身已高度优化。对于绝对性能敏感且数据不敏感的场景可以评估是否真的需要加密。6.4 常见问题排查表问题现象可能原因排查步骤与解决方案插入数据失败报错“Data too long for column”加密后数据长度超过字段定义如VARCHAR(255)1. 检查数据库表结构将加密字段改为TEXT或更大的VARCHAR。2. 计算加密后长度Base64(明文 IV Tag)。查询时解密失败抛出ValueError或解密结果乱码1. 存储的密文被意外修改或截断。2. 加解密使用的密钥不一致。3. 密文格式不符例如存储时未包含IV和Tag。1. 检查数据库该字段的完整值确认是完整的Base64字符串。2.核对环境变量确保所有应用实例使用的密钥完全相同。3. 确认加密和解密函数使用的是同一种数据组合格式我们用的是IVCiphertextTag。加密字段的WHERE条件查询不返回结果1. 在WHERE子句中使用了明文进行条件过滤。2. 使用了LIKE等模糊查询操作符。1. 确保在WHERE条件中使用的值也是通过加密工具加密后的值。例如WHERE id_card %(encrypted_id_card)s 其中encrypted_id_card是代码中加密后的值。2. 避免对加密字段使用LIKE如需查询参考6.1节的解决方案。部分字段没有自动加密1. 字段名拼写与配置不一致大小写、下划线。2. SQL表名提取失败导致未匹配到加密配置。3. 参数不是字典类型。1. 检查ENCRYPTION_CONFIG中的表名和字段名是否与数据库和SQL语句中完全一致。2. 在_get_table_name_from_sql方法中添加调试日志打印提取到的表名。3. 确认execute调用时传入的args是字典使用命名占位符%(name)s。使用fetch*方法后数据仍是密文1. 列名未匹配到加密配置。2. 使用了非EncryptedCursor游标如DictCursor。1. 检查cursor.description中的列名确认是否在all_encrypted_fields集合中。2. 确保连接使用的是EncryptedConnection或创建游标时显式指定cursorclassEncryptedCursor。如果使用DictCursor需要额外创建一个EncryptedDictCursor继承并重写相应方法。6.5 实现一个EncryptedDictCursor很多项目喜欢使用DictCursor来获取字典形式的结果。我们可以类似地实现一个加密版本。# encrypted_dict_cursor.py import pymysql.cursors from encrypted_cursor import EncryptedCursor # 继承我们已有的EncryptedCursor class EncryptedDictCursor(EncryptedCursor, pymysql.cursors.DictCursorMixin): 结合字段加密和字典返回格式的游标。 继承顺序很重要确保方法解析顺序正确。 def __init__(self, connection): super().__init__(connection) # DictCursorMixin需要的初始化 self._query None self._rows None self._lastrowid None # fetch* 方法在EncryptedCursor中已重写并解密DictCursorMixin会处理字典化。 # 我们只需要确保解密发生在字典化之前。由于Python的多继承方法解析顺序MRO # 调用super().fetchone()会先走到EncryptedCursor的fetchone完成解密。 # 因此这里不需要再重写fetch方法。然后在EncryptedConnection中也支持返回这个游标。这套方案实施下来业务代码几乎无需改动只需要换一个连接类并在配置文件中声明哪些字段需要加密就能实现数据库字段级的透明加密与解密。它把安全性集中在应用层密钥管理更灵活也真正做到了“数据库看不到明文”是满足许多数据安全合规要求的有效实践。当然每个项目都需要根据自身的具体需求在安全性、性能和查询灵活性之间找到合适的平衡点。