Apriori 算法 Python 3.12 实战:从购物篮数据挖掘 5 条强关联规则

Apriori 算法 Python 3.12 实战:从购物篮数据挖掘 5 条强关联规则 Apriori 算法 Python 3.12 实战从购物篮数据挖掘 5 条强关联规则1. 关联规则挖掘的核心概念关联规则挖掘是数据挖掘领域的重要技术它通过分析事务数据中项目之间的共现关系发现潜在的关联模式。最经典的案例莫过于啤酒与尿布的故事——超市通过分析购物篮数据发现年轻父亲们经常同时购买这两种商品。在技术实现上我们需要理解三个核心指标支持度Support项集在所有事务中出现的频率。例如1000笔交易中有50笔同时包含A和B则{A,B}的支持度为5%。置信度Confidence规则A→B的置信度表示在包含A的事务中同时包含B的概率。计算方式为支持度(A∪B)/支持度(A)提升度Lift衡量规则中前后项的相关性计算公式为置信度(A→B)/支持度(B)。提升度1表示正相关。以下是一个简单的支持度计算示例# 示例数据集 dataset [ [牛奶, 面包, 尿布], [可乐, 面包, 尿布, 啤酒], [牛奶, 尿布, 啤酒], [面包, 牛奶, 尿布], [面包, 牛奶, 啤酒] ] # 计算{牛奶, 尿布}的支持度 count sum(1 for transaction in dataset if {牛奶, 尿布}.issubset(transaction)) support count / len(dataset) print(f支持度: {support:.2f}) # 输出: 0.602. Apriori 算法原理与实现Apriori算法基于先验原理如果一个项集是频繁的那么它的所有子集也一定是频繁的。这种向下闭包性质大大减少了需要考察的项集数量。2.1 算法实现步骤我们将其封装为一个完整的Python类from itertools import combinations from collections import defaultdict class Apriori: def __init__(self, min_support0.5, min_confidence0.7): self.min_support min_support self.min_confidence min_confidence self.frequent_itemsets [] self.rules [] def _get_frequent_1_itemsets(self, transactions): 生成频繁1-项集 item_counts defaultdict(int) for transaction in transactions: for item in transaction: item_counts[frozenset([item])] 1 return {item: count for item, count in item_counts.items() if count/len(transactions) self.min_support} def _generate_candidates(self, itemsets, k): 生成候选项集 return set([i.union(j) for i in itemsets for j in itemsets if len(i.union(j)) k]) def _prune_itemsets(self, candidates, prev_itemsets, k): 剪枝步骤 pruned set() for candidate in candidates: subsets combinations(candidate, k-1) if all(frozenset(subset) in prev_itemsets for subset in subsets): pruned.add(candidate) return pruned def fit(self, transactions): 发现所有频繁项集 # 第一轮扫描生成频繁1-项集 freq_1 self._get_frequent_1_itemsets(transactions) self.frequent_itemsets.append(freq_1) k 2 while True: # 生成候选k-项集 candidates self._generate_candidates( self.frequent_itemsets[k-2].keys(), k) # 剪枝 candidates self._prune_itemsets( candidates, self.frequent_itemsets[k-2].keys(), k) if not candidates: break # 计算支持度 item_counts defaultdict(int) for transaction in transactions: for candidate in candidates: if candidate.issubset(transaction): item_counts[candidate] 1 freq_k {item: count for item, count in item_counts.items() if count/len(transactions) self.min_support} if not freq_k: break self.frequent_itemsets.append(freq_k) k 1 def generate_rules(self): 生成关联规则 for i, level in enumerate(self.frequent_itemsets[1:], start1): for itemset in level.keys(): subsets [frozenset([item]) for item in itemset] if i 1: self._rules_from_conseq(itemset, subsets) else: self._calc_confidence(itemset, subsets) def _calc_confidence(self, itemset, subsets): 计算规则置信度 for consequent in subsets: antecedent itemset - consequent support_itemset self._get_support(itemset) support_antecedent self._get_support(antecedent) confidence support_itemset / support_antecedent if confidence self.min_confidence: self.rules.append((antecedent, consequent, support_itemset, confidence)) def _rules_from_conseq(self, itemset, subsets): 从频繁项集生成规则 m len(subsets[0]) if len(itemset) m 1: new_subsets self._generate_candidates(subsets, m 1) new_subsets self._prune_itemsets( new_subsets, subsets, m 1) self._calc_confidence(itemset, new_subsets) self._rules_from_conseq(itemset, new_subsets) def _get_support(self, itemset): 获取项集支持度 for level in self.frequent_itemsets: if itemset in level: return level[itemset] / sum(level.values()) return 0.02.2 算法优化技巧原始Apriori算法存在多次扫描数据库的问题我们可以通过以下方式优化事务压缩减少不包含任何频繁k-项集的事务基于划分的方法将数据库分成多个分区分别挖掘采样方法对数据子集进行挖掘然后验证以下是一个优化后的候选生成函数def _generate_candidates_optimized(self, prev_itemsets, k): 优化后的候选项集生成 candidates set() items list(prev_itemsets.keys()) for i in range(len(items)): for j in range(i1, len(items)): # 只有前k-2项相同才合并 union_set items[i].union(items[j]) if len(union_set) k: candidates.add(union_set) return candidates3. 实战超市购物篮分析3.1 数据准备我们使用一个模拟的超市购物数据集# 模拟购物篮数据 transactions [ [牛奶, 面包, 尿布], [可乐, 面包, 尿布, 啤酒], [牛奶, 尿布, 啤酒, 饼干], [面包, 牛奶, 尿布, 啤酒], [面包, 牛奶, 啤酒, 薯片] ] # 初始化Apriori算法 apriori Apriori(min_support0.4, min_confidence0.6) apriori.fit(transactions) apriori.generate_rules()3.2 结果分析与可视化我们可以将挖掘结果以表格形式展示前项后项支持度置信度提升度{牛奶}{尿布}0.61.01.25{面包}{尿布}0.60.750.94{尿布}{牛奶}0.60.751.25{啤酒}{尿布}0.60.750.94{牛奶}{啤酒}0.40.671.11使用Python可视化库展示频繁项集import matplotlib.pyplot as plt import pandas as pd # 准备数据 data [] for level in apriori.frequent_itemsets: for itemset, count in level.items(): data.append({ itemset: , .join(itemset), support: count/len(transactions), size: len(itemset) }) df pd.DataFrame(data) # 绘制支持度分布 plt.figure(figsize(10, 6)) plt.barh(df[itemset], df[support]) plt.xlabel(Support) plt.title(Frequent Itemsets Support Distribution) plt.show()4. 性能优化与工程实践4.1 大数据集处理策略当处理大规模数据集时可以考虑以下方法分块处理将数据分成多个块分别处理后再合并结果并行计算使用多进程或分布式计算框架内存优化使用更高效的数据结构存储候选项集以下是一个使用生成器处理大型数据集的示例def chunked_dataset(dataset, chunk_size1000): 分块加载大型数据集 for i in range(0, len(dataset), chunk_size): yield dataset[i:i chunk_size] # 使用示例 for chunk in chunked_dataset(large_transactions): apriori.process_chunk(chunk) # 需要实现process_chunk方法4.2 实际应用建议在实际项目中应用Apriori算法时数据预处理清洗数据处理缺失值统一商品名称参数调优通过网格搜索找到最佳的最小支持度和置信度结果验证使用业务知识验证发现的规则是否合理以下是一个参数调优的示例from itertools import product def parameter_tuning(dataset): 参数网格搜索 supports [0.1, 0.2, 0.3] confidences [0.5, 0.6, 0.7] best_params None best_rules_count 0 for s, c in product(supports, confidences): apriori Apriori(min_supports, min_confidencec) apriori.fit(dataset) apriori.generate_rules() if len(apriori.rules) best_rules_count: best_rules_count len(apriori.rules) best_params (s, c) return best_params5. 进阶应用与扩展5.1 多维度关联规则传统关联规则只考虑商品共现我们可以扩展考虑更多维度# 多维数据示例 multi_dim_data [ {products: [牛奶, 面包], time: 早晨, customer: 青年}, {products: [啤酒, 尿布], time: 晚上, customer: 中年}, # ...更多数据 ] # 可以挖掘类似规则时间早晨 → 产品牛奶5.2 实时关联规则更新对于流式数据我们可以实现增量式更新算法class StreamingApriori(Apriori): def update(self, new_transactions): 增量更新频繁项集 for transaction in new_transactions: # 更新1-项集计数 for item in transaction: item_set frozenset([item]) if item_set in self.frequent_itemsets[0]: self.frequent_itemsets[0][item_set] 1 # 更新k-项集计数(k1) for k in range(1, len(self.frequent_itemsets)): for itemset in self.frequent_itemsets[k]: if itemset.issubset(transaction): self.frequent_itemsets[k][itemset] 1 # 重新计算支持度并剪枝 self._prune_infrequent_itemsets()5.3 与其他算法结合Apriori算法可以与聚类算法结合发现不同客户群体的购物模式from sklearn.cluster import KMeans def cluster_transactions(transactions, n_clusters3): 对交易进行聚类 # 将交易转换为特征向量 all_items set().union(*transactions) features [ [1 if item in t else 0 for item in all_items] for t in transactions ] # 聚类 kmeans KMeans(n_clustersn_clusters) clusters kmeans.fit_predict(features) return clusters # 对每个聚类分别应用Apriori算法 clusters cluster_transactions(transactions) for cluster_id in set(clusters): cluster_data [t for t, c in zip(transactions, clusters) if c cluster_id] apriori Apriori() apriori.fit(cluster_data) print(fCluster {cluster_id} rules:) apriori.generate_rules() print(apriori.rules[:5])