diff --git a/pom.xml b/pom.xml
index 6896d59..6abe35a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -73,6 +73,11 @@
lucene-queryparser
${lucene.version}
+
+ net.sf.opencsv
+ opencsv
+ 2.3
+
diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala
index 96c937b..243ef77 100644
--- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala
+++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala
@@ -1,21 +1,19 @@
package net.abhinavsarkar.ircsearch
-import io.netty.channel.ChannelInboundMessageHandlerAdapter
-import io.netty.handler.codec.http.HttpRequest
import com.typesafe.scalalogging.slf4j.Logging
-import io.netty.channel.ChannelHandlerContext
-import io.netty.handler.codec.http.HttpResponse
-import io.netty.channel.ChannelFuture
-import io.netty.handler.codec.http.HttpHeaders.isKeepAlive
-import io.netty.handler.codec.http.HttpHeaders.Names._
-import io.netty.handler.codec.http.HttpHeaders
-import io.netty.channel.ChannelFutureListener
-import io.netty.handler.codec.http.DefaultHttpResponse
-import io.netty.handler.codec.http.HttpResponseStatus._
-import io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import io.netty.handler.codec.http.HttpVersion
-import io.netty.handler.codec.http.HttpResponseStatus
+
import io.netty.buffer.Unpooled
+import io.netty.channel.ChannelFutureListener
+import io.netty.channel.ChannelHandlerContext
+import io.netty.channel.ChannelInboundMessageHandlerAdapter
+import io.netty.handler.codec.http.DefaultHttpResponse
+import io.netty.handler.codec.http.HttpHeaders
+import io.netty.handler.codec.http.HttpHeaders.Names._
+import io.netty.handler.codec.http.HttpHeaders.isKeepAlive
+import io.netty.handler.codec.http.HttpRequest
+import io.netty.handler.codec.http.HttpResponse
+import io.netty.handler.codec.http.HttpResponseStatus._
+import io.netty.handler.codec.http.HttpVersion.HTTP_1_1
trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest] with Logging {
@@ -34,7 +32,7 @@ trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest
}
protected def sendSuccess(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = {
- val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)
+ val response = new DefaultHttpResponse(HTTP_1_1, OK)
response.setContent(Unpooled.copiedBuffer(body.getBytes))
response.setHeader(CONTENT_TYPE, "application/json")
writeResponse(ctx, request, response)
diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala
index fc5efc7..2184270 100644
--- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala
+++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestRouter.scala
@@ -1,7 +1,6 @@
package net.abhinavsarkar.ircsearch
import io.netty.channel.ChannelHandler.Sharable
-import java.util.regex.Pattern
import io.netty.channel.ChannelHandlerContext
import io.netty.handler.codec.http.HttpRequest
@@ -15,12 +14,12 @@ abstract class HttpRequestRouter extends HttpRequestHandler {
val uri = request.getUri
if (route.isDefinedAt(uri)) {
val routeHandler = route.apply(uri)
- ctx.pipeline.addLast("handler", routeHandler)
+ ctx.pipeline.addLast("httphandler", routeHandler)
try {
ctx.nextInboundMessageBuffer.add(request)
ctx.fireInboundBufferUpdated
} finally {
- ctx.pipeline.remove("handler")
+ ctx.pipeline.remove("httphandler")
}
} else {
logRequest(ctx, request, sendNotFound(ctx, request))
diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala
index 81432fb..953f75c 100644
--- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala
+++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala
@@ -3,27 +3,36 @@ package net.abhinavsarkar.ircsearch
import java.net.InetSocketAddress
import java.nio.charset.Charset
+import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits._
import scala.concurrent.future
import com.typesafe.scalalogging.slf4j.Logging
+import au.com.bytecode.opencsv.CSVParser
import io.netty.bootstrap.ServerBootstrap
+import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext
+import io.netty.channel.ChannelInboundByteHandlerAdapter
+import io.netty.channel.ChannelInboundMessageHandlerAdapter
import io.netty.channel.ChannelInitializer
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
+import io.netty.handler.codec.DelimiterBasedFrameDecoder
+import io.netty.handler.codec.Delimiters
import io.netty.handler.codec.http.HttpChunkAggregator
import io.netty.handler.codec.http.HttpContentCompressor
+import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpRequest
import io.netty.handler.codec.http.HttpRequestDecoder
import io.netty.handler.codec.http.HttpResponseEncoder
+import io.netty.handler.codec.http.QueryStringDecoder
+import io.netty.handler.codec.string.StringDecoder
import net.abhinavsarkar.ircsearch.lucene.Indexer
import net.abhinavsarkar.ircsearch.lucene.Searcher
-import net.abhinavsarkar.ircsearch.model.IndexRequest
-import net.abhinavsarkar.ircsearch.model.SearchRequest
+import net.abhinavsarkar.ircsearch.model._
import net.liftweb.json.DefaultFormats
import net.liftweb.json.Serialization
@@ -36,28 +45,13 @@ object Server extends App with Logging {
val port = args(0).toInt
logger.info("Starting server at port {}", port: Integer)
- val httpRequestRouter = new HttpRequestRouter {
- val Echo = "^/echo$".r
- val Index = "^/index$".r
- val Search = "^/search$".r
- def route = {
- case Echo() => EchoHandler
- case Index() => IndexHandler
- case Search() => SearchHandler
- }
- }
-
val server = (new ServerBootstrap)
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(1))
.channel(classOf[NioServerSocketChannel])
.childHandler(new ChannelInitializer[SocketChannel] {
def initChannel(ch: SocketChannel) {
val p = ch.pipeline
- .addLast("decoder", new HttpRequestDecoder)
- .addLast("aggregator", new HttpChunkAggregator(1048576))
- .addLast("encoder", new HttpResponseEncoder)
- .addLast("compressor", new HttpContentCompressor)
- .addLast("router", httpRequestRouter)
+ .addLast("unihandler", UnifiedHandler)
}})
.localAddress(new InetSocketAddress(port))
@@ -65,7 +59,7 @@ object Server extends App with Logging {
new Thread("ShutdownHook") {
override def run {
stopServer(server)
- IndexHandler.stop
+ UnifiedHandler.stop
}
})
@@ -75,7 +69,7 @@ object Server extends App with Logging {
case e : Exception => {
logger.error("Exception while running server. Stopping server", e)
stopServer(server)
- IndexHandler.stop
+ UnifiedHandler.stop
}
}
}
@@ -88,6 +82,86 @@ object Server extends App with Logging {
}
+@Sharable
+object UnifiedHandler extends ChannelInboundByteHandlerAdapter {
+
+ lazy val indexer = { val indexer = new Indexer; indexer.start; indexer }
+
+ val httpRequestRouter = new HttpRequestRouter {
+ val Echo = "^/echo$".r
+ val Index = "^/index$".r
+ val Search = "^/search.*".r
+ def route = {
+ case Echo() => EchoHandler
+ case Index() => new IndexHandler(indexer)
+ case Search() => SearchHandler
+ }
+ }
+
+ def stop = indexer.stop
+
+ override def inboundBufferUpdated(ctx : ChannelHandlerContext, in: ByteBuf) {
+ if (in.readableBytes() < 5) {
+ return;
+ }
+
+ val magic1 = in.getUnsignedByte(in.readerIndex())
+ val magic2 = in.getUnsignedByte(in.readerIndex() + 1)
+ if (isHttp(magic1, magic2)) {
+ ctx.pipeline
+ .addLast("decoder", new HttpRequestDecoder)
+ .addLast("aggregator", new HttpChunkAggregator(1048576))
+ .addLast("encoder", new HttpResponseEncoder)
+ .addLast("compressor", new HttpContentCompressor)
+ .addLast("router", httpRequestRouter)
+ .remove(this)
+ } else {
+ ctx.pipeline
+ .addLast("framedecoder", new DelimiterBasedFrameDecoder(1048576, Delimiters.lineDelimiter() : _*))
+ .addLast("decoder", new StringDecoder(Charset.forName("UTF-8")))
+ .addLast("csvhandler", new TcpIndexHandler(indexer))
+ .remove(this)
+ }
+ ctx.nextInboundByteBuffer.writeBytes(in)
+ ctx.fireInboundBufferUpdated
+ }
+
+ private def isHttp(magic1: Int, magic2: Int) = {
+ magic1 == 'G' && magic2 == 'E' || // GET
+ magic1 == 'P' && magic2 == 'O' || // POST
+ magic1 == 'P' && magic2 == 'U' || // PUT
+ magic1 == 'H' && magic2 == 'E' || // HEAD
+ magic1 == 'O' && magic2 == 'P' || // OPTIONS
+ magic1 == 'P' && magic2 == 'A' || // PATCH
+ magic1 == 'D' && magic2 == 'E' || // DELETE
+ magic1 == 'T' && magic2 == 'R' || // TRACE
+ magic1 == 'C' && magic2 == 'O' // CONNECT
+ }
+
+}
+
+class TcpIndexHandler(indexer: Indexer) extends ChannelInboundMessageHandlerAdapter[String] {
+ var server: String = null
+ var channel : String = null
+ var botName : String = null
+ var inited = false
+ val parser = new CSVParser
+
+ override def messageReceived(ctx: ChannelHandlerContext, content : String) {
+ val values = parser.parseLine(content)
+ if (!inited) {
+ assert(values.length == 3, "Server, channel and botName should be provided first")
+ server = values(0)
+ channel = values(1)
+ botName = values(2)
+ inited = true
+ } else {
+ indexer.index(new IndexRequest(server, channel, botName,
+ List(ChatLine(values(0), values(1).toLong, values(2)))))
+ }
+ }
+}
+
@Sharable
object EchoHandler extends HttpRequestHandler {
override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) {
@@ -97,9 +171,8 @@ object EchoHandler extends HttpRequestHandler {
}
@Sharable
-object IndexHandler extends HttpRequestHandler {
+class IndexHandler(indexer: Indexer) extends HttpRequestHandler {
implicit val formats = DefaultFormats
- lazy val indexer = { val indexer = new Indexer; indexer.start; indexer }
override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) {
future {
val content = request.getContent().toString(Charset.forName("UTF-8"))
@@ -108,15 +181,27 @@ object IndexHandler extends HttpRequestHandler {
}
logRequest(ctx, request, sendDefaultResponse(ctx, request))
}
- def stop = indexer.stop
}
@Sharable
object SearchHandler extends HttpRequestHandler {
implicit val formats = DefaultFormats
override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) {
- val content = request.getContent().toString(Charset.forName("UTF-8"))
- val searchRequest = Serialization.read[SearchRequest](content)
+ val method = request.getMethod()
+ val searchRequest = if (HttpMethod.POST.equals(method)) {
+ val content = request.getContent().toString(Charset.forName("UTF-8"))
+ Serialization.read[SearchRequest](content)
+ } else if (HttpMethod.GET.equals(method)) {
+ val params = new QueryStringDecoder(request.getUri).getParameters
+ val server = params("server")(0)
+ val channel = params("channel")(0)
+ val botName = params("botName")(0)
+ val query = params("query")(0)
+ new SearchRequest(server, channel, botName, query)
+ } else {
+ throw new UnsupportedOperationException("HTTP method " + method + " is not supported")
+ }
+
val searchResult = Searcher.search(searchRequest)
logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult)))
}
diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala
index d99055c..5aeaeb3 100644
--- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala
+++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala
@@ -16,18 +16,17 @@ import org.apache.lucene.analysis.en.EnglishAnalyzer
import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper
import org.apache.lucene.analysis.standard.StandardAnalyzer
import org.apache.lucene.document.Field
-import org.apache.lucene.document.FieldType
-import org.apache.lucene.document.FieldType.NumericType
-import org.apache.lucene.index.IndexReader
+import org.apache.lucene.document.LongField
+import org.apache.lucene.document.StringField
+import org.apache.lucene.document.TextField
import org.apache.lucene.index.IndexWriter
import org.apache.lucene.index.IndexWriterConfig
-import org.apache.lucene.search.IndexSearcher
import org.apache.lucene.store.FSDirectory
import org.apache.lucene.util.Version
import com.typesafe.scalalogging.slf4j.Logging
-import net.abhinavsarkar.ircsearch.model.IndexRequest
+import net.abhinavsarkar.ircsearch.model._
class Indexer extends Logging {
@@ -47,7 +46,9 @@ class Indexer extends Logging {
def run {
try {
runLock.lock
- logger.debug("Running indexer")
+ if (indexQueue.isEmpty)
+ return
+
val indexReqs = new ArrayList[IndexRequest]
indexQueue.drainTo(indexReqs)
doIndex(indexReqs.toList)
@@ -57,7 +58,7 @@ class Indexer extends Logging {
runLock.unlock
}
}},
- 0, 10, TimeUnit.SECONDS)
+ 0, 1, TimeUnit.SECONDS)
}
def stop {
@@ -85,9 +86,9 @@ class Indexer extends Logging {
try {
for (indexRequest <- indexRequestBatch;
chatLine <- indexRequest.chatLines) {
- val tsField = mkField("timestamp", chatLine.timestamp.toString, false)
- val userField = mkField("user", chatLine.user, true)
- val msgField = mkField("message", chatLine.message)
+ val tsField = new LongField(ChatLine.TS, chatLine.timestamp, Field.Store.YES)
+ val userField = new StringField(ChatLine.USER, chatLine.user, Field.Store.YES)
+ val msgField = new TextField(ChatLine.MSG, chatLine.message, Field.Store.YES)
indexWriter.addDocument(List(tsField, userField, msgField), analyzer)
logger.debug("Indexed : [{} {} {}] [{}] {}: {}",
server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message)
@@ -108,8 +109,8 @@ object Indexer {
def mkAnalyzer : Analyzer = {
val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION)
val fieldAnalyzers = Map(
- "user" -> new KeywordAnalyzer,
- "message" -> new EnglishAnalyzer(LUCENE_VERSION))
+ ChatLine.USER -> new KeywordAnalyzer,
+ ChatLine.MSG -> new EnglishAnalyzer(LUCENE_VERSION))
new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers)
}
@@ -125,14 +126,4 @@ object Indexer {
def getIndexDir(server : String, channel : String, botName : String) : String =
s"index-$server-$channel-$botName"
- private def mkField(name : String, value : String,
- tokenized : Boolean = true, numericType : Option[NumericType] = None) : Field = {
- val fieldType = new FieldType
- fieldType.setStored(true)
- fieldType.setIndexed(true)
- fieldType.setTokenized(tokenized)
- numericType.foreach { fieldType.setNumericType }
- new Field(name, value, fieldType)
- }
-
}
\ No newline at end of file
diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala
index f4779b6..e55c171 100644
--- a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala
+++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala
@@ -1,29 +1,27 @@
package net.abhinavsarkar.ircsearch.lucene
-import com.typesafe.scalalogging.slf4j.Logging
-import org.apache.lucene.search.IndexSearcher
import java.io.File
-import org.apache.lucene.index.IndexReader
-import org.apache.lucene.store.FSDirectory
-import org.apache.lucene.analysis.Analyzer
-import org.apache.lucene.queryparser.classic.QueryParser
-import org.apache.lucene.search.Query
-import scala.collection.immutable.Set
-import org.apache.lucene.search.BooleanQuery
-import org.apache.lucene.search.TermQuery
-import org.apache.lucene.search.BooleanClause
-import org.apache.lucene.search.QueryWrapperFilter
-import org.apache.lucene.search.Filter
-import net.abhinavsarkar.ircsearch.model.SearchRequest
-import net.abhinavsarkar.ircsearch.model.SearchResult
-import org.apache.lucene.search.Sort
-import org.apache.lucene.search.SortField
+
import scala.collection.JavaConversions._
import scala.collection.mutable
-import net.abhinavsarkar.ircsearch.model.ChatLine
-import net.abhinavsarkar.ircsearch.model.ChatLine
-import net.abhinavsarkar.ircsearch.model.SearchResult
-import net.abhinavsarkar.ircsearch.model.SearchResult
+
+import org.apache.lucene.analysis.Analyzer
+import org.apache.lucene.index.IndexReader
+import org.apache.lucene.queryparser.classic.QueryParser
+import org.apache.lucene.search.BooleanClause
+import org.apache.lucene.search.BooleanQuery
+import org.apache.lucene.search.FilteredQuery
+import org.apache.lucene.search.IndexSearcher
+import org.apache.lucene.search.Query
+import org.apache.lucene.search.QueryWrapperFilter
+import org.apache.lucene.search.Sort
+import org.apache.lucene.search.SortField
+import org.apache.lucene.search.TermQuery
+import org.apache.lucene.store.FSDirectory
+
+import com.typesafe.scalalogging.slf4j.Logging
+
+import net.abhinavsarkar.ircsearch.model._
object Searcher extends Logging {
@@ -35,9 +33,9 @@ object Searcher extends Logging {
}
private def mkQueryParser(analyzer : Analyzer) =
- new QueryParser(Indexer.LUCENE_VERSION, "message", analyzer)
+ new QueryParser(Indexer.LUCENE_VERSION, ChatLine.MSG, analyzer)
- private def filterifyQuery(query : Query, mustFields : Set[String]) : (Query, Option[Filter]) =
+ private def filterifyQuery(query : Query, mustFields : Set[String]) : Query =
query match {
case boolQuery: BooleanQuery => {
val newQuery = new BooleanQuery
@@ -58,9 +56,12 @@ object Searcher extends Logging {
}
}
- (newQuery, if (filterQuery.clauses.isEmpty) None else Some(new QueryWrapperFilter(filterQuery)))
+ if (filterQuery.clauses.isEmpty)
+ newQuery
+ else
+ new FilteredQuery(newQuery, new QueryWrapperFilter(filterQuery))
}
- case _ => (query, None)
+ case _ => query
}
def search(searchRequest : SearchRequest) : SearchResult = {
@@ -71,9 +72,9 @@ object Searcher extends Logging {
val analyzer = Indexer.mkAnalyzer
try {
val queryParser = mkQueryParser(analyzer)
- val (query, filter) = filterifyQuery(queryParser.parse(searchRequest.query), Set("user"))
- logger.debug("Query: {}, Filter: {}", query, filter)
- val (totalResults, results) = doSearch(indexDir, query, filter, searchRequest.pageSize)
+ val query = filterifyQuery(queryParser.parse(searchRequest.query), Set(ChatLine.USER))
+ logger.debug("Query: {}", query)
+ val (totalResults, results) = doSearch(indexDir, query, searchRequest.pageSize)
val searchResults = SearchResult.fromSearchRequest(searchRequest)
.copy(totalResults = totalResults, chatLines = results.map(_._1))
logger.debug("Search results: {}", searchResults)
@@ -83,18 +84,18 @@ object Searcher extends Logging {
}
}
- private def doSearch(indexDir : String, query : Query, filter : Option[Filter], maxHits : Int)
+ private def doSearch(indexDir : String, query : Query, maxHits : Int)
: (Int, List[(ChatLine, Float)]) = {
val indexSearcher = mkIndexSearcher(indexDir)
- val topDocs = indexSearcher.search(query, filter.orNull, maxHits,
- new Sort(SortField.FIELD_SCORE, new SortField("timestamp", SortField.Type.LONG, true)))
+ val topDocs = indexSearcher.search(query, maxHits,
+ new Sort(SortField.FIELD_SCORE, new SortField(ChatLine.TS, SortField.Type.LONG, true)))
val docs = topDocs.scoreDocs.map { sd =>
val score = sd.score
val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) {
(map, field) => map += (field.name -> field.stringValue)
}
- val chatLine = new ChatLine(doc("user"), doc("timestamp").toLong, doc("message"))
+ val chatLine = new ChatLine(doc(ChatLine.USER), doc(ChatLine.TS).toLong, doc(ChatLine.MSG))
(chatLine, score)
}
(topDocs.totalHits, docs.toList)
diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala
index b18052a..44f741b 100644
--- a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala
+++ b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala
@@ -1,6 +1,12 @@
package net.abhinavsarkar.ircsearch.model
+object ChatLine {
+ val USER = "user"
+ val TS = "ts"
+ val MSG = "msg"
+}
+
case class ChatLine(user : String, timestamp : Long, message : String)
case class IndexRequest(