diff --git a/pom.xml b/pom.xml index 6abe35a..479ef7d 100644 --- a/pom.xml +++ b/pom.xml @@ -11,6 +11,7 @@ 1.6 UTF-8 2.10.0 + 2.10 4.3.0 ${project.build.directory}/dependency @@ -39,7 +40,7 @@ com.typesafe - scalalogging-slf4j_2.10 + scalalogging-slf4j_${scala.majorversion} 1.0.1 @@ -50,7 +51,7 @@ net.liftweb - lift-json_2.10 + lift-json_${scala.majorversion} 2.5-RC5 @@ -78,6 +79,11 @@ opencsv 2.3 + + com.google.guava + guava + 14.0.1 + diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala index d956a74..f11948d 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -20,6 +20,7 @@ import org.apache.lucene.index.{ IndexWriter, IndexWriterConfig } import org.apache.lucene.store.FSDirectory import org.apache.lucene.util.Version +import com.google.common.util.concurrent.RateLimiter import com.typesafe.scalalogging.slf4j.Logging import net.abhinavsarkar.ircsearch.model._ @@ -49,10 +50,13 @@ object Indexer extends Logging { val ContextSize = 2 val ContextDurationSecs = 20 val IndexingDurationSecs = 10 + val FlushDurationSecs = 60 + val RateLimitPerSec = 1000 - private val indexQueue = new PriorityBlockingQueue[IndexRecord](10000) + private val indexQueue = new PriorityBlockingQueue[IndexRecord] private val scheduler = Executors.newScheduledThreadPool(2) private val runLock = new ReentrantLock + private val rateLimiter = RateLimiter.create(RateLimitPerSec) private var indexingFuture : Future[_] = null private var flushFuture : Future[_] = null @@ -101,7 +105,10 @@ object Indexer extends Logging { s"index-$server-$channel-$botName" def index(indexRequest : IndexRequest) = - IndexRecord.fromIndexRequest(indexRequest).foreach(indexQueue.put) + IndexRecord.fromIndexRequest(indexRequest).foreach { rec => + rateLimiter.acquire + indexQueue put rec + } private def doInLock(f : => Unit) { try { @@ -161,7 +168,7 @@ object Indexer extends Logging { } } - if (indexRecBatch.size > windowSize) { + if (indexRecBatch.size >= windowSize) { indexRecBatch.slice(indexRecBatch.length - 2 * ContextSize, indexRecBatch.length) .zipWithIndex .map { r => if (r._2 < ContextSize) r._1.copy(indexed = true) else r._1 } @@ -170,7 +177,7 @@ object Indexer extends Logging { } } } - flushFuture = schedule(0, 10, TimeUnit.SECONDS) { + flushFuture = schedule(0, FlushDurationSecs, TimeUnit.SECONDS) { doInLock(flush) } } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala index f70aa6e..5d82ced 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -11,9 +11,9 @@ import scala.collection.mutable.Buffer import org.apache.lucene.analysis.Analyzer import org.apache.lucene.queries.ChainedFilter import org.apache.lucene.queryparser.classic.MultiFieldQueryParser -import org.apache.lucene.search.{ BooleanClause, BooleanQuery, Filter, FilteredQuery, +import org.apache.lucene.search.{ BooleanClause, BooleanQuery, Filter, FilteredQuery, NumericRangeFilter, Query, QueryWrapperFilter, SearcherFactory, - SearcherManager, Sort, SortField, TermQuery } + SearcherManager, Sort, SortField, TermQuery } import org.apache.lucene.store.FSDirectory import com.typesafe.scalalogging.slf4j.Logging