clj_twitter_feelings/src/clj_twitter_feelings/core.clj

140 lines
5.3 KiB
Clojure

(ns clj-twitter-feelings.core
(:import [java.io File BufferedReader]
[clojure.lang PersistentQueue]
[org.apache.http HttpException]
[org.apache.http.auth AuthScope UsernamePasswordCredentials]
[org.apache.http.client.methods HttpGet]
[org.apache.http.client HttpClient]
[org.apache.http.impl.client DefaultHttpClient]
[org.apache.http.params BasicHttpParams HttpParams])
(:use [clojure.java.io :only (reader resource as-file)]
[clojure.contrib
[core :only (-?>)]
[string :only (split lower-case)]
[duck-streams :only (read-lines)]
[json :only (read-json)]]))
(defmulti trace (fn [_ f _] f))
(defmethod trace :l [msg _ arg] (do (println msg ":" arg) arg))
(defmethod trace :f [arg _ msg] (do (println msg ":" arg) arg))
;names of adjective files
(def adjective-files ["negative" "neutral" "positive"])
(defn adjectives
"Loads the adjective files and returns a map of adjective to its type."
[]
(->> adjective-files
(map #(str "clj_twitter_feelings/adjectives/" % ".txt"))
(map resource)
(reduce
(fn [acc ^java.net.URL url]
(let [adjective-type
(-> url .toString (.split "/") last (.split "\\.") first)]
(reduce
(fn [acc word] (assoc! acc word adjective-type))
acc
(read-lines url))))
(transient {}))
(persistent!)))
(defn safe-divide
"If denominator is 0 returns 0 else returns numerator divided by denominator."
[n d] (if (zero? d) 0 (float (/ n d))))
(def split-pattern (re-pattern "[\\p{Z}\\p{C}\\p{P}]+"))
(defn tokenize-line
"Tokenizes the line on split-pattern and returns a lazy seq of non-empty
tokens."
[line]
(->> line (split split-pattern) (filter (complement empty?))))
(defprotocol Processor
(process [this tweet]))
(defn twitter-stream-client
"Creates an HttpClient for stream.twitter.com using the credentials provided."
[username password]
(doto (DefaultHttpClient.)
(.. getCredentialsProvider
(setCredentials
(AuthScope. "stream.twitter.com" 80)
(UsernamePasswordCredentials. username password)))))
(defn tweet-stream
"Creates a lazy stream of live tweets."
[^HttpClient client method & params]
(let [read-line (fn this [^BufferedReader rdr]
(lazy-seq
(if-let [line (.readLine rdr)]
(cons line (this rdr))
(do (.close rdr)
(.. client getConnectionManager shutdown)))))
baseurl "http://stream.twitter.com/1/statuses/"
url (str baseurl method ".json")
http-params
(reduce (fn [^HttpParams hp [k v]] (.setParameter hp (name k) v))
(BasicHttpParams.) (partition 2 params))
request (doto (HttpGet. url) (.setParams http-params))
response (.execute client request)
status-code (.. response getStatusLine getStatusCode)]
(if (= status-code 200)
(if-let [rdr (-?> response .getEntity .getContent reader)]
(map #(read-json % true) (read-line rdr)))
(throw (HttpException.
(str "Invalid Status code: " status-code))))))
(defn process-tweet-stream
"Processes the tweet stream with the provided processors."
[stream processors]
(doseq [tweet stream]
(future (doseq [p processors] (process p tweet)))))
(def status-seen (atom nil))
(defn status-processor []
(reify Processor
(process [this tweet]
(reset! status-seen
(str (-> tweet :user :screen_name) ": " (:text tweet))))))
(def adjective-type-count (atom {}))
(def adjective-seen (atom nil))
(def *tweet-window-size* 25)
(defn adjective-processor [adjective-map]
(let [state-window (atom (PersistentQueue/EMPTY))]
(reify Processor
(process [this tweet]
(let [adj-typs ;list of types of adjective in the tweet
(->> tweet :text
tokenize-line
(map lower-case)
(map #(vector % (adjective-map %)))
(filter #(-> % second nil? not))
;(map #(do (println %) %))
(map #(do (reset! adjective-seen (first %)) %))
(map second))]
(when-not (empty? adj-typs)
(let [current-state
(reduce #(assoc %1 %2 (inc (get %1 %2 0))) {} adj-typs)]
(swap! adjective-type-count
(fn [state]
;If the count of states in the state-window is less than
;*tweet-window-size*, push the current-state in the
;state-window and return merge result: (state + current-state).
;Else, pop the oldest state from the state-window, push the
;current-state in the state-window and return merge result:
;(state + current-state - popped out oldest state).
(if (<= (count @state-window) *tweet-window-size*)
(do (swap! state-window conj current-state)
(merge-with + state current-state))
(let [old-state (peek @state-window)]
(swap! state-window #(conj (pop %) current-state))
(merge-with #(max 0 (- %1 %2))
(merge-with + state current-state)
old-state))))))))))))