Skip to content
Snippets Groups Projects
Main.java 10.2 KiB
Newer Older
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMEdgeFactory;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.famer.clustering.parallelClustering.clip.CLIP;
import org.gradoop.famer.clustering.parallelClustering.clip.dataStructures.CLIPConfig;
import org.gradoop.famer.clustering.parallelClustering.common.dataStructures.ClusteringOutputType;
import org.gradoop.famer.linking.blocking.BlockMaker;
import org.gradoop.famer.linking.blocking.keyGeneration.BlockingKeyGenerator;
import org.gradoop.famer.linking.blocking.keyGeneration.dataStructures.KeyGeneratorComponent;
import org.gradoop.famer.linking.blocking.keyGeneration.dataStructures.PrefixLengthComponent;
import org.gradoop.famer.linking.blocking.methods.dataStructures.BlockingComponent;
import org.gradoop.famer.linking.blocking.methods.dataStructures.BlockingComponentBaseConfig;
import org.gradoop.famer.linking.blocking.methods.dataStructures.StandardBlockingComponent;
import org.gradoop.famer.linking.blocking.methods.dataStructures.StandardBlockingEmptyKeyStrategy;
import org.gradoop.famer.linking.linking.dataStructures.LinkerComponent;
import org.gradoop.famer.linking.linking.functions.LinkMaker;
import org.gradoop.famer.linking.selection.Selector;
import org.gradoop.famer.linking.selection.dataStructures.SelectionComponent;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.Condition;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.ConditionOperator;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRule;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRuleComponent;
import org.gradoop.famer.linking.selection.dataStructures.selectionRule.SelectionRuleComponentType;
import org.gradoop.famer.linking.similarityMeasuring.SimilarityMeasurerWithDocumentFrequencyBroadcast;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityComponent;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityComponentBaseConfig;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.SimilarityFieldList;
import org.gradoop.famer.linking.similarityMeasuring.dataStructures.TFIDFSimilarityComponent;
import org.gradoop.famer.linking.similarityMeasuring.methods.functions.CountDocumentFrequency;
import org.gradoop.famer.linking.similarityMeasuring.methods.functions.ExtractWords;
import org.gradoop.famer.postprocessing.quality.clusteredGraph.ClusteringQualityMeasures;
import org.gradoop.famer.preprocessing.io.benchmarks.amazon.AmazonProductsReader;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;

import java.io.File;
import java.nio.charset.StandardCharsets;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment;
import static org.gradoop.famer.linking.linking.Linker.SIM_DEGREE_EDGE_PROPERTY;

/**
 * Hello BA!
 */
public class Main {
  public static void main(String[] args) throws Exception {

    String componentId = "id";
    String sourceGraphLabel = "Amazon";
    String sourceLabel = "Amazon";
    String sourceAttribute = "description";
    String targetGraphLabel = "Google";
    String targetLabel = "Google";
    String targetAttribute = "description";

    String tokenizer = "\\W+";

    String data = "data/amazonGoogle";
    new Configuration().setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

    final AmazonProductsReader amazonProductsReader = new AmazonProductsReader();
    final GraphCollection benchmarkDataCollection =
      amazonProductsReader.getBenchmarkDataAsGraphCollection(data);


    File file = new File("output/tfidf-" + new SimpleDateFormat("yyyyMMM-ddHHmm'.csv'").format(new Date()));
    FileUtils.writeStringToFile(file,
      "threshold,precision,recall,fMeasure,allPositive,truePositive,perfectCompleteClusterNo");

    BlockMaker blockMaker = getStandardPrefixBlockingComponent(sourceGraphLabel, targetGraphLabel, "title",
      StandardBlockingEmptyKeyStrategy.ADD_TO_ALL);
    final DataSet<EPGMVertex> vertices = benchmarkDataCollection.getVertices();
    DataSet<Tuple2<EPGMVertex, EPGMVertex>> blockedVertices = blockMaker.execute(vertices);

    blockedVertices = blockedVertices.union(
      getStandardPrefixBlockingComponent(sourceGraphLabel, targetGraphLabel, "manufacturer",
        StandardBlockingEmptyKeyStrategy.REMOVE).execute(vertices));


    /* Build wordCountDict */
    Map<String, Integer> wordsInDoc =
      benchmarkDataCollection.getVertices().flatMap(new ExtractWords(sourceAttribute, tokenizer)).groupBy(1)
        .reduceGroup(new CountDocumentFrequency()).collect().stream()
        .collect(Collectors.toMap(t1 -> t1.f0, t2 -> t2.f1));


    /* SIMILARITY MEASURING */
    final SimilarityComponentBaseConfig baseConfig =
      new SimilarityComponentBaseConfig(componentId, sourceGraphLabel, sourceLabel, sourceAttribute,
        targetGraphLabel, targetLabel, targetAttribute, 1d);


    final SimilarityComponent tfidfSimilarityComponent = new TFIDFSimilarityComponent(baseConfig, tokenizer);
    List<SimilarityComponent> similarityComponents = Collections.singletonList(tfidfSimilarityComponent);
    DataSet<Tuple3<EPGMVertex, EPGMVertex, SimilarityFieldList>> blockedVerticesSimilarityFields =
      blockedVertices.flatMap(new SimilarityMeasurerWithDocumentFrequencyBroadcast(similarityComponents))
        .withBroadcastSet(getExecutionEnvironment().fromElements(wordsInDoc),
          TFIDFSimilarityComponent.DOCUMENT_FREQUENCY_BROADCAST);

    for (double threshold = 0; threshold <= 1; threshold = threshold + 0.05) {
      /* SELECTION */
      final LinkerComponent linkerComponent = new LinkerComponent();
      linkerComponent.setSelectionComponent(createSelectorComponent(1, threshold));
      DataSet<Tuple3<EPGMVertex, EPGMVertex, Double>> blockedVerticesSimilarityDegree =
        blockedVerticesSimilarityFields.flatMap(new Selector(linkerComponent));

      /* POST-PROCESSING */
      DataSet<EPGMEdge> edges =
        blockedVerticesSimilarityDegree.map(new LinkMaker(new EPGMEdgeFactory(), SIM_DEGREE_EDGE_PROPERTY));
      final LogicalGraph linkingResult =
        benchmarkDataCollection.getConfig().getLogicalGraphFactory().fromDataSets(vertices, edges);
      /* CLUSTERING */
      final CLIPConfig clipConfig = new CLIPConfig(0, 2, true, 0.5, 0.2, 0.3);
      final LogicalGraph graphCollection =
        new CLIP(clipConfig, ClusteringOutputType.GRAPH_COLLECTION, Integer.MAX_VALUE).execute(linkingResult);
      /* Performance Measurement */
      final ClusteringQualityMeasures qualityMeasures =
        new ClusteringQualityMeasures(graphCollection, data + "/PerfectMapping.csv", ",", "id", true, true,
          false, 2);
      final String result = String
        .format("%s,%s,%s,%s,%s,%s,%s", threshold, qualityMeasures.computePrecision(),
          qualityMeasures.computeRecall(), qualityMeasures.computeFMeasure(),
          qualityMeasures.getAllPositives(), qualityMeasures.getTruePositives(),
          qualityMeasures.getClusterNo());
      FileUtils.writeStringToFile(file, "\n" + result, StandardCharsets.UTF_8, true);
    }
  }

