Adds channel implementation user STM.
Adds join/leave/tell support for channels.
This commit is contained in:
parent
478a91d93f
commit
affa18de55
@ -1,9 +1,9 @@
|
|||||||
module Link.Client where
|
module Link.Client where
|
||||||
|
|
||||||
import Control.Concurrent.STM (STM, writeTChan, readTChan, atomically, orElse)
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent hiding (forkFinally)
|
import Control.Concurrent hiding (forkFinally)
|
||||||
import Control.Exception hiding (handle)
|
import Control.Exception hiding (handle)
|
||||||
import Control.Monad (void, forever)
|
import Control.Monad (void, forever, when, unless, forM_)
|
||||||
import Data.Time (getCurrentTime, diffUTCTime)
|
import Data.Time (getCurrentTime, diffUTCTime)
|
||||||
import System.IO (hGetLine)
|
import System.IO (hGetLine)
|
||||||
import System.Timeout (timeout)
|
import System.Timeout (timeout)
|
||||||
@ -27,6 +27,9 @@ sendMessage Client {..} = writeTChan clientChan
|
|||||||
sendMessageIO :: Client -> Message -> IO ()
|
sendMessageIO :: Client -> Message -> IO ()
|
||||||
sendMessageIO client = atomically . sendMessage client
|
sendMessageIO client = atomically . sendMessage client
|
||||||
|
|
||||||
|
tellMessage :: Channel -> Message -> STM ()
|
||||||
|
tellMessage Channel {..} = writeTChan channelChan
|
||||||
|
|
||||||
sendResponse :: Client -> Message -> IO ()
|
sendResponse :: Client -> Message -> IO ()
|
||||||
sendResponse Client {..} = printToHandle clientHandle . formatMessage
|
sendResponse Client {..} = printToHandle clientHandle . formatMessage
|
||||||
|
|
||||||
@ -35,7 +38,12 @@ runClient Server {..} client@Client {..} = do
|
|||||||
clientAlive <- newMVar True
|
clientAlive <- newMVar True
|
||||||
pingThread <- forkIO $ ping clientAlive `finally` killClient clientAlive
|
pingThread <- forkIO $ ping clientAlive `finally` killClient clientAlive
|
||||||
commandThread <- forkIO $ readCommands `finally` killClient clientAlive
|
commandThread <- forkIO $ readCommands `finally` killClient clientAlive
|
||||||
run clientAlive `finally` (killThread pingThread >> killThread commandThread)
|
run clientAlive `finally` do
|
||||||
|
killThread pingThread
|
||||||
|
killThread commandThread
|
||||||
|
clientChannelMap <- readTVarIO clientChannelChans
|
||||||
|
forM_ (Map.keys clientChannelMap) $ \channelName ->
|
||||||
|
handleMessage (Leave channelName) clientAlive
|
||||||
where
|
where
|
||||||
pingDelay = 120
|
pingDelay = 120
|
||||||
pingDelayMicros = pingDelay * 1000 * 1000
|
pingDelayMicros = pingDelay * 1000 * 1000
|
||||||
@ -56,7 +64,9 @@ runClient Server {..} client@Client {..} = do
|
|||||||
if not alive
|
if not alive
|
||||||
then printf "Closing connection: %s\n" (userName clientUser)
|
then printf "Closing connection: %s\n" (userName clientUser)
|
||||||
else do
|
else do
|
||||||
r <- try . timeout pingDelayMicros . atomically . readTChan $ clientChan
|
r <- try . timeout pingDelayMicros . atomically $ do
|
||||||
|
clientChannelMap <- readTVar clientChannelChans
|
||||||
|
foldr (orElse . readTChan) retry $ clientChan : Map.elems clientChannelMap
|
||||||
case r of
|
case r of
|
||||||
Left (e :: SomeException) -> printf "Exception: %s\n" (show e)
|
Left (e :: SomeException) -> printf "Exception: %s\n" (show e)
|
||||||
Right g -> do
|
Right g -> do
|
||||||
@ -77,27 +87,52 @@ runClient Server {..} client@Client {..} = do
|
|||||||
case Map.lookup user clientMap of
|
case Map.lookup user clientMap of
|
||||||
Nothing -> sendResponse client $ NoSuchUser (userName user)
|
Nothing -> sendResponse client $ NoSuchUser (userName user)
|
||||||
Just client' -> sendMessageIO client' $ MsgReply clientUser msg
|
Just client' -> sendMessageIO client' $ MsgReply clientUser msg
|
||||||
|
|
||||||
handleMessage Pong _ = do
|
handleMessage Pong _ = do
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
void $ swapMVar clientPongTime now
|
void $ swapMVar clientPongTime now
|
||||||
|
|
||||||
handleMessage Quit clientAlive = killClient clientAlive
|
handleMessage Quit clientAlive = killClient clientAlive
|
||||||
handleMessage (Join channelName) _ = do
|
|
||||||
modifyMVar_ serverChannels $ \channelMap -> do
|
handleMessage (Join channelName) _ = atomically $ do
|
||||||
case Map.lookup channelName channelMap of
|
clientChannelMap <- readTVar clientChannelChans
|
||||||
Just (Channel {channelUsers}) -> do
|
unless (Map.member channelName clientChannelMap) $ do
|
||||||
modifyMVar_ channelUsers $ return . Set.insert clientUser
|
channelMap <- readTVar serverChannels
|
||||||
return channelMap
|
channel@Channel {channelChan} <- case Map.lookup channelName channelMap of
|
||||||
Nothing -> do
|
Just (channel@Channel {channelUsers}) -> do
|
||||||
|
modifyTVar' channelUsers $ Set.insert clientUser
|
||||||
|
return channel
|
||||||
|
Nothing -> do
|
||||||
channel <- newChannel channelName $ Set.singleton clientUser
|
channel <- newChannel channelName $ Set.singleton clientUser
|
||||||
return $ Map.insert channelName channel channelMap
|
modifyTVar' serverChannels $ Map.insert channelName channel
|
||||||
handleMessage (Leave channelName) _ = do
|
return channel
|
||||||
modifyMVar_ serverChannels $ \channelMap -> do
|
clientChannelChan <- dupTChan channelChan
|
||||||
case Map.lookup channelName channelMap of
|
modifyTVar' clientChannelChans $ Map.insert channelName clientChannelChan
|
||||||
Just (Channel {channelUsers}) -> do
|
tellMessage channel $ Joined channelName clientUser
|
||||||
modifyMVar_ channelUsers $ return . Set.delete clientUser
|
|
||||||
users <- readMVar channelUsers
|
handleMessage (Leave channelName) _ = atomically $ do
|
||||||
return $ if Set.null users
|
channelMap <- readTVar serverChannels
|
||||||
then Map.delete channelName channelMap
|
case Map.lookup channelName channelMap of
|
||||||
else channelMap
|
Just (channel@Channel {channelUsers}) -> do
|
||||||
Nothing -> return channelMap
|
modifyTVar' channelUsers $ Set.delete clientUser
|
||||||
|
users <- readTVar channelUsers
|
||||||
|
when (Set.null users) $
|
||||||
|
modifyTVar' serverChannels $ Map.delete channelName
|
||||||
|
modifyTVar' clientChannelChans $ Map.delete channelName
|
||||||
|
tellMessage channel $ Leaved channelName clientUser
|
||||||
|
Nothing -> return ()
|
||||||
|
|
||||||
|
handleMessage (Names channelName) _ = atomically $ do
|
||||||
|
channelMap <- readTVar serverChannels
|
||||||
|
users <- case Map.lookup channelName channelMap of
|
||||||
|
Just (Channel {channelUsers}) -> readTVar channelUsers
|
||||||
|
Nothing -> return Set.empty
|
||||||
|
sendMessage client $ NamesReply channelName users
|
||||||
|
|
||||||
|
handleMessage (Tell channelName msg) _ = atomically $ do
|
||||||
|
channelMap <- readTVar serverChannels
|
||||||
|
case Map.lookup channelName channelMap of
|
||||||
|
Just channel -> tellMessage channel $ TellReply channelName clientUser msg
|
||||||
|
Nothing -> return ()
|
||||||
|
|
||||||
handleMessage message _ = sendResponse client message
|
handleMessage message _ = sendResponse client message
|
||||||
|
@ -2,22 +2,30 @@ module Link.Protocol where
|
|||||||
|
|
||||||
import Text.Printf (printf)
|
import Text.Printf (printf)
|
||||||
|
|
||||||
|
import qualified Data.Set as Set
|
||||||
|
|
||||||
import Link.Types
|
import Link.Types
|
||||||
|
|
||||||
parseCommand :: String -> Maybe Message
|
parseCommand :: String -> Maybe Message
|
||||||
parseCommand command = case words command of
|
parseCommand command = case words command of
|
||||||
["PONG"] -> Just Pong
|
["PONG"] -> Just Pong
|
||||||
"MSG" : userName : msg -> Just $ Msg (User userName) (unwords msg)
|
"MSG" : userName : msg -> Just $ Msg (User userName) (unwords msg)
|
||||||
["QUIT"] -> Just $ Quit
|
"TELL" : channelName : msg -> Just $ Tell channelName (unwords msg)
|
||||||
"JOIN" : channelName -> Just $ Join (unwords channelName)
|
["QUIT"] -> Just Quit
|
||||||
"LEAVE" : channelName -> Just $ Leave (unwords channelName)
|
"JOIN" : channelName -> Just $ Join (unwords channelName)
|
||||||
_ -> Nothing
|
"LEAVE" : channelName -> Just $ Leave (unwords channelName)
|
||||||
|
"NAMES" : channelName -> Just $ Names (unwords channelName)
|
||||||
|
_ -> Nothing
|
||||||
|
|
||||||
formatMessage :: Message -> String
|
formatMessage :: Message -> String
|
||||||
formatMessage (MsgReply user msg) = printf "MSG %s %s" (userName user) msg
|
formatMessage (MsgReply user msg) = printf "MSG %s %s" (userName user) msg
|
||||||
|
formatMessage (TellReply channelName user msg) =
|
||||||
|
printf "TELL %s %s %s" channelName (userName user) msg
|
||||||
formatMessage (NameInUse name) = printf "NAMEINUSE %s" name
|
formatMessage (NameInUse name) = printf "NAMEINUSE %s" name
|
||||||
formatMessage (Connected name) = printf "CONNECTED %s" name
|
formatMessage (Connected name) = printf "CONNECTED %s" name
|
||||||
formatMessage Ping = "PING"
|
formatMessage Ping = "PING"
|
||||||
formatMessage (NoSuchUser name) = printf "NOSUCHUSER %s" name
|
formatMessage (NoSuchUser name) = printf "NOSUCHUSER %s" name
|
||||||
formatMessage (Joined channelName user) = printf "JOINED %s %s" channelName (userName user)
|
formatMessage (Joined channelName user) = printf "JOINED %s %s" channelName (userName user)
|
||||||
formatMessage (Leaved channelName user) = printf "LEFT %s %s" channelName (userName user)
|
formatMessage (Leaved channelName user) = printf "LEFT %s %s" channelName (userName user)
|
||||||
|
formatMessage (NamesReply channelName users) =
|
||||||
|
printf "NAMES %s %s" channelName . unwords . map userName . Set.toList $ users
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
module Link.Types where
|
module Link.Types where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.STM (TVar, TChan, newTVarIO, newTChanIO, newBroadcastTChanIO)
|
import Control.Concurrent.STM (STM, TVar, TChan, newTVar, newTVarIO, newTChanIO, newBroadcastTChan)
|
||||||
import Data.Time (UTCTime, getCurrentTime)
|
import Data.Time (UTCTime, getCurrentTime)
|
||||||
import System.IO (Handle)
|
import System.IO (Handle)
|
||||||
|
|
||||||
@ -15,52 +15,58 @@ data User = User { userName :: !UserName }
|
|||||||
deriving (Show, Eq, Ord)
|
deriving (Show, Eq, Ord)
|
||||||
|
|
||||||
data Client = Client {
|
data Client = Client {
|
||||||
clientUser :: !User
|
clientUser :: !User
|
||||||
, clientHandle :: !Handle
|
, clientHandle :: !Handle
|
||||||
, clientChan :: TChan Message
|
, clientChan :: TChan Message
|
||||||
, clientPongTime :: MVar UTCTime
|
, clientPongTime :: MVar UTCTime
|
||||||
|
, clientChannelChans :: TVar (Map.Map ChannelName (TChan Message))
|
||||||
}
|
}
|
||||||
|
|
||||||
newClient :: User -> Handle -> IO Client
|
newClient :: User -> Handle -> IO Client
|
||||||
newClient user handle = do
|
newClient user handle = do
|
||||||
clientChan <- newTChanIO
|
clientChan <- newTChanIO
|
||||||
now <- getCurrentTime
|
now <- getCurrentTime
|
||||||
clientPongTime <- newMVar now
|
clientPongTime <- newMVar now
|
||||||
return $ Client user handle clientChan clientPongTime
|
clientChannelChans <- newTVarIO Map.empty
|
||||||
|
return $ Client user handle clientChan clientPongTime clientChannelChans
|
||||||
|
|
||||||
data Channel = Channel {
|
data Channel = Channel {
|
||||||
channelName :: !ChannelName
|
channelName :: !ChannelName
|
||||||
, channelUsers :: MVar (Set.Set User)
|
, channelUsers :: TVar (Set.Set User)
|
||||||
, channelChan :: TChan Message
|
, channelChan :: TChan Message
|
||||||
}
|
}
|
||||||
|
|
||||||
newChannel :: ChannelName -> Set.Set User -> IO Channel
|
newChannel :: ChannelName -> Set.Set User -> STM Channel
|
||||||
newChannel channelName users = do
|
newChannel channelName users = do
|
||||||
channelUsers <- newMVar users
|
channelUsers <- newTVar users
|
||||||
channelChan <- newBroadcastTChanIO
|
channelChan <- newBroadcastTChan
|
||||||
return $ Channel channelName channelUsers channelChan
|
return $ Channel channelName channelUsers channelChan
|
||||||
|
|
||||||
data Server = Server {
|
data Server = Server {
|
||||||
serverUsers :: MVar (Map.Map User Client)
|
serverUsers :: MVar (Map.Map User Client)
|
||||||
, serverChannels :: MVar (Map.Map ChannelName Channel)
|
, serverChannels :: TVar (Map.Map ChannelName Channel)
|
||||||
}
|
}
|
||||||
|
|
||||||
newServer :: IO Server
|
newServer :: IO Server
|
||||||
newServer = do
|
newServer = do
|
||||||
serverUsers <- newMVar Map.empty
|
serverUsers <- newMVar Map.empty
|
||||||
serverChannels <- newMVar Map.empty
|
serverChannels <- newTVarIO Map.empty
|
||||||
return $ Server serverUsers serverChannels
|
return $ Server serverUsers serverChannels
|
||||||
|
|
||||||
data Message = NameInUse UserName
|
data Message = NameInUse UserName
|
||||||
| Connected UserName
|
| Connected UserName
|
||||||
| Ping
|
| Ping
|
||||||
| MsgReply User String
|
| MsgReply User String
|
||||||
|
| TellReply ChannelName User String
|
||||||
| NoSuchUser UserName
|
| NoSuchUser UserName
|
||||||
| Joined ChannelName User
|
| Joined ChannelName User
|
||||||
| Leaved ChannelName User
|
| Leaved ChannelName User
|
||||||
|
| NamesReply ChannelName (Set.Set User)
|
||||||
| Pong
|
| Pong
|
||||||
| Msg User String
|
| Msg User String
|
||||||
|
| Tell ChannelName String
|
||||||
| Join ChannelName
|
| Join ChannelName
|
||||||
| Leave ChannelName
|
| Leave ChannelName
|
||||||
|
| Names ChannelName
|
||||||
| Quit
|
| Quit
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
Loading…
Reference in New Issue
Block a user