Skip to content
Please note that GitHub no longer supports your web browser.

We recommend upgrading to the latest Google Chrome or Firefox.

Learn more
Permalink
Browse files

Append UDF with RDD or RF. (#381)

- Addresses #223
  • Loading branch information...
ruebot authored and ianmilligan1 committed Nov 19, 2019
1 parent a081d7b commit b98ba4bae35de85672917b356845e2b1c5eb0179
Showing with 110 additions and 110 deletions.
  1. +4 −4 src/main/scala/io/archivesunleashed/ArchiveRecord.scala
  2. +2 −2 src/main/scala/io/archivesunleashed/app/DomainFrequencyExtractor.scala
  3. +6 −6 src/main/scala/io/archivesunleashed/app/DomainGraphExtractor.scala
  4. +2 −2 src/main/scala/io/archivesunleashed/app/ExtractEntities.scala
  5. +2 −2 src/main/scala/io/archivesunleashed/app/ExtractPopularImages.scala
  6. +4 −4 src/main/scala/io/archivesunleashed/app/PlainTextExtractor.scala
  7. +8 −8 src/main/scala/io/archivesunleashed/app/WriteGEXF.scala
  8. +4 −4 src/main/scala/io/archivesunleashed/app/WriteGraph.scala
  9. +5 −5 src/main/scala/io/archivesunleashed/app/WriteGraphML.scala
  10. +12 −12 src/main/scala/io/archivesunleashed/df/package.scala
  11. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{ComputeMD5.scala → ComputeMD5RDD.scala}
  12. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{ComputeSHA1.scala → ComputeSHA1RDD.scala}
  13. +1 −1 src/main/scala/io/archivesunleashed/matchbox/ExtractBoilerpipeText.scala
  14. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{ExtractDomain.scala → ExtractDomainRDD.scala}
  15. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{ExtractImageLinks.scala → ExtractImageLinksRDD.scala}
  16. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{ExtractLinks.scala → ExtractLinksRDD.scala}
  17. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{GetExtensionMime.scala → GetExtensionMimeRDD.scala}
  18. +2 −2 src/main/scala/io/archivesunleashed/matchbox/{RemoveHTML.scala → RemoveHTMLRDD.scala}
  19. +1 −1 src/main/scala/io/archivesunleashed/matchbox/{RemoveHTTPHeader.scala → RemoveHTTPHeaderRDD.scala}
  20. +13 −13 src/main/scala/io/archivesunleashed/package.scala
  21. +3 −3 src/test/scala/io/archivesunleashed/ArcTest.scala
  22. +1 −1 src/test/scala/io/archivesunleashed/CountableRDDTest.scala
  23. +4 −4 src/test/scala/io/archivesunleashed/app/ExtractGraphXTest.scala
  24. +2 −2 src/test/scala/io/archivesunleashed/df/ExtarctHyperlinksTest.scala
  25. +2 −2 src/test/scala/io/archivesunleashed/df/SaveMediaBytesTest.scala
  26. +1 −1 src/test/scala/io/archivesunleashed/df/SimpleDfTest.scala
  27. +6 −6 src/test/scala/io/archivesunleashed/matchbox/ExtractDomainTest.scala
  28. +5 −5 src/test/scala/io/archivesunleashed/matchbox/ExtractImageLinksTest.scala
  29. +5 −5 src/test/scala/io/archivesunleashed/matchbox/ExtractLinksTest.scala
  30. +3 −3 src/test/scala/io/archivesunleashed/matchbox/RemoveHTMLTest.scala
  31. +4 −4 src/test/scala/io/archivesunleashed/matchbox/RemoveHTTPHeaderTest.scala
  32. +2 −2 src/test/scala/io/archivesunleashed/matchbox/StringUtilsTest.scala
@@ -21,7 +21,7 @@ import java.io.ByteArrayInputStream
import java.security.MessageDigest

import io.archivesunleashed.data.{ArcRecordUtils, WarcRecordUtils, ArchiveRecordWritable}
import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDate, ExtractDomain, RemoveHTTPHeader}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, ExtractDate, ExtractDomainRDD, RemoveHTTPHeaderRDD}
import org.apache.spark.SerializableWritable
import org.archive.io.arc.ARCRecord
import org.archive.io.warc.WARCRecord
@@ -149,14 +149,14 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends
}

