Added rate limiter for indexing

master
Abhinav Sarkar 2013-05-21 14:11:04 +05:30
parent c8dcc52766
commit f577cd5fbd
3 changed files with 21 additions and 8 deletions

10
pom.xml
View File

@ -11,6 +11,7 @@
<maven.compiler.target>1.6</maven.compiler.target> <maven.compiler.target>1.6</maven.compiler.target>
<encoding>UTF-8</encoding> <encoding>UTF-8</encoding>
<scala.version>2.10.0</scala.version> <scala.version>2.10.0</scala.version>
<scala.majorversion>2.10</scala.majorversion>
<lucene.version>4.3.0</lucene.version> <lucene.version>4.3.0</lucene.version>
<project.dependencyDir>${project.build.directory}/dependency</project.dependencyDir> <project.dependencyDir>${project.build.directory}/dependency</project.dependencyDir>
</properties> </properties>
@ -39,7 +40,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.typesafe</groupId> <groupId>com.typesafe</groupId>
<artifactId>scalalogging-slf4j_2.10</artifactId> <artifactId>scalalogging-slf4j_${scala.majorversion}</artifactId>
<version>1.0.1</version> <version>1.0.1</version>
</dependency> </dependency>
<dependency> <dependency>
@ -50,7 +51,7 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>net.liftweb</groupId> <groupId>net.liftweb</groupId>
<artifactId>lift-json_2.10</artifactId> <artifactId>lift-json_${scala.majorversion}</artifactId>
<version>2.5-RC5</version> <version>2.5-RC5</version>
</dependency> </dependency>
<dependency> <dependency>
@ -78,6 +79,11 @@
<artifactId>opencsv</artifactId> <artifactId>opencsv</artifactId>
<version>2.3</version> <version>2.3</version>
</dependency> </dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>
</dependencies> </dependencies>
<build> <build>

View File

@ -20,6 +20,7 @@ import org.apache.lucene.index.{ IndexWriter, IndexWriterConfig }
import org.apache.lucene.store.FSDirectory import org.apache.lucene.store.FSDirectory
import org.apache.lucene.util.Version import org.apache.lucene.util.Version
import com.google.common.util.concurrent.RateLimiter
import com.typesafe.scalalogging.slf4j.Logging import com.typesafe.scalalogging.slf4j.Logging
import net.abhinavsarkar.ircsearch.model._ import net.abhinavsarkar.ircsearch.model._
@ -49,10 +50,13 @@ object Indexer extends Logging {
val ContextSize = 2 val ContextSize = 2
val ContextDurationSecs = 20 val ContextDurationSecs = 20
val IndexingDurationSecs = 10 val IndexingDurationSecs = 10
val FlushDurationSecs = 60
val RateLimitPerSec = 1000
private val indexQueue = new PriorityBlockingQueue[IndexRecord](10000) private val indexQueue = new PriorityBlockingQueue[IndexRecord]
private val scheduler = Executors.newScheduledThreadPool(2) private val scheduler = Executors.newScheduledThreadPool(2)
private val runLock = new ReentrantLock private val runLock = new ReentrantLock
private val rateLimiter = RateLimiter.create(RateLimitPerSec)
private var indexingFuture : Future[_] = null private var indexingFuture : Future[_] = null
private var flushFuture : Future[_] = null private var flushFuture : Future[_] = null
@ -101,7 +105,10 @@ object Indexer extends Logging {
s"index-$server-$channel-$botName" s"index-$server-$channel-$botName"
def index(indexRequest : IndexRequest) = def index(indexRequest : IndexRequest) =
IndexRecord.fromIndexRequest(indexRequest).foreach(indexQueue.put) IndexRecord.fromIndexRequest(indexRequest).foreach { rec =>
rateLimiter.acquire
indexQueue put rec
}
private def doInLock(f : => Unit) { private def doInLock(f : => Unit) {
try { try {
@ -161,7 +168,7 @@ object Indexer extends Logging {
} }
} }
if (indexRecBatch.size > windowSize) { if (indexRecBatch.size >= windowSize) {
indexRecBatch.slice(indexRecBatch.length - 2 * ContextSize, indexRecBatch.length) indexRecBatch.slice(indexRecBatch.length - 2 * ContextSize, indexRecBatch.length)
.zipWithIndex .zipWithIndex
.map { r => if (r._2 < ContextSize) r._1.copy(indexed = true) else r._1 } .map { r => if (r._2 < ContextSize) r._1.copy(indexed = true) else r._1 }
@ -170,7 +177,7 @@ object Indexer extends Logging {
} }
} }
} }
flushFuture = schedule(0, 10, TimeUnit.SECONDS) { flushFuture = schedule(0, FlushDurationSecs, TimeUnit.SECONDS) {
doInLock(flush) doInLock(flush)
} }
} }

View File

@ -11,9 +11,9 @@ import scala.collection.mutable.Buffer
import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.queries.ChainedFilter import org.apache.lucene.queries.ChainedFilter
import org.apache.lucene.queryparser.classic.MultiFieldQueryParser import org.apache.lucene.queryparser.classic.MultiFieldQueryParser
import org.apache.lucene.search.{ BooleanClause, BooleanQuery, Filter, FilteredQuery, import org.apache.lucene.search.{ BooleanClause, BooleanQuery, Filter, FilteredQuery,
NumericRangeFilter, Query, QueryWrapperFilter, SearcherFactory, NumericRangeFilter, Query, QueryWrapperFilter, SearcherFactory,
SearcherManager, Sort, SortField, TermQuery } SearcherManager, Sort, SortField, TermQuery }
import org.apache.lucene.store.FSDirectory import org.apache.lucene.store.FSDirectory
import com.typesafe.scalalogging.slf4j.Logging import com.typesafe.scalalogging.slf4j.Logging