  private static BlockMaker getStandardPrefixBlockingComponent(String sourceGraphLabel,
    String targetGraphLabel, String attribute, StandardBlockingEmptyKeyStrategy emptyKeyStrategy) {
    KeyGeneratorComponent keyGeneratorComponent = new PrefixLengthComponent(attribute, 1);
    return getStandardBlockingComponent(sourceGraphLabel, targetGraphLabel, keyGeneratorComponent,
      emptyKeyStrategy);
  }

  private static BlockMaker getStandardBlockingComponent(String sourceGraphLabel, String targetGraphLabel,
    KeyGeneratorComponent keyGeneratorComponent, StandardBlockingEmptyKeyStrategy emptyKeyStrategy) {
    BlockingKeyGenerator blockingKeyGenerator = new BlockingKeyGenerator(keyGeneratorComponent);
    Map<String, Set<String>> graphPairs = new HashMap<>();
    // g1 is limited to g2
    Set<String> value1 = new HashSet<>();
    value1.add(targetGraphLabel);
    graphPairs.put(sourceGraphLabel, value1);
    // g2 is limited to g1
    Set<String> value2 = new HashSet<>();
    value2.add(sourceGraphLabel);
    graphPairs.put(targetGraphLabel, value2);
    Set<String> values = new HashSet<>();
    values.add("*");
    Map<String, Set<String>> categoryPairs = new HashMap<>();
    categoryPairs.put("*", values);
    BlockingComponentBaseConfig blockingBaseConfig =
      new BlockingComponentBaseConfig(blockingKeyGenerator, graphPairs, categoryPairs);
    BlockingComponent blockingComponent =
      new StandardBlockingComponent(blockingBaseConfig, 12, emptyKeyStrategy);
    return new BlockMaker(blockingComponent);
  }

  private static SelectionComponent createSelectorComponent(double smaller, double great) {
    String componentId = "id";
    Condition less = new Condition("smallerEqual", componentId, ConditionOperator.SMALLER_EQUAL, smaller);
    Condition greater = new Condition("greaterEqual", componentId, ConditionOperator.GREATER_EQUAL, great);

    SelectionRuleComponent andComp =
      new SelectionRuleComponent(SelectionRuleComponentType.SELECTION_OPERATOR, "AND");
    SelectionRuleComponent lessComp =
      new SelectionRuleComponent(SelectionRuleComponentType.CONDITION, "smallerEqual");
    SelectionRuleComponent greatComp =
      new SelectionRuleComponent(SelectionRuleComponentType.CONDITION, "greaterEqual");

    List<Condition> conditions = Arrays.asList(less, greater);
    List<SelectionRuleComponent> selectionRuleComponents = Arrays.asList(lessComp, andComp, greatComp);
    SelectionRule selectionRule = new SelectionRule(conditions, selectionRuleComponents);
    return new SelectionComponent(selectionRule);
  }
}