From 72ac95037496d0e4a7e1584a5cbbab0605f17146 Mon Sep 17 00:00:00 2001 From: Abhinav Sarkar Date: Tue, 21 May 2013 23:31:40 +0530 Subject: [PATCH] Some refactoring / Scalafication --- .../ircsearch/HttpRequestHandler.scala | 20 ++++--- .../net/abhinavsarkar/ircsearch/Server.scala | 54 ++++++++++--------- .../ircsearch/lucene/Indexer.scala | 13 ++--- .../ircsearch/lucene/Searcher.scala | 38 ++++++------- .../net/abhinavsarkar/ircsearch/model.scala | 2 + 5 files changed, 64 insertions(+), 63 deletions(-) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala index cefc3c5..a70a8d0 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala @@ -26,20 +26,24 @@ trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest response } - protected def sendSuccess(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = { - val response = new DefaultHttpResponse(HTTP_1_1, OK) + private def sendReponse(ctx: ChannelHandlerContext, request: HttpRequest, + response: HttpResponse, body: String): HttpResponse = { response.setContent(Unpooled.copiedBuffer(body.getBytes)) response.setHeader(CONTENT_TYPE, "application/json") writeResponse(ctx, request, response) response } - protected def sendError(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = { - val response = new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR) - response.setContent(Unpooled.copiedBuffer(body.getBytes)) - response.setHeader(CONTENT_TYPE, "text/plain") - writeResponse(ctx, request, response) - response + protected def sendSuccess(ctx : ChannelHandlerContext, request : HttpRequest, body : String) = { + sendReponse(ctx, request, new DefaultHttpResponse(HTTP_1_1, OK), body) + } + + protected def sendServerError(ctx : ChannelHandlerContext, request : HttpRequest, body : String) = { + sendReponse(ctx, request, new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR), body) + } + + protected def sendClientError(ctx : ChannelHandlerContext, request : HttpRequest, body : String) = { + sendReponse(ctx, request, new DefaultHttpResponse(HTTP_1_1, BAD_REQUEST), body) } protected def writeResponse( diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala index a738a27..0842d2f 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala @@ -70,7 +70,7 @@ object Server extends App with Logging { } } - def stopServer(server : ServerBootstrap) { + private def stopServer(server : ServerBootstrap) { logger.info("Stopping server") server.shutdown logger.info("Stopped server") @@ -79,7 +79,7 @@ object Server extends App with Logging { } @Sharable -object UnifiedHandler extends ChannelInboundByteHandlerAdapter { +private object UnifiedHandler extends ChannelInboundByteHandlerAdapter { val httpRequestRouter = new HttpRequestRouter { val Echo = "^/echo$".r @@ -132,7 +132,7 @@ object UnifiedHandler extends ChannelInboundByteHandlerAdapter { } -class TcpIndexHandler extends ChannelInboundMessageHandlerAdapter[String] { +private class TcpIndexHandler extends ChannelInboundMessageHandlerAdapter[String] { var server: String = null var channel : String = null var botName : String = null @@ -148,14 +148,14 @@ class TcpIndexHandler extends ChannelInboundMessageHandlerAdapter[String] { botName = values(2) inited = true } else { - Indexer.index(new IndexRequest(server, channel, botName, + Indexer.index(IndexRequest(server, channel, botName, List(ChatLine(values(0), values(1).toLong, values(2))))) } } } @Sharable -object EchoHandler extends HttpRequestHandler { +private object EchoHandler extends HttpRequestHandler { override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { val content = request.getContent().toString(Charset.forName("UTF-8")) logRequest(ctx, request, sendSuccess(ctx, request, content)) @@ -163,7 +163,7 @@ object EchoHandler extends HttpRequestHandler { } @Sharable -class IndexHandler extends HttpRequestHandler { +private class IndexHandler extends HttpRequestHandler { implicit val formats = DefaultFormats override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { future { @@ -176,7 +176,7 @@ class IndexHandler extends HttpRequestHandler { } @Sharable -object SearchHandler extends HttpRequestHandler { +private object SearchHandler extends HttpRequestHandler { implicit val formats = DefaultFormats override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { val f = future { @@ -186,15 +186,12 @@ object SearchHandler extends HttpRequestHandler { Serialization.read[SearchRequest](content) } else if (HttpMethod.GET.equals(method)) { val params = new QueryStringDecoder(request.getUri).getParameters.toMap - val server = params("server")(0) - val channel = params("channel")(0) - val botName = params("botName")(0) - val query = params("query")(0) - val page = params.get("page").collect({ case l => l.get(0) }) - val pageSize = params.get("pageSize").collect({ case l => l.get(0) }) - val details = params.get("details").collect({ case l => l.get(0) }) + val List(server, channel, botName, query) = + List("server", "channel", "botName", "query").map(params(_).get(0)) + val List(page, pageSize, details) = + List("page", "pageSize", "details").map(params.get(_).map({ case l => l.get(0) })) - var sr = new SearchRequest(server, channel, botName, query) + var sr = SearchRequest(server, channel, botName, query) if (page.isDefined) sr = sr.copy(page = page.get.toInt) if (pageSize.isDefined) @@ -210,16 +207,23 @@ object SearchHandler extends HttpRequestHandler { } f onSuccess { case (searchRequest, searchResult) => - logRequest(ctx, request, - sendSuccess(ctx, request, - if (searchRequest.details) - Serialization.write(searchResult) - else - Serialization.write(searchResult.toSimpleSearchResult))) + logRequest(ctx, request, sendSuccess(ctx, request, + Serialization.write( + if (searchRequest.details) searchResult else searchResult.toSimpleSearchResult))) + } + f onFailure { + case e => { + logger.error("Error", e) + val body = Serialization.write(SearchError(e.getMessage)) + e match { + case e : NoSuchElementException => { + logRequest(ctx, request, sendClientError(ctx, request, body)) + } + case _ => { + logRequest(ctx, request, sendServerError(ctx, request, body)) + } + } + } } - f onFailure { case e : Exception => { - logger.error("Error", e) - logRequest(ctx, request, sendError(ctx, request, e.getMessage)) - }} } } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala index 36b8d50..7b7f0da 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -41,9 +41,7 @@ object Indexer extends Logging { object IndexRecord { def fromIndexRequest(indexRequest : IndexRequest) = { val IndexRequest(server, channel, botName, chatLines) = indexRequest - for { - chatLine <- chatLines - } yield new IndexRecord(server, channel, botName, chatLine) + chatLines.map(IndexRecord(server, channel, botName, _)) } } @@ -64,14 +62,12 @@ object Indexer extends Logging { private val indexers = mutable.Map[String, IndexWriter]() private def close { - for (indexer <- indexers.values) - indexer.close + indexers.values.foreach(_.close) logger.info("Closed Indexer") } private def flush { - for (indexer <- indexers.values) - indexer.commit + indexers.values.foreach(_.commit) logger.info("Flushed Indexer") } @@ -145,7 +141,7 @@ object Indexer extends Logging { def start { logger.info("Starting indexer") - indexingFuture = schedule(0, IndexingDurationSecs.max(ContextDurationSecs), TimeUnit.SECONDS) { + indexingFuture = schedule(0, IndexingDurationSecs, TimeUnit.SECONDS) { if (!indexQueue.isEmpty) { val indexRecs = new ArrayList[IndexRecord] indexQueue drainTo indexRecs @@ -172,6 +168,7 @@ object Indexer extends Logging { } } } + flushFuture = schedule(0, FlushDurationSecs, TimeUnit.SECONDS) { doInLock(flush) } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala index 5d82ced..c16ddd0 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -28,8 +28,7 @@ object Searcher extends Logging { private val searcherMgrs = mutable.Map[String, SearcherManager]() def close { - for (searcherMgr <- searcherMgrs.values) - searcherMgr.close + searcherMgrs.values.foreach(_.close) logger.info("Closed Searcher") } @@ -41,7 +40,6 @@ object Searcher extends Logging { val dir = FSDirectory.open(indexDir) searcherMgrs += (dirPath -> new SearcherManager(dir, new SearcherFactory)) - } } @@ -71,23 +69,18 @@ object Searcher extends Logging { filterQuery.add(clause) filters += new QueryWrapperFilter(filterQuery) } - case "before" => { - try { - val ts = sdf.parse(termQuery.getTerm.text).getTime - filters += NumericRangeFilter.newLongRange( - ChatLine.TS, 0, ts, true, true) - } catch { - case e : ParseException => {} - } + case "before" => try { + val ts = sdf.parse(termQuery.getTerm.text).getTime + filters += NumericRangeFilter.newLongRange(ChatLine.TS, 0, ts, true, true) + } catch { + case e : ParseException => {} } - case "after" => { - try { - val ts = sdf.parse(termQuery.getTerm.text).getTime - filters += NumericRangeFilter.newLongRange( - ChatLine.TS, ts, java.lang.Long.MAX_VALUE, true, true) - } catch { - case e : ParseException => {} - } + case "after" => try { + val ts = sdf.parse(termQuery.getTerm.text).getTime + filters += NumericRangeFilter.newLongRange( + ChatLine.TS, ts, java.lang.Long.MAX_VALUE, true, true) + } catch { + case e : ParseException => {} } case _ => newQuery.add(clause) } @@ -108,7 +101,8 @@ object Searcher extends Logging { logger.debug("Searching : [{} {} {}] {}", searchRequest.server, searchRequest.channel, searchRequest.botName, searchRequest.query) - val indexDir = Indexer.getIndexDir(searchRequest.server, searchRequest.channel, searchRequest.botName) + val indexDir = + Indexer.getIndexDir(searchRequest.server, searchRequest.channel, searchRequest.botName) val analyzer = Indexer.mkAnalyzer try { val queryParser = mkQueryParser(analyzer) @@ -147,10 +141,10 @@ object Searcher extends Logging { val LineRe = "(\\d+) (.*?): (.*)".r val List(ctxBefore, ctxAfter) = List(contextBefore, contextAfter).map { _.split('\n').filterNot(_.isEmpty).map { - case LineRe(timestamp, user, message) => new ChatLine(user, timestamp.toLong, message) + case LineRe(timestamp, user, message) => ChatLine(user, timestamp.toLong, message) }} - val chatLine = new ChatLine(user, timestamp.toLong, message, ctxBefore.toList, ctxAfter.toList) + val chatLine = ChatLine(user, timestamp.toLong, message, ctxBefore.toList, ctxAfter.toList) (chatLine, score) } (topDocs.totalHits, docs.toList) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala index 026d115..a24a61b 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala @@ -43,3 +43,5 @@ object SearchResult { case class SimpleSearchResult( server : String, channel : String, botName : String, query: String, page : Int, pageSize : Int, totalResults : Int, lines : List[List[List[String]]]) + +case class SearchError(error : String) \ No newline at end of file