diff --git a/pom.xml b/pom.xml index 00824b3..6896d59 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,8 @@ 1.6 1.6 UTF-8 - 2.10.1 + 2.10.0 + 4.3.0 ${project.build.directory}/dependency @@ -25,6 +26,11 @@ scala-reflect ${scala.version} + + org.scala-lang + scala-compiler + ${scala.version} + io.netty netty @@ -42,8 +48,31 @@ 1.0.0 runtime - - + + net.liftweb + lift-json_2.10 + 2.5-RC5 + + + org.scala-lang + jline + 2.11.0-M2 + + + org.apache.lucene + lucene-core + ${lucene.version} + + + org.apache.lucene + lucene-analyzers-common + ${lucene.version} + + + org.apache.lucene + lucene-queryparser + ${lucene.version} + diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala index ebecaba..96c937b 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/HttpRequestHandler.scala @@ -36,6 +36,7 @@ trait HttpRequestHandler extends ChannelInboundMessageHandlerAdapter[HttpRequest protected def sendSuccess(ctx : ChannelHandlerContext, request : HttpRequest, body : String) : HttpResponse = { val response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK) response.setContent(Unpooled.copiedBuffer(body.getBytes)) + response.setHeader(CONTENT_TYPE, "application/json") writeResponse(ctx, request, response) response } diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala index 2853a85..81432fb 100644 --- a/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala +++ b/src/main/scala/net/abhinavsarkar/ircsearch/Server.scala @@ -1,7 +1,13 @@ package net.abhinavsarkar.ircsearch import java.net.InetSocketAddress +import java.nio.charset.Charset + +import scala.concurrent.ExecutionContext.Implicits._ +import scala.concurrent.future + import com.typesafe.scalalogging.slf4j.Logging + import io.netty.bootstrap.ServerBootstrap import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext @@ -14,11 +20,12 @@ import io.netty.handler.codec.http.HttpContentCompressor import io.netty.handler.codec.http.HttpRequest import io.netty.handler.codec.http.HttpRequestDecoder import io.netty.handler.codec.http.HttpResponseEncoder -import io.netty.handler.codec.http.DefaultHttpResponse -import io.netty.handler.codec.http.HttpVersion -import io.netty.handler.codec.http.HttpResponseStatus -import io.netty.buffer.Unpooled -import java.nio.charset.Charset +import net.abhinavsarkar.ircsearch.lucene.Indexer +import net.abhinavsarkar.ircsearch.lucene.Searcher +import net.abhinavsarkar.ircsearch.model.IndexRequest +import net.abhinavsarkar.ircsearch.model.SearchRequest +import net.liftweb.json.DefaultFormats +import net.liftweb.json.Serialization object Server extends App with Logging { @@ -31,13 +38,12 @@ object Server extends App with Logging { val httpRequestRouter = new HttpRequestRouter { val Echo = "^/echo$".r + val Index = "^/index$".r + val Search = "^/search$".r def route = { - case Echo() => new HttpRequestHandler { - override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { - val content = request.getContent().toString(Charset.forName("UTF-8")) - logRequest(ctx, request, sendSuccess(ctx, request, content)) - } - } + case Echo() => EchoHandler + case Index() => IndexHandler + case Search() => SearchHandler } } @@ -58,7 +64,8 @@ object Server extends App with Logging { Runtime.getRuntime.addShutdownHook( new Thread("ShutdownHook") { override def run { - stopServer(server); + stopServer(server) + IndexHandler.stop } }) @@ -67,7 +74,8 @@ object Server extends App with Logging { } catch { case e : Exception => { logger.error("Exception while running server. Stopping server", e) - stopServer(server); + stopServer(server) + IndexHandler.stop } } } @@ -78,4 +86,38 @@ object Server extends App with Logging { logger.info("Stopped server") } -} \ No newline at end of file +} + +@Sharable +object EchoHandler extends HttpRequestHandler { + override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { + val content = request.getContent().toString(Charset.forName("UTF-8")) + logRequest(ctx, request, sendSuccess(ctx, request, content)) + } +} + +@Sharable +object IndexHandler extends HttpRequestHandler { + implicit val formats = DefaultFormats + lazy val indexer = { val indexer = new Indexer; indexer.start; indexer } + override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { + future { + val content = request.getContent().toString(Charset.forName("UTF-8")) + val indexRequest = Serialization.read[IndexRequest](content) + indexer.index(indexRequest) + } + logRequest(ctx, request, sendDefaultResponse(ctx, request)) + } + def stop = indexer.stop +} + +@Sharable +object SearchHandler extends HttpRequestHandler { + implicit val formats = DefaultFormats + override def messageReceived(ctx: ChannelHandlerContext, request: HttpRequest) { + val content = request.getContent().toString(Charset.forName("UTF-8")) + val searchRequest = Serialization.read[SearchRequest](content) + val searchResult = Searcher.search(searchRequest) + logRequest(ctx, request, sendSuccess(ctx, request, Serialization.write(searchResult))) + } +} diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala new file mode 100644 index 0000000..d99055c --- /dev/null +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Indexer.scala @@ -0,0 +1,138 @@ +package net.abhinavsarkar.ircsearch.lucene + +import java.io.File +import java.util.ArrayList +import java.util.concurrent.Executors +import java.util.concurrent.Future +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit +import java.util.concurrent.locks.ReentrantLock + +import scala.collection.JavaConversions._ + +import org.apache.lucene.analysis.Analyzer +import org.apache.lucene.analysis.core.KeywordAnalyzer +import org.apache.lucene.analysis.en.EnglishAnalyzer +import org.apache.lucene.analysis.miscellaneous.PerFieldAnalyzerWrapper +import org.apache.lucene.analysis.standard.StandardAnalyzer +import org.apache.lucene.document.Field +import org.apache.lucene.document.FieldType +import org.apache.lucene.document.FieldType.NumericType +import org.apache.lucene.index.IndexReader +import org.apache.lucene.index.IndexWriter +import org.apache.lucene.index.IndexWriterConfig +import org.apache.lucene.search.IndexSearcher +import org.apache.lucene.store.FSDirectory +import org.apache.lucene.util.Version + +import com.typesafe.scalalogging.slf4j.Logging + +import net.abhinavsarkar.ircsearch.model.IndexRequest + +class Indexer extends Logging { + + import Indexer._ + + private val indexQueue = new LinkedBlockingQueue[IndexRequest] + private val scheduler = Executors.newSingleThreadScheduledExecutor + private val runLock = new ReentrantLock + private var runFuture : Future[_] = null + + def index(indexRequest : IndexRequest) = indexQueue.offer(indexRequest) + + def start { + logger.info("Starting indexer") + runFuture = scheduler.scheduleWithFixedDelay( + new Runnable { + def run { + try { + runLock.lock + logger.debug("Running indexer") + val indexReqs = new ArrayList[IndexRequest] + indexQueue.drainTo(indexReqs) + doIndex(indexReqs.toList) + } catch { + case e : Throwable => logger.error("Exception while running indexer", e) + } finally { + runLock.unlock + } + }}, + 0, 10, TimeUnit.SECONDS) + } + + def stop { + try { + runLock.lock + if (runFuture != null) { + runFuture.cancel(false) + runFuture = null + } + logger.info("Stopped indexer") + } finally { + runLock.unlock + } + } + + private def doIndex(indexReqs: List[IndexRequest]) { + val indexRequests = indexReqs.groupBy { r => + (r.server, r.channel, r.botName) + } + + for (((server, channel, botName), indexRequestBatch) <- indexRequests) { + val indexDir = getIndexDir(server, channel, botName) + val analyzer = mkAnalyzer + val indexWriter = mkIndexWriter(indexDir, analyzer) + try { + for (indexRequest <- indexRequestBatch; + chatLine <- indexRequest.chatLines) { + val tsField = mkField("timestamp", chatLine.timestamp.toString, false) + val userField = mkField("user", chatLine.user, true) + val msgField = mkField("message", chatLine.message) + indexWriter.addDocument(List(tsField, userField, msgField), analyzer) + logger.debug("Indexed : [{} {} {}] [{}] {}: {}", + server, channel, botName, chatLine.timestamp.toString, chatLine.user, chatLine.message) + } + } finally { + indexWriter.close + analyzer.close + } + } + } + +} + +object Indexer { + + val LUCENE_VERSION = Version.LUCENE_43 + + def mkAnalyzer : Analyzer = { + val defAnalyzer = new StandardAnalyzer(LUCENE_VERSION) + val fieldAnalyzers = Map( + "user" -> new KeywordAnalyzer, + "message" -> new EnglishAnalyzer(LUCENE_VERSION)) + + new PerFieldAnalyzerWrapper(defAnalyzer, fieldAnalyzers) + } + + private def mkIndexWriter(dirPath : String, analyzer : Analyzer) : IndexWriter = { + val indexDir = new File(dirPath) + if (indexDir.exists) { + assert(indexDir.isDirectory) + } + new IndexWriter(FSDirectory.open(indexDir), new IndexWriterConfig(LUCENE_VERSION, analyzer)) + } + + def getIndexDir(server : String, channel : String, botName : String) : String = + s"index-$server-$channel-$botName" + + private def mkField(name : String, value : String, + tokenized : Boolean = true, numericType : Option[NumericType] = None) : Field = { + val fieldType = new FieldType + fieldType.setStored(true) + fieldType.setIndexed(true) + fieldType.setTokenized(tokenized) + numericType.foreach { fieldType.setNumericType } + new Field(name, value, fieldType) + } + +} \ No newline at end of file diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala new file mode 100644 index 0000000..f4779b6 --- /dev/null +++ b/src/main/scala/net/abhinavsarkar/ircsearch/lucene/Searcher.scala @@ -0,0 +1,103 @@ +package net.abhinavsarkar.ircsearch.lucene + +import com.typesafe.scalalogging.slf4j.Logging +import org.apache.lucene.search.IndexSearcher +import java.io.File +import org.apache.lucene.index.IndexReader +import org.apache.lucene.store.FSDirectory +import org.apache.lucene.analysis.Analyzer +import org.apache.lucene.queryparser.classic.QueryParser +import org.apache.lucene.search.Query +import scala.collection.immutable.Set +import org.apache.lucene.search.BooleanQuery +import org.apache.lucene.search.TermQuery +import org.apache.lucene.search.BooleanClause +import org.apache.lucene.search.QueryWrapperFilter +import org.apache.lucene.search.Filter +import net.abhinavsarkar.ircsearch.model.SearchRequest +import net.abhinavsarkar.ircsearch.model.SearchResult +import org.apache.lucene.search.Sort +import org.apache.lucene.search.SortField +import scala.collection.JavaConversions._ +import scala.collection.mutable +import net.abhinavsarkar.ircsearch.model.ChatLine +import net.abhinavsarkar.ircsearch.model.ChatLine +import net.abhinavsarkar.ircsearch.model.SearchResult +import net.abhinavsarkar.ircsearch.model.SearchResult + +object Searcher extends Logging { + + private def mkIndexSearcher(dirPath : String) : IndexSearcher = { + val indexDir = new File(dirPath) + assert(indexDir.exists && indexDir.isDirectory) + + new IndexSearcher(IndexReader.open(FSDirectory.open(indexDir))) + } + + private def mkQueryParser(analyzer : Analyzer) = + new QueryParser(Indexer.LUCENE_VERSION, "message", analyzer) + + private def filterifyQuery(query : Query, mustFields : Set[String]) : (Query, Option[Filter]) = + query match { + case boolQuery: BooleanQuery => { + val newQuery = new BooleanQuery + val filterQuery = new BooleanQuery + for (clause <- boolQuery.getClauses) { + val subQuery = clause.getQuery + if (subQuery.isInstanceOf[TermQuery]) { + val termQuery = subQuery.asInstanceOf[TermQuery] + val field = termQuery.getTerm.field + if (mustFields contains field) { + clause.setOccur(BooleanClause.Occur.MUST) + filterQuery.add(clause) + } else { + newQuery.add(clause) + } + } else { + newQuery.add(clause) + } + } + + (newQuery, if (filterQuery.clauses.isEmpty) None else Some(new QueryWrapperFilter(filterQuery))) + } + case _ => (query, None) + } + + def search(searchRequest : SearchRequest) : SearchResult = { + logger.debug("Searching : [{} {} {}] {}", + searchRequest.server, searchRequest.channel, searchRequest.botName, searchRequest.query) + + val indexDir = Indexer.getIndexDir(searchRequest.server, searchRequest.channel, searchRequest.botName) + val analyzer = Indexer.mkAnalyzer + try { + val queryParser = mkQueryParser(analyzer) + val (query, filter) = filterifyQuery(queryParser.parse(searchRequest.query), Set("user")) + logger.debug("Query: {}, Filter: {}", query, filter) + val (totalResults, results) = doSearch(indexDir, query, filter, searchRequest.pageSize) + val searchResults = SearchResult.fromSearchRequest(searchRequest) + .copy(totalResults = totalResults, chatLines = results.map(_._1)) + logger.debug("Search results: {}", searchResults) + searchResults + } finally { + analyzer.close + } + } + + private def doSearch(indexDir : String, query : Query, filter : Option[Filter], maxHits : Int) + : (Int, List[(ChatLine, Float)]) = { + val indexSearcher = mkIndexSearcher(indexDir) + val topDocs = indexSearcher.search(query, filter.orNull, maxHits, + new Sort(SortField.FIELD_SCORE, new SortField("timestamp", SortField.Type.LONG, true))) + val docs = topDocs.scoreDocs.map { sd => + val score = sd.score + val doc = indexSearcher.doc(sd.doc).getFields.foldLeft(mutable.Map[String, String]()) { + (map, field) => map += (field.name -> field.stringValue) + } + + val chatLine = new ChatLine(doc("user"), doc("timestamp").toLong, doc("message")) + (chatLine, score) + } + (topDocs.totalHits, docs.toList) + } + +} \ No newline at end of file diff --git a/src/main/scala/net/abhinavsarkar/ircsearch/model.scala b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala new file mode 100644 index 0000000..b18052a --- /dev/null +++ b/src/main/scala/net/abhinavsarkar/ircsearch/model.scala @@ -0,0 +1,23 @@ +package net.abhinavsarkar.ircsearch.model + + +case class ChatLine(user : String, timestamp : Long, message : String) + +case class IndexRequest( + server : String, channel : String, botName : String, chatLines : List[ChatLine]) + +case class SearchRequest( + server : String, channel : String, botName : String, query: String, + page : Int = 0, pageSize : Int = 10) + +case class SearchResult( + server : String, channel : String, botName : String, query: String, + page : Int, pageSize : Int, totalResults : Int, chatLines : List[ChatLine]) + +object SearchResult { + def fromSearchRequest(searchRequest : SearchRequest) = searchRequest match { + case SearchRequest(server, channel, botName, query, page, pageSize) => + new SearchResult(server, channel, botName, query, page, pageSize, 0, List()) + } +} +