从零实现Apriori算法深入理解关联规则挖掘的核心逻辑在数据挖掘领域我们经常需要从海量数据中发现有价值的模式。想象一下超市的购物篮数据——当顾客把啤酒和尿布放在同一个购物篮中时这背后隐藏着什么商业价值这就是关联规则挖掘要解决的问题。本文将带你用Python从零实现经典的Apriori算法不仅理解其工作原理还能亲手构建这个强大的数据挖掘工具。1. 关联规则挖掘基础概念关联规则挖掘的核心目标是发现数据集中项与项之间的有趣关系。以超市购物为例我们可能发现购买啤酒的顾客有65%也会购买薯片这样的规律。要量化这些关系我们需要几个关键指标支持度(Support): 项集在所有事务中出现的频率。计算式为support(X) (包含X的事务数) / (总事务数)置信度(Confidence): 在包含X的事务中同时包含Y的条件概率。计算式为confidence(X→Y) support(X∪Y) / support(X)提升度(Lift): X的出现对Y出现概率的影响程度。计算式为lift(X→Y) confidence(X→Y) / support(Y)举个例子假设我们有以下简单的交易数据集交易ID商品1牛奶, 面包, 鸡蛋2牛奶, 尿布, 啤酒3面包, 尿布, 可乐4牛奶, 面包, 尿布, 啤酒5面包, 可乐计算牛奶→面包的规则指标支持度同时包含牛奶和面包的交易有ID 1和4共2个总交易数5support 2/5 0.4置信度包含牛奶的交易有ID 1,2,4共3个其中同时包含面包的有2个confidence 2/3 ≈ 0.67提升度面包的单独支持度是4/50.8lift 0.67/0.8 ≈ 0.842. Apriori算法原理剖析Apriori算法是关联规则挖掘中最经典的算法之一其核心思想基于一个简单但强大的先验性质如果一个项集是频繁的那么它的所有子集也一定是频繁的反之如果一个项集是非频繁的那么它的所有超集也一定是非频繁的。算法主要分为两个步骤连接步(Join Step): 通过将频繁(k-1)-项集自连接产生候选k-项集剪枝步(Prune Step): 删除那些包含非频繁子集的候选让我们用一个具体例子说明这个过程。假设有以下事务数据库T1: {1,2,3} T2: {2,3,4} T3: {1,3,4} T4: {2,3} T5: {1,2,4}设置最小支持度为0.4即至少出现2次第一次迭代(1-项集):扫描所有事务统计每个项的出现次数候选{1}:3, {2}:4, {3}:4, {4}:3全部满足支持度要求第二次迭代(2-项集):连接将频繁1-项集两两组合候选{1,2}, {1,3}, {1,4}, {2,3}, {2,4}, {3,4}扫描数据库计算支持度{1,2}:2, {1,3}:2, {1,4}:2, {2,3}:3, {2,4}:2, {3,4}:2全部满足支持度要求第三次迭代(3-项集):连接将频繁2-项集组合要求前k-2项相同{1,2} {1,3} → {1,2,3}{1,2} {1,4} → {1,2,4}{1,3} {1,4} → {1,3,4}{2,3} {2,4} → {2,3,4}剪枝检查所有子集是否频繁{1,2,3}的子集{1,2}, {1,3}, {2,3}都是频繁的{1,2,4}的子集{1,2}, {1,4}, {2,4}都是频繁的同理检查其他候选扫描数据库计算支持度{1,2,3}:1, {1,2,4}:1, {1,3,4}:1, {2,3,4}:1都不满足最小支持度算法终止最终得到的频繁项集是所有2-项集。3. Python实现Apriori算法现在让我们用Python从零实现这个算法。我们将分几个关键步骤构建完整的解决方案。3.1 数据准备与辅助函数首先我们需要准备数据和一些辅助函数def load_dataset(): 创建示例数据集 return [ [牛奶, 面包, 鸡蛋], [牛奶, 尿布, 啤酒], [面包, 尿布, 可乐], [牛奶, 面包, 尿布, 啤酒], [面包, 可乐] ] def create_c1(dataset): 创建初始候选1-项集 c1 [] for transaction in dataset: for item in transaction: if [item] not in c1: c1.append([item]) c1.sort() return list(map(frozenset, c1)) # 使用frozenset作为可哈希类型 def scan_dataset(dataset, candidates, min_support): 扫描数据集计算候选项集的支持度 item_count {} for transaction in dataset: for candidate in candidates: if candidate.issubset(transaction): item_count[candidate] item_count.get(candidate, 0) 1 num_transactions float(len(dataset)) frequent_items [] support_data {} for item in item_count: support item_count[item] / num_transactions if support min_support: frequent_items.append(item) support_data[item] support return frequent_items, support_data3.2 核心算法实现接下来是实现Apriori算法的核心部分def apriori_gen(frequent_items_k, k): 根据频繁(k-1)-项集生成候选k-项集 candidates [] len_frequent len(frequent_items_k) for i in range(len_frequent): for j in range(i1, len_frequent): # 前k-2项相同才能连接 itemset1 list(frequent_items_k[i]) itemset2 list(frequent_items_k[j]) itemset1.sort() itemset2.sort() if itemset1[:k-2] itemset2[:k-2]: new_candidate frequent_items_k[i] | frequent_items_k[j] candidates.append(new_candidate) return candidates def apriori(dataset, min_support0.5): 完整的Apriori算法实现 c1 create_c1(dataset) dataset list(map(set, dataset)) frequent_items1, support_data scan_dataset(dataset, c1, min_support) frequent_items [frequent_items1] k 2 while True: candidates_k apriori_gen(frequent_items[k-2], k) frequent_items_k, support_data_k scan_dataset(dataset, candidates_k, min_support) if not frequent_items_k: break support_data.update(support_data_k) frequent_items.append(frequent_items_k) k 1 return frequent_items, support_data3.3 规则生成实现找到频繁项集后我们需要从中生成关联规则def generate_rules(frequent_items, support_data, min_confidence0.7): 从频繁项集中生成关联规则 rules [] for i in range(1, len(frequent_items)): # 只从2-项集开始 for itemset in frequent_items[i]: subsets [frozenset([item]) for item in itemset] if i 1: calculate_confidence(itemset, subsets, support_data, rules, min_confidence) else: rules_from_consequent(itemset, subsets, support_data, rules, min_confidence) return rules def calculate_confidence(itemset, subsets, support_data, rules, min_confidence): 计算规则的置信度 for consequent in subsets: antecedent itemset - consequent conf support_data[itemset] / support_data[antecedent] if conf min_confidence: rules.append((antecedent, consequent, conf)) def rules_from_consequent(itemset, subsets, support_data, rules, min_confidence): 从后件生成更多规则 m len(subsets[0]) if len(itemset) (m 1): new_subsets apriori_gen(subsets, m1) new_subsets [subset for subset in new_subsets if subset.issubset(itemset)] for consequent in new_subsets: antecedent itemset - consequent conf support_data[itemset] / support_data[antecedent] if conf min_confidence: rules.append((antecedent, consequent, conf)) rules_from_consequent(itemset, new_subsets, support_data, rules, min_confidence)3.4 完整示例运行现在我们可以运行完整的示例dataset load_dataset() frequent_items, support_data apriori(dataset, min_support0.4) rules generate_rules(frequent_items, support_data, min_confidence0.7) print(频繁项集) for itemset in frequent_items: print(itemset) print(\n关联规则) for rule in rules: antecedent, consequent, confidence rule print(f{set(antecedent)} {set(consequent)} (置信度: {confidence:.2f}))输出结果将显示所有满足最小支持度和置信度要求的频繁项集和关联规则。4. 算法优化与性能考量基本的Apriori实现虽然直观但在处理大规模数据时可能会遇到性能问题。以下是几种常见的优化策略4.1 基于哈希的优化使用哈希树来存储候选项集可以显著减少子集检查的次数class HashNode: def __init__(self, depth0, max_depth3): self.depth depth self.max_depth max_depth self.children {} self.itemsets [] def insert(self, itemset): if self.depth self.max_depth or len(itemset) self.depth: self.itemsets.append(itemset) else: hash_key self.hash_function(itemset) if hash_key not in self.children: self.children[hash_key] HashNode(self.depth1, self.max_depth) self.children[hash_key].insert(itemset) def hash_function(self, itemset): return hash(list(itemset)[self.depth]) % 10 def build_hash_tree(itemsets, max_depth3): root HashNode(max_depthmax_depth) for itemset in itemsets: root.insert(itemset) return root4.2 事务压缩减少需要扫描的事务数量不包含任何频繁k-项集的事务可以移除对事务进行排序和压缩存储4.3 分区技术将数据库分成多个分区只在分区内计算支持度将数据库分成不相交的分区在每个分区内找出局部频繁项集合并结果得到全局候选最后扫描整个数据库计算准确支持度4.4 采样方法对数据集进行采样在小样本上运行算法import random def sampled_apriori(dataset, sample_size, min_support, min_confidence): sample random.sample(dataset, min(sample_size, len(dataset))) frequent_items, support_data apriori(sample, min_support) rules generate_rules(frequent_items, support_data, min_confidence) return frequent_items, rules4.5 并行化实现利用多核处理器并行计算from multiprocessing import Pool def parallel_apriori(dataset, min_support, num_processes4): # 划分数据集 chunk_size len(dataset) // num_processes chunks [dataset[i:ichunk_size] for i in range(0, len(dataset), chunk_size)] with Pool(num_processes) as pool: results pool.starmap(partial_apriori, [(chunk, min_support) for chunk in chunks]) # 合并结果 # ...5. 实际应用案例让我们看一个更实际的例子分析一个在线零售商的交易数据。假设我们有以下预处理过的数据retail_data [ [鸡蛋, 牛奶, 面包, 奶酪], [鸡蛋, 牛奶, 面包, 苹果], [鸡蛋, 牛奶], [鸡蛋, 面包, 饼干], [牛奶, 面包, 饼干], [牛奶, 面包], [鸡蛋, 面包], [鸡蛋], [牛奶], [面包] ]我们将应用我们的Apriori实现来分析这些数据# 设置较低的支持度阈值以获取更多模式 frequent_items, support_data apriori(retail_data, min_support0.3) rules generate_rules(frequent_items, support_data, min_confidence0.7) print(零售数据关联分析结果) for i, itemset in enumerate(frequent_items): print(f{i1}-项集) for items in itemset: print(f {set(items)} (支持度: {support_data[items]:.2f})) print(\n重要关联规则) for rule in sorted(rules, keylambda x: x[2], reverseTrue): antecedent, consequent, confidence rule print(f {set(antecedent)} {set(consequent)} (置信度: {confidence:.2f}))典型输出可能显示1-项集 {鸡蛋} (支持度: 0.60) {牛奶} (支持度: 0.70) {面包} (支持度: 0.70) 2-项集 {鸡蛋, 牛奶} (支持度: 0.40) {鸡蛋, 面包} (支持度: 0.40) {牛奶, 面包} (支持度: 0.50) 3-项集 {鸡蛋, 牛奶, 面包} (支持度: 0.30) 重要关联规则 {牛奶} {面包} (置信度: 0.71) {面包} {牛奶} (置信度: 0.71) {鸡蛋, 牛奶} {面包} (置信度: 0.75) {鸡蛋, 面包} {牛奶} (置信度: 0.75)从结果中我们可以得出一些有趣的商业见解牛奶和面包经常被一起购买支持度0.5置信度0.71当顾客同时购买鸡蛋和牛奶时有75%的概率也会购买面包鸡蛋、牛奶和面包的组合虽然支持度不高但置信度很强这些洞察可以帮助零售商优化商品摆放、设计促销组合或制定交叉销售策略。
别光调包了!手把手带你用Python从零实现Apriori算法,搞懂关联规则挖掘
从零实现Apriori算法深入理解关联规则挖掘的核心逻辑在数据挖掘领域我们经常需要从海量数据中发现有价值的模式。想象一下超市的购物篮数据——当顾客把啤酒和尿布放在同一个购物篮中时这背后隐藏着什么商业价值这就是关联规则挖掘要解决的问题。本文将带你用Python从零实现经典的Apriori算法不仅理解其工作原理还能亲手构建这个强大的数据挖掘工具。1. 关联规则挖掘基础概念关联规则挖掘的核心目标是发现数据集中项与项之间的有趣关系。以超市购物为例我们可能发现购买啤酒的顾客有65%也会购买薯片这样的规律。要量化这些关系我们需要几个关键指标支持度(Support): 项集在所有事务中出现的频率。计算式为support(X) (包含X的事务数) / (总事务数)置信度(Confidence): 在包含X的事务中同时包含Y的条件概率。计算式为confidence(X→Y) support(X∪Y) / support(X)提升度(Lift): X的出现对Y出现概率的影响程度。计算式为lift(X→Y) confidence(X→Y) / support(Y)举个例子假设我们有以下简单的交易数据集交易ID商品1牛奶, 面包, 鸡蛋2牛奶, 尿布, 啤酒3面包, 尿布, 可乐4牛奶, 面包, 尿布, 啤酒5面包, 可乐计算牛奶→面包的规则指标支持度同时包含牛奶和面包的交易有ID 1和4共2个总交易数5support 2/5 0.4置信度包含牛奶的交易有ID 1,2,4共3个其中同时包含面包的有2个confidence 2/3 ≈ 0.67提升度面包的单独支持度是4/50.8lift 0.67/0.8 ≈ 0.842. Apriori算法原理剖析Apriori算法是关联规则挖掘中最经典的算法之一其核心思想基于一个简单但强大的先验性质如果一个项集是频繁的那么它的所有子集也一定是频繁的反之如果一个项集是非频繁的那么它的所有超集也一定是非频繁的。算法主要分为两个步骤连接步(Join Step): 通过将频繁(k-1)-项集自连接产生候选k-项集剪枝步(Prune Step): 删除那些包含非频繁子集的候选让我们用一个具体例子说明这个过程。假设有以下事务数据库T1: {1,2,3} T2: {2,3,4} T3: {1,3,4} T4: {2,3} T5: {1,2,4}设置最小支持度为0.4即至少出现2次第一次迭代(1-项集):扫描所有事务统计每个项的出现次数候选{1}:3, {2}:4, {3}:4, {4}:3全部满足支持度要求第二次迭代(2-项集):连接将频繁1-项集两两组合候选{1,2}, {1,3}, {1,4}, {2,3}, {2,4}, {3,4}扫描数据库计算支持度{1,2}:2, {1,3}:2, {1,4}:2, {2,3}:3, {2,4}:2, {3,4}:2全部满足支持度要求第三次迭代(3-项集):连接将频繁2-项集组合要求前k-2项相同{1,2} {1,3} → {1,2,3}{1,2} {1,4} → {1,2,4}{1,3} {1,4} → {1,3,4}{2,3} {2,4} → {2,3,4}剪枝检查所有子集是否频繁{1,2,3}的子集{1,2}, {1,3}, {2,3}都是频繁的{1,2,4}的子集{1,2}, {1,4}, {2,4}都是频繁的同理检查其他候选扫描数据库计算支持度{1,2,3}:1, {1,2,4}:1, {1,3,4}:1, {2,3,4}:1都不满足最小支持度算法终止最终得到的频繁项集是所有2-项集。3. Python实现Apriori算法现在让我们用Python从零实现这个算法。我们将分几个关键步骤构建完整的解决方案。3.1 数据准备与辅助函数首先我们需要准备数据和一些辅助函数def load_dataset(): 创建示例数据集 return [ [牛奶, 面包, 鸡蛋], [牛奶, 尿布, 啤酒], [面包, 尿布, 可乐], [牛奶, 面包, 尿布, 啤酒], [面包, 可乐] ] def create_c1(dataset): 创建初始候选1-项集 c1 [] for transaction in dataset: for item in transaction: if [item] not in c1: c1.append([item]) c1.sort() return list(map(frozenset, c1)) # 使用frozenset作为可哈希类型 def scan_dataset(dataset, candidates, min_support): 扫描数据集计算候选项集的支持度 item_count {} for transaction in dataset: for candidate in candidates: if candidate.issubset(transaction): item_count[candidate] item_count.get(candidate, 0) 1 num_transactions float(len(dataset)) frequent_items [] support_data {} for item in item_count: support item_count[item] / num_transactions if support min_support: frequent_items.append(item) support_data[item] support return frequent_items, support_data3.2 核心算法实现接下来是实现Apriori算法的核心部分def apriori_gen(frequent_items_k, k): 根据频繁(k-1)-项集生成候选k-项集 candidates [] len_frequent len(frequent_items_k) for i in range(len_frequent): for j in range(i1, len_frequent): # 前k-2项相同才能连接 itemset1 list(frequent_items_k[i]) itemset2 list(frequent_items_k[j]) itemset1.sort() itemset2.sort() if itemset1[:k-2] itemset2[:k-2]: new_candidate frequent_items_k[i] | frequent_items_k[j] candidates.append(new_candidate) return candidates def apriori(dataset, min_support0.5): 完整的Apriori算法实现 c1 create_c1(dataset) dataset list(map(set, dataset)) frequent_items1, support_data scan_dataset(dataset, c1, min_support) frequent_items [frequent_items1] k 2 while True: candidates_k apriori_gen(frequent_items[k-2], k) frequent_items_k, support_data_k scan_dataset(dataset, candidates_k, min_support) if not frequent_items_k: break support_data.update(support_data_k) frequent_items.append(frequent_items_k) k 1 return frequent_items, support_data3.3 规则生成实现找到频繁项集后我们需要从中生成关联规则def generate_rules(frequent_items, support_data, min_confidence0.7): 从频繁项集中生成关联规则 rules [] for i in range(1, len(frequent_items)): # 只从2-项集开始 for itemset in frequent_items[i]: subsets [frozenset([item]) for item in itemset] if i 1: calculate_confidence(itemset, subsets, support_data, rules, min_confidence) else: rules_from_consequent(itemset, subsets, support_data, rules, min_confidence) return rules def calculate_confidence(itemset, subsets, support_data, rules, min_confidence): 计算规则的置信度 for consequent in subsets: antecedent itemset - consequent conf support_data[itemset] / support_data[antecedent] if conf min_confidence: rules.append((antecedent, consequent, conf)) def rules_from_consequent(itemset, subsets, support_data, rules, min_confidence): 从后件生成更多规则 m len(subsets[0]) if len(itemset) (m 1): new_subsets apriori_gen(subsets, m1) new_subsets [subset for subset in new_subsets if subset.issubset(itemset)] for consequent in new_subsets: antecedent itemset - consequent conf support_data[itemset] / support_data[antecedent] if conf min_confidence: rules.append((antecedent, consequent, conf)) rules_from_consequent(itemset, new_subsets, support_data, rules, min_confidence)3.4 完整示例运行现在我们可以运行完整的示例dataset load_dataset() frequent_items, support_data apriori(dataset, min_support0.4) rules generate_rules(frequent_items, support_data, min_confidence0.7) print(频繁项集) for itemset in frequent_items: print(itemset) print(\n关联规则) for rule in rules: antecedent, consequent, confidence rule print(f{set(antecedent)} {set(consequent)} (置信度: {confidence:.2f}))输出结果将显示所有满足最小支持度和置信度要求的频繁项集和关联规则。4. 算法优化与性能考量基本的Apriori实现虽然直观但在处理大规模数据时可能会遇到性能问题。以下是几种常见的优化策略4.1 基于哈希的优化使用哈希树来存储候选项集可以显著减少子集检查的次数class HashNode: def __init__(self, depth0, max_depth3): self.depth depth self.max_depth max_depth self.children {} self.itemsets [] def insert(self, itemset): if self.depth self.max_depth or len(itemset) self.depth: self.itemsets.append(itemset) else: hash_key self.hash_function(itemset) if hash_key not in self.children: self.children[hash_key] HashNode(self.depth1, self.max_depth) self.children[hash_key].insert(itemset) def hash_function(self, itemset): return hash(list(itemset)[self.depth]) % 10 def build_hash_tree(itemsets, max_depth3): root HashNode(max_depthmax_depth) for itemset in itemsets: root.insert(itemset) return root4.2 事务压缩减少需要扫描的事务数量不包含任何频繁k-项集的事务可以移除对事务进行排序和压缩存储4.3 分区技术将数据库分成多个分区只在分区内计算支持度将数据库分成不相交的分区在每个分区内找出局部频繁项集合并结果得到全局候选最后扫描整个数据库计算准确支持度4.4 采样方法对数据集进行采样在小样本上运行算法import random def sampled_apriori(dataset, sample_size, min_support, min_confidence): sample random.sample(dataset, min(sample_size, len(dataset))) frequent_items, support_data apriori(sample, min_support) rules generate_rules(frequent_items, support_data, min_confidence) return frequent_items, rules4.5 并行化实现利用多核处理器并行计算from multiprocessing import Pool def parallel_apriori(dataset, min_support, num_processes4): # 划分数据集 chunk_size len(dataset) // num_processes chunks [dataset[i:ichunk_size] for i in range(0, len(dataset), chunk_size)] with Pool(num_processes) as pool: results pool.starmap(partial_apriori, [(chunk, min_support) for chunk in chunks]) # 合并结果 # ...5. 实际应用案例让我们看一个更实际的例子分析一个在线零售商的交易数据。假设我们有以下预处理过的数据retail_data [ [鸡蛋, 牛奶, 面包, 奶酪], [鸡蛋, 牛奶, 面包, 苹果], [鸡蛋, 牛奶], [鸡蛋, 面包, 饼干], [牛奶, 面包, 饼干], [牛奶, 面包], [鸡蛋, 面包], [鸡蛋], [牛奶], [面包] ]我们将应用我们的Apriori实现来分析这些数据# 设置较低的支持度阈值以获取更多模式 frequent_items, support_data apriori(retail_data, min_support0.3) rules generate_rules(frequent_items, support_data, min_confidence0.7) print(零售数据关联分析结果) for i, itemset in enumerate(frequent_items): print(f{i1}-项集) for items in itemset: print(f {set(items)} (支持度: {support_data[items]:.2f})) print(\n重要关联规则) for rule in sorted(rules, keylambda x: x[2], reverseTrue): antecedent, consequent, confidence rule print(f {set(antecedent)} {set(consequent)} (置信度: {confidence:.2f}))典型输出可能显示1-项集 {鸡蛋} (支持度: 0.60) {牛奶} (支持度: 0.70) {面包} (支持度: 0.70) 2-项集 {鸡蛋, 牛奶} (支持度: 0.40) {鸡蛋, 面包} (支持度: 0.40) {牛奶, 面包} (支持度: 0.50) 3-项集 {鸡蛋, 牛奶, 面包} (支持度: 0.30) 重要关联规则 {牛奶} {面包} (置信度: 0.71) {面包} {牛奶} (置信度: 0.71) {鸡蛋, 牛奶} {面包} (置信度: 0.75) {鸡蛋, 面包} {牛奶} (置信度: 0.75)从结果中我们可以得出一些有趣的商业见解牛奶和面包经常被一起购买支持度0.5置信度0.71当顾客同时购买鸡蛋和牛奶时有75%的概率也会购买面包鸡蛋、牛奶和面包的组合虽然支持度不高但置信度很强这些洞察可以帮助零售商优化商品摆放、设计促销组合或制定交叉销售策略。