diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala index 588ed11..d28e684 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala @@ -55,21 +55,24 @@ object Server extends App with Logging { }}) .localAddress(new InetSocketAddress(port)) + val cleanup = { () => + stopServer(server) + Indexer.stop + Searcher.close + } + Runtime.getRuntime.addShutdownHook( new Thread("ShutdownHook") { - override def run { - stopServer(server) - UnifiedHandler.stop - } + override def run = cleanup() }) try { + Indexer.start server.bind().sync.channel.closeFuture.sync } catch { case e : Exception => { logger.error("Exception while running server. Stopping server", e) - stopServer(server) - UnifiedHandler.stop + cleanup() } } } @@ -85,21 +88,17 @@ object Server extends App with Logging { @Sharable object UnifiedHandler extends ChannelInboundByteHandlerAdapter { - lazy val indexer = { val indexer = new Indexer; indexer.start; indexer } - val httpRequestRouter = new HttpRequestRouter { val Echo = "^/echo$".r val Index = "^/index$".r val Search = "^/search.*".r def route = { case Echo() => EchoHandler - case Index() => new IndexHandler(indexer) + case Index() => new IndexHandler case Search() => SearchHandler } } - def stop = indexer.stop - override def inboundBufferUpdated(ctx : ChannelHandlerContext, in: ByteBuf) { if (in.readableBytes() < 5) { return; @@ -119,7 +118,7 @@ object UnifiedHandler extends ChannelInboundByteHandlerAdapter { ctx.pipeline .addLast("framedecoder", new DelimiterBasedFrameDecoder(1048576, Delimiters.lineDelimiter() : _*)) .addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))) - .addLast("csvhandler", new TcpIndexHandler(indexer)) + .addLast("csvhandler", new TcpIndexHandler) .remove(this) } ctx.nextInboundByteBuffer.writeBytes(in) @@ -140,7 +139,7 @@ object UnifiedHandler extends ChannelInboundByteHandlerAdapter { } -class TcpIndexHandler(indexer: Indexer) extends ChannelInboundMessageHandlerAdapter[String] { +class TcpIndexHandler extends ChannelInboundMessageHandlerAdapter[String] { var server: String = null var channel : String = null var botName : String = null @@ -156,7 +155,7 @@ class TcpIndexHandler(indexer: Indexer) extends ChannelInboundMessageHandlerAdap botName = values(2) inited = true } else { - indexer.index(new IndexRequest(server, channel, botName, + Indexer.index(new IndexRequest(server, channel, botName, List(ChatLine(values(0), values(1).toLong, values(2))))) } } @@ -171,13 +170,13 @@ object EchoHandler extends HttpRequestHandler { } @Sharable -class IndexHandler(indexer: Indexer) extends HttpRequestHandler { +class IndexHandler extends HttpRequestHandler { implicit val formats = DefaultFormats override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { future { val content = request.getContent().toString(Charset.forName("UTF-8")) val indexRequest = Serialization.read[IndexRequest](content) - indexer.index(indexRequest) + Indexer.index(indexRequest) } logRequest(ctx, request, sendDefaultResponse(ctx, request)) } @@ -187,7 +186,7 @@ class IndexHandler(indexer: Indexer) extends HttpRequestHandler { object SearchHandler extends HttpRequestHandler { implicit val formats = DefaultFormats override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { - future { + val f = future { val method = request.getMethod() val searchRequest = if (HttpMethod.POST.equals(method)) { val content = request.getContent().toString(Charset.forName("UTF-8")) @@ -210,8 +209,12 @@ object SearchHandler extends HttpRequestHandler { throw new UnsupportedOperationException("HTTP method " + method + " is not supported") } - val searchResult = Searcher.search(searchRequest) - logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) - } onFailure { case e : Exception => logger.error("Error", e) } + Searcher.search(searchRequest) + } + f onSuccess { + case searchResult => + logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) + } + f onFailure { case e : Exception => logger.error("Error", e) } } } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala index 5189661..5b5d62b 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -7,9 +7,8 @@ import java.util.concurrent.Future import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock - import scala.collection.JavaConversions._ - +import scala.collection.mutable import org.apache.lucene.analysis.Analyzer import org.apache.lucene.analysis.core.KeywordAnalyzer import org.apache.lucene.analysis.en.EnglishAnalyzer @@ -23,89 +22,35 @@ import org.apache.lucene.index.IndexWriter import org.apache.lucene.index.IndexWriterConfig import org.apache.lucene.store.FSDirectory import org.apache.lucene.util.Version - import com.typesafe.scalalogging.slf4j.Logging - import net.abhinavsarkar.ircsearch.model._ +import java.util.concurrent.BlockingDeque +import java.util.concurrent.BlockingQueue -class Indexer extends Logging { - - import Indexer._ - - private val indexQueue = new LinkedBlockingQueue[IndexRequest](10000) - private val scheduler = Executors.newSingleThreadScheduledExecutor - private val runLock = new ReentrantLock - private var runFuture : Future[_] = null - - def index(indexRequest : IndexRequest) = indexQueue.put(indexRequest) - - def start { - logger.info("Starting indexer") - runFuture = scheduler.scheduleWithFixedDelay( - new Runnable { - def run { - try { - runLock.lock - if (indexQueue.isEmpty) - return - - val indexReqs = new ArrayList[IndexRequest] - indexQueue.drainTo(indexReqs) - doIndex(indexReqs.toList) - } catch { - case e : Throwable => logger.error("Exception while running indexer", e) - } finally { - runLock.unlock - } - }}, - 0, 1, TimeUnit.SECONDS) - } - - def stop { - try { - runLock.lock - if (runFuture != null) { - runFuture.cancel(false) - runFuture = null - } - logger.info("Stopped indexer") - } finally { - runLock.unlock - } - } - - private def doIndex(indexReqs: List[IndexRequest]) { - val indexRequests = indexReqs.groupBy { r => - (r.server, r.channel, r.botName) - } - - for (((server, channel, botName), indexRequestBatch) <- indexRequests) { - val indexDir = getIndexDir(server, channel, botName) - val analyzer = mkAnalyzer - val indexWriter = mkIndexWriter(indexDir, analyzer) - try { - for (indexRequest <- indexRequestBatch; - chatLine <- indexRequest.chatLines) { - val tsField = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES) - val userField = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES) - val msgField = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES) - indexWriter.addDocument(List(tsField, userField, msgField), analyzer) - logger.debug("Indexed : [{} {} {}] [{}] {}: {}", - server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message) - } - } finally { - indexWriter.close - analyzer.close - } - } - } - -} - -object Indexer { +object Indexer extends Logging { val LUCENE_VERSION = Version.LUCENE_43 + private val indexReqQueue = new LinkedBlockingQueue[IndexRequest](10000) + private val scheduler = Executors.newScheduledThreadPool(2) + private val runLock = new ReentrantLock + private var indexingFuture : Future[_] = null + private var flushFuture : Future[_] = null + + private val indexers = mutable.Map[String, IndexWriter]() + + private def close { + for (indexer <- indexers.values) + indexer.close + logger.info("Closed Indexer") + } + + private def flush { + for (indexer <- indexers.values) + indexer.commit + logger.info("Flushed Indexer") + } + def mkAnalyzer : Analyzer = { val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION) val fieldAnalyzers = Map( @@ -115,15 +60,90 @@ object Indexer { new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers) } - private def mkIndexWriter(dirPath : String, analyzer : Analyzer) : IndexWriter = { - val indexDir = new File(dirPath) - if (indexDir.exists) { - assert(indexDir.isDirectory) + private def getIndexWriter(dirPath : String) : IndexWriter = { + synchronized { + if (!(indexers contains dirPath)) { + val indexDir = new File(dirPath) + if (indexDir.exists) { + assert(indexDir.isDirectory) + } + val indexer = new IndexWriter(FSDirectory.open(indexDir), + new IndexWriterConfig(LUCENE_VERSION, mkAnalyzer)) + indexers += (dirPath -> indexer) + } } - new IndexWriter(FSDirectory.open(indexDir), new IndexWriterConfig(LUCENE_VERSION, analyzer)) + + indexers(dirPath) } def getIndexDir(server : String, channel : String, botName : String) : String = s"index-$server-$channel-$botName" + def index(indexRequest : IndexRequest) = indexReqQueue.put(indexRequest) + + private def doInLock(f : => Unit) { + try { + runLock.lock + f + } finally { + runLock.unlock + } + } + + implicit private def funcToRunnable(f : => Unit) : Runnable = new Runnable { + def run { + try { f } + catch { + case e : Throwable => logger.error("Exception while running", e) + } + }} + + def indexReqStream : Stream[IndexRequest] = Stream.cons(indexReqQueue.take, indexReqStream) + + def start { + logger.info("Starting indexer") + indexingFuture = scheduler.submit { + for (indexReq <- indexReqStream) + doInLock { + doIndex(List(indexReq)) + } + } + flushFuture = scheduler.scheduleWithFixedDelay(doInLock(flush), 0, 10, TimeUnit.SECONDS) + } + + def stop { + doInLock { + if (indexingFuture != null) { + indexingFuture.cancel(false) + indexingFuture = null + } + if (flushFuture != null) { + flushFuture.cancel(false) + flushFuture = null + } + close + logger.info("Stopped indexer") + } + } + + private def doIndex(indexReqs: List[IndexRequest]) { + val indexRequests = indexReqs.groupBy { r => + (r.server, r.channel, r.botName) + } + + for (((server, channel, botName), indexRequestBatch) <- indexRequests) { + val indexDir = getIndexDir(server, channel, botName) + val indexWriter = getIndexWriter(indexDir) + for (indexRequest <- indexRequestBatch; + chatLine <- indexRequest.chatLines) { + val tsField = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES) + val userField = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES) + val msgField = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES) + indexWriter.addDocument(List(tsField, userField, msgField), indexWriter.getAnalyzer) + logger.debug("Indexed : [{} {} {}] [{}] {}: {}", + server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message) + } + } + } + } \ No newline at end of file diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala index e665634..38710c1 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -33,21 +33,27 @@ object Searcher extends Logging { val MaxHits = 1000 - val readers = mutable.Map[String, SearcherManager]() + private val searcherMgrs = mutable.Map[String, SearcherManager]() - private def mkIndexSearcher(dirPath : String) : SearcherManager = { + def close { + for (searcherMgr <- searcherMgrs.values) + searcherMgr.close + logger.info("Closed Searcher") + } + + private def getSearcherMgr(dirPath : String) : SearcherManager = { synchronized { - if (!(readers contains dirPath)) { + if (!(searcherMgrs contains dirPath)) { val indexDir = new File(dirPath) assert(indexDir.exists && indexDir.isDirectory) val dir = FSDirectory.open(indexDir) - readers += (dirPath -> new SearcherManager(dir, new SearcherFactory)) + searcherMgrs += (dirPath -> new SearcherManager(dir, new SearcherFactory)) } } - readers(dirPath) + searcherMgrs(dirPath) } private def mkQueryParser(analyzer : Analyzer) = @@ -126,7 +132,7 @@ object Searcher extends Logging { private def doSearch(indexDir : String, query : Query, page : Int, pageSize : Int) : (Int, List[(ChatLine, Float)]) = { - val searcherMgr = mkIndexSearcher(indexDir) + val searcherMgr = getSearcherMgr(indexDir) searcherMgr.maybeRefresh val indexSearcher = searcherMgr.acquire() try {