From 52966fbd8c046396ff937f77cd1e29a0258f1939 Mon Sep 17 00:00:00 2001 From: Abhinav Sarkar Date: Sat, 18 May 2013 06:42:15 +0530 Subject: [PATCH] Added tcp server to read from a csv stream. Added GET support for search endpoint. Some cleanup --- pom.xml | 5 + .../ircsearch/HttpRequestHandler.scala | 28 ++-- .../ircsearch/HttpRequestRouter.scala | 5 +- .../net/abhinavsarkar/ircsearch/Server.scala | 135 ++++++++++++++---- .../ircsearch/lucene/Indexer.scala | 35 ++--- .../ircsearch/lucene/Searcher.scala | 65 ++++----- .../net/abhinavsarkar/ircsearch/model.scala | 6 + 7 files changed, 182 insertions(+), 97 deletions(-) diff --git a/pom.xml b/pom.xml index 6896d59..6abe35a 100644 --- a/pom.xml +++ b/pom.xml @@ -73,6 +73,11 @@ lucene-queryparser ${lucene.version} + + net.sf.opencsv + opencsv + 2.3 + diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala index 96c937b..243ef77 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala @@ -1,21 +1,19 @@ package net.abhinavsarkar.ircsearch -import io.netty.channel.ChannelInboundMessageHandlerAdapter -import io.netty.handler.codec.http.HttpRequest import com.typesafe.scalalogging.slf4j.Logging -import io.netty.channel.ChannelHandlerContext -import io.netty.handler.codec.http.HttpResponse -import io.netty.channel.ChannelFuture -import io.netty.handler.codec.http.HttpHeaders.isKeepAlive -import io.netty.handler.codec.http.HttpHeaders.Names._ -import io.netty.handler.codec.http.HttpHeaders -import io.netty.channel.ChannelFutureListener -import io.netty.handler.codec.http.DefaultHttpResponse -import io.netty.handler.codec.http.HttpResponseStatus._ -import io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import io.netty.handler.codec.http.HttpVersion -import io.netty.handler.codec.http.HttpResponseStatus + import io.netty.buffer.Unpooled +import io.netty.channel.ChannelFutureListener +import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundMessageHandlerAdapter +import io.netty.handler.codec.http.DefaultHttpResponse +import io.netty.handler.codec.http.HttpHeaders +import io.netty.handler.codec.http.HttpHeaders.Names._ +import io.netty.handler.codec.http.HttpHeaders.isKeepAlive +import io.netty.handler.codec.http.HttpRequest +import io.netty.handler.codec.http.HttpResponse +import io.netty.handler.codec.http.HttpResponseStatus._ +import io.netty.handler.codec.http.HttpVersion.HTTP_1_1 trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest] with Logging { @@ -34,7 +32,7 @@ trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest } protected def sendSuccess(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = { - val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) + val response = new DefaultHttpResponse(HTTP_1_1, OK) response.setContent(Unpooled.copiedBuffer(body.getBytes)) response.setHeader(CONTENT_TYPE, "application/json") writeResponse(ctx, request, response) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala index fc5efc7..2184270 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala @@ -1,7 +1,6 @@ package net.abhinavsarkar.ircsearch import io.netty.channel.ChannelHandler.Sharable -import java.util.regex.Pattern import io.netty.channel.ChannelHandlerContext import io.netty.handler.codec.http.HttpRequest @@ -15,12 +14,12 @@ abstract class HttpRequestRouter extends HttpRequestHandler { val uri = request.getUri if (route.isDefinedAt(uri)) { val routeHandler = route.apply(uri) - ctx.pipeline.addLast("handler", routeHandler) + ctx.pipeline.addLast("httphandler", routeHandler) try { ctx.nextInboundMessageBuffer.add(request) ctx.fireInboundBufferUpdated } finally { - ctx.pipeline.remove("handler") + ctx.pipeline.remove("httphandler") } } else { logRequest(ctx, request, sendNotFound(ctx, request)) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala index 81432fb..953f75c 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala @@ -3,27 +3,36 @@ package net.abhinavsarkar.ircsearch import java.net.InetSocketAddress import java.nio.charset.Charset +import scala.collection.JavaConversions._ import scala.concurrent.ExecutionContext.Implicits._ import scala.concurrent.future import com.typesafe.scalalogging.slf4j.Logging +import au.com.bytecode.opencsv.CSVParser import io.netty.bootstrap.ServerBootstrap +import io.netty.buffer.ByteBuf import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext +import io.netty.channel.ChannelInboundByteHandlerAdapter +import io.netty.channel.ChannelInboundMessageHandlerAdapter import io.netty.channel.ChannelInitializer import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioEventLoopGroup import io.netty.channel.socket.nio.NioServerSocketChannel +import io.netty.handler.codec.DelimiterBasedFrameDecoder +import io.netty.handler.codec.Delimiters import io.netty.handler.codec.http.HttpChunkAggregator import io.netty.handler.codec.http.HttpContentCompressor +import io.netty.handler.codec.http.HttpMethod import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequestDecoder import io.netty.handler.codec.http.HttpResponseEncoder +import io.netty.handler.codec.http.QueryStringDecoder +import io.netty.handler.codec.string.StringDecoder import net.abhinavsarkar.ircsearch.lucene.Indexer import net.abhinavsarkar.ircsearch.lucene.Searcher -import net.abhinavsarkar.ircsearch.model.IndexRequest -import net.abhinavsarkar.ircsearch.model.SearchRequest +import net.abhinavsarkar.ircsearch.model._ import net.liftweb.json.DefaultFormats import net.liftweb.json.Serialization @@ -36,28 +45,13 @@ object Server extends App with Logging { val port = args(0).toInt logger.info("Starting server at port {}", port: Integer) - val httpRequestRouter = new HttpRequestRouter { - val Echo = "^/echo$".r - val Index = "^/index$".r - val Search = "^/search$".r - def route = { - case Echo() => EchoHandler - case Index() => IndexHandler - case Search() => SearchHandler - } - } - val server = (new ServerBootstrap) .group(new NioEventLoopGroup(1), new NioEventLoopGroup(1)) .channel(classOf[NioServerSocketChannel]) .childHandler(new ChannelInitializer[SocketChannel] { def initChannel(ch: SocketChannel) { val p = ch.pipeline - .addLast("decoder", new HttpRequestDecoder) - .addLast("aggregator", new HttpChunkAggregator(1048576)) - .addLast("encoder", new HttpResponseEncoder) - .addLast("compressor", new HttpContentCompressor) - .addLast("router", httpRequestRouter) + .addLast("unihandler", UnifiedHandler) }}) .localAddress(new InetSocketAddress(port)) @@ -65,7 +59,7 @@ object Server extends App with Logging { new Thread("ShutdownHook") { override def run { stopServer(server) - IndexHandler.stop + UnifiedHandler.stop } }) @@ -75,7 +69,7 @@ object Server extends App with Logging { case e : Exception => { logger.error("Exception while running server. Stopping server", e) stopServer(server) - IndexHandler.stop + UnifiedHandler.stop } } } @@ -88,6 +82,86 @@ object Server extends App with Logging { } +@Sharable +object UnifiedHandler extends ChannelInboundByteHandlerAdapter { + + lazy val indexer = { val indexer = new Indexer; indexer.start; indexer } + + val httpRequestRouter = new HttpRequestRouter { + val Echo = "^/echo$".r + val Index = "^/index$".r + val Search = "^/search.*".r + def route = { + case Echo() => EchoHandler + case Index() => new IndexHandler(indexer) + case Search() => SearchHandler + } + } + + def stop = indexer.stop + + override def inboundBufferUpdated(ctx : ChannelHandlerContext, in: ByteBuf) { + if (in.readableBytes() < 5) { + return; + } + + val magic1 = in.getUnsignedByte(in.readerIndex()) + val magic2 = in.getUnsignedByte(in.readerIndex() + 1) + if (isHttp(magic1, magic2)) { + ctx.pipeline + .addLast("decoder", new HttpRequestDecoder) + .addLast("aggregator", new HttpChunkAggregator(1048576)) + .addLast("encoder", new HttpResponseEncoder) + .addLast("compressor", new HttpContentCompressor) + .addLast("router", httpRequestRouter) + .remove(this) + } else { + ctx.pipeline + .addLast("framedecoder", new DelimiterBasedFrameDecoder(1048576, Delimiters.lineDelimiter() : _*)) + .addLast("decoder", new StringDecoder(Charset.forName("UTF-8"))) + .addLast("csvhandler", new TcpIndexHandler(indexer)) + .remove(this) + } + ctx.nextInboundByteBuffer.writeBytes(in) + ctx.fireInboundBufferUpdated + } + + private def isHttp(magic1: Int, magic2: Int) = { + magic1 == 'G' && magic2 == 'E' || // GET + magic1 == 'P' && magic2 == 'O' || // POST + magic1 == 'P' && magic2 == 'U' || // PUT + magic1 == 'H' && magic2 == 'E' || // HEAD + magic1 == 'O' && magic2 == 'P' || // OPTIONS + magic1 == 'P' && magic2 == 'A' || // PATCH + magic1 == 'D' && magic2 == 'E' || // DELETE + magic1 == 'T' && magic2 == 'R' || // TRACE + magic1 == 'C' && magic2 == 'O' // CONNECT + } + +} + +class TcpIndexHandler(indexer: Indexer) extends ChannelInboundMessageHandlerAdapter[String] { + var server: String = null + var channel : String = null + var botName : String = null + var inited = false + val parser = new CSVParser + + override def messageReceived(ctx: ChannelHandlerContext, content : String) { + val values = parser.parseLine(content) + if (!inited) { + assert(values.length == 3, "Server, channel and botName should be provided first") + server = values(0) + channel = values(1) + botName = values(2) + inited = true + } else { + indexer.index(new IndexRequest(server, channel, botName, + List(ChatLine(values(0), values(1).toLong, values(2))))) + } + } +} + @Sharable object EchoHandler extends HttpRequestHandler { override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { @@ -97,9 +171,8 @@ object EchoHandler extends HttpRequestHandler { } @Sharable -object IndexHandler extends HttpRequestHandler { +class IndexHandler(indexer: Indexer) extends HttpRequestHandler { implicit val formats = DefaultFormats - lazy val indexer = { val indexer = new Indexer; indexer.start; indexer } override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { future { val content = request.getContent().toString(Charset.forName("UTF-8")) @@ -108,15 +181,27 @@ object IndexHandler extends HttpRequestHandler { } logRequest(ctx, request, sendDefaultResponse(ctx, request)) } - def stop = indexer.stop } @Sharable object SearchHandler extends HttpRequestHandler { implicit val formats = DefaultFormats override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { - val content = request.getContent().toString(Charset.forName("UTF-8")) - val searchRequest = Serialization.read[SearchRequest](content) + val method = request.getMethod() + val searchRequest = if (HttpMethod.POST.equals(method)) { + val content = request.getContent().toString(Charset.forName("UTF-8")) + Serialization.read[SearchRequest](content) + } else if (HttpMethod.GET.equals(method)) { + val params = new QueryStringDecoder(request.getUri).getParameters + val server = params("server")(0) + val channel = params("channel")(0) + val botName = params("botName")(0) + val query = params("query")(0) + new SearchRequest(server, channel, botName, query) + } else { + throw new UnsupportedOperationException("HTTP method " + method + " is not supported") + } + val searchResult = Searcher.search(searchRequest) logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala index d99055c..5aeaeb3 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -16,18 +16,17 @@ import org.apache.lucene.analysis.en.EnglishAnalyzer import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper import org.apache.lucene.analysis.standard.StandardAnalyzer import org.apache.lucene.document.Field -import org.apache.lucene.document.FieldType -import org.apache.lucene.document.FieldType.NumericType -import org.apache.lucene.index.IndexReader +import org.apache.lucene.document.LongField +import org.apache.lucene.document.StringField +import org.apache.lucene.document.TextField import org.apache.lucene.index.IndexWriter import org.apache.lucene.index.IndexWriterConfig -import org.apache.lucene.search.IndexSearcher import org.apache.lucene.store.FSDirectory import org.apache.lucene.util.Version import com.typesafe.scalalogging.slf4j.Logging -import net.abhinavsarkar.ircsearch.model.IndexRequest +import net.abhinavsarkar.ircsearch.model._ class Indexer extends Logging { @@ -47,7 +46,9 @@ class Indexer extends Logging { def run { try { runLock.lock - logger.debug("Running indexer") + if (indexQueue.isEmpty) + return + val indexReqs = new ArrayList[IndexRequest] indexQueue.drainTo(indexReqs) doIndex(indexReqs.toList) @@ -57,7 +58,7 @@ class Indexer extends Logging { runLock.unlock } }}, - 0, 10, TimeUnit.SECONDS) + 0, 1, TimeUnit.SECONDS) } def stop { @@ -85,9 +86,9 @@ class Indexer extends Logging { try { for (indexRequest <- indexRequestBatch; chatLine <- indexRequest.chatLines) { - val tsField = mkField("timestamp", chatLine.timestamp.toString, false) - val userField = mkField("user", chatLine.user, true) - val msgField = mkField("message", chatLine.message) + val tsField = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES) + val userField = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES) + val msgField = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES) indexWriter.addDocument(List(tsField, userField, msgField), analyzer) logger.debug("Indexed : [{} {} {}] [{}] {}: {}", server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message) @@ -108,8 +109,8 @@ object Indexer { def mkAnalyzer : Analyzer = { val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION) val fieldAnalyzers = Map( - "user" -> new KeywordAnalyzer, - "message" -> new EnglishAnalyzer(LUCENE_VERSION)) + ChatLine.USER -> new KeywordAnalyzer, + ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION)) new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers) } @@ -125,14 +126,4 @@ object Indexer { def getIndexDir(server : String, channel : String, botName : String) : String = s"index-$server-$channel-$botName" - private def mkField(name : String, value : String, - tokenized : Boolean = true, numericType : Option[NumericType] = None) : Field = { - val fieldType = new FieldType - fieldType.setStored(true) - fieldType.setIndexed(true) - fieldType.setTokenized(tokenized) - numericType.foreach { fieldType.setNumericType } - new Field(name, value, fieldType) - } - } \ No newline at end of file diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala index f4779b6..e55c171 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -1,29 +1,27 @@ package net.abhinavsarkar.ircsearch.lucene -import com.typesafe.scalalogging.slf4j.Logging -import org.apache.lucene.search.IndexSearcher import java.io.File -import org.apache.lucene.index.IndexReader -import org.apache.lucene.store.FSDirectory -import org.apache.lucene.analysis.Analyzer -import org.apache.lucene.queryparser.classic.QueryParser -import org.apache.lucene.search.Query -import scala.collection.immutable.Set -import org.apache.lucene.search.BooleanQuery -import org.apache.lucene.search.TermQuery -import org.apache.lucene.search.BooleanClause -import org.apache.lucene.search.QueryWrapperFilter -import org.apache.lucene.search.Filter -import net.abhinavsarkar.ircsearch.model.SearchRequest -import net.abhinavsarkar.ircsearch.model.SearchResult -import org.apache.lucene.search.Sort -import org.apache.lucene.search.SortField + import scala.collection.JavaConversions._ import scala.collection.mutable -import net.abhinavsarkar.ircsearch.model.ChatLine -import net.abhinavsarkar.ircsearch.model.ChatLine -import net.abhinavsarkar.ircsearch.model.SearchResult -import net.abhinavsarkar.ircsearch.model.SearchResult + +import org.apache.lucene.analysis.Analyzer +import org.apache.lucene.index.IndexReader +import org.apache.lucene.queryparser.classic.QueryParser +import org.apache.lucene.search.BooleanClause +import org.apache.lucene.search.BooleanQuery +import org.apache.lucene.search.FilteredQuery +import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.search.Query +import org.apache.lucene.search.QueryWrapperFilter +import org.apache.lucene.search.Sort +import org.apache.lucene.search.SortField +import org.apache.lucene.search.TermQuery +import org.apache.lucene.store.FSDirectory + +import com.typesafe.scalalogging.slf4j.Logging + +import net.abhinavsarkar.ircsearch.model._ object Searcher extends Logging { @@ -35,9 +33,9 @@ object Searcher extends Logging { } private def mkQueryParser(analyzer : Analyzer) = - new QueryParser(Indexer.LUCENE_VERSION, "message", analyzer) + new QueryParser(Indexer.LUCENE_VERSION, ChatLine.MSG, analyzer) - private def filterifyQuery(query : Query, mustFields : Set[String]) : (Query, Option[Filter]) = + private def filterifyQuery(query : Query, mustFields : Set[String]) : Query = query match { case boolQuery: BooleanQuery => { val newQuery = new BooleanQuery @@ -58,9 +56,12 @@ object Searcher extends Logging { } } - (newQuery, if (filterQuery.clauses.isEmpty) None else Some(new QueryWrapperFilter(filterQuery))) + if (filterQuery.clauses.isEmpty) + newQuery + else + new FilteredQuery(newQuery, new QueryWrapperFilter(filterQuery)) } - case _ => (query, None) + case _ => query } def search(searchRequest : SearchRequest) : SearchResult = { @@ -71,9 +72,9 @@ object Searcher extends Logging { val analyzer = Indexer.mkAnalyzer try { val queryParser = mkQueryParser(analyzer) - val (query, filter) = filterifyQuery(queryParser.parse(searchRequest.query), Set("user")) - logger.debug("Query: {}, Filter: {}", query, filter) - val (totalResults, results) = doSearch(indexDir, query, filter, searchRequest.pageSize) + val query = filterifyQuery(queryParser.parse(searchRequest.query), Set(ChatLine.USER)) + logger.debug("Query: {}", query) + val (totalResults, results) = doSearch(indexDir, query, searchRequest.pageSize) val searchResults = SearchResult.fromSearchRequest(searchRequest) .copy(totalResults = totalResults, chatLines = results.map(_._1)) logger.debug("Search results: {}", searchResults) @@ -83,18 +84,18 @@ object Searcher extends Logging { } } - private def doSearch(indexDir : String, query : Query, filter : Option[Filter], maxHits : Int) + private def doSearch(indexDir : String, query : Query, maxHits : Int) : (Int, List[(ChatLine, Float)]) = { val indexSearcher = mkIndexSearcher(indexDir) - val topDocs = indexSearcher.search(query, filter.orNull, maxHits, - new Sort(SortField.FIELD_SCORE, new SortField("timestamp", SortField.Type.LONG, true))) + val topDocs = indexSearcher.search(query, maxHits, + new Sort(SortField.FIELD_SCORE, new SortField(ChatLine.TS, SortField.Type.LONG, true))) val docs = topDocs.scoreDocs.map { sd => val score = sd.score val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) { (map, field) => map += (field.name -> field.stringValue) } - val chatLine = new ChatLine(doc("user"), doc("timestamp").toLong, doc("message")) + val chatLine = new ChatLine(doc(ChatLine.USER), doc(ChatLine.TS).toLong, doc(ChatLine.MSG)) (chatLine, score) } (topDocs.totalHits, docs.toList) diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala index b18052a..44f741b 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala @@ -1,6 +1,12 @@ package net.abhinavsarkar.ircsearch.model +object ChatLine { + val USER = "user" + val TS = "ts" + val MSG = "msg" +} + case class ChatLine(user : String, timestamp : Long, message : String) case class IndexRequest(