@@ -106,7 +106,7 @@ package object archivesunleashed {
val schema = new StructType ()
.add(StructField (" crawl_date" , StringType , true ))
.add(StructField (" url" , StringType , true ))
.add(StructField (" mime_type " , StringType , true ))
.add(StructField (" mime_type_web_server " , StringType , true ))
.add(StructField (" content" , StringType , true ))
val sqlContext = SparkSession .builder()
@@ -154,13 +154,15 @@ package object archivesunleashed {
.keepImages()
.map(r => {
val image = ExtractImageDetails (r.getUrl, r.getMimeType, r.getBinaryBytes)
(r.getUrl, r.getMimeType, image.width, image.height, image.hash, image.body)
(r.getUrl, r.getMimeType, DetectMimeTypeTika (r.getBinaryBytes),
image.width, image.height, image.hash, image.body)
})
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6))
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6, t._7 ))
val schema = new StructType ()
.add(StructField (" url" , StringType , true ))
.add(StructField (" mime_type" , StringType , true ))
.add(StructField (" mime_type_web_server" , StringType , true ))
.add(StructField (" mime_type_tika" , StringType , true ))
.add(StructField (" width" , IntegerType , true ))
.add(StructField (" height" , IntegerType , true ))
.add(StructField (" md5" , StringType , true ))
@@ -173,23 +175,28 @@ package object archivesunleashed {
/* Extract PDF bytes and PDF metadata. */
def extractPDFDetailsDF (): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika (r.getBinaryBytes) == " application/pdf" ))
.map(r =>
(r, (DetectMimeTypeTika (r.getBinaryBytes)))
)
.filter(r => r._2 == " application/pdf" )
.map(r => {
val bytes = r.getBinaryBytes
val bytes = r._1. getBinaryBytes
val hash = new String (Hex .encodeHex(MessageDigest .getInstance(" MD5" ).digest(bytes)))
val encodedBytes = Base64 .getEncoder.encodeToString(bytes)
val url = new URL (r.getUrl)
val url = new URL (r._1. getUrl)
val filename = FilenameUtils .getName(url.getPath())
val extension = FilenameUtils .getExtension(url.getPath())
(r.getUrl, filename, extension, r.getMimeType, hash, encodedBytes)
(r._1.getUrl, filename, extension, r._1.getMimeType,
DetectMimeTypeTika (r._1.getBinaryBytes), hash, encodedBytes)
})
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6))
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6, t._7 ))
val schema = new StructType ()
.add(StructField (" url" , StringType , true ))
.add(StructField (" filename" , StringType , true ))
.add(StructField (" extension" , StringType , true ))
.add(StructField (" mime_type" , StringType , true ))
.add(StructField (" mime_type_web_server" , StringType , true ))
.add(StructField (" mime_type_tika" , StringType , true ))
.add(StructField (" md5" , StringType , true ))
.add(StructField (" bytes" , StringType , true ))
@@ -200,35 +207,40 @@ package object archivesunleashed {
/* Extract audio bytes and audio metadata. */
def extractAudioDetailsDF (): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika (r.getBinaryBytes).startsWith(" audio/" ))
|| r.getUrl.endsWith(" aac" )
|| r.getUrl.endsWith(" mid" )
|| r.getUrl.endsWith(" midi" )
|| r.getUrl.endsWith(" mp3" )
|| r.getUrl.endsWith(" wav" )
|| r.getUrl.endsWith(" oga" )
|| r.getUrl.endsWith(" ogg" )
|| r.getUrl.endsWith(" weba" )
|| r.getUrl.endsWith(" ra" )
|| r.getUrl.endsWith(" rm" )
|| r.getUrl.endsWith(" 3gp" )
|| r.getUrl.endsWith(" 3g2" ))
.map(r =>
(r, (DetectMimeTypeTika (r.getBinaryBytes)))
)
.filter(r => r._2.startsWith(" audio/" )
|| r._1.getUrl.endsWith(" aac" )
|| r._1.getUrl.endsWith(" mid" )
|| r._1.getUrl.endsWith(" midi" )
|| r._1.getUrl.endsWith(" mp3" )
|| r._1.getUrl.endsWith(" wav" )
|| r._1.getUrl.endsWith(" oga" )
|| r._1.getUrl.endsWith(" ogg" )
|| r._1.getUrl.endsWith(" weba" )
|| r._1.getUrl.endsWith(" ra" )
|| r._1.getUrl.endsWith(" rm" )
|| r._1.getUrl.endsWith(" 3gp" )
|| r._1.getUrl.endsWith(" 3g2" ))
.map(r => {
val bytes = r.getBinaryBytes
val bytes = r._1. getBinaryBytes
val hash = new String (Hex .encodeHex(MessageDigest .getInstance(" MD5" ).digest(bytes)))
val encodedBytes = Base64 .getEncoder.encodeToString(bytes)
val url = new URL (r.getUrl)
val url = new URL (r._1. getUrl)
val filename = FilenameUtils .getName(url.getPath())
val extension = FilenameUtils .getExtension(url.getPath())
(r.getUrl, filename, extension, r.getMimeType, hash, encodedBytes)
(r._1.getUrl, filename, extension, r._1.getMimeType,
DetectMimeTypeTika (r._1.getBinaryBytes), hash, encodedBytes)
})
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6))
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6, t._7 ))
val schema = new StructType ()
.add(StructField (" url" , StringType , true ))
.add(StructField (" filename" , StringType , true ))
.add(StructField (" extension" , StringType , true ))
.add(StructField (" mime_type" , StringType , true ))
.add(StructField (" mime_type_web_server" , StringType , true ))
.add(StructField (" mime_type_tika" , StringType , true ))
.add(StructField (" md5" , StringType , true ))
.add(StructField (" bytes" , StringType , true ))
@@ -239,35 +251,40 @@ package object archivesunleashed {
/* Extract video bytes and video metadata. */
def extractVideoDetailsDF (): DataFrame = {
val records = rdd
.filter(r => (DetectMimeTypeTika (r.getBinaryBytes).startsWith(" video/" ))
|| r.getUrl.endsWith(" flv" )
|| r.getUrl.endsWith(" mp4" )
|| r.getUrl.endsWith(" mov" )
|| r.getUrl.endsWith(" avi" )
|| r.getUrl.endsWith(" wmv" )
|| r.getUrl.endsWith(" rv" )
|| r.getUrl.endsWith(" mpeg" )
|| r.getUrl.endsWith(" ogv" )
|| r.getUrl.endsWith(" webm" )
|| r.getUrl.endsWith(" ts" )
|| r.getUrl.endsWith(" 3gp" )
|| r.getUrl.endsWith(" 3g2" ))
.map(r =>
(r, (DetectMimeTypeTika (r.getBinaryBytes)))
)
.filter(r => r._2.startsWith(" video/" )
|| r._1.getUrl.endsWith(" flv" )
|| r._1.getUrl.endsWith(" mp4" )
|| r._1.getUrl.endsWith(" mov" )
|| r._1.getUrl.endsWith(" avi" )
|| r._1.getUrl.endsWith(" wmv" )
|| r._1.getUrl.endsWith(" rv" )
|| r._1.getUrl.endsWith(" mpeg" )
|| r._1.getUrl.endsWith(" ogv" )
|| r._1.getUrl.endsWith(" webm" )
|| r._1.getUrl.endsWith(" ts" )
|| r._1.getUrl.endsWith(" 3gp" )
|| r._1.getUrl.endsWith(" 3g2" ))
.map(r => {
val bytes = r.getBinaryBytes
val bytes = r._1. getBinaryBytes
val hash = new String (Hex .encodeHex(MessageDigest .getInstance(" MD5" ).digest(bytes)))
val encodedBytes = Base64 .getEncoder.encodeToString(bytes)
val url = new URL (r.getUrl)
val url = new URL (r._1. getUrl)
val filename = FilenameUtils .getName(url.getPath())
val extension = FilenameUtils .getExtension(url.getPath())
(r.getUrl, filename, extension, r.getMimeType, hash, encodedBytes)
(r._1.getUrl, filename, extension, r._1.getMimeType,
DetectMimeTypeTika (r._1.getBinaryBytes), hash, encodedBytes)
})
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6))
.map(t => Row (t._1, t._2, t._3, t._4, t._5, t._6, t._7 ))
val schema = new StructType ()
.add(StructField (" url" , StringType , true ))
.add(StructField (" filename" , StringType , true ))
.add(StructField (" extension" , StringType , true ))
.add(StructField (" mime_type" , StringType , true ))
.add(StructField (" mime_type_web_server" , StringType , true ))
.add(StructField (" mime_type_tika" , StringType , true ))
.add(StructField (" md5" , StringType , true ))
.add(StructField (" bytes" , StringType , true ))
0 comments on commit
01d12b4