将关联关系的数据放在同一个集合中

功能实现

1
2
3
(A,B)
(C,D)
(E,A)

将有关联关系的数据放在同一个集合中

1
2
(A,B,E)
(C,D)

方法一

通过每次添加一个数据,判断是否在集合中,并且将关联集合合并

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;


public class ClosureSet {


private Set<Set<String>> set;


public ClosureSet() {
set = new HashSet<>();
}

public ClosureSet(int size) {
set = new HashSet<>(size);
}

public Set<Set<String>> getSet() {
return set;
}

public void addItem(Tuple2<String,String> item) {
Set<String> l1 = null;
Set<String> l2 = null;
for (Set<String> ele : set) {
if (ele.contains(item.f0)) {
l1 = ele;
}
if (ele.contains(item.f1)) {
l2 = ele;
}
}
if (l1 == null && l2 == null) {
Set<String> ele = new HashSet<>();
ele.add(item.f0);
ele.add(item.f1);
set.add(ele);
} else if (l1 == null && l2 != null) {
l2.add(item.f0);
} else if (l1 != null && l2 == null) {
l1.add(item.f1);
} else if (l1 != l2) {
l1.addAll(l2);
l2.clear();
}
}

主要原理是每次添加元素,都从集合中遍历,如果出现两个set就合并set,如果只存在单个set就加入,最大的性能消耗在遍历上,set的size越大,耗时就越大。

由于业务特性,只要相关的数据在同一个set即可,5组数据在3个集合还是5个集合就不影响最后的计算结果,因此可以限定一个size大小,超过阈值进行merge操作。

方法二

优化的方式是限制Set大小,如果超过一个阈值,进行resize操作。
resize思想是,通过random的方式,判断是否需要合并数据。
通过for循环遍历所有的数据,每个数据都判断是否需要合并,如果需要,合并对应的length-i(对称)的数据,然后清空被合并的数据,可以将数据的大小压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import java.util.*;



public class ClosureSetV2 {

private int maxSize = 5000;
private Random random = new Random();
private List<Set<String>> list;


public ClosureSetV2() {
list = new ArrayList<>(maxSize);
}


public ClosureSetV2(int size) {
list = new ArrayList<>(size);
}

public List<Set<String>> getList() {
return list;
}

public void resize() {

if (list.size() >= maxSize) {
List<Set<String>> anotherList = new ArrayList<>(maxSize);
for (int i = 0; i < maxSize ; i++) {
Set<String> oneSet = list.get(i);
if (oneSet.size() == 0) {
continue;
}
boolean isMerge = random.nextBoolean();
if (isMerge) {
int index = list.size() - 1 - i;
if (i == index) {
continue;
}
Set<String> strings = list.get(index);
oneSet.addAll(strings);
strings.clear();
}
anotherList.add(oneSet);
}
this.list = anotherList;
}
}

public void addItem(Tuple2<String,String> item) {
resize();
Set<String> l1 = null;
Set<String> l2 = null;
for (Set<String> ele : list) {
if (ele.contains(item.f0)) {
l1 = ele;
}
if (ele.contains(item.f1)) {
l2 = ele;
}
}
if (l1 == null && l2 == null) {
Set<String> ele = new HashSet<>();
ele.add(item.f0);
ele.add(item.f1);
list.add(ele);
} else if (l1 == null && l2 != null) {
l2.add(item.f0);
} else if (l1 != null && l2 == null) {
l1.add(item.f1);
} else if (l1 != l2) {
l1.addAll(l2);
l2.clear();
}

}

}

可以将耗时从半小时算不完降低到高峰期可以在1分钟多算完(50w条数据)

方法三

通过倒排索引的思想,现将每个tuple进行编号,获取一份该数据的倒排索引和正排索引。

正排索引如下

1
2
3
4
1. (A,B)
2. (C,D)
3. (E,A)
4. (E,F)

对应的倒排索引如下

1
2
3
4
5
6
A ==> 1,3
B ==> 1
C ==> 2
D ==> 2
E ==> 3,4
F ==> 4

需要一个队列queue,放入需要被继续深究的关系。
一个集合set,放置已经被搜过的索引。
和一个存放关系的list。

在计算的时候,遍历正排索引

