Skip to content
Permalink
Browse files

Add audio & video binary extraction (#341)

- Add Audio & Video binary extraction.
- Add filename, and extenstion column to audio, pdf, and video DF
- Pass binary bytes instread of string to DetectMimeTypeTika in DF (s/getContentString/getBinaryBytes)
- Updates saveToDisk to use file extension from DF column
- Adds tests for Audio, PDF, and Video DF extraction
- Add test fixtures for Audio, PDF, and Video DF extraction
- Rename SaveBytesTest to SaveImageBytes test
- Eliminate bytes->string->bytes conversion that was causing data loss in DetectMimeTypeTika
- Update tika-parsers dep from JitPack
- Remove tweet cruft
- Resolves #306
- Resolves #307
- Includes work by @jrwiebe, see #341 for all commits before squash
  • Loading branch information...
ruebot authored and ianmilligan1 committed Aug 13, 2019
1 parent 73981a7 commit 54c0c3ee17af23b1083d71fab5a18e279c0d7bb9
@@ -658,7 +658,7 @@
<groupId>com.github.archivesunleashed.tika</groupId>
<artifactId>tika-parsers</artifactId>
<version>${tika.version}</version>
<classifier>shaded</classifier>
<classifier>shady</classifier>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
@@ -55,7 +55,7 @@ package object df {
def saveImageToDisk(bytesColumnName: String, fileName: String): Unit = {
df.select(bytesColumnName).foreach(row => {
try {
// assumes the bytes are base64 encoded already as returned by ExtractImageDetails
// Assumes the bytes are base64 encoded already as returned by ExtractImageDetails.
val encodedBytes: String = row.getAs(bytesColumnName);
val bytes = Base64.getDecoder.decode(encodedBytes);
val in = new ByteArrayInputStream(bytes);
@@ -84,19 +84,20 @@ package object df {
/**
* @param bytesColumnName the name of the column containing the bytes
* @param fileName the name of the file to save the binary file to (without extension)
* @param extension the extension of saved files
* e.g. fileName = "foo", extension = "pdf" => files are saved as "foo-[MD5 hash].pdf"
* @param extensionColumnName the name of the column containin the extension
* e.g. fileName = "foo" => files are saved as "foo-[MD5 hash].pdf"
*/
def saveToDisk(bytesColumnName: String, fileName: String, extension: String): Unit = {
df.select(bytesColumnName).foreach(row => {
def saveToDisk(bytesColumnName: String, fileName: String, extensionColumnName: String): Unit = {
df.select(bytesColumnName, extensionColumnName).foreach(row => {
try {
// assumes the bytes are base64 encoded
// Assumes the bytes are base64 encoded.
val encodedBytes: String = row.getAs(bytesColumnName);
val bytes = Base64.getDecoder.decode(encodedBytes);
val in = new ByteArrayInputStream(bytes);

val extension: String = row.getAs(extensionColumnName);
val suffix = ComputeMD5(bytes)
val file = new FileOutputStream(fileName + "-" + suffix + "." + extension)
val file = new FileOutputStream(fileName + "-" + suffix + "." + extension.toLowerCase)
IOUtils.copy(in, file)
} catch {
case e: Throwable => {
@@ -16,7 +16,6 @@
*/
package io.archivesunleashed.matchbox

import java.io.ByteArrayInputStream
import org.apache.tika.Tika
import org.apache.tika.detect.DefaultDetector
import org.apache.tika.io.TikaInputStream
@@ -30,15 +29,14 @@ object DetectMimeTypeTika {

/** Detect MIME type from an input string.
*
* @param content a string of content for which to detect the MimeType
* @param content a byte array of content for which to detect the MimeType
* @return MIME type (e.g. "text/html" or "application/xml") or "N/A".
*/
def apply(content: String): String = {
if (content.isEmpty) {
def apply(content: Array[Byte]): String = {
if (content.size == 0) {
"N/A"
} else {
val is = new ByteArrayInputStream(content.getBytes)
val tis = TikaInputStream.get(is)
val tis = TikaInputStream.get(content)
val mimetype = tika.detect(tis)
mimetype
}
@@ -25,9 +25,11 @@ import ArchiveRecordWritable.ArchiveFormat
import io.archivesunleashed.matchbox.{ComputeMD5, DetectLanguage, DetectMimeTypeTika, ExtractDate, ExtractDomain, ExtractImageDetails, ExtractImageLinks, ExtractLinks, ImageDetails, RemoveHTML}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent
import org.apache.commons.codec.binary.Hex
import org.apache.commons.io.FilenameUtils
import org.apache.hadoop.fs.{FileSystem, Path}
import io.archivesunleashed.matchbox.ExtractDate.DateComponent.DateComponent
import java.net.URI
import java.net.URL
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.hadoop.io.LongWritable
@@ -171,17 +173,100 @@ package object archivesunleashed {
/* Extract PDF bytes and PDF metadata. */
def extractPDFDetailsDF(): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika(r.getContentString) == "application/pdf"))
.filter(r => (DetectMimeTypeTika(r.getBinaryBytes) == "application/pdf"))
.map(r => {
val bytes = r.getBinaryBytes
val hash = new String(Hex.encodeHex(MessageDigest.getInstance("MD5").digest(bytes)))
val encodedBytes = Base64.getEncoder.encodeToString(bytes)
(r.getUrl, r.getMimeType, hash, encodedBytes)
val url = new URL(r.getUrl)
val filename = FilenameUtils.getName(url.getPath())
val extension = FilenameUtils.getExtension(url.getPath())
(r.getUrl, filename, extension, r.getMimeType, hash, encodedBytes)
})
.map(t => Row(t._1, t._2, t._3, t._4))
.map(t => Row(t._1, t._2, t._3, t._4, t._5, t._6))

val schema = new StructType()
.add(StructField("url", StringType, true))
.add(StructField("filename", StringType, true))
.add(StructField("extension", StringType, true))
.add(StructField("mime_type", StringType, true))
.add(StructField("md5", StringType, true))
.add(StructField("bytes", StringType, true))

val sqlContext = SparkSession.builder();
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/* Extract audio bytes and audio metadata. */
def extractAudioDetailsDF(): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika(r.getBinaryBytes).startsWith("audio/"))
|| r.getUrl.endsWith("aac")
|| r.getUrl.endsWith("mid")
|| r.getUrl.endsWith("midi")
|| r.getUrl.endsWith("mp3")
|| r.getUrl.endsWith("wav")
|| r.getUrl.endsWith("oga")
|| r.getUrl.endsWith("ogg")
|| r.getUrl.endsWith("weba")
|| r.getUrl.endsWith("ra")
|| r.getUrl.endsWith("rm")
|| r.getUrl.endsWith("3gp")
|| r.getUrl.endsWith("3g2"))
.map(r => {
val bytes = r.getBinaryBytes
val hash = new String(Hex.encodeHex(MessageDigest.getInstance("MD5").digest(bytes)))
val encodedBytes = Base64.getEncoder.encodeToString(bytes)
val url = new URL(r.getUrl)
val filename = FilenameUtils.getName(url.getPath())
val extension = FilenameUtils.getExtension(url.getPath())
(r.getUrl, filename, extension, r.getMimeType, hash, encodedBytes)
})
.map(t => Row(t._1, t._2, t._3, t._4, t._5, t._6))

val schema = new StructType()
.add(StructField("url", StringType, true))
.add(StructField("filename", StringType, true))
.add(StructField("extension", StringType, true))
.add(StructField("mime_type", StringType, true))
.add(StructField("md5", StringType, true))
.add(StructField("bytes", StringType, true))

val sqlContext = SparkSession.builder();
sqlContext.getOrCreate().createDataFrame(records, schema)
}

/* Extract video bytes and video metadata. */
def extractVideoDetailsDF(): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika(r.getBinaryBytes).startsWith("video/"))
|| r.getUrl.endsWith("flv")
|| r.getUrl.endsWith("mp4")
|| r.getUrl.endsWith("mov")
|| r.getUrl.endsWith("avi")
|| r.getUrl.endsWith("wmv")
|| r.getUrl.endsWith("rv")
|| r.getUrl.endsWith("mpeg")
|| r.getUrl.endsWith("ogv")
|| r.getUrl.endsWith("webm")
|| r.getUrl.endsWith("ts")
|| r.getUrl.endsWith("3gp")
|| r.getUrl.endsWith("3g2"))
.map(r => {
val bytes = r.getBinaryBytes
val hash = new String(Hex.encodeHex(MessageDigest.getInstance("MD5").digest(bytes)))
val encodedBytes = Base64.getEncoder.encodeToString(bytes)
val url = new URL(r.getUrl)
val filename = FilenameUtils.getName(url.getPath())
val extension = FilenameUtils.getExtension(url.getPath())
(r.getUrl, filename, extension, r.getMimeType, hash, encodedBytes)
})
.map(t => Row(t._1, t._2, t._3, t._4, t._5, t._6))

val schema = new StructType()
.add(StructField("url", StringType, true))
.add(StructField("filename", StringType, true))
.add(StructField("extension", StringType, true))
.add(StructField("mime_type", StringType, true))
.add(StructField("md5", StringType, true))
.add(StructField("bytes", StringType, true))

This file was deleted.

Binary file not shown.
Binary file not shown.
@@ -105,7 +105,7 @@ class ArcTest extends FunSuite with BeforeAndAfter {
test("detect mime type tika") {
val mimeTypeCounts = RecordLoader.loadArchives(arcPath, sc)
.map(r => RemoveHTML(r.getContentString))
.groupBy(content => DetectMimeTypeTika(content))
.groupBy(content => DetectMimeTypeTika(content.getBytes))
.map(f => {
(f._1, f._2.size)
}).collect
@@ -0,0 +1,64 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source toolkit for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.archivesunleashed

import com.google.common.io.Resources
import org.apache.spark.sql.SparkSession
// scalastyle:off underscore.import
import io.archivesunleashed.df._
import org.apache.spark.sql.functions._
// scalastyle:on underscore.import
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class ExtractAudioDetailsTest extends FunSuite with BeforeAndAfter {
private val warcPath = Resources.getResource("warc/example.media.warc.gz").getPath
private val master = "local[4]"
private val appName = "example-df"
private var sc: SparkContext = _

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
sc = new SparkContext(conf)
}

test("Audio DF extraction") {
val df = RecordLoader.loadArchives(warcPath, sc)
.extractAudioDetailsDF()

val extracted = df.select("url", "filename", "extension", "mime_type", "md5")
.orderBy(desc("md5")).head(1).toList
assert(extracted.size == 1)
assert("https://ruebot.net/files/feniz.mp3" == extracted(0)(0))
assert("feniz.mp3" == extracted(0)(1))
assert("mp3" == extracted(0)(2))
assert("audio/mpeg" == extracted(0)(3))
assert("f7e7ec84b12c294e19af1ba41732c733" == extracted(0)(4))
}

after {
if (sc != null) {
sc.stop()
}
}
}
@@ -42,7 +42,7 @@ class ExtractImageDetailsTest extends FunSuite with BeforeAndAfter {
sc = new SparkContext(conf)
}

test("Fetch image") {
test("Image DF extraction") {
val df = RecordLoader.loadArchives(arcPath, sc)
.extractImageDetailsDF()

@@ -0,0 +1,69 @@
/*
* Archives Unleashed Toolkit (AUT):
* An open-source toolkit for analyzing web archives.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.archivesunleashed

import com.google.common.io.Resources
import org.apache.spark.sql.SparkSession
// scalastyle:off underscore.import
import io.archivesunleashed.df._
import org.apache.spark.sql.functions._
// scalastyle:on underscore.import
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.{BeforeAndAfter, FunSuite}

@RunWith(classOf[JUnitRunner])
class ExtractPDFDetailsTest extends FunSuite with BeforeAndAfter {
private val warcPath = Resources.getResource("warc/example.pdf.warc.gz").getPath
private val master = "local[4]"
private val appName = "example-df"
private var sc: SparkContext = _

before {
val conf = new SparkConf()
.setMaster(master)
.setAppName(appName)
sc = new SparkContext(conf)
}

test("PDF DF extraction") {
val df = RecordLoader.loadArchives(warcPath, sc)
.extractPDFDetailsDF()

val extracted = df.select("url", "filename", "extension", "mime_type", "md5")
.orderBy(desc("md5")).head(2).toList
assert(extracted.size == 2)
assert("https://yorkspace.library.yorku.ca/xmlui/bitstream/handle/10315/36158/cost-analysis.pdf?sequence=1&isAllowed=y" == extracted(0)(0))
assert("cost-analysis.pdf" == extracted(0)(1))
assert("pdf" == extracted(0)(2))
assert("application/pdf" == extracted(0)(3))
assert("aaba59d2287afd40c996488a39bbc0dd" == extracted(0)(4))
assert("https://yorkspace.library.yorku.ca/xmlui/bitstream/handle/10315/36158/JCDL%20-%20Cost%20of%20a%20WARC%20Presentation-4.pdf?sequence=3&isAllowed=y" == extracted(1)(0))
assert("JCDL%20-%20Cost%20of%20a%20WARC%20Presentation-4.pdf" == extracted(1)(1))
assert("pdf" == extracted(1)(2))
assert("application/pdf" == extracted(1)(3))
assert("322cd5239141408c42f7441f15eed9af" == extracted(1)(4))
}

after {
if (sc != null) {
sc.stop()
}
}
}

0 comments on commit 54c0c3e

Please sign in to comment.
You can’t perform that action at this time.