Moves client channels to TChan
This commit is contained in:
parent
4941ab9bd5
commit
130b556c56
@ -25,7 +25,8 @@ library
|
|||||||
build-depends: base >= 4.7 && < 5,
|
build-depends: base >= 4.7 && < 5,
|
||||||
network >= 2.6 && < 2.7,
|
network >= 2.6 && < 2.7,
|
||||||
containers >= 0.5 && < 0.6,
|
containers >= 0.5 && < 0.6,
|
||||||
time >= 1.4 && < 1.6
|
time >= 1.4 && < 1.6,
|
||||||
|
stm >= 2.4 && < 2.5
|
||||||
default-language: Haskell2010
|
default-language: Haskell2010
|
||||||
|
|
||||||
executable link-exe
|
executable link-exe
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
module Link.Client where
|
module Link.Client where
|
||||||
|
|
||||||
import Control.Concurrent hiding (forkFinally)
|
import Control.Concurrent.STM (STM, writeTChan, readTChan, atomically, orElse)
|
||||||
import Control.Exception hiding (handle)
|
import Control.Concurrent hiding (forkFinally)
|
||||||
import Control.Monad (void)
|
import Control.Exception hiding (handle)
|
||||||
import Data.Time (getCurrentTime, diffUTCTime)
|
import Control.Monad (void, forever)
|
||||||
import System.IO (hGetLine)
|
import Data.Time (getCurrentTime, diffUTCTime)
|
||||||
import System.Timeout (timeout)
|
import System.IO (hGetLine)
|
||||||
import Text.Printf (printf)
|
import System.Timeout (timeout)
|
||||||
|
import Text.Printf (printf)
|
||||||
|
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
|
|
||||||
@ -19,38 +20,34 @@ forkFinally action fun =
|
|||||||
mask $ \restore ->
|
mask $ \restore ->
|
||||||
forkIO (do r <- try (restore action); fun r)
|
forkIO (do r <- try (restore action); fun r)
|
||||||
|
|
||||||
race :: IO a -> IO b -> IO (Either a b)
|
sendMessage :: Client -> Message -> STM ()
|
||||||
race ioa iob = do
|
sendMessage Client {..} = writeTChan clientChan
|
||||||
m <- newEmptyMVar
|
|
||||||
bracket (forkFinally (fmap Left ioa) (putMVar m)) killThread $ \_ ->
|
|
||||||
bracket (forkFinally (fmap Right iob) (putMVar m)) killThread $ \_ -> do
|
|
||||||
r <- readMVar m
|
|
||||||
case r of
|
|
||||||
Left e -> throwIO e
|
|
||||||
Right a -> return a
|
|
||||||
|
|
||||||
sendMessage :: Client -> Message -> IO ()
|
sendMessageIO :: Client -> Message -> IO ()
|
||||||
sendMessage Client {..} = writeChan clientChan
|
sendMessageIO client = atomically . sendMessage client
|
||||||
|
|
||||||
sendResponse :: Client -> Message -> IO ()
|
sendResponse :: Client -> Message -> IO ()
|
||||||
sendResponse Client {..} = printToHandle clientHandle . formatMessage
|
sendResponse Client {..} = printToHandle clientHandle . formatMessage
|
||||||
|
|
||||||
runClient :: Server -> Client -> IO ()
|
runClient :: Server -> Client -> IO ()
|
||||||
runClient Server {..} client@Client {..} = do
|
runClient Server {..} client@Client {..} = do
|
||||||
clientAlive <- newMVar True
|
clientAlive <- newMVar True
|
||||||
pingThread <- forkIO $ ping clientAlive
|
pingThread <- forkIO $ ping clientAlive `finally` killClient clientAlive
|
||||||
run clientAlive `finally` killThread pingThread
|
commandThread <- forkIO $ readCommands `finally` killClient clientAlive
|
||||||
|
run clientAlive `finally` (killThread pingThread >> killThread commandThread)
|
||||||
where
|
where
|
||||||
pingDelay = 120
|
pingDelay = 120
|
||||||
pingDelayMicros = pingDelay * 1000 * 1000
|
pingDelayMicros = pingDelay * 1000 * 1000
|
||||||
|
|
||||||
|
killClient clientAlive = void $ swapMVar clientAlive False
|
||||||
|
|
||||||
ping clientAlive = do
|
ping clientAlive = do
|
||||||
sendMessage client Ping
|
sendMessageIO client Ping
|
||||||
threadDelay pingDelayMicros
|
threadDelay pingDelayMicros
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
pongTime <- readMVar clientPongTime
|
pongTime <- readMVar clientPongTime
|
||||||
if diffUTCTime now pongTime > fromIntegral pingDelay
|
if diffUTCTime now pongTime > fromIntegral pingDelay
|
||||||
then void $ swapMVar clientAlive False
|
then killClient clientAlive
|
||||||
else ping clientAlive
|
else ping clientAlive
|
||||||
|
|
||||||
run clientAlive = do
|
run clientAlive = do
|
||||||
@ -58,32 +55,29 @@ runClient Server {..} client@Client {..} = do
|
|||||||
if not alive
|
if not alive
|
||||||
then printf "Closing connection: %s\n" (userName clientUser)
|
then printf "Closing connection: %s\n" (userName clientUser)
|
||||||
else do
|
else do
|
||||||
r <- try . timeout pingDelayMicros $ race readCommand readMessage
|
r <- try . timeout pingDelayMicros . atomically . readTChan $ clientChan
|
||||||
case r of
|
case r of
|
||||||
Left (e :: SomeException) -> printf "Exception: %s\n" (show e)
|
Left (e :: SomeException) -> printf "Exception: %s\n" (show e)
|
||||||
Right g -> case g of
|
Right g -> do
|
||||||
Nothing -> run clientAlive
|
case g of
|
||||||
Just cm -> do
|
Nothing -> return ()
|
||||||
case cm of
|
Just message -> handleMessage message clientAlive
|
||||||
Left mcommand -> case mcommand of
|
run clientAlive
|
||||||
Nothing -> printf "Could not parse command\n"
|
|
||||||
Just command -> handleCommand command clientAlive
|
|
||||||
Right message -> sendResponse client message
|
|
||||||
run clientAlive
|
|
||||||
|
|
||||||
readCommand = do
|
readCommands = forever $ do
|
||||||
command <- hGetLine clientHandle
|
command <- hGetLine clientHandle
|
||||||
printf "<%s>: %s\n" (userName clientUser) command
|
printf "<%s>: %s\n" (userName clientUser) command
|
||||||
return $ parseCommand command
|
case parseCommand command of
|
||||||
|
Nothing -> printf "Could not parse command: %s\n" command
|
||||||
|
Just c -> sendMessageIO client c
|
||||||
|
|
||||||
readMessage = readChan clientChan
|
handleMessage (Msg user msg) _ =
|
||||||
|
|
||||||
handleCommand (Msg user msg) _ =
|
|
||||||
withMVar serverUsers $ \clientMap ->
|
withMVar serverUsers $ \clientMap ->
|
||||||
case Map.lookup user clientMap of
|
case Map.lookup user clientMap of
|
||||||
Nothing -> sendResponse client $ NoSuchUser (userName user)
|
Nothing -> sendResponse client $ NoSuchUser (userName user)
|
||||||
Just client' -> sendMessage client' $ Msg clientUser msg
|
Just client' -> sendMessageIO client' $ MsgReply clientUser msg
|
||||||
handleCommand Pong _ = do
|
handleMessage Pong _ = do
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
void $ swapMVar clientPongTime now
|
void $ swapMVar clientPongTime now
|
||||||
handleCommand Quit clientAlive = void $ swapMVar clientAlive False
|
handleMessage Quit clientAlive = killClient clientAlive
|
||||||
|
handleMessage message _ = sendResponse client message
|
||||||
|
@ -12,8 +12,8 @@ parseCommand command = case words command of
|
|||||||
_ -> Nothing
|
_ -> Nothing
|
||||||
|
|
||||||
formatMessage :: Message -> String
|
formatMessage :: Message -> String
|
||||||
formatMessage (Msg user msg) = printf "MSG %s %s" (userName user) msg
|
formatMessage (MsgReply user msg) = printf "MSG %s %s" (userName user) msg
|
||||||
formatMessage (NameInUse name) = printf "NAMEINUSE %s" name
|
formatMessage (NameInUse name) = printf "NAMEINUSE %s" name
|
||||||
formatMessage (Connected name) = printf "CONNECTED %s" name
|
formatMessage (Connected name) = printf "CONNECTED %s" name
|
||||||
formatMessage Ping = "PING"
|
formatMessage Ping = "PING"
|
||||||
formatMessage (NoSuchUser name) = printf "NOSUCHUSER %s" name
|
formatMessage (NoSuchUser name) = printf "NOSUCHUSER %s" name
|
||||||
|
@ -1,13 +1,14 @@
|
|||||||
module Link.Server where
|
module Link.Server where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Exception hiding (handle)
|
import Control.Concurrent.STM (newTChanIO)
|
||||||
import Control.Monad (forever)
|
import Control.Exception hiding (handle)
|
||||||
import Data.Time (getCurrentTime)
|
import Control.Monad (forever)
|
||||||
import Network (withSocketsDo, listenOn, accept, PortID(..))
|
import Data.Time (getCurrentTime)
|
||||||
import System.IO (hClose, hSetNewlineMode, hSetBuffering, BufferMode(..),
|
import Network (withSocketsDo, listenOn, accept, PortID(..))
|
||||||
universalNewlineMode, hGetLine, Handle, stdout)
|
import System.IO (hClose, hSetNewlineMode, hSetBuffering, BufferMode(..),
|
||||||
import Text.Printf (printf)
|
universalNewlineMode, hGetLine, Handle, stdout)
|
||||||
|
import Text.Printf (printf)
|
||||||
|
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
|
|
||||||
@ -19,8 +20,10 @@ import Link.Util
|
|||||||
runServer :: Int -> IO ()
|
runServer :: Int -> IO ()
|
||||||
runServer port = withSocketsDo $ do
|
runServer port = withSocketsDo $ do
|
||||||
hSetBuffering stdout LineBuffering
|
hSetBuffering stdout LineBuffering
|
||||||
|
|
||||||
serverUsers <- newMVar Map.empty
|
serverUsers <- newMVar Map.empty
|
||||||
let server = Server serverUsers
|
let server = Server serverUsers
|
||||||
|
|
||||||
sock <- listenOn . PortNumber . fromIntegral $ port
|
sock <- listenOn . PortNumber . fromIntegral $ port
|
||||||
printf "Listening on port %d\n" port
|
printf "Listening on port %d\n" port
|
||||||
forever $ do
|
forever $ do
|
||||||
@ -55,7 +58,7 @@ checkAddClient Server {..} user@User {..} handle =
|
|||||||
if Map.member user clientMap
|
if Map.member user clientMap
|
||||||
then return (clientMap, Nothing)
|
then return (clientMap, Nothing)
|
||||||
else do
|
else do
|
||||||
clientChan <- newChan
|
clientChan <- newTChanIO
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
clientPongTime <- newMVar now
|
clientPongTime <- newMVar now
|
||||||
let client = Client user handle clientChan clientPongTime
|
let client = Client user handle clientChan clientPongTime
|
||||||
|
@ -1,12 +1,15 @@
|
|||||||
module Link.Types where
|
module Link.Types where
|
||||||
|
|
||||||
import Control.Concurrent (MVar, Chan)
|
import Control.Concurrent (MVar)
|
||||||
import Data.Time (UTCTime)
|
import Control.Concurrent.STM (TVar, TChan)
|
||||||
import System.IO (Handle)
|
import Data.Time (UTCTime)
|
||||||
|
import System.IO (Handle)
|
||||||
|
|
||||||
import qualified Data.Map as Map
|
import qualified Data.Map as Map
|
||||||
|
import qualified Data.Set as Set
|
||||||
|
|
||||||
type UserName = String
|
type UserName = String
|
||||||
|
type RoomName = String
|
||||||
|
|
||||||
data User = User { userName :: !UserName }
|
data User = User { userName :: !UserName }
|
||||||
deriving (Show, Eq, Ord)
|
deriving (Show, Eq, Ord)
|
||||||
@ -14,7 +17,7 @@ data User = User { userName :: !UserName }
|
|||||||
data Client = Client {
|
data Client = Client {
|
||||||
clientUser :: !User
|
clientUser :: !User
|
||||||
, clientHandle :: !Handle
|
, clientHandle :: !Handle
|
||||||
, clientChan :: !(Chan Message)
|
, clientChan :: !(TChan Message)
|
||||||
, clientPongTime :: MVar UTCTime
|
, clientPongTime :: MVar UTCTime
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,8 +28,9 @@ data Server = Server {
|
|||||||
data Message = NameInUse UserName
|
data Message = NameInUse UserName
|
||||||
| Connected UserName
|
| Connected UserName
|
||||||
| Ping
|
| Ping
|
||||||
|
| MsgReply User String
|
||||||
|
| NoSuchUser UserName
|
||||||
| Pong
|
| Pong
|
||||||
| Msg User String
|
| Msg User String
|
||||||
| NoSuchUser UserName
|
|
||||||
| Quit
|
| Quit
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
Loading…
Reference in New Issue
Block a user