Added support for indexing and searching context of a chat line

This commit is contained in:
Abhinav Sarkar 2013-05-19 23:37:00 +05:30
parent 3822bd6017
commit 2d50b9bb73
5 changed files with 138 additions and 37 deletions

View File

@ -39,6 +39,14 @@ trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest
protected def sendError(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = {
val response = new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR)
response.setHeader(CONTENT_TYPE, "text/plain")
writeResponse(ctx, request, response)
protected def writeResponse(
ctx : ChannelHandlerContext, request : HttpRequest, response : HttpResponse) {
response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes())

View File

@ -215,6 +215,9 @@ object SearchHandler extends HttpRequestHandler {
case searchResult =>
logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult)))
f onFailure { case e : Exception => logger.error("Error", e) }
f onFailure { case e : Exception => {
logger.error("Error", e)
logRequest(ctx, request, sendError(ctx, request, e.getMessage))

View File

@ -4,11 +4,15 @@ import
import java.util.ArrayList
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.PriorityBlockingQueue
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConversions._
import scala.collection.Seq
import scala.collection.mutable
import scala.math.Ordered
import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.analysis.core.KeywordAnalyzer
import org.apache.lucene.analysis.en.EnglishAnalyzer
@ -22,16 +26,40 @@ import org.apache.lucene.index.IndexWriter
import org.apache.lucene.index.IndexWriterConfig
import org.apache.lucene.util.Version
import com.typesafe.scalalogging.slf4j.Logging
import net.abhinavsarkar.ircsearch.model._
import java.util.concurrent.BlockingDeque
import java.util.concurrent.BlockingQueue
object Indexer extends Logging {
case class IndexRecord(
server : String, channel : String, botName : String, chatLine : ChatLine,
indexed : Boolean = false)
extends Ordered[IndexRecord] {
def compare(that : IndexRecord) = {
val diff = this.chatLine.timestamp - that.chatLine.timestamp
if (diff > 0) 1 else if (diff < 0) -1 else 0
private val indexReqQueue = new LinkedBlockingQueue[IndexRequest](10000)
object IndexRecord {
def fromIndexRequest(indexRequest : IndexRequest) = {
val IndexRequest(server, channel, botName, chatLines) = indexRequest
for {
chatLine <- chatLines
} yield new IndexRecord(server, channel, botName, chatLine)
val ContextSize = 2
val ContextDurationSecs = 20
val IndexingDurationSecs = 10
private val indexQueue = new PriorityBlockingQueue[IndexRecord](10000)
private val scheduler = Executors.newScheduledThreadPool(2)
private val runLock = new ReentrantLock
private var indexingFuture : Future[_] = null
@ -55,7 +83,9 @@ object Indexer extends Logging {
val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION)
val fieldAnalyzers = Map(
ChatLine.USER -> new KeywordAnalyzer,
ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION))
ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION),
ChatLine.CTXB -> new EnglishAnalyzer(LUCENE_VERSION),
ChatLine.CTXA -> new EnglishAnalyzer(LUCENE_VERSION))
new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers)
@ -79,7 +109,8 @@ object Indexer extends Logging {
def getIndexDir(server : String, channel : String, botName : String) : String =
def index(indexRequest : IndexRequest) = indexReqQueue.put(indexRequest)
def index(indexRequest : IndexRequest) =
private def doInLock(f : => Unit) {
try {
@ -98,17 +129,59 @@ object Indexer extends Logging {
def indexReqStream : Stream[IndexRequest] = Stream.cons(indexReqQueue.take, indexReqStream)
def schedule(initialDelay : Int, delay : Int, unit : TimeUnit)(f : => Unit) = {
scheduler.scheduleWithFixedDelay(f, initialDelay, delay, unit)
def fillContext(rec: IndexRecord, recs: Seq[IndexRecord], idx : Int) = {
rec.copy(chatLine =
contextBefore = recs.slice(idx - ContextSize, idx).map(_.chatLine)
.filter(_.timestamp >= rec.chatLine.timestamp - ContextDurationSecs * 1000)
contextAfter = recs.slice(idx + 1, 2 * ContextSize + 1).map(_.chatLine)
.filter(_.timestamp <= rec.chatLine.timestamp + ContextDurationSecs * 1000)
def start {"Starting indexer")
indexingFuture = scheduler.submit {
for (indexReq <- indexReqStream)
doInLock {
indexingFuture = schedule(0, IndexingDurationSecs.max(ContextDurationSecs), TimeUnit.SECONDS) {
if (!indexQueue.isEmpty) {
val indexRecs = new ArrayList[IndexRecord]
indexQueue drainTo indexRecs
val indexRecsMap = indexRecs groupBy { r => (r.server,, r.botName) }
val windowSize = 2 * ContextSize + 1
for (indexRecBatch <- indexRecsMap.values) {
for (recs <- indexRecBatch.sliding(windowSize)) {
if (recs.size == windowSize) {
doInLock {
doIndex(fillContext(recs(ContextSize), recs, ContextSize))
} else if (recs.size < ContextSize + 1) {
} else {
recs.zipWithIndex.drop(ContextSize).foreach { r =>
doInLock {
doIndex(fillContext(r._1, recs, r._2))
if (indexRecBatch.size > windowSize) {
indexRecBatch.slice(indexRecBatch.length - 2 * ContextSize, indexRecBatch.length)
.map { r => if (r._2 < ContextSize) r._1.copy(indexed = true) else r._1 }
flushFuture = schedule(0, 10, TimeUnit.SECONDS) {
flushFuture = scheduler.scheduleWithFixedDelay(doInLock(flush), 0, 10, TimeUnit.SECONDS)
def stop {
@ -126,23 +199,22 @@ object Indexer extends Logging {
private def doIndex(indexReqs: List[IndexRequest]) {
val indexRequests = indexReqs.groupBy { r =>
(r.server,, r.botName)
def ctxToStr(ctx : List[ChatLine]) = { line => s"${line.timestamp} ${line.user}: ${line.message}" } mkString "\n"
for (((server, channel, botName), indexRequestBatch) <- indexRequests) {
private def doIndex(indexRecord: IndexRecord) {
val IndexRecord(server, channel, botName, chatLine, indexed) = indexRecord
if (!indexed) {
val indexDir = getIndexDir(server, channel, botName)
val indexWriter = getIndexWriter(indexDir)
for (indexRequest <- indexRequestBatch;
chatLine <- indexRequest.chatLines) {
val tsField = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES)
val userField = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES)
val msgField = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES)
indexWriter.addDocument(List(tsField, userField, msgField), indexWriter.getAnalyzer)
logger.debug("Indexed : [{} {} {}] [{}] {}: {}",
server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message)
val ts = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES)
val user = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES)
val msg = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES)
val ctxBfr = new TextField(ChatLine.CTXB, ctxToStr(chatLine.contextBefore), Field.Store.YES)
val ctxAft = new TextField(ChatLine.CTXA, ctxToStr(chatLine.contextAfter), Field.Store.YES)
indexWriter.addDocument(List(ts, user, msg, ctxBfr, ctxAft), indexWriter.getAnalyzer)
logger.debug("Indexed : [{} {} {}] [{}] {}: {}",
server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message)

View File

@ -5,12 +5,13 @@ import java.text.ParseException
import java.text.SimpleDateFormat
import scala.collection.JavaConversions._
import scala.collection.immutable.Map
import scala.collection.mutable
import scala.collection.mutable.Buffer
import org.apache.lucene.analysis.Analyzer
import org.apache.lucene.queries.ChainedFilter
import org.apache.lucene.queryparser.classic.QueryParser
import org.apache.lucene.queryparser.classic.MultiFieldQueryParser
@ -32,6 +33,7 @@ import net.abhinavsarkar.ircsearch.model._
object Searcher extends Logging {
val MaxHits = 1000
val MessageFieldBoost = java.lang.Float.valueOf(2.0f)
private val searcherMgrs = mutable.Map[String, SearcherManager]()
@ -57,7 +59,9 @@ object Searcher extends Logging {
private def mkQueryParser(analyzer : Analyzer) =
new QueryParser(Indexer.LUCENE_VERSION, ChatLine.MSG, analyzer)
new MultiFieldQueryParser(Indexer.LUCENE_VERSION,
List(ChatLine.MSG, ChatLine.CTXB, ChatLine.CTXA).toArray, analyzer,
Map(ChatLine.MSG -> MessageFieldBoost))
private def filterifyQuery(query : Query) : Query =
query match {
@ -130,6 +134,8 @@ object Searcher extends Logging {
private val DocFields = List(ChatLine.USER, ChatLine.TS, ChatLine.MSG, ChatLine.CTXB, ChatLine.CTXA)
private def doSearch(indexDir : String, query : Query, page : Int, pageSize : Int)
: (Int, List[(ChatLine, Float)]) = {
val searcherMgr = getSearcherMgr(indexDir)
@ -139,14 +145,22 @@ object Searcher extends Logging {
val topDocs =, MaxHits.min((page + 1) * pageSize),
new Sort(SortField.FIELD_SCORE, new SortField(ChatLine.TS, SortField.Type.LONG, true)))
val docs = topDocs.scoreDocs
.drop(page * pageSize)
.map { sd =>
val score = sd.score
val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) {
(map, field) => map += ( -> field.stringValue)
.drop(page * pageSize)
.map { sd =>
val score = sd.score
val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) {
(map, field) => map += ( -> field.stringValue)
val chatLine = new ChatLine(doc(ChatLine.USER), doc(ChatLine.TS).toLong, doc(ChatLine.MSG))
val List(user, timestamp, message, contextBefore, contextAfter) =
val LineRe = "(\\d+) (.*?): (.*)".r
val List(ctxBefore, ctxAfter) = List(contextBefore, contextAfter).map {
_.split('\n').filterNot(_.isEmpty).map {
case LineRe(timestamp, user, message) => new ChatLine(user, timestamp.toLong, message)
val chatLine = new ChatLine(user, timestamp.toLong, message, ctxBefore.toList, ctxAfter.toList)
(chatLine, score)
(topDocs.totalHits, docs.toList)

View File

@ -5,9 +5,13 @@ object ChatLine {
val USER = "user"
val TS = "ts"
val MSG = "msg"
val CTXB = "ctxb"
val CTXA = "ctxa"
case class ChatLine(user : String, timestamp : Long, message : String)
case class ChatLine(user : String, timestamp : Long, message : String,
contextBefore : List[ChatLine] = List(),
contextAfter : List[ChatLine] = List())
case class IndexRequest(
server : String, channel : String, botName : String, chatLines : List[ChatLine])