From 2d50b9bb73fa55f6af09b94ebf8c2cf5ea868e20 Mon Sep 17 00:00:00 2001 From: Abhinav Sarkar Date: Sun, 19 May 2013 23:37:00 +0530 Subject: [PATCH] Added support for indexing and searching context of a chat line --- .../ircsearch/HttpRequestHandler.scala | 8 ++ .../net/abhinavsarkar/ircsearch/Server.scala | 5 +- .../ircsearch/lucene/Indexer.scala | 126 ++++++++++++++---- .../ircsearch/lucene/Searcher.scala | 30 +++-- .../net/abhinavsarkar/ircsearch/model.scala | 6 +- 5 files changed, 138 insertions(+), 37 deletions(-) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala index 243ef77..e5b24c4 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala @@ -39,6 +39,14 @@ trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest response } + protected def sendError(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = { + val response = new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR) + response.setContent(Unpooled.copiedBuffer(body.getBytes)) + response.setHeader(CONTENT_TYPE, "text/plain") + writeResponse(ctx, request, response) + response + } + protected def writeResponse( ctx : ChannelHandlerContext, request : HttpRequest, response : HttpResponse) { response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes()) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala index d28e684..e63a359 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala @@ -215,6 +215,9 @@ object SearchHandler extends HttpRequestHandler { case searchResult => logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) } - f onFailure { case e : Exception => logger.error("Error", e) } + f onFailure { case e : Exception => { + logger.error("Error", e) + logRequest(ctx, request, sendError(ctx, request, e.getMessage)) + }} } } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala index 5b5d62b..73d32be 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -4,11 +4,15 @@ import java.io.File import java.util.ArrayList import java.util.concurrent.Executors import java.util.concurrent.Future -import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock + import scala.collection.JavaConversions._ +import scala.collection.Seq import scala.collection.mutable +import scala.math.Ordered + import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.core.KeywordAnalyzer import org.apache.lucene.analysis.en.EnglishAnalyzer @@ -22,16 +26,40 @@ import org.apache.lucene.index.IndexWriter import org.apache.lucene.index.IndexWriterConfig import org.apache.lucene.store.FSDirectory import org.apache.lucene.util.Version + import com.typesafe.scalalogging.slf4j.Logging + import net.abhinavsarkar.ircsearch.model._ -import java.util.concurrent.BlockingDeque -import java.util.concurrent.BlockingQueue object Indexer extends Logging { - val LUCENE_VERSION = Version.LUCENE_43 + case class IndexRecord( + server : String, channel : String, botName : String, chatLine : ChatLine, + indexed : Boolean = false) + extends Ordered[IndexRecord] { + def compare(that : IndexRecord) = { + val diff = this.chatLine.timestamp - that.chatLine.timestamp + if (diff > 0) 1 else if (diff < 0) -1 else 0 + } + } - private val indexReqQueue = new LinkedBlockingQueue[IndexRequest](10000) + object IndexRecord { + + def fromIndexRequest(indexRequest : IndexRequest) = { + val IndexRequest(server, channel, botName, chatLines) = indexRequest + for { + chatLine <- chatLines + } yield new IndexRecord(server, channel, botName, chatLine) + } + + } + + val LUCENE_VERSION = Version.LUCENE_43 + val ContextSize = 2 + val ContextDurationSecs = 20 + val IndexingDurationSecs = 10 + + private val indexQueue = new PriorityBlockingQueue[IndexRecord](10000) private val scheduler = Executors.newScheduledThreadPool(2) private val runLock = new ReentrantLock private var indexingFuture : Future[_] = null @@ -55,7 +83,9 @@ object Indexer extends Logging { val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION) val fieldAnalyzers = Map( ChatLine.USER -> new KeywordAnalyzer, - ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION)) + ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION), + ChatLine.CTXB -> new EnglishAnalyzer(LUCENE_VERSION), + ChatLine.CTXA -> new EnglishAnalyzer(LUCENE_VERSION)) new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers) } @@ -79,7 +109,8 @@ object Indexer extends Logging { def getIndexDir(server : String, channel : String, botName : String) : String = s"index-$server-$channel-$botName" - def index(indexRequest : IndexRequest) = indexReqQueue.put(indexRequest) + def index(indexRequest : IndexRequest) = + IndexRecord.fromIndexRequest(indexRequest).foreach(indexQueue.put) private def doInLock(f : => Unit) { try { @@ -98,17 +129,59 @@ object Indexer extends Logging { } }} - def indexReqStream : Stream[IndexRequest] = Stream.cons(indexReqQueue.take, indexReqStream) + def schedule(initialDelay : Int, delay : Int, unit : TimeUnit)(f : => Unit) = { + scheduler.scheduleWithFixedDelay(f, initialDelay, delay, unit) + } + + def fillContext(rec: IndexRecord, recs: Seq[IndexRecord], idx : Int) = { + rec.copy(chatLine = + rec.chatLine.copy( + contextBefore = recs.slice(idx - ContextSize, idx).map(_.chatLine) + .filter(_.timestamp >= rec.chatLine.timestamp - ContextDurationSecs * 1000) + .toList, + contextAfter = recs.slice(idx + 1, 2 * ContextSize + 1).map(_.chatLine) + .filter(_.timestamp <= rec.chatLine.timestamp + ContextDurationSecs * 1000) + .toList)) + } def start { logger.info("Starting indexer") - indexingFuture = scheduler.submit { - for (indexReq <- indexReqStream) - doInLock { - doIndex(List(indexReq)) + indexingFuture = schedule(0, IndexingDurationSecs.max(ContextDurationSecs), TimeUnit.SECONDS) { + if (!indexQueue.isEmpty) { + val indexRecs = new ArrayList[IndexRecord] + indexQueue drainTo indexRecs + val indexRecsMap = indexRecs groupBy { r => (r.server, r.channel, r.botName) } + + val windowSize = 2 * ContextSize + 1 + for (indexRecBatch <- indexRecsMap.values) { + for (recs <- indexRecBatch.sliding(windowSize)) { + if (recs.size == windowSize) { + doInLock { + doIndex(fillContext(recs(ContextSize), recs, ContextSize)) + } + } else if (recs.size < ContextSize + 1) { + recs.foreach(indexQueue.offer) + } else { + recs.zipWithIndex.drop(ContextSize).foreach { r => + doInLock { + doIndex(fillContext(r._1, recs, r._2)) + } + } + } + } + + if (indexRecBatch.size > windowSize) { + indexRecBatch.slice(indexRecBatch.length - 2 * ContextSize, indexRecBatch.length) + .zipWithIndex + .map { r => if (r._2 < ContextSize) r._1.copy(indexed = true) else r._1 } + .foreach(indexQueue.put) + } } + } + } + flushFuture = schedule(0, 10, TimeUnit.SECONDS) { + doInLock(flush) } - flushFuture = scheduler.scheduleWithFixedDelay(doInLock(flush), 0, 10, TimeUnit.SECONDS) } def stop { @@ -126,23 +199,22 @@ object Indexer extends Logging { } } - private def doIndex(indexReqs: List[IndexRequest]) { - val indexRequests = indexReqs.groupBy { r => - (r.server, r.channel, r.botName) - } + def ctxToStr(ctx : List[ChatLine]) = + ctx.map { line => s"${line.timestamp} ${line.user}: ${line.message}" } mkString "\n" - for (((server, channel, botName), indexRequestBatch) <- indexRequests) { + private def doIndex(indexRecord: IndexRecord) { + val IndexRecord(server, channel, botName, chatLine, indexed) = indexRecord + if (!indexed) { val indexDir = getIndexDir(server, channel, botName) val indexWriter = getIndexWriter(indexDir) - for (indexRequest <- indexRequestBatch; - chatLine <- indexRequest.chatLines) { - val tsField = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES) - val userField = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES) - val msgField = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES) - indexWriter.addDocument(List(tsField, userField, msgField), indexWriter.getAnalyzer) - logger.debug("Indexed : [{} {} {}] [{}] {}: {}", - server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message) - } + val ts = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES) + val user = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES) + val msg = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES) + val ctxBfr = new TextField(ChatLine.CTXB, ctxToStr(chatLine.contextBefore), Field.Store.YES) + val ctxAft = new TextField(ChatLine.CTXA, ctxToStr(chatLine.contextAfter), Field.Store.YES) + indexWriter.addDocument(List(ts, user, msg, ctxBfr, ctxAft), indexWriter.getAnalyzer) + logger.debug("Indexed : [{} {} {}] [{}] {}: {}", + server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message) } } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala index 38710c1..9e96a35 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -5,12 +5,13 @@ import java.text.ParseException import java.text.SimpleDateFormat import scala.collection.JavaConversions._ +import scala.collection.immutable.Map import scala.collection.mutable import scala.collection.mutable.Buffer import org.apache.lucene.analysis.Analyzer import org.apache.lucene.queries.ChainedFilter -import org.apache.lucene.queryparser.classic.QueryParser +import org.apache.lucene.queryparser.classic.MultiFieldQueryParser import org.apache.lucene.search.BooleanClause import org.apache.lucene.search.BooleanQuery import org.apache.lucene.search.Filter @@ -32,6 +33,7 @@ import net.abhinavsarkar.ircsearch.model._ object Searcher extends Logging { val MaxHits = 1000 + val MessageFieldBoost = java.lang.Float.valueOf(2.0f) private val searcherMgrs = mutable.Map[String, SearcherManager]() @@ -57,7 +59,9 @@ object Searcher extends Logging { } private def mkQueryParser(analyzer : Analyzer) = - new QueryParser(Indexer.LUCENE_VERSION, ChatLine.MSG, analyzer) + new MultiFieldQueryParser(Indexer.LUCENE_VERSION, + List(ChatLine.MSG, ChatLine.CTXB, ChatLine.CTXA).toArray, analyzer, + Map(ChatLine.MSG -> MessageFieldBoost)) private def filterifyQuery(query : Query) : Query = query match { @@ -130,6 +134,8 @@ object Searcher extends Logging { } } + private val DocFields = List(ChatLine.USER, ChatLine.TS, ChatLine.MSG, ChatLine.CTXB, ChatLine.CTXA) + private def doSearch(indexDir : String, query : Query, page : Int, pageSize : Int) : (Int, List[(ChatLine, Float)]) = { val searcherMgr = getSearcherMgr(indexDir) @@ -139,14 +145,22 @@ object Searcher extends Logging { val topDocs = indexSearcher.search(query, MaxHits.min((page + 1) * pageSize), new Sort(SortField.FIELD_SCORE, new SortField(ChatLine.TS, SortField.Type.LONG, true))) val docs = topDocs.scoreDocs - .drop(page * pageSize) - .map { sd => - val score = sd.score - val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) { - (map, field) => map += (field.name -> field.stringValue) + .drop(page * pageSize) + .map { sd => + val score = sd.score + val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) { + (map, field) => map += (field.name -> field.stringValue) } - val chatLine = new ChatLine(doc(ChatLine.USER), doc(ChatLine.TS).toLong, doc(ChatLine.MSG)) + val List(user, timestamp, message, contextBefore, contextAfter) = DocFields.map(doc) + + val LineRe = "(\\d+) (.*?): (.*)".r + val List(ctxBefore, ctxAfter) = List(contextBefore, contextAfter).map { + _.split('\n').filterNot(_.isEmpty).map { + case LineRe(timestamp, user, message) => new ChatLine(user, timestamp.toLong, message) + }} + + val chatLine = new ChatLine(user, timestamp.toLong, message, ctxBefore.toList, ctxAfter.toList) (chatLine, score) } (topDocs.totalHits, docs.toList) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala index 44f741b..b529083 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala @@ -5,9 +5,13 @@ object ChatLine { val USER = "user" val TS = "ts" val MSG = "msg" + val CTXB = "ctxb" + val CTXA = "ctxa" } -case class ChatLine(user : String, timestamp : Long, message : String) +case class ChatLine(user : String, timestamp : Long, message : String, + contextBefore : List[ChatLine] = List(), + contextAfter : List[ChatLine] = List()) case class IndexRequest( server : String, channel : String, botName : String, chatLines : List[ChatLine])