package org.gradoop.famer.linking; import org.apache.commons.io.FileUtils; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.gradoop.common.model.impl.pojo.EPGMEdge; import org.gradoop.common.model.impl.pojo.EPGMEdgeFactory; import org.gradoop.common.model.impl.pojo.EPGMVertex; import org.gradoop.famer.clustering.parallelClustering.clip.CLIP; import org.gradoop.famer.clustering.parallelClustering.clip.dataStructures.CLIPConfig; import org.gradoop.famer.clustering.parallelClustering.common.dataStructures.ClusteringOutputType; import org.gradoop.famer.linking.blocking.BlockMaker; import org.gradoop.famer.linking.blocking.keyGeneration.BlockingKeyGenerator; import org.gradoop.famer.linking.blocking.keyGeneration.dataStructures.KeyGeneratorComponent; import org.gradoop.famer.linking.blocking.keyGeneration.dataStructures.PrefixLengthComponent; import org.gradoop.famer.linking.blocking.methods.dataStructures.BlockingComponent; import org.gradoop.famer.linking.blocking.methods.dataStructures.BlockingComponentBaseConfig; import org.gradoop.famer.linking.blocking.methods.dataStructures.StandardBlockingComponent; import org.gradoop.famer.linking.blocking.methods.dataStructures.StandardBlockingEmptyKeyStrategy; import org.gradoop.famer.linking.linking.dataStructures.LinkerComponent; import org.gradoop.famer.linking.linking.functions.LinkMaker; import org.gradoop.famer.linking.selection.Selector; import org.gradoop.famer.linking.selection.dataStructures.SelectionComponent; import org.gradoop.famer.linking.selection.dataStructures.selectionRule.Condition; import org.gradoop.famer.linking.selection.dataStructures.selectionRule.ConditionOperator; import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRule; import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRuleComponent; import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRuleComponentType; import org.gradoop.famer.linking.similarityMeasuring.SimilarityMeasurerWithDocumentFrequencyBroadcast; import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityComponent; import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityComponentBaseConfig; import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityFieldList; import org.gradoop.famer.linking.similarityMeasuring.dataStructures.TFIDFSimilarityComponent; import org.gradoop.famer.linking.similarityMeasuring.methods.functions.CountDocumentFrequency; import org.gradoop.famer.linking.similarityMeasuring.methods.functions.ExtractWords; import org.gradoop.famer.postprocessing.quality.clusteredGraph.ClusteringQualityMeasures; import org.gradoop.famer.preprocessing.io.benchmarks.amazon.AmazonProductsReader; import org.gradoop.flink.model.impl.epgm.GraphCollection; import org.gradoop.flink.model.impl.epgm.LogicalGraph; import java.io.File; import java.nio.charset.StandardCharsets; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment; import static org.gradoop.famer.linking.linking.Linker.SIM_DEGREE_EDGE_PROPERTY; /** * Hello BA! */ public class Main { public static void main(String[] args) throws Exception { String componentId = "id"; String sourceGraphLabel = "Amazon"; String sourceLabel = "Amazon"; String sourceAttribute = "description"; String targetGraphLabel = "Google"; String targetLabel = "Google"; String targetAttribute = "description"; String tokenizer = "\\W+"; String data = "data/amazonGoogle"; new Configuration().setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); final AmazonProductsReader amazonProductsReader = new AmazonProductsReader(); final GraphCollection benchmarkDataCollection = amazonProductsReader.getBenchmarkDataAsGraphCollection(data); File file = new File("output/tfidf-" + new SimpleDateFormat("yyyyMMM-ddHHmm'.csv'").format(new Date())); FileUtils.writeStringToFile(file, "threshold,precision,recall,fMeasure,allPositive,truePositive,perfectCompleteClusterNo"); /* Blocking */ BlockMaker blockMaker = getStandardPrefixBlockingComponent(sourceGraphLabel, targetGraphLabel, "title", StandardBlockingEmptyKeyStrategy.ADD_TO_ALL); final DataSet<EPGMVertex> vertices = benchmarkDataCollection.getVertices(); DataSet<Tuple2<EPGMVertex, EPGMVertex>> blockedVertices = blockMaker.execute(vertices); blockedVertices = blockedVertices.union( getStandardPrefixBlockingComponent(sourceGraphLabel, targetGraphLabel, "manufacturer", StandardBlockingEmptyKeyStrategy.REMOVE).execute(vertices)); /* Build wordCountDict */ Map<String, Integer> wordsInDoc = benchmarkDataCollection.getVertices().flatMap(new ExtractWords(sourceAttribute, tokenizer)).groupBy(1) .reduceGroup(new CountDocumentFrequency()).collect().stream() .collect(Collectors.toMap(t1 -> t1.f0, t2 -> t2.f1)); /* SIMILARITY MEASURING */ final SimilarityComponentBaseConfig baseConfig = new SimilarityComponentBaseConfig(componentId, sourceGraphLabel, sourceLabel, sourceAttribute, targetGraphLabel, targetLabel, targetAttribute, 1d); final SimilarityComponent tfidfSimilarityComponent = new TFIDFSimilarityComponent(baseConfig, tokenizer); List<SimilarityComponent> similarityComponents = Collections.singletonList(tfidfSimilarityComponent); DataSet<Tuple3<EPGMVertex, EPGMVertex, SimilarityFieldList>> blockedVerticesSimilarityFields = blockedVertices.flatMap(new SimilarityMeasurerWithDocumentFrequencyBroadcast(similarityComponents)) .withBroadcastSet(getExecutionEnvironment().fromElements(wordsInDoc), TFIDFSimilarityComponent.DOCUMENT_FREQUENCY_BROADCAST); for (double threshold = 0; threshold <= 1; threshold = threshold + 0.05) { /* SELECTION */ final LinkerComponent linkerComponent = new LinkerComponent(); linkerComponent.setSelectionComponent(createSelectorComponent(1, threshold)); DataSet<Tuple3<EPGMVertex, EPGMVertex, Double>> blockedVerticesSimilarityDegree = blockedVerticesSimilarityFields.flatMap(new Selector(linkerComponent)); /* POST-PROCESSING */ DataSet<EPGMEdge> edges = blockedVerticesSimilarityDegree.map(new LinkMaker(new EPGMEdgeFactory(), SIM_DEGREE_EDGE_PROPERTY)); final LogicalGraph linkingResult = benchmarkDataCollection.getConfig().getLogicalGraphFactory().fromDataSets(vertices, edges); /* CLUSTERING */ final CLIPConfig clipConfig = new CLIPConfig(0, 2, true, 0.5, 0.2, 0.3); final LogicalGraph graphCollection = new CLIP(clipConfig, ClusteringOutputType.GRAPH_COLLECTION, Integer.MAX_VALUE).execute(linkingResult); /* Performance Measurement */ final ClusteringQualityMeasures qualityMeasures = new ClusteringQualityMeasures(graphCollection, data + "/PerfectMapping.csv", ",", "id", true, true, false, 2); final String result = String .format("%s,%s,%s,%s,%s,%s,%s", threshold, qualityMeasures.computePrecision(), qualityMeasures.computeRecall(), qualityMeasures.computeFMeasure(), qualityMeasures.getAllPositives(), qualityMeasures.getTruePositives(), qualityMeasures.getClusterNo()); FileUtils.writeStringToFile(file, "\n" + result, StandardCharsets.UTF_8, true); } } private static BlockMaker getStandardPrefixBlockingComponent(String sourceGraphLabel, String targetGraphLabel, String attribute, StandardBlockingEmptyKeyStrategy emptyKeyStrategy) { KeyGeneratorComponent keyGeneratorComponent = new PrefixLengthComponent(attribute, 1); return getStandardBlockingComponent(sourceGraphLabel, targetGraphLabel, keyGeneratorComponent, emptyKeyStrategy); } private static BlockMaker getStandardBlockingComponent(String sourceGraphLabel, String targetGraphLabel, KeyGeneratorComponent keyGeneratorComponent, StandardBlockingEmptyKeyStrategy emptyKeyStrategy) { BlockingKeyGenerator blockingKeyGenerator = new BlockingKeyGenerator(keyGeneratorComponent); Map<String, Set<String>> graphPairs = new HashMap<>(); // g1 is limited to g2 Set<String> value1 = new HashSet<>(); value1.add(targetGraphLabel); graphPairs.put(sourceGraphLabel, value1); // g2 is limited to g1 Set<String> value2 = new HashSet<>(); value2.add(sourceGraphLabel); graphPairs.put(targetGraphLabel, value2); Set<String> values = new HashSet<>(); values.add("*"); Map<String, Set<String>> categoryPairs = new HashMap<>(); categoryPairs.put("*", values); BlockingComponentBaseConfig blockingBaseConfig = new BlockingComponentBaseConfig(blockingKeyGenerator, graphPairs, categoryPairs); BlockingComponent blockingComponent = new StandardBlockingComponent(blockingBaseConfig, 12, emptyKeyStrategy); return new BlockMaker(blockingComponent); } private static SelectionComponent createSelectorComponent(double smaller, double great) { String componentId = "id"; Condition less = new Condition("smallerEqual", componentId, ConditionOperator.SMALLER_EQUAL, smaller); Condition greater = new Condition("greaterEqual", componentId, ConditionOperator.GREATER_EQUAL, great); SelectionRuleComponent andComp = new SelectionRuleComponent(SelectionRuleComponentType.SELECTION_OPERATOR, "AND"); SelectionRuleComponent lessComp = new SelectionRuleComponent(SelectionRuleComponentType.CONDITION, "smallerEqual"); SelectionRuleComponent greatComp = new SelectionRuleComponent(SelectionRuleComponentType.CONDITION, "greaterEqual"); List<Condition> conditions = Arrays.asList(less, greater); List<SelectionRuleComponent> selectionRuleComponents = Arrays.asList(lessComp, andComp, greatComp); SelectionRule selectionRule = new SelectionRule(conditions, selectionRuleComponents); return new SelectionComponent(selectionRule); } }