关联关系是一种非常有用的数据挖掘算法,它可以分析出数据内在的关联关系。其中比较著名的是啤酒和尿不湿的案例
交易号 | 清单 |
---|---|
0 | 豆奶,莴苣 |
1 | 莴苣,尿布,啤酒,甜菜 |
2 | 豆奶,尿布,啤酒,橙汁 |
3 | 莴苣,豆奶,尿布,啤酒 |
4 | 莴苣,豆奶,尿布,橙汁 |
当超市在分析顾客的购物清单时发现一个比较奇怪的问题,为什么大部顾客在购买啤酒的时候还会买啤酒呢?后来经过超市的调查发现,顾客的妻子提醒丈夫买尿不湿时丈父会把自己的啤酒也一起带上。这是超市调查发现的尿不湿和啤酒的关系,如果数据量小我们还是可以处理的,但是涉及到大数据时,其复杂度就非常高,那我们有没有其它方式去寻找这种关系呢?其实我们可以使用关联算法去挖掘我们商品之间的关联关系,这些关系可以有两种形式:频繁项集或者关联规则。频繁项集(frequent item sets)是经常出现在一块的物品的集合(即经常被一起购买的商品),关联规则(association rules)暗示两种物品之间可能存在很强的关系。
为了有效定义频繁和关联,我们引入两个概念,支持度和置信度。
支持度(support),即事件发生的频率,记作 ,例如一共有5条记录,啤酒和尿布出现的次数是3次,这样啤酒的支持度就是 3 / 5 = 0.6,支持度越大表格商品出现的次数就越多。
置信度(confidence),置信度揭事了如果事件A发生了,则事件B发生的的概率,记作 ,
例如啤酒和尿布被购买时,橙汁也一起被购买的概率记作 ,置信度的值表示事件A和B同时发生后的概率占了A事件出现的概率的百份比,值越大表明买了啤酒和尿刷的顾客购买橙汁的概率比较大。
由上面的介绍可以知道,我们需要计算所有组合的支持度和置信度,从而计算出所有可能被购买的频繁项集这样我们就可以对商品进行合理的布局。
下面为了说明问题我们先假设我们有n种商品,那我们所有的商品排列组合为
其中k表示频繁集合中元素的个数,也就是说我们要运算出所有的频繁集合的话,时间复杂度为 ,如下图(来自网络)例示我们有四种商品0, 1, 2, 3,那我们就有 种频繁集合,如果我们有1000种频繁集合的话,可想而知这个数据量是非常的庞大的。
置信度的计算方式是我们遍历所有的频繁项集,然后找出每一个项集的所有的子集再使用上面的公式来计算出所有子集的置信度,这些子集的意思就是那些商品最可能被一起购买的商品组合,举个例子如果我们有频繁项集{0, 2, 3}那它可能出现的所有的子集是{0, 2}, {0, 3}, {2, 3}, {0, 2, 3}。使用排列组合的知识我们同理可以得出:
个排列组合,根据(1), (2)两个公式我们可以得到计算所有子集的计算复杂度为 , 从公式可以看出我们如果要计算的时间复杂度非常高,我们需要把计算置信度方式进行降维。
定理1: 如果一个项集是频繁的,那么其所有的子集(subsets)也一定是频繁的。 这个比较容易证明,因为某项集的子集的支持度一定不小于该项集。
定理2: 如果一个项集是非频繁的,那么其所有的超集(supersets)也一定是非频繁的。 定理2是上一条定理的逆反定理。根据定理2,可以对项集树进行如下剪枝(下图来自网络):
因为数据量大的话,生成频集的计算量也是非常大的,而Apriori给出的两个生成频繁集项的方法:
即第k-1项的所有频繁集项和第一项的频繁集项进行组合生成第k项候选频繁集量,但是并不是所有的结果都是满足需求的,我们要设定最小频繁项集阀值,只有大于阀值才会转正成为频繁项集。
,选择前 项均相同的 进行合并,生成 频繁集,这里有个要求是 必须是有序的,否则生成出来的项集并并不会符合要求。
# -*- coding:gbk -*-
'''
Created on 2018年2月12日
@author: Belle
'''
from sklearn.feature_extraction import DictVectorizer
from dask.array.chunk import arange
import time; # 引入time模块
from apriori2 import apriori
SUPPORT_DIVIDER = ","
CONFIDENCE_DIVIDER = "=>"
'''
构建模型
'''
class Apriori():
def __init__(self, dataSet, minSupport, minConfidence):
self.vec = DictVectorizer()
'''最小支持度'''
self.minSupport = minSupport
'''最小置信度'''
self.minConfidence = minConfidence
'''整个列表,数组的行表示单个特证向量,里面的特证不重复,而且每一行的长度有可能不一样'''
self.dataSet = dataSet
self.numOfTypes = len(dataSet)
'''构建所有种类出现的次数'''
self.dataTypeMap = {}
'''初始化一项式'''
self.dataTypeMap[1] = createTrainSet(self.dataSet)
'''
构学无监督学习的数据
'''
def createTrainSet(dataTypeMap):
dataTypeMapResult = {}
for row in range(len(dataTypeMap)):
rowValues = dataTypeMap[row]
rowValues.sort()
for column in range(len(rowValues)):
value = str(rowValues[column])
if value in dataTypeMapResult:
'''更新当前键出现的次数'''
dataTypeMapResult[value] = dataTypeMapResult[value] + 1
else:
'''第一次出现的数据值为1'''
dataTypeMapResult[value] = 1
return dataTypeMapResult
'''
analize_x 为n*k列距阵
'''
def analize(dataSet, minSupport = 0.15, minConfidence = 0.7):
row = 2
apriori = Apriori(dataSet, minSupport, minConfidence)
'''从C(2, n), C(3, n)....到C(n, n)'''
while True:
if innerLoop(apriori, row) == 0:
break
row = row + 1
'''生成关规则'''
generateRule(apriori)
return apriori
'''
计算通过k-1项,计算k项的数据
'''
def innerLoop(apriori, kSet):
'''候选 k项式修选集'''
kSetItems = {}
beforeLenght = len(kSetItems)
'''选择k项式的值'''
print("选择{0}项式的值开始...".format(kSet))
startTime = time.time()
scanKMinusItems(kSetItems, apriori, kSet)
print("获取候选{0}项式时间:".format(kSet) + str(time.time() - startTime))
'''对候选集进行剪枝'''
print("剪枝开始,剪枝数量{0}...".format(len(kSetItems)))
startTime = time.time()
sliceBranch(kSetItems, apriori)
print("剪枝花费时间:" + str(time.time() - startTime))
'''存在下一个key_set,则放在结果中'''
afterLength = len(kSetItems)
if afterLength != beforeLenght:
apriori.dataTypeMap[kSet] = kSetItems
return 1
else:
return 0
'''
通过1项式和k-1项式生成k项式
'''
def scanKMinusItems(kSetItems, apriori, kSet):
'''频集1项式和k-1项式,组成新的k项式,然后把不满足的项式去掉'''
'''频集1项式'''
keys = list(apriori.dataTypeMap[1].keys())
'''k-1项式,,1项式和k-1项式组成k项式'''
kMinusOneKeys = list(apriori.dataTypeMap[kSet - 1].keys())
'''生成候选集'''
for row in range(len(keys)):
for column in range(len(kMinusOneKeys)):
'''2项式时,由于1项式和1项式进行组合,需要去除相同的组合数'''
if kSet == 2 and row == column:
continue
calc(keys[row], kMinusOneKeys[column], kSetItems)
'''
生成候选频繁集
@param oneDataSetKey: 1项式的key值
@param dataSet: 训练集1项式
@param kMinusOneItemKey: k - 1项式的key值
@param kDataSetItems: k项式map数据
'''
def calc(oneDataSetKey, kMinusOneItemKey, kDataSetItems):
if kMinusOneItemKey.find(oneDataSetKey) != -1:
return
kDataSetItemKeys = kMinusOneItemKey + SUPPORT_DIVIDER + str(oneDataSetKey)
'''分割成数组,再进行排序'''
kItemKeys = kDataSetItemKeys.split(SUPPORT_DIVIDER)
kItemKeys.sort()
'''list合成字段串'''
kDataSetItemKeys = SUPPORT_DIVIDER.join(kItemKeys)
'''kDataSetItemKeys初始值为0'''
if kDataSetItemKeys in kDataSetItems.keys():
kDataSetItems[kDataSetItemKeys] += 1
else:
kDataSetItems[kDataSetItemKeys] = 0
'''
对后选频烦集进行剪枝
@param kDataSetItems
'''
def sliceBranch(kDataSetItems, apriori):
kItemKeyArrays = list(kDataSetItems.keys())
'''计算kItemKeys数组里面的所有元素同时在dataSet中出现的次数'''
dataSets = {}
for kItemKeys in kItemKeyArrays:
kItemKeyArray = kItemKeys.split(SUPPORT_DIVIDER)
kDataSetItemCount = 0
setData = set(kItemKeyArray)
for rowIndex in range(len(apriori.dataSet)):
if rowIndex in dataSets:
rowArray = dataSets[rowIndex]
else:
rowArray = set(apriori.dataSet[rowIndex])
dataSets[rowIndex] = rowArray
'''长度大于数据长度'''
if len(rowArray) < len(kItemKeyArray):
continue
'''判断所有元素是否都在列表中同时存在'''
if setData.issubset(set(rowArray)):
kDataSetItemCount += 1
'''小于最小支持度,则不添加到列表中'''
if apriori.minSupport > kDataSetItemCount / apriori.numOfTypes:
del kDataSetItems[kItemKeys]
else:
kDataSetItems[kItemKeys] = kDataSetItemCount
'''
计算置信度
@param kDataSetItems: 频集数据集{1:{'1, 2, 3':次数}}
@param apriori: 关联数据类
'''
def generateRule(apriori):
cacheKeySet = {}
resultConfidence = {}
'''key是频集集合,value代表是K项式的k值'''
for key in apriori.dataTypeMap:
if key == 1:
continue
innerMap = apriori.dataTypeMap[key]
for innerKey in innerMap:
keyArray = innerKey.split(SUPPORT_DIVIDER)
generateRule2(apriori, keyArray, innerMap[innerKey], resultConfidence, len(keyArray) - 1)
'''
目标繁集项和源繁集项两两结合在一起
@param kDataSetItems: 二项式繁集项
@param targetItems: 某个目标繁集
@param sourceItems: 源繁集项
'''
def generateRule2(apriori, targetItems, supportTargetItems, resultConfidence, childIndex):
if childIndex <= 0:
return
dataMap = apriori.dataTypeMap
'''数据长度'''
dataLength = len(targetItems)
totalSets = set(targetItems)
'''构造targetItems非空真子集,并计算至信度'''
for index in range(dataLength):
'''超过了数组长度'''
if index + childIndex > dataLength:
break
'''从index开始取childIndex个数据表示是leftDataSet'''
leftDataArray = targetItems[index:childIndex + index]
leftDataArray.sort()
'''取总列表与左边的列表相减,就是右列key'''
rightDataArray = list(totalSets ^ set(leftDataArray))
rightDataArray.sort()
leftDataKeyString = SUPPORT_DIVIDER.join(leftDataArray)
rightDataKeyString = SUPPORT_DIVIDER.join(rightDataArray)
'''不存在数量为数组长度的频集'''
if (len(leftDataArray) not in dataMap) or (len(rightDataArray) not in dataMap):
continue
'''非频集'''
if (leftDataKeyString not in dataMap[len(leftDataArray)]) or \
(rightDataKeyString not in dataMap[len(rightDataArray)]):
continue
'''leftDataKey出现的时,rightDataKeyString出现的概率,即频集列表中两个出现的数量'''
confidence = supportTargetItems / \
dataMap[len(leftDataArray)][leftDataKeyString]
if confidence > apriori.minConfidence:
'''置信度大于阀值'''
print("{0}===>>{1}: confidence = {2}".format(leftDataKeyString, rightDataKeyString, str(confidence)))
resultConfidence[leftDataKeyString + CONFIDENCE_DIVIDER + rightDataKeyString] = confidence
else:
'''置信度小于阀值,放在ignore例表中,用于下次判'''
'''递规的方式云偏历'''
generateRule2(apriori, targetItems, supportTargetItems, resultConfidence, childIndex - 1)
测试代码
# -*- coding:gbk -*-
'''
Created on 2018年2月12日
@author: Belle
'''
import Apriori
analize_x = [["豆奶", "莴苣"],\
["莴苣", "尿布", "啤酒", "甜菜"],\
["豆奶", "尿布", "啤酒", "橙汁"],\
["莴苣", "豆奶", "尿布", "啤酒"],\
["莴苣", "豆奶", "尿布", "橙汁"]]
apriori = Apriori.analize(analize_x)
print(apriori.dataTypeMap)
本文来自网易实践者社区,经作者陈杜龙授权发布。