  • 遍历索引1,获取A,B是相关关系,在list中加入A和B。在queue加入A,B,在被搜过的索引集合set中加入1,此时set中是1,queue中是A和B,list中是A和B
  • 从队列中取出需要遍历的第一个数据A,获取对应的倒排索引是1和3,1已经在遍历过的索引set中,跳过,3没有,获取3中的数据E和A,E加入关系List中,E加入待深搜的队列queue中,已经被搜索的set中加入3. 此时 待深搜的queue为B和E, 已经被搜过的索引set是1和3,此时关系list是A、B和E。
  • 继续弹出队列获取B,按照上一步的步骤1已经搜过,跳过。 继续弹出E,查出对应索引为3和4,3被搜过跳过,搜索4,获取F,此时可以更新关系, list中为A、B、E、F,已经搜过的索引set为1、3、4. 待深搜的队列queue为F。 弹出F,无发现。至此,第一组关系诞生,其他数据均无联系。
  • 深搜2,按照上面的步骤,可以得出C和D是关系,无其他数据。
  • 深搜3,发现3已经被搜过
  • 深搜4同上。
  • 完结。

上代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class ClosureSet {

private List<Set<String>> resultRelationlist = new LinkedList<>();

private Long index = 0L;

/**
* id与流的关系, 正排索引
*/
private Map<Long, Tuple2<String, String>> indexOfStreamMap;

/**
* 流的倒排索引
*/
private Map<String, Set<Long>> docValueOfSream;


/**
* 计算专用队列
*/
private Queue<String> queue = new LinkedBlockingDeque<>();

public ClosureSet() {
indexOfStreamMap = new HashMap<>();
docValueOfSream = new HashMap<>();
}


public List<Set<String>> getRelation() {
return resultRelationlist;
}

/**
* 添加数据的时候,构建正排和倒排索引
*/
public void addItem(Tuple2<String,String> item) {
indexOfStreamMap.put(index, Tuple2.of(item.f0, item.f1);
Set<Long> docvalueSet = Optional.ofNullable(docValueOfSream.get(item.f0))
.map(set -> {
set.add(index);
return set;
}).orElse(Sets.newHashSet(index));
docValueOfSream.put(item.f0, docvalueSet);

docvalueSet = Optional.ofNullable(docValueOfSream.get(item.f1))
.map(set -> {
set.add(index);
return set;
}).orElse(Sets.newHashSet(index));
docValueOfSream.put(item.f1, docvalueSet);
index++;
}


/**
* 计算关系
*/
public void cal() {
//已被搜索过的index,可跳过
Set<Long> searchedIndexSet = new HashSet<>();
for (Map.Entry<Long, Tuple2<String, String>> indexToStreamGroupEntry : indexOfStreamMap.entrySet()) {
Set<String> relations = new HashSet<>();
Long index = indexToStreamGroupEntry.getKey();
if (!searchedIndexSet.contains(index)) {
queue.offer(indexToStreamGroupEntry.getValue().f0);
queue.offer(indexToStreamGroupEntry.getValue().f1);
}
String stream = queue.poll();
while (null != stream) {
//将自己加入关系中
relations.add(stream);
//获取该stream的倒排索引
Set<Long> indexs = docValueOfSream.get(stream);
//遍历倒排索引,获取所有关联的stream,加入队列和关系中
for (Long relationIndex : indexs) {
if (searchedIndexSet.contains(relationIndex)) {
continue;
}
Tuple2<String, String> tuple2 = indexOfStreamMap.get(relationIndex);
queue.offer(tuple2.f0);
queue.offer(tuple2.f1);
relations.add(tuple2.f0);
relations.add(tuple2.f1);
searchedIndexSet.add(relationIndex);
}
stream = queue.poll();
}
if (relations.size() != 0) {
resultRelationlist.add(relations);
}
}
}

public static void main(String[] args) {
StreamClosureSet set = new StreamClosureSet();
set.addItem(new Tuple2("1", "2"));
set.addItem(new Tuple2("3", "4"));
set.addItem(new Tuple2("10", "11"));
set.addItem(new Tuple2("3", "2"));
set.addItem(new Tuple2("10", "2"));
set.addItem(new Tuple2("12", "12"));
set.addItem(new Tuple2("3", "12"));
set.addItem(new Tuple2("23", "312"));
set.cal();
System.out.println(set.getRelation().size());
}
}

跟方案二比,50w数据从1min30s,只用1.5s就可以跑完。从不完全聚合变成完全聚合。
灵感是从es来的。