Code and Data as Demo

Code and data to for matching share intents (Generated Events) with post actions (Generated Tweets).
erik563 committed Jul 24, 2019
1 parent 70ad034 commit 1fbcc5eb2b49c94ec824100e8f4083be7697afb7
Showing with 48,636 additions and 0 deletions.
  1. +11,683 −0 generated_events_x1.csv
  2. +11,683 −0 generated_events_x4.csv
  3. +12,398 −0 generated_tweets_x1.csv
  4. +12,399 −0 generated_tweets_x4.csv
  5. +209 −0 public_classification_synthetic_data.scala
  6. +264 −0 public_generate_data.scala

@@ -0,0 +1,209 @@
// Databricks notebook source
import spark.implicits._
import sqlContext.implicits._

import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types._

import collection.JavaConversions._
import java.time.{ZonedDateTime, ZoneId, LocalDate, LocalTime}

import com.github.mrpowers.spark.stringmetric.SimilarityFunctions._ //Jaro-Winkler comparison

//Data-cleaning, pre-processing and comparison functions.

//Clean URL: remove stuff like: ?session=x1%20id=123
val clean_url = udf {(url: String) =>
if (url.contains("?")) url.slice( 0, url.indexOf("?") )
else url

//Mathmatical functions
val time_diff = udf {(event1: Long, event2: Long) =>
event2 - event1
val multiply = udf { ( value: Double, multiplier: Double ) =>
value * multiplier

val temporal_score_function = udf { (
distance: Double,
scale: Double,
offset: Double,
origin: Double,
min_thresh: Double,
max_thresh: Double
) =>
if (distance>min_thresh && distance<max_thresh) { //Check for thresholds
if (d_norm <= offset) 1 //If distance is lower than offset, award full score
else BigDecimal(Math.pow(2.0, - (d_norm-offset) / scale)).setScale(2, BigDecimal.RoundingMode.HALF_EVEN).toDouble //Else award partial score
} else { 0 }
val similarity_function = udf { ( string1: String, string2: String ) =>
if (string2.contains( string1 )) 1
else 0
val device_score_function = udf { ( event_device_cat: Int, post_device_cat: Int ) =>
if (event_device_cat == post_device_cat) 1
else 0

//Sum the scores
val sum_identifier_scores = udf { ( score_intent_time: Double, score_location: Int, score_device: Int ) =>
score_intent_time + score_location + score_device

//Probabilistic score weights (see notes in the bottom for explanation of the weights)
val prob_weigh_time = udf { (intent_time: Long) =>
if (intent_time>0 && intent_time < 120) 6.2
else -1.8
val prob_weigh_device = udf { (device_score: Int) =>
if (device_score == 1) 2.3
else -5.6
val prob_weigh_location = udf { (location_score: Int) =>
if (location_score == 1) 6.2
else -1.8
val prob_sum_scores = udf { ( score_intent_time: Double, score_location: Double, score_device: Double ) =>
score_intent_time + score_location + score_device

val dfEvents ="csv")
.option("header", "true")

val dfTweets ="csv")
.option("header", "true")

//Blocking phase
val dfRecordPairs = dfEvents
.join( dfTweets, dfEvents.col("synth_page_url") === dfTweets.col("t_synth_page_url") ) //Inner join on URL (drops all events with no corresponding Tweets)
.withColumn( "t_intent_time", time_diff($"unix_tstamp", $"t_unix_tstamp") ) //Create the intent time col

//Comparison and classification phase
val dfMatchScores = dfRecordPairs
.withColumn( "sim_intent_time", temporal_score_function($"t_intent_time", lit(110.0), lit(5.0), lit(5.0), lit(0.0), lit(300.0) ) ) //Calculate the temporal distance
.withColumn( "sim_intent_time_tripled", multiply($"sim_intent_time", lit(3.0) ) ) //Triple the weight
similarity_function(lower($"synth_geo_city"), lower($"t_synth_location")) //If not Null, calculate identifier score
).otherwise( lit(0) ) //Otherwise score is zero.
.withColumn( "score_device", device_score_function($"synth_browser_category", $"t_synth_browser_category") )
//classification cost-based
.withColumn( "match_score", sum_identifier_scores($"sim_intent_time_tripled", $"score_location", $"score_device") )
//classification probabilistic
.withColumn( "prob_score_device", prob_weigh_device($"score_device") )
.withColumn( "prob_score_location", prob_weigh_location($"score_location") )
.withColumn( "prob_score_time", prob_weigh_time($"t_intent_time") )
.withColumn( "prob_score_summed", prob_sum_scores($"prob_score_time", $"prob_score_location", $"prob_score_device") )

//Cost-based validation
val considered_pairs = dfMatchScores.count();
val threshold = 3.00
val true_matches = dfMatchScores.filter($"true_match" === true && $"r_intent_time" === $"t_intent_time" ).count();
val true_matches_found = dfMatchScores.filter($"true_match" === true && $"r_intent_time" === $"t_intent_time" && $"match_score" >= threshold ).count();
val true_matches_not_found = true_matches - true_matches_found;

val false_matches_pt1 = dfMatchScores.filter($"true_match" === true && $"r_intent_time" =!= $"t_intent_time" && $"match_score" >= threshold ).count();
val false_matches_pt2 = dfMatchScores.filter($"true_match" === false && $"match_score" >= threshold ).count();
val false_matches = false_matches_pt1 + false_matches_pt2;

val true_nonmatches_pt1 = dfMatchScores.filter($"true_match" === true && $"r_intent_time" =!= $"t_intent_time" && $"match_score" < threshold ).count();
val true_nonmatches_pt2 = dfMatchScores.filter($"true_match" === false && $"match_score" < threshold ).count();
val true_nonmatches = true_nonmatches_pt1 + true_nonmatches_pt2;

val recall: Float = true_matches_found.toFloat/true_matches;
val precision: Float = true_matches_found.toFloat/(true_matches_found+false_matches);
val f_score = 2 * (recall * precision)/(recall + precision);

println("recall: " + recall)
println("precision: " + precision)
println("f1-score: " + f_score)

//Probablistic validation
val considered_pairs = dfMatchScores.count();
val threshold = 9.03 //See notes in the bottom for explanation of this number

val true_matches = dfMatchScores.filter($"true_match" === true && $"r_intent_time" === $"t_intent_time" ).count();
val true_matches_found = dfMatchScores.filter($"true_match" === true && $"r_intent_time" === $"t_intent_time" && $"prob_score_summed" >= threshold ).count();
val true_matches_not_found = true_matches - true_matches_found;

val false_matches_pt1 = dfMatchScores.filter($"true_match" === true && $"r_intent_time" =!= $"t_intent_time" && $"prob_score_summed" >= threshold ).count();
val false_matches_pt2 = dfMatchScores.filter($"true_match" === false && $"prob_score_summed" >= threshold ).count();
val false_matches = false_matches_pt1 + false_matches_pt2;

val true_nonmatches_pt1 = dfMatchScores.filter($"true_match" === true && $"r_intent_time" =!= $"t_intent_time" && $"prob_score_summed" < threshold ).count();
val true_nonmatches_pt2 = dfMatchScores.filter($"true_match" === false && $"prob_score_summed" < threshold ).count();
val true_nonmatches = true_nonmatches_pt1 + true_nonmatches_pt2;

val recall: Float = true_matches_found.toFloat/true_matches;
val precision: Float = true_matches_found.toFloat/(true_matches_found+false_matches);
val f_score = 2 * (recall * precision)/(recall + precision);

println("recall: " + recall)
println("precision: " + precision)
println("f1-score: " + f_score)

// End of Code

/* Probabilistic weight notes. See section 2: Related work of Erik's Thesis for an explanation of the formulas.
true matches = 3505
(a*b) = 211050 considered record pairs after blocking
starting weight = log2(true matches/(a*b)-true matches)
starting weight = log2(3505/(211050-3505))
starting weight = -5.88
P = 0.9
threshold weight = log2(P/1-P)
threshold wieght = log2(0.9/1-0.9)
threshold wieght = 3.15
threshold = starting weight - threshold weight
threshold = -5.88 - 3.15
threshold = 9.03
Weights per identifier:
i3 temporal
M = 0.721
U = 0.010
Wm = 6.17
Wu = -1.78
i4 location
M = 0.2
U = 0.0175
Wm = 3.55
Wu = -0.30
i5 device type
M = 0.981
U = 0.203
Wm = 2.27
Wu = -5.64

