Permalink
Please sign in to comment.
Showing
with
94 additions
and 152 deletions.
@@ -1,27 +1,86 @@ | |||
package io.anserini.spark | |||
|
|||
import java.util.stream.{Collectors, IntStream} | |||
import java.util.{ArrayList, HashMap, Iterator} | |||
|
|||
import io.anserini.hadoop.HdfsReadOnlyDirectory | |||
import org.apache.hadoop.conf.Configuration | |||
import org.apache.hadoop.fs.Path | |||
import org.apache.lucene.document.Document | |||
import org.apache.lucene.index.DirectoryReader | |||
import org.apache.spark.SparkContext | |||
import org.apache.spark.rdd.RDD | |||
import org.apache.spark.api.java.function.FlatMapFunction | |||
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} | |||
|
|||
import scala.collection.JavaConverters._ | |||
import scala.reflect.ClassTag | |||
|
|||
class IndexLoader(sc: SparkContext, path: String) { | |||
val conf = new Configuration() | |||
val reader = DirectoryReader.open(new HdfsReadOnlyDirectory(conf, new Path(path))) | |||
object IndexLoader { | |||
|
|||
def docids: RDD[Int] = { | |||
val arr = (0 to numDocs() - 1).toArray | |||
sc.parallelize(arr) | |||
def docs[T: ClassTag](rdd: JavaRDD[Integer], path: String, extractor: Document => T): JavaRDD[T] = { | |||
rdd.mapPartitions(new FlatMapFunction[Iterator[Integer], T] { | |||
override def call(iter: Iterator[Integer]): Iterator[T] = { | |||
val reader = DirectoryReader.open(new HdfsReadOnlyDirectory(new Configuration(), new Path(path))) | |||
val list = new ArrayList[T]() | |||
while (iter.hasNext) { | |||
list.add(extractor(reader.document(iter.next))) | |||
} | |||
list.iterator() | |||
} | |||
}) | |||
} | |||
|
|||
def numDocs() = { | |||
reader.numDocs() | |||
// Used in PySpark... need to return results as a HashMap since a Lucene's Document can't be serialized | |||
def docs2map(rdd: JavaRDD[Integer], path: String): JavaRDD[HashMap[String, String]] = docs(rdd, path, doc => { | |||
val map = new HashMap[String, String]() | |||
for (field <- doc.getFields.asScala) { | |||
map.put(field.name(), field.stringValue()) | |||
} | |||
map | |||
}); | |||
|
|||
} | |||
|
|||
/** | |||
* Load document IDs from a Lucene index | |||
*/ | |||
class IndexLoader(sc: JavaSparkContext, path: String) { | |||
|
|||
/** | |||
* Default Hadoop Configuration | |||
*/ | |||
val config = new Configuration() | |||
|
|||
/** | |||
* Lucene IndexReader | |||
*/ | |||
val reader = DirectoryReader.open(new HdfsReadOnlyDirectory(config, new Path(path))) | |||
|
|||
/** | |||
* Get the document IDs | |||
* | |||
* @return an RDD of document IDs | |||
*/ | |||
def docids(): JavaRDD[Integer] = { | |||
docids(numDocs() - 1) | |||
} | |||
|
|||
def docidsN(n: Int): RDD[Int] = { | |||
val arr = (0 to n - 1).toArray | |||
sc.parallelize(arr) | |||
/** | |||
* Get the document IDs up to a certain num | |||
* | |||
* @param num the limit of IDs | |||
* @return an RDD of document IDs | |||
*/ | |||
def docids(num: Integer): JavaRDD[Integer] = { | |||
sc.parallelize(IntStream.rangeClosed(0, num).boxed().collect(Collectors.toList())) | |||
} | |||
} | |||
|
|||
/** | |||
* Get the number of documents in the index. | |||
* | |||
* @return the number of documents in the index | |||
*/ | |||
def numDocs(): Integer = { | |||
reader.numDocs() | |||
} | |||
|
|||
} |
@@ -1,30 +1,29 @@ | |||
package io.anserini | |||
|
|||
import io.anserini.hadoop.HdfsReadOnlyDirectory | |||
import org.apache.hadoop.conf.Configuration | |||
import org.apache.hadoop.fs.Path | |||
import org.apache.lucene.document.Document | |||
import org.apache.lucene.index.DirectoryReader | |||
import org.apache.spark.SparkContext | |||
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} | |||
import org.apache.spark.rdd.RDD | |||
|
|||
import scala.reflect.ClassTag | |||
|
|||
package object spark { | |||
|
|||
implicit class DocidRDD[T: ClassTag](rdd: RDD[Int]) extends java.io.Serializable { | |||
def getDocs(path: String, extractor: Document => T): RDD[T] = { | |||
rdd.mapPartitions(iter => { | |||
val reader = DirectoryReader.open(new HdfsReadOnlyDirectory(new Configuration(), new Path(path))) | |||
iter.map(doc => extractor(reader.document(doc))) | |||
}) | |||
} | |||
|
|||
def getDocs(path: String): RDD[String] = { | |||
rdd.mapPartitions(iter => { | |||
val reader = DirectoryReader.open(new HdfsReadOnlyDirectory(new Configuration(), new Path(path))) | |||
iter.map(doc => reader.document(doc).toString) | |||
}) | |||
} | |||
// Convert JavaRDD to RDD | |||
implicit def java2scala[T: ClassTag](rdd: JavaRDD[T]): RDD[T] = JavaRDD.toRDD(rdd) | |||
|
|||
// Convert RDD to JavaRDD | |||
implicit def scala2java[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd) | |||
|
|||
// Convert JavaSparkContext to SparkContext | |||
implicit def java2scala(sc: JavaSparkContext): SparkContext = JavaSparkContext.toSparkContext(sc) | |||
|
|||
// Convert SparkContext to JavaSparkContext | |||
implicit def scala2java(sc: SparkContext): JavaSparkContext = JavaSparkContext.fromSparkContext(sc) | |||
|
|||
// Add docs(...) methods to JavaRDD[Integer] class. | |||
implicit class DocRDD[T: ClassTag](rdd: JavaRDD[Integer]) extends java.io.Serializable { | |||
def docs(path: String, extractor: Document => T): JavaRDD[T] = IndexLoader.docs(rdd, path, extractor) | |||
} | |||
|
|||
} |
0 comments on commit
8a29d95