Permalink
Browse files

Buffer Documents for ConcurrentUpdateSolrClient (#576)

(Re-)add object pooling for SolrClients... it helped throughput quite a bit for `CloudSolrClient` even
though it's thread-safe. Also, buffer docs for the `ConcurrentUpdateSolrClient` as it's queue is for 
update requests (i.e., it buffers `add(SolrInputDocument)` calls until the queue size is reached then 
drains the queue concurrently).
  • Loading branch information...
r-clancy authored and lintool committed Feb 13, 2019
1 parent 51bcb2a commit ca58b56d3b17be48eb5a593cc192748348d367ee
Showing with 91 additions and 33 deletions.
  1. +5 −0 pom.xml
  2. +86 −33 src/main/java/io/anserini/index/IndexCollection.java
@@ -304,5 +304,10 @@
<artifactId>trec-car-tools-java</artifactId>
<version>13</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
</project>
@@ -25,6 +25,12 @@
import io.anserini.index.generator.LuceneDocumentGenerator;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.ObjectPool;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.analysis.CharArraySet;
@@ -144,10 +150,23 @@
@Option(name = "-solr.zkChroot", usage = "the ZooKeeper chroot if using SolrCloud")
public String solrZkChroot = "/";

// Note: This is used for ConcurrentSolrClient, where each processing thread has its own instance
// If the architecture is changed to share one ConcurrentSolrClient among all threads, the default should likely be increased
@Option(name = "-solr.client.threads", metaVar = "[Number]", required = false, usage = "Number of Threads for each SolrClient")
public int solrClientThreads = 16;
/**
* The number of {@link SolrClient#add(Collection) add} calls to buffer before draining pending update queue.
*/
@Option(name = "-solr.clientQueueSize", usage = "the number of update requests to buffer before draining")
public int solrClientQueueSize = 16;

/**
* The number threads used to drain the ConcurrentUpdateSolrClient queue
*/
@Option(name = "-solr.clientThreads", metaVar = "[NUMBER]", usage = "the number of threads used to drain the ConcurrentUpdateSolrClient queue")
public int solrClientThreads = 1;

/**
* The number of SolrClients to keep in the object pool.
*/
@Option(name = "-solr.poolSize", metaVar = "[NUMBER]", usage = "the number of clients to keep in the pool")
public int solrPoolSize = 16;

@Option(name = "-shard.count", usage = "the number of shards for the index")
public int shardCount = -1;
@@ -338,21 +357,16 @@ public void run() {
}
}

// With CloudSolrClient, we need to buffer ourselves...
if (args.solrCloud) {
buffer.add(solrDocument);
if (buffer.size() == args.solrBatch) {
flush();
}
} else {
solrClient.add(args.solrIndex, solrDocument, args.solrCommitWithin * 1000); // ... and ConcurrentUpdateSolrClient does it for us
buffer.add(solrDocument);
if (buffer.size() == args.solrBatch) {
flush();
}

cnt++;
}

// If we're running in cloud mode and have docs in the buffer, flush them.
if (args.solrCloud && !buffer.isEmpty()) {
// If we have docs in the buffer, flush them.
if (!buffer.isEmpty()) {
flush();
}

@@ -371,11 +385,21 @@ public void run() {

private void flush() {
if (!buffer.isEmpty()) {
SolrClient solrClient = null;
try {
solrClient = solrPool.borrowObject();
solrClient.add(args.solrIndex, buffer, args.solrCommitWithin * 1000);
buffer.clear();
} catch (Exception e) {
LOG.error("Error flushing documents to Solr", e);
} finally {
if (solrClient != null) {
try {
solrPool.returnObject(solrClient);
} catch (Exception e) {
LOG.error("Error returning SolrClient to pool", e);
}
}
}
}
}
@@ -390,7 +414,7 @@ private void flush() {
private final DocumentCollection collection;
private final Counters counters;
private Path indexPath;
private SolrClient solrClient;
private ObjectPool<SolrClient> solrPool;

public IndexCollection(IndexCollection.Args args) throws Exception {
this.args = args;
@@ -415,7 +439,9 @@ public IndexCollection(IndexCollection.Args args) throws Exception {
LOG.info("Solr commitWithin: " + args.solrCommitWithin);
LOG.info("Solr index: " + args.solrIndex);
LOG.info("Solr URL: " + args.solrUrl);
LOG.info("SolrClient Threads: " + args.solrClientThreads);
LOG.info("SolrClient thread count: " + args.solrClientThreads);
LOG.info("SolrClient queue size: " + args.solrClientQueueSize);
LOG.info("SolrClient pool size: " + args.solrPoolSize);
}

if (args.index == null && !args.solr) {
@@ -448,30 +474,54 @@ public IndexCollection(IndexCollection.Args args) throws Exception {
}

if (args.solr) {
GenericObjectPoolConfig<SolrClient> config = new GenericObjectPoolConfig<>();
config.setMaxTotal(args.solrPoolSize);
config.setMinIdle(args.solrPoolSize); // To guard against premature discarding of solrClients
this.solrPool = new GenericObjectPool(new SolrClientFactory(), config);
}

this.counters = new Counters();
}

private class SolrClientFactory extends BasePooledObjectFactory<SolrClient> {

@Override
public SolrClient create() {

if (args.solrCloud) {
List<String> urls = Splitter.on(',').splitToList(args.solrUrl);
this.solrClient = new CloudSolrClient.Builder(urls, Optional.of(args.solrZkChroot)).build();
} else {
ConcurrentUpdateSolrClient.Builder builder = new ConcurrentUpdateSolrClient.Builder(args.solrUrl)
.withQueueSize(args.solrBatch)
.withThreadCount(args.solrClientThreads);
this.solrClient = new ExceptionHandlingSolrClient(builder);
return new CloudSolrClient.Builder(urls, Optional.of(args.solrZkChroot)).build();
}

ConcurrentUpdateSolrClient.Builder builder = new ConcurrentUpdateSolrClient.Builder(args.solrUrl)
.withQueueSize(args.solrClientQueueSize)
.withThreadCount(args.solrClientThreads);

return new ExceptionHandlingSolrClient(builder);
}

@Override
public PooledObject<SolrClient> wrap(SolrClient solrClient) {
return new DefaultPooledObject(solrClient);
}

@Override
public void destroyObject(PooledObject<SolrClient> pooled) throws Exception {
pooled.getObject().close();
}

this.counters = new Counters();
}

private class ExceptionHandlingSolrClient extends ConcurrentUpdateSolrClient {
ExceptionHandlingSolrClient(ConcurrentUpdateSolrClient.Builder builder) {
super(builder);
}
ExceptionHandlingSolrClient(ConcurrentUpdateSolrClient.Builder builder) {
super(builder);
}

@Override
public void handleError(Throwable ex) {
LOG.warn("Solr: Exception delivering documents", ex);
}
@Override
public void handleError(Throwable ex) {
LOG.warn("Solr: Exception delivering documents", ex);
}
}

public void run() throws IOException {
final long start = System.nanoTime();
@@ -542,8 +592,11 @@ public void run() throws IOException {
// Do a final commit
if (args.solr) {
try {
solrClient.commit(args.solrIndex);
solrClient.close();
SolrClient client = solrPool.borrowObject();
client.commit(args.solrIndex);
// Needed for orderly shutdown so the SolrClient executor does not delay main thread exit
solrPool.returnObject(client);
solrPool.close();
} catch (Exception e) {
LOG.error("Exception during final Solr commit: ", e);
}

0 comments on commit ca58b56

Please sign in to comment.