|
|
|
@ -20,6 +20,7 @@ import org.apache.lucene.document.{ Field, LongField, StringField, TextField } |
|
|
|
|
import org.apache.lucene.index.{ IndexWriter, IndexWriterConfig } |
|
|
|
|
import org.apache.lucene.store.FSDirectory |
|
|
|
|
import org.apache.lucene.util.Version |
|
|
|
|
import org.streum.configrity.Configuration |
|
|
|
|
|
|
|
|
|
import com.google.common.util.concurrent.RateLimiter |
|
|
|
|
import com.typesafe.scalalogging.slf4j.Logging |
|
|
|
@ -45,12 +46,14 @@ object Indexer extends Logging { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
val LUCENE_VERSION = Version.LUCENE_43 |
|
|
|
|
val ContextSize = 2 |
|
|
|
|
val ContextDurationSecs = 20 |
|
|
|
|
val IndexingDurationSecs = 10 |
|
|
|
|
val FlushDurationSecs = 60 |
|
|
|
|
val RateLimitPerSec = 1000 |
|
|
|
|
private val config = Configuration.loadResource("/irc-search.conf").detach("indexing") |
|
|
|
|
|
|
|
|
|
val LuceneVersion = Version.LUCENE_43 |
|
|
|
|
private val ContextSize = config[Int]("context.size") |
|
|
|
|
private val ContextDurationSecs = config[Int]("context.durationSecs") |
|
|
|
|
private val RunIntervalSecs = config[Int]("runIntervalSecs") |
|
|
|
|
private val FlushIntervalSecs = config[Int]("flushIntervalSecs") |
|
|
|
|
private val RateLimitPerSec = config[Int]("rateLimitPerSec") |
|
|
|
|
|
|
|
|
|
private val indexQueue = new PriorityBlockingQueue[IndexRecord] |
|
|
|
|
private val scheduler = Executors.newScheduledThreadPool(2) |
|
|
|
@ -72,12 +75,12 @@ object Indexer extends Logging { |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
def mkAnalyzer : Analyzer = { |
|
|
|
|
val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION) |
|
|
|
|
val defAnalyzer = new StandardAnalyzer(LuceneVersion) |
|
|
|
|
val fieldAnalyzers = Map( |
|
|
|
|
ChatLine.USER -> new KeywordAnalyzer, |
|
|
|
|
ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION), |
|
|
|
|
ChatLine.CTXB -> new EnglishAnalyzer(LUCENE_VERSION), |
|
|
|
|
ChatLine.CTXA -> new EnglishAnalyzer(LUCENE_VERSION)) |
|
|
|
|
ChatLine.MSG -> new EnglishAnalyzer(LuceneVersion), |
|
|
|
|
ChatLine.CTXB -> new EnglishAnalyzer(LuceneVersion), |
|
|
|
|
ChatLine.CTXA -> new EnglishAnalyzer(LuceneVersion)) |
|
|
|
|
|
|
|
|
|
new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers) |
|
|
|
|
} |
|
|
|
@ -90,7 +93,7 @@ object Indexer extends Logging { |
|
|
|
|
assert(indexDir.isDirectory) |
|
|
|
|
} |
|
|
|
|
val indexer = new IndexWriter(FSDirectory.open(indexDir), |
|
|
|
|
new IndexWriterConfig(LUCENE_VERSION, mkAnalyzer)) |
|
|
|
|
new IndexWriterConfig(LuceneVersion, mkAnalyzer)) |
|
|
|
|
indexers += (dirPath -> indexer) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
@ -141,7 +144,7 @@ object Indexer extends Logging { |
|
|
|
|
|
|
|
|
|
def start { |
|
|
|
|
logger.info("Starting indexer") |
|
|
|
|
indexingFuture = schedule(0, IndexingDurationSecs, TimeUnit.SECONDS) { |
|
|
|
|
indexingFuture = schedule(0, RunIntervalSecs, TimeUnit.SECONDS) { |
|
|
|
|
if (!indexQueue.isEmpty) { |
|
|
|
|
val indexRecs = new ArrayList[IndexRecord] |
|
|
|
|
indexQueue drainTo indexRecs |
|
|
|
@ -169,7 +172,7 @@ object Indexer extends Logging { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
flushFuture = schedule(0, FlushDurationSecs, TimeUnit.SECONDS) { |
|
|
|
|
flushFuture = schedule(0, FlushIntervalSecs, TimeUnit.SECONDS) { |
|
|
|
|
doInLock(flush) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|