val getDomain: String = {
ExtractDomain(getUrl)
ExtractDomainRDD(getUrl)
}

val getBinaryBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/")) {
getContentBytes.slice(
getContentString.indexOf(RemoveHTTPHeader.headerEnd)
+ RemoveHTTPHeader.headerEnd.length, getContentBytes.length)
getContentString.indexOf(RemoveHTTPHeaderRDD.headerEnd)
+ RemoveHTTPHeaderRDD.headerEnd.length, getContentBytes.length)
} else {
getContentBytes
}
@@ -32,7 +32,7 @@ object DomainFrequencyExtractor {
def apply(records: RDD[ArchiveRecord]): RDD[(String, Int)] = {
records
.keepValidPages()
.map(r => matchbox.ExtractDomain(r.getUrl))
.map(r => matchbox.ExtractDomainRDD(r.getUrl))
.countItems()
}

@@ -47,7 +47,7 @@ object DomainFrequencyExtractor {
import spark.implicits._
// scalastyle:on

d.select(df.ExtractDomain($"url").as("domain"))
d.select(df.ExtractDomainDF($"url").as("domain"))
.groupBy("domain").count().orderBy(desc("count"))
}
}
@@ -17,7 +17,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.{ArchiveRecord, DataFrameLoader, CountableRDD}
import io.archivesunleashed.matchbox.{ExtractDomain, ExtractLinks}
import io.archivesunleashed.matchbox.{ExtractDomainRDD, ExtractLinksRDD}
import io.archivesunleashed.df
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.desc
@@ -33,11 +33,11 @@ object DomainGraphExtractor {
def apply(records: RDD[ArchiveRecord]): RDD[((String, String, String), Int)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, ExtractLinks(r.getUrl, r.getContentString)))
.map(r => (r.getCrawlDate, ExtractLinksRDD(r.getUrl, r.getContentString)))
.flatMap(r => r._2.map(f =>
(r._1,
ExtractDomain(f._1).replaceAll("^\\\\s*www\\\\.", ""),
ExtractDomain(f._2).replaceAll("^\\\\s*www\\\\.", ""))
ExtractDomainRDD(f._1).replaceAll("^\\\\s*www\\\\.", ""),
ExtractDomainRDD(f._2).replaceAll("^\\\\s*www\\\\.", ""))
))
.filter(r => r._2 != "" && r._3 != "")
.countItems()
@@ -55,8 +55,8 @@ object DomainGraphExtractor {
import spark.implicits._
// scalastyle:on
d.select($"crawl_date",
df.RemovePrefixWWW(df.ExtractDomain($"src")).as("src_domain"),
df.RemovePrefixWWW(df.ExtractDomain($"dest")).as("dest_domain"))
df.RemovePrefixWWWDF(df.ExtractDomainDF($"src")).as("src_domain"),
df.RemovePrefixWWWDF(df.ExtractDomainDF($"dest")).as("dest_domain"))
.filter("src_domain != ''").filter("dest_domain != ''")
.groupBy($"crawl_date", $"src_domain", $"dest_domain").count().orderBy(desc("count"))
}
@@ -16,7 +16,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.RecordLoader
import io.archivesunleashed.matchbox.{ComputeMD5, NERClassifier, RemoveHTML}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, NERClassifier, RemoveHTMLRDD}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD

@@ -43,7 +43,7 @@ object ExtractEntities {
.keepValidPages()
.map(r => (("\"timestamp\":\"" + r.getCrawlDate + "\""),
("\"url\":\"" + r.getUrl + "\""),
(RemoveHTML(r.getContentString)),
(RemoveHTMLRDD(r.getContentString)),
("\"digest\":\"" + r.getPayloadDigest + "\"")))
extractAndOutput(iNerClassifierFile, rdd, outputFile)
}
@@ -16,7 +16,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.ArchiveRecord
import io.archivesunleashed.matchbox.{ComputeImageSize, ComputeMD5}
import io.archivesunleashed.matchbox.{ComputeImageSize, ComputeMD5RDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.{RangePartitioner, SparkContext}

@@ -39,7 +39,7 @@ object ExtractPopularImages {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getBinaryBytes), 1))
.map(img => (ComputeMD5(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2)))
.map(img => (ComputeMD5RDD(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2)))
.filter(img => img._2._1._1 >= minWidth && img._2._1._2 >= minHeight)
.reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3))
.map(x=> (x._2._3, x._2._2))
@@ -17,7 +17,7 @@
package io.archivesunleashed.app

