Skip to content
Permalink
Browse files

Add PDF binary extraction. (#340)

Introduces the new extractPDFDetailsDF() method and brings in changes to make our use of Tika's MIME type detection more efficient, as well as POM updates to use a shaded version of tika-parsers in order to eliminate a dependency version conflict that has long been troublesome.

- Updates getImageBytes to getBinaryBytes
- Refactor SaveImage class to more general SaveBytes, and saveToDisk to saveImageToDisk
- Only instantiate Tika when the DetectMimeTypeTika singleton object is first referenced. See https://git.io/fj7g0.
- Use TikaInputStream to enabler container-aware detection. Until now we were only using the default Mime Magic detection. See https://tika.apache.org/1.22/detection.html#Container_Aware_Detection.
- Added generic saveToDisk method to save a bytes column of a DataFrame to files
- Updates tests
- Resolves #302
- Further addresses #308
- Includes work by @ruebot, see #340 for all commits before squash
  • Loading branch information...
jrwiebe authored and ruebot committed Aug 12, 2019
1 parent b2d7394 commit 73981a79bb842aacc7e77621fe2099edf550db52
14 pom.xml
@@ -118,7 +118,7 @@
<excludes>
<exclude>org.apache.hadoop:hadoop-core</exclude>
<exclude>org.apache.hadoop:hadoop-common</exclude>
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
<exclude>org.apache.hadoop:hadoop-mapreduce-client-core</exclude>
<exclude>org.apache.spark:*</exclude>
</excludes>
</artifactSet>
@@ -504,6 +504,10 @@
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@@ -539,6 +543,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
@@ -644,9 +652,13 @@
<version>${tika.version}</version>
</dependency>
<dependency>
<!-- see issue #302
<groupId>org.apache.tika</groupId>
-->
<groupId>com.github.archivesunleashed.tika</groupId>
<artifactId>tika-parsers</artifactId>
<version>${tika.version}</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
@@ -56,7 +56,7 @@ trait ArchiveRecord extends Serializable {
def getDomain: String

/** Returns a raw array of bytes for an image. */
def getImageBytes: Array[Byte]
def getBinaryBytes: Array[Byte]

/** Returns the http status of the crawl. */
def getHttpStatus: String
@@ -150,7 +150,7 @@ class ArchiveRecordImpl(r: SerializableWritable[ArchiveRecordWritable]) extends
ExtractDomain(getUrl)
}

val getImageBytes: Array[Byte] = {
val getBinaryBytes: Array[Byte] = {
if (getContentString.startsWith("HTTP/")) {
getContentBytes.slice(
getContentString.indexOf(RemoveHttpHeader.headerEnd)
@@ -39,7 +39,7 @@ object ExtractPopularImages {
def apply(records: RDD[ArchiveRecord], limit: Int, sc:SparkContext, minWidth: Int = MIN_WIDTH, minHeight: Int = MIN_HEIGHT): RDD[String] = {
val res = records
.keepImages()
.map(r => ((r.getUrl, r.getImageBytes), 1))
.map(r => ((r.getUrl, r.getBinaryBytes), 1))
.map(img => (ComputeMD5(img._1._2), (ComputeImageSize(img._1._2), img._1._1, img._2)))
.filter(img => img._2._1._1 >= minWidth && img._2._1._2 >= minHeight)
.reduceByKey((image1, image2) => (image1._1, image1._2, image1._3 + image2._3))
@@ -17,11 +17,13 @@

package io.archivesunleashed

import org.apache.commons.io.IOUtils
import io.archivesunleashed.matchbox.{ComputeMD5, ExtractDomain, RemoveHTML}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.DataFrame
import java.io.ByteArrayInputStream
import java.io.File
import java.io.FileOutputStream
import javax.imageio.{ImageIO, ImageReader}
import java.util.Base64

@@ -38,17 +40,19 @@ package object df {
val RemovePrefixWWW = udf[String, String](_.replaceAll("^\\s*www\\.", ""))

var RemoveHTML = udf(io.archivesunleashed.matchbox.RemoveHTML.apply(_:String))

/**
* Given a dataframe, serializes the images and saves to disk
* Given a dataframe, serializes binary object and saves to disk
* @param df the input dataframe
*/
implicit class SaveImage(df: DataFrame) {
*/
implicit class SaveBytes(df: DataFrame) {

/**
* @param bytesColumnName the name of the column containing the image bytes
* @param fileName the base name of the file to save the images to (without extension)
* e.g. fileName = "foo" => images are saved as foo-[md5 hash].jpg
*/
def saveToDisk(bytesColumnName: String, fileName: String): Unit = {
* @param fileName the name of the file to save the images to (without extension)
* e.g. fileName = "foo" => images are saved as "foo-[MD5 hash].jpg"
*/
def saveImageToDisk(bytesColumnName: String, fileName: String): Unit = {
df.select(bytesColumnName).foreach(row => {
try {
// assumes the bytes are base64 encoded already as returned by ExtractImageDetails
@@ -76,5 +80,29 @@ package object df {
}
})
}

/**
* @param bytesColumnName the name of the column containing the bytes
* @param fileName the name of the file to save the binary file to (without extension)
* @param extension the extension of saved files
* e.g. fileName = "foo", extension = "pdf" => files are saved as "foo-[MD5 hash].pdf"
*/
def saveToDisk(bytesColumnName: String, fileName: String, extension: String): Unit = {
df.select(bytesColumnName).foreach(row => {
try {
// assumes the bytes are base64 encoded
val encodedBytes: String = row.getAs(bytesColumnName);
val bytes = Base64.getDecoder.decode(encodedBytes);
val in = new ByteArrayInputStream(bytes);

val suffix = ComputeMD5(bytes)
val file = new FileOutputStream(fileName + "-" + suffix + "." + extension)
IOUtils.copy(in, file)
} catch {
case e: Throwable => {
}
}
})
}
}
}
@@ -19,10 +19,14 @@ package io.archivesunleashed.matchbox
import java.io.ByteArrayInputStream
import org.apache.tika.Tika
import org.apache.tika.detect.DefaultDetector
import org.apache.tika.io.TikaInputStream
import org.apache.tika.parser.AutoDetectParser

/** Detect MIME type using Apache Tika. */
object DetectMimeTypeTika {
val detector = new DefaultDetector()
val parser = new AutoDetectParser(detector)
val tika = new Tika(detector, parser)

/** Detect MIME type from an input string.
*
@@ -34,9 +38,8 @@ object DetectMimeTypeTika {
"N/A"
} else {
val is = new ByteArrayInputStream(content.getBytes)
val detector = new DefaultDetector()
val parser = new AutoDetectParser(detector)
val mimetype = new Tika(detector, parser).detect(is)
val tis = TikaInputStream.get(is)
val mimetype = tika.detect(tis)
mimetype
}
}
@@ -17,14 +17,17 @@

package io

import java.security.MessageDigest
import java.util.Base64

import io.archivesunleashed.data.{ArchiveRecordInputFormat, ArchiveRecordWritable}
import ArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.matchbox.{ComputeMD5, DetectLanguage, ExtractDate, ExtractDomain, ExtractImageDetails, ExtractImageLinks, ExtractLinks, RemoveHTML}
import io.archivesunleashed.matchbox.ImageDetails
import io.archivesunleashed.matchbox.{ComputeMD5, DetectLanguage, DetectMimeTypeTika, ExtractDate, ExtractDomain, ExtractImageDetails, ExtractImageLinks, ExtractLinks, ImageDetails, RemoveHTML}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import java.net.URI
import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent.DateComponent
import java.net.URI
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.hadoop.io.LongWritable
@@ -83,7 +86,7 @@ package object archivesunleashed {
* To load such an RDD, please see [[RecordLoader]].
*/
implicit class WARecordRDD(rdd: RDD[ArchiveRecord]) extends java.io.Serializable {
/** Removes all non-html-based data (images, executables etc.) from html text. */
/** Removes all non-html-based data (images, executables, etc.) from html text. */
def keepValidPages(): RDD[ArchiveRecord] = {
rdd.filter(r =>
r.getCrawlDate != null
@@ -124,7 +127,7 @@ package object archivesunleashed {
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/* Extracts all the images from a source page */
/* Extracts all the images links from a source page. */
def extractImageLinksDF(): DataFrame = {
val records = rdd
.keepValidPages()
@@ -143,12 +146,12 @@ package object archivesunleashed {
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/* Extract image bytes and metadata */
/* Extract image bytes and image metadata. */
def extractImageDetailsDF(): DataFrame = {
val records = rdd
.keepImages()
.map(r => {
val image = ExtractImageDetails(r.getUrl, r.getMimeType, r.getImageBytes)
val image = ExtractImageDetails(r.getUrl, r.getMimeType, r.getBinaryBytes)
(r.getUrl, r.getMimeType, image.width, image.height, image.hash, image.body)
})
.map(t => Row(t._1, t._2, t._3, t._4, t._5, t._6))
@@ -165,6 +168,28 @@ package object archivesunleashed {
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/* Extract PDF bytes and PDF metadata. */
def extractPDFDetailsDF(): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika(r.getContentString) == "application/pdf"))
.map(r => {
val bytes = r.getBinaryBytes
val hash = new String(Hex.encodeHex(MessageDigest.getInstance("MD5").digest(bytes)))
val encodedBytes = Base64.getEncoder.encodeToString(bytes)
(r.getUrl, r.getMimeType, hash, encodedBytes)
})
.map(t => Row(t._1, t._2, t._3, t._4))

val schema = new StructType()
.add(StructField("url", StringType, true))
.add(StructField("mime_type", StringType, true))
.add(StructField("md5", StringType, true))
.add(StructField("bytes", StringType, true))

val sqlContext = SparkSession.builder();
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/** Removes all data except images. */
def keepImages(): RDD[ArchiveRecord] = {
rdd.filter(r =>
@@ -194,7 +219,7 @@ package object archivesunleashed {
rdd.filter(r => dates.contains(ExtractDate(r.getCrawlDate, component)))
}

/** Removes all data but selected exact URLs
/** Removes all data but selected exact URLs.
*
* @param urls a Set of URLs to keep
*/
@@ -38,7 +38,7 @@ case class TestImageDetails(url: String, mime_type: String, width: String,
height: String, md5: String, bytes: String)

@RunWith(classOf[JUnitRunner])
class SaveImageTest extends FunSuite with BeforeAndAfter {
class SaveBytesTest extends FunSuite with BeforeAndAfter {
private val arcPath = Resources.getResource("arc/example.arc.gz").getPath
private val master = "local[4]"
private val appName = "example-df"
@@ -58,7 +58,7 @@ class SaveImageTest extends FunSuite with BeforeAndAfter {

val extracted = df.select(testString)
.orderBy(desc(testString)).limit(1)
extracted.saveToDisk(testString, "/tmp/foo")
extracted.saveImageToDisk(testString, "/tmp/foo")

val encodedBytes: String = extracted.take(1)(0).getAs(testString)
val bytes = Base64.getDecoder.decode(encodedBytes);
@@ -97,7 +97,7 @@ class SaveImageTest extends FunSuite with BeforeAndAfter {
// scalastyle:on
val df = Seq(dummyImg).toDF

df.saveToDisk(testString, "/tmp/bar")
df.saveImageToDisk(testString, "/tmp/bar")

// Check that no file was written.
assert(new File("/tmp").listFiles.filter(_.isFile).toList

0 comments on commit 73981a7

Please sign in to comment.
You can’t perform that action at this time.