2013-05-13 19:58:53 +05:30
|
|
|
package net.abhinavsarkar.ircsearch.lucene
|
|
|
|
|
|
|
|
import java.io.File
|
|
|
|
import java.util.ArrayList
|
2013-05-21 14:20:38 +05:30
|
|
|
import java.util.Date
|
2013-05-20 00:59:34 +05:30
|
|
|
import java.util.concurrent.{ Executors, Future, PriorityBlockingQueue, TimeUnit }
|
2013-05-13 19:58:53 +05:30
|
|
|
import java.util.concurrent.locks.ReentrantLock
|
2013-05-19 23:37:00 +05:30
|
|
|
|
2013-05-13 19:58:53 +05:30
|
|
|
import scala.collection.JavaConversions._
|
2013-05-19 23:37:00 +05:30
|
|
|
import scala.collection.Seq
|
2013-05-19 11:32:20 +05:30
|
|
|
import scala.collection.mutable
|
2013-05-19 23:37:00 +05:30
|
|
|
import scala.math.Ordered
|
|
|
|
|
2013-05-13 19:58:53 +05:30
|
|
|
import org.apache.lucene.analysis.Analyzer
|
|
|
|
import org.apache.lucene.analysis.core.KeywordAnalyzer
|
|
|
|
import org.apache.lucene.analysis.en.EnglishAnalyzer
|
|
|
|
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper
|
|
|
|
import org.apache.lucene.analysis.standard.StandardAnalyzer
|
2013-05-20 00:59:34 +05:30
|
|
|
import org.apache.lucene.document.{ Field, LongField, StringField, TextField }
|
|
|
|
import org.apache.lucene.index.{ IndexWriter, IndexWriterConfig }
|
2013-05-13 19:58:53 +05:30
|
|
|
import org.apache.lucene.store.FSDirectory
|
|
|
|
import org.apache.lucene.util.Version
|
2013-05-19 23:37:00 +05:30
|
|
|
|
2013-05-21 14:11:04 +05:30
|
|
|
import com.google.common.util.concurrent.RateLimiter
|
2013-05-13 19:58:53 +05:30
|
|
|
import com.typesafe.scalalogging.slf4j.Logging
|
2013-05-19 23:37:00 +05:30
|
|
|
|
2013-05-18 06:42:15 +05:30
|
|
|
import net.abhinavsarkar.ircsearch.model._
|
2013-05-13 19:58:53 +05:30
|
|
|
|
2013-05-19 11:32:20 +05:30
|
|
|
object Indexer extends Logging {
|
2013-05-13 19:58:53 +05:30
|
|
|
|
2013-05-19 23:37:00 +05:30
|
|
|
case class IndexRecord(
|
|
|
|
server : String, channel : String, botName : String, chatLine : ChatLine,
|
|
|
|
indexed : Boolean = false)
|
|
|
|
extends Ordered[IndexRecord] {
|
|
|
|
def compare(that : IndexRecord) = {
|
|
|
|
val diff = this.chatLine.timestamp - that.chatLine.timestamp
|
|
|
|
if (diff > 0) 1 else if (diff < 0) -1 else 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
object IndexRecord {
|
|
|
|
def fromIndexRequest(indexRequest : IndexRequest) = {
|
|
|
|
val IndexRequest(server, channel, botName, chatLines) = indexRequest
|
|
|
|
for {
|
|
|
|
chatLine <- chatLines
|
|
|
|
} yield new IndexRecord(server, channel, botName, chatLine)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-05-19 11:32:20 +05:30
|
|
|
val LUCENE_VERSION = Version.LUCENE_43
|
2013-05-19 23:37:00 +05:30
|
|
|
val ContextSize = 2
|
|
|
|
val ContextDurationSecs = 20
|
|
|
|
val IndexingDurationSecs = 10
|
2013-05-21 14:11:04 +05:30
|
|
|
val FlushDurationSecs = 60
|
|
|
|
val RateLimitPerSec = 1000
|
2013-05-13 19:58:53 +05:30
|
|
|
|
2013-05-21 14:11:04 +05:30
|
|
|
private val indexQueue = new PriorityBlockingQueue[IndexRecord]
|
2013-05-19 11:32:20 +05:30
|
|
|
private val scheduler = Executors.newScheduledThreadPool(2)
|
2013-05-13 19:58:53 +05:30
|
|
|
private val runLock = new ReentrantLock
|
2013-05-21 14:11:04 +05:30
|
|
|
private val rateLimiter = RateLimiter.create(RateLimitPerSec)
|
2013-05-19 11:32:20 +05:30
|
|
|
private var indexingFuture : Future[_] = null
|
|
|
|
private var flushFuture : Future[_] = null
|
2013-05-13 19:58:53 +05:30
|
|
|
|
2013-05-19 11:32:20 +05:30
|
|
|
private val indexers = mutable.Map[String, IndexWriter]()
|
|
|
|
|
|
|
|
private def close {
|
|
|
|
for (indexer <- indexers.values)
|
|
|
|
indexer.close
|
|
|
|
logger.info("Closed Indexer")
|
|
|
|
}
|
|
|
|
|
|
|
|
private def flush {
|
|
|
|
for (indexer <- indexers.values)
|
|
|
|
indexer.commit
|
|
|
|
logger.info("Flushed Indexer")
|
|
|
|
}
|
|
|
|
|
|
|
|
def mkAnalyzer : Analyzer = {
|
|
|
|
val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION)
|
|
|
|
val fieldAnalyzers = Map(
|
|
|
|
ChatLine.USER -> new KeywordAnalyzer,
|
2013-05-19 23:37:00 +05:30
|
|
|
ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION),
|
|
|
|
ChatLine.CTXB -> new EnglishAnalyzer(LUCENE_VERSION),
|
|
|
|
ChatLine.CTXA -> new EnglishAnalyzer(LUCENE_VERSION))
|
2013-05-19 11:32:20 +05:30
|
|
|
|
|
|
|
new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers)
|
|
|
|
}
|
|
|
|
|
|
|
|
private def getIndexWriter(dirPath : String) : IndexWriter = {
|
|
|
|
synchronized {
|
|
|
|
if (!(indexers contains dirPath)) {
|
|
|
|
val indexDir = new File(dirPath)
|
|
|
|
if (indexDir.exists) {
|
|
|
|
assert(indexDir.isDirectory)
|
|
|
|
}
|
|
|
|
val indexer = new IndexWriter(FSDirectory.open(indexDir),
|
|
|
|
new IndexWriterConfig(LUCENE_VERSION, mkAnalyzer))
|
|
|
|
indexers += (dirPath -> indexer)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
indexers(dirPath)
|
|
|
|
}
|
|
|
|
|
|
|
|
def getIndexDir(server : String, channel : String, botName : String) : String =
|
|
|
|
s"index-$server-$channel-$botName"
|
|
|
|
|
2013-05-19 23:37:00 +05:30
|
|
|
def index(indexRequest : IndexRequest) =
|
2013-05-21 14:11:04 +05:30
|
|
|
IndexRecord.fromIndexRequest(indexRequest).foreach { rec =>
|
|
|
|
rateLimiter.acquire
|
|
|
|
indexQueue put rec
|
|
|
|
}
|
2013-05-19 11:32:20 +05:30
|
|
|
|
|
|
|
private def doInLock(f : => Unit) {
|
|
|
|
try {
|
|
|
|
runLock.lock
|
|
|
|
f
|
|
|
|
} finally {
|
|
|
|
runLock.unlock
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
implicit private def funcToRunnable(f : => Unit) : Runnable = new Runnable {
|
|
|
|
def run {
|
|
|
|
try { f }
|
|
|
|
catch {
|
|
|
|
case e : Throwable => logger.error("Exception while running", e)
|
|
|
|
}
|
|
|
|
}}
|
|
|
|
|
2013-05-21 22:11:27 +05:30
|
|
|
private def schedule(initialDelay : Int, delay : Int, unit : TimeUnit)(f : => Unit) = {
|
2013-05-19 23:37:00 +05:30
|
|
|
scheduler.scheduleWithFixedDelay(f, initialDelay, delay, unit)
|
|
|
|
}
|
|
|
|
|
2013-05-21 22:11:27 +05:30
|
|
|
private def fillContext(rec: IndexRecord, recs: Seq[IndexRecord], idx : Int) = {
|
2013-05-19 23:37:00 +05:30
|
|
|
rec.copy(chatLine =
|
|
|
|
rec.chatLine.copy(
|
|
|
|
contextBefore = recs.slice(idx - ContextSize, idx).map(_.chatLine)
|
|
|
|
.filter(_.timestamp >= rec.chatLine.timestamp - ContextDurationSecs * 1000)
|
|
|
|
.toList,
|
|
|
|
contextAfter = recs.slice(idx + 1, 2 * ContextSize + 1).map(_.chatLine)
|
|
|
|
.filter(_.timestamp <= rec.chatLine.timestamp + ContextDurationSecs * 1000)
|
|
|
|
.toList))
|
|
|
|
}
|
2013-05-13 19:58:53 +05:30
|
|
|
|
|
|
|
def start {
|
|
|
|
logger.info("Starting indexer")
|
2013-05-19 23:37:00 +05:30
|
|
|
indexingFuture = schedule(0, IndexingDurationSecs.max(ContextDurationSecs), TimeUnit.SECONDS) {
|
|
|
|
if (!indexQueue.isEmpty) {
|
|
|
|
val indexRecs = new ArrayList[IndexRecord]
|
|
|
|
indexQueue drainTo indexRecs
|
|
|
|
val indexRecsMap = indexRecs groupBy { r => (r.server, r.channel, r.botName) }
|
|
|
|
|
|
|
|
val windowSize = 2 * ContextSize + 1
|
|
|
|
for (indexRecBatch <- indexRecsMap.values) {
|
|
|
|
for (recs <- indexRecBatch.sliding(windowSize)) {
|
|
|
|
if (recs.size == windowSize) {
|
|
|
|
doInLock {
|
|
|
|
doIndex(fillContext(recs(ContextSize), recs, ContextSize))
|
|
|
|
}
|
|
|
|
} else {
|
2013-05-21 22:11:27 +05:30
|
|
|
recs.foreach(indexQueue.put)
|
2013-05-19 23:37:00 +05:30
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-05-21 14:11:04 +05:30
|
|
|
if (indexRecBatch.size >= windowSize) {
|
2013-05-19 23:37:00 +05:30
|
|
|
indexRecBatch.slice(indexRecBatch.length - 2 * ContextSize, indexRecBatch.length)
|
|
|
|
.zipWithIndex
|
|
|
|
.map { r => if (r._2 < ContextSize) r._1.copy(indexed = true) else r._1 }
|
|
|
|
.foreach(indexQueue.put)
|
|
|
|
}
|
2013-05-19 11:32:20 +05:30
|
|
|
}
|
2013-05-19 23:37:00 +05:30
|
|
|
}
|
|
|
|
}
|
2013-05-21 14:11:04 +05:30
|
|
|
flushFuture = schedule(0, FlushDurationSecs, TimeUnit.SECONDS) {
|
2013-05-19 23:37:00 +05:30
|
|
|
doInLock(flush)
|
2013-05-19 11:32:20 +05:30
|
|
|
}
|
2013-05-13 19:58:53 +05:30
|
|
|
}
|
|
|
|
|
|
|
|
def stop {
|
2013-05-19 11:32:20 +05:30
|
|
|
doInLock {
|
|
|
|
if (indexingFuture != null) {
|
|
|
|
indexingFuture.cancel(false)
|
|
|
|
indexingFuture = null
|
|
|
|
}
|
|
|
|
if (flushFuture != null) {
|
|
|
|
flushFuture.cancel(false)
|
|
|
|
flushFuture = null
|
2013-05-13 19:58:53 +05:30
|
|
|
}
|
2013-05-19 11:32:20 +05:30
|
|
|
close
|
2013-05-13 19:58:53 +05:30
|
|
|
logger.info("Stopped indexer")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-05-21 22:11:27 +05:30
|
|
|
private def ctxToStr(ctx : List[ChatLine]) =
|
2013-05-19 23:37:00 +05:30
|
|
|
ctx.map { line => s"${line.timestamp} ${line.user}: ${line.message}" } mkString "\n"
|
2013-05-13 19:58:53 +05:30
|
|
|
|
2013-05-19 23:37:00 +05:30
|
|
|
private def doIndex(indexRecord: IndexRecord) {
|
|
|
|
val IndexRecord(server, channel, botName, chatLine, indexed) = indexRecord
|
|
|
|
if (!indexed) {
|
2013-05-13 19:58:53 +05:30
|
|
|
val indexDir = getIndexDir(server, channel, botName)
|
2013-05-19 11:32:20 +05:30
|
|
|
val indexWriter = getIndexWriter(indexDir)
|
2013-05-19 23:37:00 +05:30
|
|
|
val ts = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES)
|
|
|
|
val user = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES)
|
|
|
|
val msg = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES)
|
|
|
|
val ctxBfr = new TextField(ChatLine.CTXB, ctxToStr(chatLine.contextBefore), Field.Store.YES)
|
|
|
|
val ctxAft = new TextField(ChatLine.CTXA, ctxToStr(chatLine.contextAfter), Field.Store.YES)
|
|
|
|
indexWriter.addDocument(List(ts, user, msg, ctxBfr, ctxAft), indexWriter.getAnalyzer)
|
|
|
|
logger.debug("Indexed : [{} {} {}] [{}] {}: {}",
|
2013-05-21 14:20:38 +05:30
|
|
|
server, channel, botName, new Date(chatLine.timestamp), chatLine.user, chatLine.message)
|
2013-05-13 19:58:53 +05:30
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|