import io.archivesunleashed.{ArchiveRecord, df}
import io.archivesunleashed.matchbox.RemoveHTML
import io.archivesunleashed.matchbox.RemoveHTMLRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

@@ -31,7 +31,7 @@ object PlainTextExtractor {
def apply(records: RDD[ArchiveRecord]): RDD[(String, String, String, String)] = {
records
.keepValidPages()
.map(r => (r.getCrawlDate, r.getDomain, r.getUrl, RemoveHTML(r.getContentString)))
.map(r => (r.getCrawlDate, r.getDomain, r.getUrl, RemoveHTMLRDD(r.getContentString)))
}

/** Extract plain text from web archive using Data Frame and Spark SQL.
@@ -44,7 +44,7 @@ object PlainTextExtractor {
// scalastyle:off
import spark.implicits._
// scalastyle:on
d.select($"crawl_date", df.ExtractDomain($"url").as("domain"),
$"url", df.RemoveHTML($"content").as("Text"))
d.select($"crawl_date", df.ExtractDomainDF($"url").as("domain"),
$"url", df.RemoveHTMLDF($"content").as("Text"))
}
}
@@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.archivesunleashed.app
import io.archivesunleashed.matchbox.{ComputeMD5, WWWLink}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, WWWLink}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.rdd.RDD
@@ -59,18 +59,18 @@ object WriteGEXF {
val endAttribute = "\" />\n"
val nodeStart = "<node id=\""
val labelStart = "\" label=\""
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5(r._1._3.getBytes) + "\" weight=\"" + r._2 +
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5RDD(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5RDD(r._1._3.getBytes) + "\" weight=\"" + r._2 +
"\" type=\"directed\">\n" +
"<attvalues>\n" +
"<attvalue for=\"0\" value=\"" + r._1._1 + endAttribute +
"</attvalues>\n" +
"</edge>\n").collect
val nodes = rdd.flatMap(r => List(nodeStart +
ComputeMD5(r._1._2.getBytes) + labelStart +
ComputeMD5RDD(r._1._2.getBytes) + labelStart +
r._1._2.escapeInvalidXML() + endAttribute,
nodeStart +
ComputeMD5(r._1._3.getBytes) + labelStart +
ComputeMD5RDD(r._1._3.getBytes) + labelStart +
r._1._3.escapeInvalidXML() + endAttribute)).distinct.collect
outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<gexf xmlns=\"http://www.gexf.net/1.3draft\"\n" +
@@ -119,13 +119,13 @@ object WriteGEXF {
"<nodes>\n")
vertices foreach { v =>
outFile.write("<node id=\"" +
ComputeMD5(v.getBytes) + "\" label=\"" +
ComputeMD5RDD(v.getBytes) + "\" label=\"" +
v.escapeInvalidXML() + endAttribute)
}
outFile.write("</nodes>\n<edges>\n")
data foreach { e =>
outFile.write("<edge source=\"" + ComputeMD5(e.get(1).asInstanceOf[String].getBytes) + "\" target=\"" +
ComputeMD5(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
outFile.write("<edge source=\"" + ComputeMD5RDD(e.get(1).asInstanceOf[String].getBytes) + "\" target=\"" +
ComputeMD5RDD(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
"\" type=\"directed\">\n" +
"<attvalues>\n" +
"<attvalue for=\"0\" value=\"" + e.get(0) + endAttribute +
@@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.archivesunleashed.app
import io.archivesunleashed.matchbox.{ComputeMD5, WWWLink}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, WWWLink}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.rdd.RDD
@@ -173,13 +173,13 @@ object WriteGraph {
"<nodes>\n")
vertices.foreach { v =>
outFile.write(nodeStart +
ComputeMD5(v.getBytes) + "\" label=\"" +
ComputeMD5RDD(v.getBytes) + "\" label=\"" +
v.escapeInvalidXML() + endAttribute)
}
outFile.write("</nodes>\n<edges>\n")
data.foreach { e =>
outFile.write(edgeStart + ComputeMD5(e.get(1).asInstanceOf[String].getBytes) + targetChunk +
ComputeMD5(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
outFile.write(edgeStart + ComputeMD5RDD(e.get(1).asInstanceOf[String].getBytes) + targetChunk +
ComputeMD5RDD(e.get(2).asInstanceOf[String].getBytes) + "\" weight=\"" + e.get(3) +
"\" type=\"directed\">\n" +
"<attvalues>\n" +
"<attvalue for=\"0\" value=\"" + e.get(0) + endAttribute +
@@ -14,7 +14,7 @@
* limitations under the License.
*/
package io.archivesunleashed.app
import io.archivesunleashed.matchbox.{ComputeMD5, WWWLink}
import io.archivesunleashed.matchbox.{ComputeMD5RDD, WWWLink}
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import org.apache.spark.rdd.RDD
@@ -45,14 +45,14 @@ object WriteGraphML {
*/
def makeFile (rdd: RDD[((String, String, String), Int)], graphmlPath: String): Boolean = {
val outFile = Files.newBufferedWriter(Paths.get(graphmlPath), StandardCharsets.UTF_8)
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5(r._1._3.getBytes) + "\" type=\"directed\">\n" +
val edges = rdd.map(r => "<edge source=\"" + ComputeMD5RDD(r._1._2.getBytes) + "\" target=\"" +
ComputeMD5RDD(r._1._3.getBytes) + "\" type=\"directed\">\n" +
"<data key=\"weight\">" + r._2 + "</data>\n" +
"<data key=\"crawlDate\">" + r._1._1 + "</data>\n" +
"</edge>\n").collect
val nodes = rdd.flatMap(r => List("<node id=\"" + ComputeMD5(r._1._2.getBytes) + "\">\n" +
val nodes = rdd.flatMap(r => List("<node id=\"" + ComputeMD5RDD(r._1._2.getBytes) + "\">\n" +
"<data key=\"label\">" + r._1._2.escapeInvalidXML() + "</data>\n</node>\n",
"<node id=\"" + ComputeMD5(r._1._3.getBytes) + "\">\n" +
"<node id=\"" + ComputeMD5RDD(r._1._3.getBytes) + "\">\n" +
"<data key=\"label\">" + r._1._3.escapeInvalidXML() + "</data>\n</node>\n")).distinct.collect
outFile.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
"<graphml xmlns=\"http://graphml.graphdrawing.org/xmlns\"\n" +
@@ -16,7 +16,7 @@
package io.archivesunleashed

import org.apache.commons.io.IOUtils
import io.archivesunleashed.matchbox.{ComputeMD5}
import io.archivesunleashed.matchbox.{ComputeMD5RDD}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
import java.io.ByteArrayInputStream
@@ -31,23 +31,23 @@ package object df {
// by wrapping matchbox UDFs or by reimplementing them. The following examples illustrate. Obviously, we'll
// need to populate more UDFs over time, but this is a start.

val ExtractDomain = udf(io.archivesunleashed.matchbox.ExtractDomain.apply(_: String, ""))
val ExtractDomainDF = udf(io.archivesunleashed.matchbox.ExtractDomainRDD.apply(_: String, ""))

val RemoveHTTPHeader = udf(io.archivesunleashed.matchbox.RemoveHTTPHeader.apply(_: String))
val RemoveHTTPHeaderDF = udf(io.archivesunleashed.matchbox.RemoveHTTPHeaderRDD.apply(_: String))

val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", ""))
val RemovePrefixWWWDF = udf[String, String](_.replaceAll("^\\s*www\\.", ""))

var RemoveHTML = udf(io.archivesunleashed.matchbox.RemoveHTML.apply(_: String))
var RemoveHTMLDF = udf(io.archivesunleashed.matchbox.RemoveHTMLRDD.apply(_: String))

val ExtractLinks = udf(io.archivesunleashed.matchbox.ExtractLinks.apply(_: String, _: String))
val ExtractLinksDF = udf(io.archivesunleashed.matchbox.ExtractLinksRDD.apply(_: String, _: String))

val GetExtensionMime = udf(io.archivesunleashed.matchbox.GetExtensionMime.apply(_: String, _: String))
val GetExtensionMimeDF = udf(io.archivesunleashed.matchbox.GetExtensionMimeRDD.apply(_: String, _: String))

val ExtractImageLinks = udf(io.archivesunleashed.matchbox.ExtractImageLinks.apply(_: String, _: String))
val ExtractImageLinksDF = udf(io.archivesunleashed.matchbox.ExtractImageLinksRDD.apply(_: String, _: String))

val ComputeMD5DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeMD5.apply(content.getBytes()))
val ComputeSHA1DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeSHA1.apply(content.getBytes()))
val ComputeMD5DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeMD5RDD.apply(content.getBytes()))

val ComputeSHA1DF = udf((content: String) => io.archivesunleashed.matchbox.ComputeSHA1RDD.apply(content.getBytes()))

/**
* Given a dataframe, serializes binary object and saves to disk
@@ -70,7 +70,7 @@ package object df {
val in = new ByteArrayInputStream(bytes);

val extension: String = row.getAs(extensionColumnName);
val suffix = ComputeMD5(bytes)
val suffix = ComputeMD5RDD(bytes)
val file = new FileOutputStream(fileName + "-" + suffix + "." + extension.toLowerCase)
IOUtils.copy(in, file)
file.close()
@@ -19,7 +19,7 @@ import java.security.MessageDigest

/** Compute MD5 checksum. */
// scalastyle:off object.name
object ComputeMD5 {
object ComputeMD5RDD {
// scalastyle:on object.name
/** Computes the MD5 checksum of a byte array (eg. an image).
*
@@ -19,7 +19,7 @@ import java.security.MessageDigest

/** Compute SHA1 checksum. */
// scalastyle:off object.name
object ComputeSHA1 {
object ComputeSHA1RDD {
// scalastyle:on object.name
/** Computes the MD5 checksum of a byte array (eg. an image).
*
@@ -29,7 +29,7 @@ object ExtractBoilerpipeText {
*/

def apply(input: String): String = {
removeBoilerplate(RemoveHTTPHeader(input))
removeBoilerplate(RemoveHTTPHeaderRDD(input))
}

private def removeBoilerplate(input: String): String = {
@@ -18,7 +18,7 @@ package io.archivesunleashed.matchbox
import java.net.URL

/** Extracts the host domain name from a full url string. */
object ExtractDomain {
object ExtractDomainRDD {
/** Extract source domains from a full url string.
*
* @param url a url as a string
@@ -21,7 +21,7 @@ import org.jsoup.select.Elements
import scala.collection.mutable

/** Extracts image links from a webpage given the HTML content (using Jsoup). */
object ExtractImageLinks {
object ExtractImageLinksRDD {

/** Extracts image links.
*
@@ -22,7 +22,7 @@ import scala.collection.mutable
import scala.Option

/** Extracts links from a webpage given the HTML content (using Jsoup). */
object ExtractLinks {
object ExtractLinksRDD {

/** Extract links.
*
@@ -19,7 +19,7 @@ import org.apache.commons.io.FilenameUtils

/** Get file extension using MIME type, then URL extension. */
// scalastyle:off object.name
object GetExtensionMime {
object GetExtensionMimeRDD {
// scalastyle:on object.name

/** Returns the extension of a file specified by URL
@@ -19,7 +19,7 @@ import java.io.IOException
import org.jsoup.Jsoup

/** Removes HTML markup with JSoup. */
object RemoveHTML {
object RemoveHTMLRDD {

/** Removes HTML markup.
*
@@ -28,7 +28,7 @@ object RemoveHTML {
*/
def apply(content: String): String = {
// First remove the HTTP header.
val maybeContent: Option[String] = Option(RemoveHTTPHeader(content))
val maybeContent: Option[String] = Option(RemoveHTTPHeaderRDD(content))
maybeContent match {
case Some(content) =>
Jsoup.parse(content).text().replaceAll("[\\r\\n]+", " ")
@@ -16,7 +16,7 @@
package io.archivesunleashed.matchbox

/** Remove HTTP headers. */
object RemoveHTTPHeader {
object RemoveHTTPHeaderRDD {
val headerEnd = "\r\n\r\n"

/** Remove HTTP headers.

0 comments on commit b98ba4b

Please sign in to comment.
You can’t perform that action at this time.