From 613e9161725801d108d35fda017a4a8d922474f0 Mon Sep 17 00:00:00 2001 From: Abhinav Sarkar Date: Sat, 18 May 2013 18:02:52 +0530 Subject: [PATCH] Added support for pagination, constraining search by timestamp range. Performance inprovement in searching --- .../net/abhinavsarkar/ircsearch/Server.scala | 41 ++++--- .../ircsearch/lucene/Indexer.scala | 4 +- .../ircsearch/lucene/Searcher.scala | 106 +++++++++++++----- 3 files changed, 104 insertions(+), 47 deletions(-) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala index 953f75c..588ed11 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala @@ -187,22 +187,31 @@ class IndexHandler(indexer: Indexer) extends HttpRequestHandler { object SearchHandler extends HttpRequestHandler { implicit val formats = DefaultFormats override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { - val method = request.getMethod() - val searchRequest = if (HttpMethod.POST.equals(method)) { - val content = request.getContent().toString(Charset.forName("UTF-8")) - Serialization.read[SearchRequest](content) - } else if (HttpMethod.GET.equals(method)) { - val params = new QueryStringDecoder(request.getUri).getParameters - val server = params("server")(0) - val channel = params("channel")(0) - val botName = params("botName")(0) - val query = params("query")(0) - new SearchRequest(server, channel, botName, query) - } else { - throw new UnsupportedOperationException("HTTP method " + method + " is not supported") - } + future { + val method = request.getMethod() + val searchRequest = if (HttpMethod.POST.equals(method)) { + val content = request.getContent().toString(Charset.forName("UTF-8")) + Serialization.read[SearchRequest](content) + } else if (HttpMethod.GET.equals(method)) { + val params = new QueryStringDecoder(request.getUri).getParameters.toMap + val server = params("server")(0) + val channel = params("channel")(0) + val botName = params("botName")(0) + val query = params("query")(0) + val page = params.get("page").collect({ case l => l.get(0) }) + val pageSize = params.get("pageSize").collect({ case l => l.get(0) }) + var sr = new SearchRequest(server, channel, botName, query) + if (page.isDefined) + sr = sr.copy(page = page.get.toInt) + if (pageSize.isDefined) + sr = sr.copy(pageSize = pageSize.get.toInt) + sr + } else { + throw new UnsupportedOperationException("HTTP method " + method + " is not supported") + } - val searchResult = Searcher.search(searchRequest) - logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) + val searchResult = Searcher.search(searchRequest) + logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) + } onFailure { case e : Exception => logger.error("Error", e) } } } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala index 5aeaeb3..5189661 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -32,12 +32,12 @@ class Indexer extends Logging { import Indexer._ - private val indexQueue = new LinkedBlockingQueue[IndexRequest] + private val indexQueue = new LinkedBlockingQueue[IndexRequest](10000) private val scheduler = Executors.newSingleThreadScheduledExecutor private val runLock = new ReentrantLock private var runFuture : Future[_] = null - def index(indexRequest : IndexRequest) = indexQueue.offer(indexRequest) + def index(indexRequest : IndexRequest) = indexQueue.put(indexRequest) def start { logger.info("Starting indexer") diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala index e55c171..e665634 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -1,19 +1,25 @@ package net.abhinavsarkar.ircsearch.lucene import java.io.File +import java.text.ParseException +import java.text.SimpleDateFormat import scala.collection.JavaConversions._ import scala.collection.mutable +import scala.collection.mutable.Buffer import org.apache.lucene.analysis.Analyzer -import org.apache.lucene.index.IndexReader +import org.apache.lucene.queries.ChainedFilter import org.apache.lucene.queryparser.classic.QueryParser import org.apache.lucene.search.BooleanClause import org.apache.lucene.search.BooleanQuery +import org.apache.lucene.search.Filter import org.apache.lucene.search.FilteredQuery -import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.search.NumericRangeFilter import org.apache.lucene.search.Query import org.apache.lucene.search.QueryWrapperFilter +import org.apache.lucene.search.SearcherFactory +import org.apache.lucene.search.SearcherManager import org.apache.lucene.search.Sort import org.apache.lucene.search.SortField import org.apache.lucene.search.TermQuery @@ -25,41 +31,75 @@ import net.abhinavsarkar.ircsearch.model._ object Searcher extends Logging { - private def mkIndexSearcher(dirPath : String) : IndexSearcher = { - val indexDir = new File(dirPath) - assert(indexDir.exists && indexDir.isDirectory) + val MaxHits = 1000 - new IndexSearcher(IndexReader.open(FSDirectory.open(indexDir))) + val readers = mutable.Map[String, SearcherManager]() + + private def mkIndexSearcher(dirPath : String) : SearcherManager = { + synchronized { + if (!(readers contains dirPath)) { + val indexDir = new File(dirPath) + assert(indexDir.exists && indexDir.isDirectory) + + val dir = FSDirectory.open(indexDir) + readers += (dirPath -> new SearcherManager(dir, new SearcherFactory)) + + } + } + + readers(dirPath) } private def mkQueryParser(analyzer : Analyzer) = new QueryParser(Indexer.LUCENE_VERSION, ChatLine.MSG, analyzer) - private def filterifyQuery(query : Query, mustFields : Set[String]) : Query = + private def filterifyQuery(query : Query) : Query = query match { case boolQuery: BooleanQuery => { val newQuery = new BooleanQuery - val filterQuery = new BooleanQuery + val filters = Buffer[Filter]() for (clause <- boolQuery.getClauses) { val subQuery = clause.getQuery if (subQuery.isInstanceOf[TermQuery]) { val termQuery = subQuery.asInstanceOf[TermQuery] val field = termQuery.getTerm.field - if (mustFields contains field) { - clause.setOccur(BooleanClause.Occur.MUST) - filterQuery.add(clause) - } else { - newQuery.add(clause) + val sdf = new SimpleDateFormat("yyMMdd") + field match { + case ChatLine.USER => { + val filterQuery = new BooleanQuery + clause.setOccur(BooleanClause.Occur.MUST) + filterQuery.add(clause) + filters += new QueryWrapperFilter(filterQuery) + } + case "before" => { + try { + val ts = sdf.parse(termQuery.getTerm.text).getTime + filters += NumericRangeFilter.newLongRange( + ChatLine.TS, 0, ts, true, true) + } catch { + case e : ParseException => {} + } + } + case "after" => { + try { + val ts = sdf.parse(termQuery.getTerm.text).getTime + filters += NumericRangeFilter.newLongRange( + ChatLine.TS, ts, java.lang.Long.MAX_VALUE, true, true) + } catch { + case e : ParseException => {} + } + } + case _ => newQuery.add(clause) } } else { newQuery.add(clause) } } - if (filterQuery.clauses.isEmpty) + if (filters.isEmpty) newQuery else - new FilteredQuery(newQuery, new QueryWrapperFilter(filterQuery)) + new FilteredQuery(newQuery, new ChainedFilter(filters.toArray, ChainedFilter.AND)) } case _ => query } @@ -72,9 +112,9 @@ object Searcher extends Logging { val analyzer = Indexer.mkAnalyzer try { val queryParser = mkQueryParser(analyzer) - val query = filterifyQuery(queryParser.parse(searchRequest.query), Set(ChatLine.USER)) + val query = filterifyQuery(queryParser.parse(searchRequest.query)) logger.debug("Query: {}", query) - val (totalResults, results) = doSearch(indexDir, query, searchRequest.pageSize) + val (totalResults, results) = doSearch(indexDir, query, searchRequest.page, searchRequest.pageSize) val searchResults = SearchResult.fromSearchRequest(searchRequest) .copy(totalResults = totalResults, chatLines = results.map(_._1)) logger.debug("Search results: {}", searchResults) @@ -84,21 +124,29 @@ object Searcher extends Logging { } } - private def doSearch(indexDir : String, query : Query, maxHits : Int) + private def doSearch(indexDir : String, query : Query, page : Int, pageSize : Int) : (Int, List[(ChatLine, Float)]) = { - val indexSearcher = mkIndexSearcher(indexDir) - val topDocs = indexSearcher.search(query, maxHits, - new Sort(SortField.FIELD_SCORE, new SortField(ChatLine.TS, SortField.Type.LONG, true))) - val docs = topDocs.scoreDocs.map { sd => - val score = sd.score - val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) { - (map, field) => map += (field.name -> field.stringValue) - } + val searcherMgr = mkIndexSearcher(indexDir) + searcherMgr.maybeRefresh + val indexSearcher = searcherMgr.acquire() + try { + val topDocs = indexSearcher.search(query, MaxHits.min((page + 1) * pageSize), + new Sort(SortField.FIELD_SCORE, new SortField(ChatLine.TS, SortField.Type.LONG, true))) + val docs = topDocs.scoreDocs + .drop(page * pageSize) + .map { sd => + val score = sd.score + val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) { + (map, field) => map += (field.name -> field.stringValue) + } - val chatLine = new ChatLine(doc(ChatLine.USER), doc(ChatLine.TS).toLong, doc(ChatLine.MSG)) - (chatLine, score) + val chatLine = new ChatLine(doc(ChatLine.USER), doc(ChatLine.TS).toLong, doc(ChatLine.MSG)) + (chatLine, score) + } + (topDocs.totalHits, docs.toList) + } finally { + searcherMgr.release(indexSearcher) } - (topDocs.totalHits, docs.toList) } } \ No newline at end of file