Newer
Older
package org.gradoop.famer.linking;
import org.apache.commons.io.FileUtils;
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMEdgeFactory;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.famer.clustering.parallelClustering.clip.CLIP;
import org.gradoop.famer.clustering.parallelClustering.clip.dataStructures.CLIPConfig;
import org.gradoop.famer.clustering.parallelClustering.common.dataStructures.ClusteringOutputType;
import org.gradoop.famer.linking.blocking.BlockMaker;
import org.gradoop.famer.linking.blocking.keyGeneration.BlockingKeyGenerator;
import org.gradoop.famer.linking.blocking.keyGeneration.dataStructures.KeyGeneratorComponent;
import org.gradoop.famer.linking.blocking.keyGeneration.dataStructures.PrefixLengthComponent;
import org.gradoop.famer.linking.blocking.methods.dataStructures.BlockingComponent;
import org.gradoop.famer.linking.blocking.methods.dataStructures.BlockingComponentBaseConfig;
import org.gradoop.famer.linking.blocking.methods.dataStructures.StandardBlockingComponent;
import org.gradoop.famer.linking.blocking.methods.dataStructures.StandardBlockingEmptyKeyStrategy;
import org.gradoop.famer.linking.linking.dataStructures.LinkerComponent;
import org.gradoop.famer.linking.linking.functions.LinkMaker;
import org.gradoop.famer.linking.selection.Selector;
import org.gradoop.famer.linking.selection.dataStructures.SelectionComponent;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.Condition;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.ConditionOperator;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRule;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRuleComponent;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRuleComponentType;
import org.gradoop.famer.linking.similarityMeasuring.SimilarityMeasurerWithDocumentFrequencyBroadcast;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityComponent;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityComponentBaseConfig;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityFieldList;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.TFIDFSimilarityComponent;
import org.gradoop.famer.linking.similarityMeasuring.methods.functions.CountDocumentFrequency;
import org.gradoop.famer.linking.similarityMeasuring.methods.functions.ExtractWords;
import org.gradoop.famer.postprocessing.quality.clusteredGraph.ClusteringQualityMeasures;
import org.gradoop.famer.preprocessing.io.benchmarks.amazon.AmazonProductsReader;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment;
import static org.gradoop.famer.linking.linking.Linker.SIM_DEGREE_EDGE_PROPERTY;
/**
* Hello BA!
*/
public class Main {
public static void main(String[] args) throws Exception {
String componentId = "id";
String sourceGraphLabel = "Amazon";
String sourceLabel = "Amazon";
String sourceAttribute = "description";
String targetGraphLabel = "Google";
String targetLabel = "Google";
String targetAttribute = "description";
String tokenizer = "\\W+";
String data = "data/amazonGoogle";
new Configuration().setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
final AmazonProductsReader amazonProductsReader = new AmazonProductsReader();
final GraphCollection benchmarkDataCollection =
amazonProductsReader.getBenchmarkDataAsGraphCollection(data);
File file = new File("output/tfidf-" + new SimpleDateFormat("yyyyMMM-ddHHmm'.csv'").format(new Date()));
FileUtils.writeStringToFile(file,
"threshold,precision,recall,fMeasure,allPositive,truePositive,perfectCompleteClusterNo");
/* Blocking */
BlockMaker blockMaker = getStandardPrefixBlockingComponent(sourceGraphLabel, targetGraphLabel, "title",
StandardBlockingEmptyKeyStrategy.ADD_TO_ALL);
final DataSet<EPGMVertex> vertices = benchmarkDataCollection.getVertices();
DataSet<Tuple2<EPGMVertex, EPGMVertex>> blockedVertices = blockMaker.execute(vertices);
blockedVertices = blockedVertices.union(
getStandardPrefixBlockingComponent(sourceGraphLabel, targetGraphLabel, "manufacturer",
StandardBlockingEmptyKeyStrategy.REMOVE).execute(vertices));
/* Build wordCountDict */
Map<String, Integer> wordsInDoc =
benchmarkDataCollection.getVertices().flatMap(new ExtractWords(sourceAttribute, tokenizer)).groupBy(1)
.reduceGroup(new CountDocumentFrequency()).collect().stream()
.collect(Collectors.toMap(t1 -> t1.f0, t2 -> t2.f1));
/* SIMILARITY MEASURING */
final SimilarityComponentBaseConfig baseConfig =
new SimilarityComponentBaseConfig(componentId, sourceGraphLabel, sourceLabel, sourceAttribute,
targetGraphLabel, targetLabel, targetAttribute, 1d);
final SimilarityComponent tfidfSimilarityComponent = new TFIDFSimilarityComponent(baseConfig, tokenizer);
List<SimilarityComponent> similarityComponents = Collections.singletonList(tfidfSimilarityComponent);
DataSet<Tuple3<EPGMVertex, EPGMVertex, SimilarityFieldList>> blockedVerticesSimilarityFields =
blockedVertices.flatMap(new SimilarityMeasurerWithDocumentFrequencyBroadcast(similarityComponents))
.withBroadcastSet(getExecutionEnvironment().fromElements(wordsInDoc),
TFIDFSimilarityComponent.DOCUMENT_FREQUENCY_BROADCAST);
for (double threshold = 0; threshold <= 1; threshold = threshold + 0.05) {
/* SELECTION */
final LinkerComponent linkerComponent = new LinkerComponent();
linkerComponent.setSelectionComponent(createSelectorComponent(1, threshold));
DataSet<Tuple3<EPGMVertex, EPGMVertex, Double>> blockedVerticesSimilarityDegree =
blockedVerticesSimilarityFields.flatMap(new Selector(linkerComponent));
/* POST-PROCESSING */
DataSet<EPGMEdge> edges =
blockedVerticesSimilarityDegree.map(new LinkMaker(new EPGMEdgeFactory(), SIM_DEGREE_EDGE_PROPERTY));
final LogicalGraph linkingResult =
benchmarkDataCollection.getConfig().getLogicalGraphFactory().fromDataSets(vertices, edges);
/* CLUSTERING */
final CLIPConfig clipConfig = new CLIPConfig(0, 2, true, 0.5, 0.2, 0.3);
final LogicalGraph graphCollection =
new CLIP(clipConfig, ClusteringOutputType.GRAPH_COLLECTION, Integer.MAX_VALUE).execute(linkingResult);
/* Performance Measurement */
final ClusteringQualityMeasures qualityMeasures =
new ClusteringQualityMeasures(graphCollection, data + "/PerfectMapping.csv", ",", "id", true, true,
false, 2);
final String result = String
.format("%s,%s,%s,%s,%s,%s,%s", threshold, qualityMeasures.computePrecision(),
qualityMeasures.computeRecall(), qualityMeasures.computeFMeasure(),
qualityMeasures.getAllPositives(), qualityMeasures.getTruePositives(),
qualityMeasures.getClusterNo());
FileUtils.writeStringToFile(file, "\n" + result, StandardCharsets.UTF_8, true);
}
}
private static BlockMaker getStandardPrefixBlockingComponent(String sourceGraphLabel,
String targetGraphLabel, String attribute, StandardBlockingEmptyKeyStrategy emptyKeyStrategy) {
KeyGeneratorComponent keyGeneratorComponent = new PrefixLengthComponent(attribute, 1);
return getStandardBlockingComponent(sourceGraphLabel, targetGraphLabel, keyGeneratorComponent,
emptyKeyStrategy);
}
private static BlockMaker getStandardBlockingComponent(String sourceGraphLabel, String targetGraphLabel,
KeyGeneratorComponent keyGeneratorComponent, StandardBlockingEmptyKeyStrategy emptyKeyStrategy) {
BlockingKeyGenerator blockingKeyGenerator = new BlockingKeyGenerator(keyGeneratorComponent);
Map<String, Set<String>> graphPairs = new HashMap<>();
// g1 is limited to g2
Set<String> value1 = new HashSet<>();
value1.add(targetGraphLabel);
graphPairs.put(sourceGraphLabel, value1);
// g2 is limited to g1
Set<String> value2 = new HashSet<>();
value2.add(sourceGraphLabel);
graphPairs.put(targetGraphLabel, value2);
Set<String> values = new HashSet<>();
values.add("*");
Map<String, Set<String>> categoryPairs = new HashMap<>();
categoryPairs.put("*", values);
BlockingComponentBaseConfig blockingBaseConfig =
new BlockingComponentBaseConfig(blockingKeyGenerator, graphPairs, categoryPairs);
BlockingComponent blockingComponent =
new StandardBlockingComponent(blockingBaseConfig, 12, emptyKeyStrategy);
return new BlockMaker(blockingComponent);
}
private static SelectionComponent createSelectorComponent(double smaller, double great) {
String componentId = "id";
Condition less = new Condition("smallerEqual", componentId, ConditionOperator.SMALLER_EQUAL, smaller);
Condition greater = new Condition("greaterEqual", componentId, ConditionOperator.GREATER_EQUAL, great);
SelectionRuleComponent andComp =
new SelectionRuleComponent(SelectionRuleComponentType.SELECTION_OPERATOR, "AND");
SelectionRuleComponent lessComp =
new SelectionRuleComponent(SelectionRuleComponentType.CONDITION, "smallerEqual");
SelectionRuleComponent greatComp =
new SelectionRuleComponent(SelectionRuleComponentType.CONDITION, "greaterEqual");
List<Condition> conditions = Arrays.asList(less, greater);
List<SelectionRuleComponent> selectionRuleComponents = Arrays.asList(lessComp, andComp, greatComp);
SelectionRule selectionRule = new SelectionRule(conditions, selectionRuleComponents);
return new SelectionComponent(selectionRule);
}
}