New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: ArchiveRecord.archiveFile #164

Closed
dportabella opened this Issue Jan 22, 2018 · 4 comments

Comments

Projects
None yet
2 participants
@dportabella
Contributor

dportabella commented Jan 22, 2018

I am querying CommonCrawl archive, which is divided into hundreds of warc.gz files. I use RecordLoader.loadArchives to read all the warc files at once. Sometimes the log contains an Exception when processing a page, and I'd need to find out from which of the individual warc.gz files comes from (so that I can re-run the program in that file only).

Would it be possible for ArchiveRecord class to have also a field with the input archive name? (with that, I could catch exceptions and show not only the url but also the input archive file).

@ianmilligan1

This comment has been minimized.

Member

ianmilligan1 commented Jan 22, 2018

Thanks as always (we're getting a bit of a back log of issues as we're stretched in many different directions, so don't take lack of work as lack of interest!).

Just so I am clear, the idea would be that you'd have a command like:

  .map(r => (r.getArchiveFile, r.getCrawlDate, r.getDomain, r.getUrl, RemoveHTML(r.getContentString)))

Where getArchiveFile would then produce the WARC file string?

@dportabella

This comment has been minimized.

Contributor

dportabella commented Jan 22, 2018

Hi, i am not sure I understand your question. here it is an example:

$ find /CC-MAIN-2016-36
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00286-ip-10-153-172-175.ec2.internal.warc.gz
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00140-ip-10-153-172-175.ec2.internal.warc.gz
...
/CC-MAIN-2016-36/segments/1471982292675.36/warc/CC-MAIN-20160823195812-00089-ip-10-153-172-175.ec2.internal.warc.gz
/CC-MAIN-2016-36/segments/1471982292675.36/warc/CC-MAIN-20160823195812-00190-ip-10-153-172-175.ec2.internal.warc.gz
...

RecordLoader.loadArchives("/CC-MAIN-2016-36", sc)
.foreach(r => println(r.getArchiveFile + "\t" + r.getUrl)


/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00286-ip-10-153-172-175.ec2.internal.warc.gz   http://example.com/p1.html
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00286-ip-10-153-172-175.ec2.internal.warc.gz   http://example.com/p2.html
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00286-ip-10-153-172-175.ec2.internal.warc.gz   http://example.com/p3.html
...
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00140-ip-10-153-172-175.ec2.internal.warc.gz   http://example2.com/p1.html
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00140-ip-10-153-172-175.ec2.internal.warc.gz   http://example2.com/p2.html
/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00140-ip-10-153-172-175.ec2.internal.warc.gz   http://example2.com/p3.html
...
/CC-MAIN-2016-36/segments/1471982292675.36/warc/CC-MAIN-20160823195812-00089-ip-10-153-172-175.ec2.internal.warc.gz   http://example3.com/p1.html


@ianmilligan1

This comment has been minimized.

Member

ianmilligan1 commented Jan 22, 2018

OK thanks for this. We are quite swamped right now but if you have a cycle we always enthusiastically look for pull requests too. 😄

@dportabella

This comment has been minimized.

Contributor

dportabella commented Jan 22, 2018

I posted the feature request here, but I am not sure that it's useful for other people.
meanwhile, I have the following custom solution for my use case:

def readProcessAndSaveWithSameInputPaths[T](in: String, outDir: String, readAndProcessAndSaveFn: (SparkContext, String, String) => Unit, sc: SparkContext) {
  val files: Seq[String] = getWarcFiles(in)

  files.foreach { individualFileIn =>
    val outFile = Paths.get(outDir).resolve(Paths.get(in).relativize(Paths.get(individualFileIn)))
    Files.createDirectories(outFile.getParent)
    readAndProcessAndSaveFn(sc, individualFileIn, outFile.toString)
  }
}

def getWarcFiles(dir: String): Seq[String] =
  FileUtils.fileStream(new File(dir))
    .filter(f => f.getName.endsWith(".warc") || f.getName.endsWith(".warc.gz"))
    .map(_.getPath)


val sc = SparkUtils.initSpark(appName)

def readAndProcessAndSaveFn(sc: SparkContext, in: String, out: String) {
  RecordLoader.loadArchives(in, sc, keepValidPages = false)
    .map(_.getUrl)
    .saveAsTextFile(out)
}

readProcessAndSaveWithSameInputPaths(in, out = "/tmp", readAndProcessAndSaveFn, sc)

for the previous example, this would create the files:

/tmp/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00286-ip-10-153-172-175.ec2.internal.warc.gz/part-00000
/tmp/CC-MAIN-2016-36/segments/1471982931818.60/warc/CC-MAIN-20160823200851-00140-ip-10-153-172-175.ec2.internal.warc.gz/part-00000
...
/tmp/CC-MAIN-2016-36/segments/1471982292675.36/warc/CC-MAIN-20160823195812-00089-ip-10-153-172-175.ec2.internal.warc.gz/part-00000
/tmp/CC-MAIN-2016-36/segments/1471982292675.36/warc/CC-MAIN-20160823195812-00190-ip-10-153-172-175.ec2.internal.warc.gz/part-00000
...

which is actually even better for my needs.
one major problem with that is that it does not parallelise: it uses only one node and thread.
Replacing files.foreach by sc.parallelize(files).foreach complains that it cannot serialise that function (because it uses sc). Any idea of how to solve this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment