commit
f3b929ea60
@ -0,0 +1,8 @@ |
||||
/bower_components/ |
||||
/node_modules/ |
||||
/.pulp-cache/ |
||||
/output/ |
||||
/generated-docs/ |
||||
/.psc* |
||||
/.purs* |
||||
/.psa* |
@ -0,0 +1,20 @@ |
||||
{ |
||||
"name": "purescript-amqp", |
||||
"ignore": [ |
||||
"**/.*", |
||||
"node_modules", |
||||
"bower_components", |
||||
"output" |
||||
], |
||||
"dependencies": { |
||||
"purescript-prelude": "^3.1.0", |
||||
"purescript-console": "^3.0.0", |
||||
"purescript-aff": "^3.1.0", |
||||
"purescript-node-buffer": "^3.0.1", |
||||
"purescript-node-process": "^5.0.0", |
||||
"purescript-foreign-generic": "^4.2.0" |
||||
}, |
||||
"devDependencies": { |
||||
"purescript-psci-support": "^3.0.0" |
||||
} |
||||
} |
@ -0,0 +1,68 @@ |
||||
{ |
||||
"name": "purescript-amqp", |
||||
"version": "1.0.0", |
||||
"lockfileVersion": 1, |
||||
"requires": true, |
||||
"dependencies": { |
||||
"amqplib": { |
||||
"version": "0.5.1", |
||||
"resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.5.1.tgz", |
||||
"integrity": "sha1-fMz+ur5WwumE6noiQ/fO/m+/xs8=", |
||||
"requires": { |
||||
"bitsyntax": "0.0.4", |
||||
"bluebird": "3.5.1", |
||||
"buffer-more-ints": "0.0.2", |
||||
"readable-stream": "1.1.14" |
||||
} |
||||
}, |
||||
"bitsyntax": { |
||||
"version": "0.0.4", |
||||
"resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.0.4.tgz", |
||||
"integrity": "sha1-6xDMb4K4xJDj6FaY8H6D1G4MuoI=", |
||||
"requires": { |
||||
"buffer-more-ints": "0.0.2" |
||||
} |
||||
}, |
||||
"bluebird": { |
||||
"version": "3.5.1", |
||||
"resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.5.1.tgz", |
||||
"integrity": "sha512-MKiLiV+I1AA596t9w1sQJ8jkiSr5+ZKi0WKrYGUn6d1Fx+Ij4tIj+m2WMQSGczs5jZVxV339chE8iwk6F64wjA==" |
||||
}, |
||||
"buffer-more-ints": { |
||||
"version": "0.0.2", |
||||
"resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz", |
||||
"integrity": "sha1-JrOIXRD6E9t/wBquOquHAZngEkw=" |
||||
}, |
||||
"core-util-is": { |
||||
"version": "1.0.2", |
||||
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", |
||||
"integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" |
||||
}, |
||||
"inherits": { |
||||
"version": "2.0.3", |
||||
"resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.3.tgz", |
||||
"integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=" |
||||
}, |
||||
"isarray": { |
||||
"version": "0.0.1", |
||||
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", |
||||
"integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" |
||||
}, |
||||
"readable-stream": { |
||||
"version": "1.1.14", |
||||
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", |
||||
"integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", |
||||
"requires": { |
||||
"core-util-is": "1.0.2", |
||||
"inherits": "2.0.3", |
||||
"isarray": "0.0.1", |
||||
"string_decoder": "0.10.31" |
||||
} |
||||
}, |
||||
"string_decoder": { |
||||
"version": "0.10.31", |
||||
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", |
||||
"integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,18 @@ |
||||
{ |
||||
"name": "purescript-amqp", |
||||
"version": "1.0.0", |
||||
"description": "", |
||||
"main": "index.js", |
||||
"directories": { |
||||
"test": "test" |
||||
}, |
||||
"scripts": { |
||||
"test": "echo \"Error: no test specified\" && exit 1" |
||||
}, |
||||
"keywords": [], |
||||
"author": "", |
||||
"license": "ISC", |
||||
"dependencies": { |
||||
"amqplib": "^0.5.1" |
||||
} |
||||
} |
@ -0,0 +1,236 @@ |
||||
module Node.AMQP |
||||
( module MoreExports |
||||
, module Node.AMQP.Types |
||||
, module Node.AMQP |
||||
) where |
||||
|
||||
import Node.AMQP.FFI |
||||
import Node.AMQP.Types |
||||
import Node.AMQP.Types.Internal |
||||
import Prelude |
||||
|
||||
import Control.Monad.Aff (makeAff) |
||||
import Control.Monad.Eff.Class (liftEff) |
||||
import Control.Monad.Eff.Exception (Error, message) |
||||
import Control.Monad.Error.Class (withResource) |
||||
import Data.Foreign (Foreign) |
||||
import Data.Foreign.Class (encode) |
||||
import Data.Function.Uncurried (runFn1, runFn2, runFn3, runFn4, runFn5, runFn6, runFn7) |
||||
import Data.Maybe (Maybe) |
||||
import Data.Nullable (toMaybe) |
||||
import Data.StrMap (StrMap) |
||||
import Data.String (Pattern(..), contains) |
||||
import Node.AMQP.FFI (EffAMQP, AffAMQP) as MoreExports |
||||
import Node.Buffer (Buffer) |
||||
|
||||
-- | Connects to an AMQP server given an AMQP URL and [connection options]. Returns the connection in |
||||
-- | `Aff` monad. See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#connect) for details. |
||||
connect :: forall eff. String -> ConnectOptions -> AffAMQP eff Connection |
||||
connect url = makeAff <<< runFn3 _connect <<< connectUrl url |
||||
|
||||
-- | Closes the given AMQP connection. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#model_close) for details. |
||||
close :: forall eff. Connection -> AffAMQP eff Unit |
||||
close conn = makeAff $ \onError onSuccess -> flip (runFn3 _close conn) onSuccess \err -> do |
||||
if contains (Pattern "Connection closed") (message err) |
||||
then onSuccess unit |
||||
else onError err |
||||
|
||||
-- | Creates an open channel and returns it. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#model_createChannel) for details. |
||||
createChannel :: forall eff. Connection -> AffAMQP eff Channel |
||||
createChannel = makeAff <<< runFn3 _createChannel |
||||
|
||||
-- | Closes the given channel. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_close) for details. |
||||
closeChannel :: forall eff. Channel -> AffAMQP eff Unit |
||||
closeChannel = makeAff <<< runFn3 _closeChannel |
||||
|
||||
-- | Asserts a queue into existence with the given name and options. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) for details. |
||||
assertQueue :: forall eff. Channel -> QueueName -> QueueOptions -> AffAMQP eff AssertQueueOK |
||||
assertQueue channel queue = |
||||
makeAff <<< runFn5 _assertQueue channel queue <<< encodeQueueOptions |
||||
|
||||
-- | Checks that a queue exists with the given queue name. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_checkQueue) for details. |
||||
checkQueue :: forall eff. Channel -> QueueName -> AffAMQP eff AssertQueueOK |
||||
checkQueue channel = makeAff <<< runFn4 _checkQueue channel |
||||
|
||||
-- | Deletes the queue by the given name. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_deleteQueue) for details. |
||||
deleteQueue :: forall eff. Channel -> QueueName -> DeleteQueueOptions -> AffAMQP eff DeleteQueueOK |
||||
deleteQueue channel queue = |
||||
makeAff <<< runFn5 _deleteQueue channel queue <<< encodeDeleteQueueOptions |
||||
|
||||
-- | Purges the messages from the queue by the given name. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_purgeQueue) for details. |
||||
purgeQueue :: forall eff. Channel -> QueueName -> AffAMQP eff PurgeQueueOK |
||||
purgeQueue channel = makeAff <<< runFn4 _purgeQueue channel |
||||
|
||||
-- | Asserts a routing path from an exchange to a queue: the given exchange will relay |
||||
-- | messages to the given queue, according to the type of the exchange and the given routing key. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue) details. |
||||
bindQueue :: forall eff. Channel -> QueueName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit |
||||
bindQueue channel queue exchange routingKey = |
||||
makeAff <<< runFn7 _bindQueue channel queue exchange routingKey <<< encode |
||||
|
||||
-- | Removes the routing path between the given queue and the given exchange with the given routing key and arguments. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_unbindQueue) for details. |
||||
unbindQueue :: forall eff. Channel -> QueueName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit |
||||
unbindQueue channel queue exchange routingKey = |
||||
makeAff <<< runFn7 _unbindQueue channel queue exchange routingKey <<< encode |
||||
|
||||
-- | Asserts an exchange into existence with the given exchange name, type and options. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange) for details. |
||||
assertExchange :: forall eff. Channel -> ExchangeName -> ExchangeType -> ExchangeOptions -> AffAMQP eff Unit |
||||
assertExchange channel exchange exchangeType = |
||||
makeAff <<< runFn6 _assertExchange channel exchange (show exchangeType) <<< encodeExchangeOptions |
||||
|
||||
-- | Checks that the exchange exists by the given name. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_checkExchange) for details. |
||||
checkExchange :: forall eff. Channel -> ExchangeName -> AffAMQP eff Unit |
||||
checkExchange channel = makeAff <<< runFn4 _checkExchange channel |
||||
|
||||
-- | Deletes the exchange by the given name. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_deleteExchange) for details. |
||||
deleteExchange :: forall eff. Channel -> ExchangeName -> DeleteExchangeOptions -> AffAMQP eff Unit |
||||
deleteExchange channel exchange = |
||||
makeAff <<< runFn5 _deleteExchange channel exchange <<< encodeDeleteExchangeOptions |
||||
|
||||
-- | Binds an exchange to another exchange. The exchange named by `destExchange` will receive messages |
||||
-- | from the exchange named by `sourceExchange`, according to the type of the source and the given |
||||
-- | routing key. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_bindExchange) for details. |
||||
bindExchange :: forall eff. Channel -> ExchangeName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit |
||||
bindExchange channel destExchange sourceExchange routingKey = |
||||
makeAff <<< runFn7 _bindExchange channel destExchange sourceExchange routingKey <<< encode |
||||
|
||||
-- | Removes a binding from an exchange to another exchange. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_unbindExchange) for details. |
||||
unbindExchange :: forall eff. Channel -> ExchangeName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit |
||||
unbindExchange channel destExchange sourceExchange routingKey = |
||||
makeAff <<< runFn7 _unbindExchange channel destExchange sourceExchange routingKey <<< encode |
||||
|
||||
-- | Publish a single message to the given exchange with the given routing key, and the given publish |
||||
-- | options. The message content is given as a `Buffer`. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish) for details. |
||||
publish :: forall eff. Channel -> ExchangeName -> RoutingKey -> Buffer -> PublishOptions -> AffAMQP eff Unit |
||||
publish channel exchange routingKey buffer = |
||||
makeAff <<< const <<< runFn6 _publish channel exchange routingKey buffer <<< encodePublishOptions |
||||
|
||||
-- | Sends a single message with the content given as a `Buffer` to the given queue, bypassing routing. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_sendToQueue) for details. |
||||
sendToQueue :: forall eff. Channel -> QueueName -> Buffer -> PublishOptions -> AffAMQP eff Unit |
||||
sendToQueue channel queue buffer = |
||||
makeAff <<< const <<< runFn5 _sendToQueue channel queue buffer <<< encodePublishOptions |
||||
|
||||
-- | Sets up a consumer for the given queue and consume options, with a callback to be invoked with each message. |
||||
-- | The callback receives `Nothing` if the consumer is cancelled by the broker. Returns the consumer tag. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume) for details. |
||||
consume :: forall eff. Channel -> QueueName -> ConsumeOptions -> (Maybe Message -> EffAMQP eff Unit) -> AffAMQP eff ConsumeOK |
||||
consume channel queue options onMessage = |
||||
makeAff $ runFn6 _consume channel queue (toMaybe >>> map toMessage >>> onMessage) (encodeConsumeOptions options) |
||||
|
||||
-- | Cancels the consumer identified by the given consumer tag. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel) for details. |
||||
cancel :: forall eff. Channel -> String -> AffAMQP eff Unit |
||||
cancel channel = makeAff <<< runFn4 _cancel channel |
||||
|
||||
-- | Gets a message from the given queue. If there are no messages in the queue, returns `Nothing`. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_get) for details. |
||||
get :: forall eff. Channel -> QueueName -> GetOptions -> AffAMQP eff (Maybe Message) |
||||
get channel queue opts = makeAff \onError onSuccess -> |
||||
runFn5 _get channel queue (encodeGetOptions opts) onError (onSuccess <<< map toMessage <<< toMaybe) |
||||
|
||||
-- | Acknowledges a message given its delivery tag. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_ack) for details. |
||||
ack :: forall eff. Channel -> String -> EffAMQP eff Unit |
||||
ack channel deliveryTag = runFn3 _ack channel deliveryTag false |
||||
|
||||
-- | Acknowledges all messages up to the message with the given delivery tag. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_ack) for details. |
||||
ackAllUpTo :: forall eff. Channel -> String -> EffAMQP eff Unit |
||||
ackAllUpTo channel deliveryTag = runFn3 _ack channel deliveryTag true |
||||
|
||||
-- | Acknowledges all outstanding messages on the channel. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_ackAll) for details. |
||||
ackAll :: forall eff. Channel -> EffAMQP eff Unit |
||||
ackAll = runFn1 _ackAll |
||||
|
||||
-- | Rejects a message given its delivery tag. If the boolean param is true, the server requeues the |
||||
-- | message, else it drops it. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_nack) for details. |
||||
nack :: forall eff. Channel -> String -> Boolean -> EffAMQP eff Unit |
||||
nack channel deliveryTag = runFn4 _nack channel deliveryTag false |
||||
|
||||
-- | Rejects all messages up to the message with the given delivery tag. If the boolean param is true, |
||||
-- | the server requeues the messages, else it drops them. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_nack) for details. |
||||
nackAllUpTo :: forall eff. Channel -> String -> Boolean -> EffAMQP eff Unit |
||||
nackAllUpTo channel deliveryTag = runFn4 _nack channel deliveryTag true |
||||
|
||||
-- | Rejects all outstanding messages on the channel. If the boolean param is true, |
||||
-- | the server requeues the messages, else it drops them. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_nackAll) for details. |
||||
nackAll :: forall eff. Channel -> Boolean -> EffAMQP eff Unit |
||||
nackAll = runFn2 _nackAll |
||||
|
||||
-- | Sets the prefetch count for this channel. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch) for details. |
||||
prefetch :: forall eff. Channel -> Int -> AffAMQP eff Unit |
||||
prefetch channel = liftEff <<< runFn2 _prefetch channel |
||||
|
||||
-- | Requeues unacknowledged messages on this channel. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_recover) for details. |
||||
recover :: forall eff. Channel -> AffAMQP eff Unit |
||||
recover = makeAff <<< runFn3 _recover |
||||
|
||||
-- | Registers an event handler to the connection which is triggered when the connection closes. |
||||
-- | If the connection closes because of an error, the handler is called with `Just error`, else |
||||
-- | with `Nothing`. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#model_events) for details. |
||||
onConnectionClose :: forall eff. Connection -> (Maybe Error -> EffAMQP eff Unit) -> EffAMQP eff Unit |
||||
onConnectionClose conn onClose = runFn2 _onConnectionClose conn (onClose <<< toMaybe) |
||||
|
||||
-- | Registers an event handler to the connection which is triggered when the connection errors out. |
||||
-- | The handler is called with the error. |
||||
onConnectionError :: forall eff. Connection -> (Error -> EffAMQP eff Unit) -> EffAMQP eff Unit |
||||
onConnectionError = runFn2 _onConnectionError |
||||
|
||||
-- | Registers an event handler to the connection which is triggered when the RabbitMQ server |
||||
-- | decides to block the connection. The handler is called with the reason for blocking. |
||||
onConnectionBlocked :: forall eff. Connection -> (String -> EffAMQP eff Unit) -> EffAMQP eff Unit |
||||
onConnectionBlocked = runFn2 _onConnectionBlocked |
||||
|
||||
-- | Registers an event handler to the connection which is triggered when the RabbitMQ server |
||||
-- | decides to unblock the connection. The handler is called with no arguments. |
||||
onConnectionUnblocked :: forall eff. Connection -> EffAMQP eff Unit -> EffAMQP eff Unit |
||||
onConnectionUnblocked = runFn2 _onConnectionUnblocked |
||||
|
||||
-- | Registers an event handler to the channel which is triggered when the channel closes. |
||||
-- | The handler is called with no arguments. |
||||
-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_events) for details. |
||||
onChannelClose :: forall eff. Channel -> EffAMQP eff Unit -> EffAMQP eff Unit |
||||
onChannelClose = runFn2 _onChannelClose |
||||
|
||||
-- | Registers an event handler to the channel which is triggered when the channel errors out. |
||||
-- | The handler is called with the error. |
||||
onChannelError :: forall eff. Channel -> (Error -> EffAMQP eff Unit) -> EffAMQP eff Unit |
||||
onChannelError = runFn2 _onChannelError |
||||
|
||||
-- | Registers an event handler to the channel which is triggered when a message published with |
||||
-- | the mandatory flag cannot be routed and is returned to the sending channel. The handler is |
||||
-- | called with the returned message. |
||||
onChannelReturn :: forall eff. Channel -> (Message -> EffAMQP eff Unit) -> EffAMQP eff Unit |
||||
onChannelReturn channel onReturn = runFn2 _onChannelReturn channel (toMessage >>> onReturn) |
||||
|
||||
-- | Registers an event handler to the channel which is triggered when the channel's write buffer |
||||
-- | is emptied. The handler is called with no arguments. |
||||
onChannelDrain :: forall eff. Channel -> EffAMQP eff Unit -> EffAMQP eff Unit |
||||
onChannelDrain = runFn2 _onChannelDrain |
||||
|
||||
-- | A convenience function for creating a channel, doing some action with it, and then automatically closing |
||||
-- | it, even in case of errors. |
||||
withChannel :: forall eff a. Connection -> (Channel -> AffAMQP eff a) -> AffAMQP eff a |
||||
withChannel conn = withResource (createChannel conn) closeChannel |
@ -0,0 +1,221 @@ |
||||
var amqp = require('amqplib/callback_api'); |
||||
|
||||
var callback = function(onError, onSuccess) { |
||||
return function(err, o) { |
||||
if (err) { |
||||
onError(err)(); |
||||
return; |
||||
} |
||||
onSuccess(o)(); |
||||
}; |
||||
}; |
||||
|
||||
var callbackOnlyError = function(onError, onSuccess) { |
||||
return function(err) { |
||||
if (err) { |
||||
onError(err)(); |
||||
return; |
||||
} |
||||
onSuccess()(); |
||||
}; |
||||
}; |
||||
|
||||
exports._connect = function(url, onError,onSuccess) { |
||||
return function() { |
||||
amqp.connect(url, callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._close = function(conn, onError, onSuccess) { |
||||
return function() { |
||||
conn.close(callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._createChannel = function(conn, onError, onSuccess) { |
||||
return function() { |
||||
conn.createChannel(callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._closeChannel = function(channel, onError, onSuccess) { |
||||
return function() { |
||||
channel.close(callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._assertQueue = function(channel, queue, options, onError, onSuccess) { |
||||
return function() { |
||||
channel.assertQueue(queue, options, callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._checkQueue = function(channel, queue, onError, onSuccess) { |
||||
return function() { |
||||
channel.checkQueue(queue, callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._deleteQueue = function(channel, queue, options, onError, onSuccess) { |
||||
return function() { |
||||
channel.deleteQueue(queue, options, callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._purgeQueue = function(channel, queue, onError, onSuccess) { |
||||
return function() { |
||||
channel.purgeQueue(queue, callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._bindQueue = function(channel, queue, exchange, pattern, args, onError, onSuccess) { |
||||
return function() { |
||||
channel.bindQueue(queue, exchange, pattern, args, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._unbindQueue = function(channel, queue, exchange, pattern, args, onError, onSuccess) { |
||||
return function() { |
||||
channel.unbindQueue(queue, exchange, pattern, args, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._assertExchange = function(channel, exchange, type, options, onError, onSuccess) { |
||||
return function() { |
||||
channel.assertExchange(exchange, type, options, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._checkExchange = function(channel, exchange, onError, onSuccess) { |
||||
return function() { |
||||
channel.checkExchange(exchange, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._deleteExchange = function(channel, exchange, options, onError, onSuccess) { |
||||
return function() { |
||||
channel.deleteExchange(exchange, options, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._bindExchange = function(channel, destExchange, sourceExchange, pattern, args, onError, onSuccess) { |
||||
return function() { |
||||
channel.bindExchange(destExchange, sourceExchange, pattern, args, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._unbindExchange = function(channel, destExchange, sourceExchange, pattern, args, onError, onSuccess) { |
||||
return function() { |
||||
channel.unbindExchange(destExchange, sourceExchange, pattern, args, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._publish = function(channel, exchange, routingKey, content, options, onSuccess) { |
||||
return function() { |
||||
if (!channel.publish(exchange, routingKey, content, options)) { |
||||
channel.once('drain', function() { |
||||
onSuccess()(); |
||||
}); |
||||
} else { |
||||
onSuccess()(); |
||||
} |
||||
} |
||||
}; |
||||
|
||||
exports._sendToQueue = function(channel, queue, content, options, onSuccess) { |
||||
return function() { |
||||
if (!channel.sendToQueue(queue, content, options)) { |
||||
channel.once('drain', function() { |
||||
onSuccess()(); |
||||
}); |
||||
} else { |
||||
onSuccess()(); |
||||
} |
||||
} |
||||
}; |
||||
|
||||
exports._consume = function(channel, queue, onMessage, options, onError, onSuccess) { |
||||
return function() { |
||||
channel.consume(queue, function(msg) { onMessage(msg)(); }, options, callback(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._cancel = function(channel, consumerTag, onError, onSuccess) { |
||||
return function() { |
||||
channel.cancel(consumerTag, callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
exports._get = function(channel, queue, options, onError, onSuccess) { |
||||
return function() { |
||||
channel.get(queue, options, callback(onError, function(msgOrFalse) { |
||||
return function() { |
||||
if (msgOrFalse === false) { |
||||
onSuccess(null)(); |
||||
} else { |
||||
onSuccess(msgOrFalse)(); |
||||
} |
||||
}; |
||||
})) |
||||
}; |
||||
}; |
||||
|
||||
exports._ack = function(channel, deliveryTag, allUpTo) { |
||||
return function() { |
||||
channel.ack({fields: {deliveryTag: deliveryTag}}, allUpTo); |
||||
}; |
||||
}; |
||||
|
||||
exports._nack = function(channel, deliveryTag, allUpTo, requeue) { |
||||
return function() { |
||||
channel.nack({fields: {deliveryTag: deliveryTag}}, allUpTo, requeue); |
||||
}; |
||||
}; |
||||
|
||||
exports._ackAll = function(channel) { |
||||
return function() { |
||||
channel.ackAll(); |
||||
}; |
||||
}; |
||||
|
||||
exports._nackAll = function(channel, requeue) { |
||||
return function() { |
||||
channel.nackAll(requeue); |
||||
}; |
||||
}; |
||||
|
||||
exports._prefetch = function(channel, count) { |
||||
return function() { |
||||
channel.prefetch(count); |
||||
}; |
||||
}; |
||||
|
||||
exports._recover = function(channel, onError, onSuccess) { |
||||
return function() { |
||||
channel.recover(callbackOnlyError(onError, onSuccess)); |
||||
}; |
||||
}; |
||||
|
||||
var eventHandler = function(event) { |
||||
return function(model, onEvent) { |
||||
return function() { |
||||
model.on(event, function(arg) { |
||||
if (arg === undefined) { |
||||
onEvent(); |
||||
} else { |
||||
onEvent(arg)(); |
||||
} |
||||
}); |
||||
}; |
||||
}; |
||||
}; |
||||
|
||||
exports._onConnectionClose = eventHandler('close'); |
||||
exports._onConnectionError = eventHandler('error'); |
||||
exports._onConnectionBlocked = eventHandler('blocked'); |
||||
exports._onConnectionUnblocked = eventHandler('unblocked'); |
||||
|
||||
exports._onChannelClose = eventHandler('close'); |
||||
exports._onChannelError = eventHandler('error'); |
||||
exports._onChannelReturn = eventHandler('return'); |
||||
exports._onChannelDrain = eventHandler('drain'); |
@ -0,0 +1,121 @@ |
||||
module Node.AMQP.FFI where |
||||
|
||||
import Node.AMQP.Types |
||||
import Node.AMQP.Types.Internal |
||||
import Prelude |
||||
|
||||
import Control.Monad.Aff (Aff) |
||||
import Control.Monad.Eff (Eff, kind Effect) |
||||
import Control.Monad.Eff.Exception (Error) |
||||
import Data.Foreign (Foreign) |
||||
import Data.Function.Uncurried (Fn2, Fn3, Fn4, Fn5, Fn6, Fn7, Fn1) |
||||
import Data.Nullable (Nullable) |
||||
import Node.Buffer (Buffer) |
||||
|
||||
type EffAMQP e a = Eff (amqp :: AMQP | e) a |
||||
type AffAMQP e a = Aff (amqp :: AMQP | e) a |
||||
|
||||
type ECb e a = Error -> EffAMQP e a |
||||
type SCb r e a = r -> EffAMQP e a |
||||
|
||||
foreign import _connect :: forall e a. |
||||
Fn3 String (ECb e a) (SCb Connection e a) (EffAMQP e a) |
||||
|
||||
foreign import _close :: forall e a. |
||||
Fn3 Connection (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _createChannel :: forall e a. |
||||
Fn3 Connection (ECb e a) (SCb Channel e a) (EffAMQP e a) |
||||
|
||||
foreign import _closeChannel :: forall e a. |
||||
Fn3 Channel (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _assertQueue :: forall e a. |
||||
Fn5 Channel QueueName Foreign (ECb e a) (SCb AssertQueueOK e a) (EffAMQP e a) |
||||
|
||||
foreign import _checkQueue :: forall e a. |
||||
Fn4 Channel QueueName (ECb e a) (SCb AssertQueueOK e a) (EffAMQP e a) |
||||
|
||||
foreign import _deleteQueue :: forall e a. |
||||
Fn5 Channel QueueName Foreign (ECb e a) (SCb DeleteQueueOK e a) (EffAMQP e a) |
||||
|
||||
foreign import _purgeQueue :: forall e a. |
||||
Fn4 Channel QueueName (ECb e a) (SCb PurgeQueueOK e a) (EffAMQP e a) |
||||
|
||||
foreign import _bindQueue :: forall e a. |
||||
Fn7 Channel QueueName ExchangeName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _unbindQueue :: forall e a. |
||||
Fn7 Channel QueueName ExchangeName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _assertExchange :: forall e a. |
||||
Fn6 Channel ExchangeName String Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _checkExchange :: forall e a. |
||||
Fn4 Channel ExchangeName (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _deleteExchange :: forall e a. |
||||
Fn5 Channel ExchangeName Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _bindExchange :: forall e a. |
||||
Fn7 Channel QueueName QueueName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _unbindExchange :: forall e a. |
||||
Fn7 Channel QueueName QueueName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _publish :: forall e a. |
||||
Fn6 Channel ExchangeName RoutingKey Buffer Foreign (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _sendToQueue :: forall e a. |
||||
Fn5 Channel QueueName Buffer Foreign (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _consume :: forall e a. |
||||
Fn6 Channel QueueName (SCb (Nullable Message') e Unit) Foreign (ECb e a) (SCb ConsumeOK e a) (EffAMQP e a) |
||||
|
||||
foreign import _cancel :: forall e a. |
||||
Fn4 Channel String (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _get :: forall e a. |
||||
Fn5 Channel QueueName Foreign (ECb e a) (SCb (Nullable Message') e a) (EffAMQP e a) |
||||
|
||||
foreign import _ack :: forall e a. |
||||
Fn3 Channel String Boolean (EffAMQP e a) |
||||
|
||||
foreign import _nack :: forall e a. |
||||
Fn4 Channel String Boolean Boolean (EffAMQP e a) |
||||
|
||||
foreign import _ackAll :: forall e a. |
||||
Fn1 Channel (EffAMQP e a) |
||||
|
||||
foreign import _nackAll :: forall e a. |
||||
Fn2 Channel Boolean (EffAMQP e a) |
||||
|
||||
foreign import _prefetch :: forall e a. |
||||
Fn2 Channel Int (EffAMQP e a) |
||||
|
||||
foreign import _recover :: forall e a. |
||||
Fn3 Channel (ECb e a) (SCb Unit e a) (EffAMQP e a) |
||||
|
||||
foreign import _onConnectionClose :: forall e a. |
||||
Fn2 Connection (SCb (Nullable Error) e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onConnectionError :: forall e a. |
||||
Fn2 Connection (SCb Error e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onConnectionBlocked :: forall e a. |
||||
Fn2 Connection (SCb String e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onConnectionUnblocked :: forall e a. |
||||
Fn2 Connection (EffAMQP e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onChannelClose :: forall e a. |
||||
Fn2 Channel (EffAMQP e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onChannelError :: forall e a. |
||||
Fn2 Channel (SCb Error e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onChannelReturn :: forall e a. |
||||
Fn2 Channel (SCb Message' e Unit) (EffAMQP e a) |
||||
|
||||
foreign import _onChannelDrain :: forall e a. |
||||
Fn2 Channel (EffAMQP e Unit) (EffAMQP e a) |
@ -0,0 +1,111 @@ |
||||
module Node.AMQP.Main where |
||||
|
||||
import Node.AMQP |
||||
import Prelude |
||||
|
||||
import Control.Monad.Aff (delay, runAff) |
||||
import Control.Monad.Eff (Eff, kind Effect) |
||||
import Control.Monad.Eff.Class (liftEff) |
||||
import Control.Monad.Eff.Console (CONSOLE, log, logShow) |
||||
import Control.Monad.Eff.Exception (EXCEPTION) |
||||
import Data.Maybe (Maybe(..)) |
||||
import Data.StrMap as StrMap |
||||
import Data.Time.Duration (Milliseconds(..)) |
||||
import Node.Buffer (BUFFER, fromString, toString) |
||||
import Node.Encoding (Encoding(..)) |
||||
import Node.Process (PROCESS) |
||||
|
||||
main :: forall eff. Eff ( console :: CONSOLE |
||||
, amqp :: AMQP |
||||
, exception :: EXCEPTION |
||||
, process :: PROCESS |
||||
, buffer :: BUFFER |
||||
| eff ) Unit |
||||
main = do |
||||
log "Starting" |
||||
content <- fromString "test123" UTF8 |
||||
void $ runAff (\e -> log "ERRORAFF" *> logShow e) pure do |
||||
conn <- connect "amqp://localhost" defaultConnectOptions |
||||
liftEff $ log "Connected" |
||||
let q = "queue111" |
||||
x = "exc2222" |
||||
|
||||
liftEff $ do |
||||
onConnectionClose conn $ case _ of |
||||
Just err -> log $ "E: Conn closed: " <> show err |
||||
Nothing -> log "E: Conn closed" |
||||
onConnectionError conn \err -> log $ "E: Conn errored: " <> show err |
||||
onConnectionBlocked conn \reason -> log $ "E: Conn blocked: " <> reason |
||||
onConnectionUnblocked conn (log "E: Conn unblocked") |
||||
|
||||
withChannel conn \channel -> do |
||||
liftEff $ log "Channel created" |
||||
prefetch channel 10 |
||||
|
||||
liftEff $ do |
||||
onChannelClose channel (log "E: Channel close") |
||||
onChannelError channel \err -> log $ "E: Channel errored: " <> show err |
||||
onChannelReturn channel \msg -> |
||||
toString UTF8 msg.content >>= \m -> log $ "E: Channel message returned: " <> m |
||||
onChannelDrain channel (log "E: Channel drained") |
||||
|
||||
ok <- assertQueue channel q defaultQueueOptions |
||||
liftEff $ log $ "Queue created: " <> ok.queue |
||||
|
||||
consumeChannel <- createChannel conn |
||||
liftEff $ onChannelClose consumeChannel (log "E: Consume Channel close") |
||||
|
||||
recover consumeChannel |
||||
|
||||
ok <- consume consumeChannel q defaultConsumeOptions $ case _ of |
||||
Nothing -> do |
||||
log "Consumer closed" |
||||
void $ runAff logShow logShow $ closeChannel consumeChannel |
||||
Just msg -> do |
||||
toString UTF8 msg.content >>= \m -> log $ "Received: " <> m |
||||
ack consumeChannel msg.fields.deliveryTag |
||||
logShow msg.properties.persistent |
||||
|
||||
let consumerTag = ok.consumerTag |
||||
liftEff $ log $ "Consumer tag: " <> ok.consumerTag |
||||
|
||||
ok <- purgeQueue channel q |
||||
liftEff $ log $ "Queue purged: " <> show ok.messageCount |
||||
|
||||
assertExchange channel x Fanout defaultExchangeOptions |
||||
liftEff $ log $ "Exchange created" |
||||
|
||||
bindQueue channel q x "*" StrMap.empty |
||||
liftEff $ log $ "Queue bound" |
||||
|
||||
publish channel x "" content $ defaultPublishOptions { persistent = Just false } |
||||
liftEff $ log $ "Message published to exchange" |
||||
|
||||
delay (Milliseconds 500.0) |
||||
|
||||
liftEff $ ackAll consumeChannel |
||||
cancel consumeChannel consumerTag |
||||
closeChannel consumeChannel |
||||
|
||||
sendToQueue channel q content $ defaultPublishOptions { persistent = Just true } |
||||
liftEff $ log $ "Message published to queue" |
||||
|
||||
delay (Milliseconds 500.0) |
||||
|
||||
get channel q defaultGetOptions >>= case _ of |
||||
Nothing -> liftEff $ log "No message received" |
||||
Just msg -> liftEff do |
||||
toString UTF8 msg.content >>= \m -> log $ "Get Received: " <> m |
||||
nackAllUpTo channel msg.fields.deliveryTag true |
||||
|
||||
unbindQueue channel q x "*" StrMap.empty |
||||
liftEff $ log $ "Queue unbound" |
||||
|
||||
deleteExchange channel x defaultDeleteExchangeOptions |
||||
liftEff $ log $ "Exchange deleted" |
||||
|
||||
ok <- deleteQueue channel q defaultDeleteQueueOptions |
||||
liftEff $ log $ "Queue deleted: " <> show ok.messageCount |
||||
|
||||
close conn |
||||
liftEff $ log $ "Connection closed" |
@ -0,0 +1,301 @@ |
||||
module Node.AMQP.Types where |
||||
|
||||
import Prelude |
||||
|
||||
import Control.Monad.Eff (kind Effect) |
||||
import Control.Plus (empty) |
||||
import Data.DateTime.Instant (Instant) |
||||
import Data.Foreign (Foreign) |
||||
import Data.Generic.Rep (class Generic) |
||||
import Data.Generic.Rep.Eq (genericEq) |
||||
import Data.Generic.Rep.Show (genericShow) |
||||
import Data.Maybe (Maybe(..)) |
||||
import Data.StrMap as StrMap |
||||
import Data.String (toLower) |
||||
import Data.Time.Duration (Milliseconds, Seconds(..)) |
||||
import Node.Buffer (Buffer) |
||||
|
||||
-- | The effect for computations which talk to an AMQP server. |
||||
foreign import data AMQP :: Effect |
||||
|
||||
-- | An AMQP connection. |
||||
foreign import data Connection :: Type |
||||
|
||||
-- | An AMQP Channel. |
||||
foreign import data Channel :: Type |
||||
|
||||
-- | An AMQP queue name. |
||||
type QueueName = String |
||||
|
||||
-- | An AMQP exchange name. |
||||
type ExchangeName = String |
||||
|
||||
-- | An AMQP routing key. |
||||
type RoutingKey = String |
||||
|
||||
-- | Options used to connect to the AMQP server. See |
||||
-- | <http://www.squaremobius.net/amqp.node/channel_api.html#connect> for details. |
||||
type ConnectOptions = { frameMax :: Int |
||||
, channelMax :: Int |
||||
, heartbeat :: Seconds |
||||
} |
||||
|
||||
-- | Default connection options. |
||||
defaultConnectOptions :: ConnectOptions |
||||
defaultConnectOptions = { frameMax : 4096 |
||||
, channelMax: 0 |
||||
, heartbeat : Seconds 5.0 |
||||
} |
||||
|
||||
-- | Reply from the server for asserting a queue. Contains the |
||||
-- | name of queue asserted, the count of the messages in the queue, and the count of the consumers of |
||||
-- | the queue. |
||||
type AssertQueueOK = { queue :: String, messageCount :: Int, consumerCount :: Int } |
||||
|
||||
-- | Reply from the server for deleting a queue. Contains the count of the mssages in the queue. |
||||
type DeleteQueueOK = { messageCount :: Int } |
||||
|
||||
-- | Reply from the server for purging a queue. Contains the count of the mssages purged from the queue. |
||||
type PurgeQueueOK = { messageCount :: Int } |
||||
|
||||
-- | Types of AMQP exchanges. See <https://www.rabbitmq.com/tutorials/amqp-concepts.html> for details. |
||||
data ExchangeType = Direct |
||||
| Topic |
||||
| Headers |
||||
| Fanout |
||||
|
||||
derive instance genericExchangeType :: Generic ExchangeType _ |
||||
|
||||
instance showExchangeType :: Show ExchangeType where |
||||
show = genericShow >>> toLower |
||||
|
||||
instance eqExchangeType :: Eq ExchangeType where |
||||
eq = genericEq |
||||
|
||||
-- | Options used to create a queue: |
||||
-- | - `exclusive`: if true, the queue is used by only one connection and is deleted when that connection closes. |
||||
-- | - `durable`: if true, the queue survives broker restarts. |
||||
-- | - `autoDelete`: if true, the queue is deleted when there is no consumers for it. |
||||
-- | - `messageTTL`: if specified, expires messages arriving in the queue after n milliseconds. |
||||
-- | - `expires`: if specified, the queue is destroyed after n milliseconds of disuse, where "use" |
||||
-- | means having consumers, being asserted or checked, or being polled. |
||||
-- | - `deadLetterExchange`: if specified, an exchange to which messages discarded from the queue are resent to. |
||||
-- | - `deadLetterRoutingKey`: if specified, set as the routing key for the discarded messages. |
||||
-- | - `maxLength`: if specified, the maximum number of messages the queue holds. |
||||
-- | - `maxPriority`: if specified, makes the queue a [priority queue](http://www.rabbitmq.com/priority.html). |
||||
-- | - `arguments`: additional arguments. |
||||
-- | |
||||
-- | See <http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue> for details. |
||||
type QueueOptions = { exclusive :: Maybe Boolean |
||||
, durable :: Maybe Boolean |
||||
, autoDelete :: Maybe Boolean |
||||
, arguments :: StrMap.StrMap Foreign |
||||
, messageTTL :: Maybe Milliseconds |
||||
, expires :: Maybe Milliseconds |
||||
, deadLetterExchange :: Maybe String |
||||
, deadLetterRoutingKey :: Maybe RoutingKey |
||||
, maxLength :: Maybe Int |
||||
, maxPriority :: Maybe Int |
||||
} |
||||
|
||||
-- | Default options to create a queue. Every field is set to `Nothing` or `empty`. |
||||
defaultQueueOptions :: QueueOptions |
||||
defaultQueueOptions = { exclusive : Nothing |
||||
, durable : Nothing |
||||
, autoDelete : Nothing |
||||
, arguments : StrMap.empty |
||||
, messageTTL : Nothing |
||||
, expires : Nothing |
||||
, deadLetterExchange : Nothing |
||||
, deadLetterRoutingKey: Nothing |
||||
, maxLength : Nothing |
||||
, maxPriority : Nothing |
||||
} |
||||
|
||||
-- | Options used to delete a queue: |
||||
-- | - `ifUnused`: if true and the queue has consumers, it is not deleted and the channel is closed. |
||||
-- | - `ifEmpty`: if true and the queue contains messages, the queue is not deleted and the channel is closed. |
||||
-- | |
||||
-- | See <http://www.squaremobius.net/amqp.node/channel_api.html#channel_deleteQueue> for details. |
||||
type DeleteQueueOptions = { ifUnused :: Maybe Boolean, ifEmpty :: Maybe Boolean } |
||||
|
||||
-- | Default options to delete a queue. Every field is set to `Nothing`. |
||||
defaultDeleteQueueOptions :: DeleteQueueOptions |
||||
defaultDeleteQueueOptions = { ifUnused: empty, ifEmpty: empty } |
||||
|
||||
-- | Options to create an exchange: |
||||
-- | - `durable`: if true, the exchange survives broker restarts. |
||||
-- | - `internal`: if true, messages cannot be published directly to the exchange. |
||||
-- | - `autoDelete`: if true, the exchange is destroyed once it has no bindings. |
||||
-- | - `alternateExchange`: if specified, an exchange to send messages to if this exchange can't route them to any queues. |
||||
-- | - `arguments`: additional arguments. |
||||
-- | |
||||
-- | See <http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange> for details. |
||||
type ExchangeOptions = { durable :: Maybe Boolean |
||||
, internal :: Maybe Boolean |
||||
, autoDelete :: Maybe Boolean |
||||
, alternateExchange :: Maybe String |
||||
, arguments :: StrMap.StrMap Foreign |
||||
} |
||||
|
||||
-- | Default options to create an exchange. Every field is set to `Nothing` or `empty`. |
||||
defaultExchangeOptions :: ExchangeOptions |
||||
defaultExchangeOptions = { durable : Nothing |
||||
, internal : Nothing |
||||
, autoDelete : Nothing |
||||
, alternateExchange: Nothing |
||||
, arguments : StrMap.empty |
||||
} |
||||
|
||||
-- | Options to delete an exchange: |
||||
-- | - `ifUnused`: if true and the exchange has bindings, it is not deleted and the channel is closed. |
||||
-- | |
||||
-- | See <http://www.squaremobius.net/amqp.node/channel_api.html#channel_deleteExchange> for details. |
||||
type DeleteExchangeOptions = { ifUnused :: Maybe Boolean } |
||||
|
||||
-- | Default options to delete an exchange. Every field is set to `Nothing`. |
||||
defaultDeleteExchangeOptions :: DeleteExchangeOptions |
||||
defaultDeleteExchangeOptions = { ifUnused: Nothing } |
||||
|
||||
-- | Options to publish a message: |
||||
-- | - `expiration`: if specified, the message is discarded from a queue once it has been there |
||||
-- | longer than the given number of milliseconds. |
||||
-- | - `userId`: if specified, RabbitMQ compares it to the username supplied when opening the connection, |
||||
-- | and rejects the messages for which it does not match. |
||||
-- | - `cc`: an array of routing keys; messages are routed to these routing keys in addition to the |
||||
-- | given routing key. |
||||
-- | - `bcc`: like `cc`, except that the value is not sent in the message headers to consumers. |
||||
-- | - `priority`: if specified, a priority for the message. |
||||
-- | - `persistent`: if true, the message survives broker restarts provided it is in a queue that |
||||
-- | also survives restarts. |
||||
-- | - `mandatory`: if true, the message is returned if it is not routed to a queue. |
||||
-- | - `contentType`: a MIME type for the message content. |
||||
-- | - `contentEncoding`: a MIME encoding for the message content. |
||||
-- | - `headers`: application specific headers to be carried along with the message content. |
||||
-- | - `correlationId`: usually used to match replies to requests, or similar. |
||||
-- | - `replyTo`: often used to name a queue to which the receiving application must send replies, in an RPC scenario. |
||||
-- | - `messageId`: arbitrary application-specific identifier for the message. |
||||
-- | - `timestamp`: a timestamp for the message. |
||||
-- | - `type`: an arbitrary application-specific type for the message. |
||||
-- | - `appId`: an arbitrary identifier for the originating application. |
||||
-- | |
||||
-- | See <http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish> for details. |
||||
type PublishOptions = { expiration :: Maybe Milliseconds |
||||
, userId :: Maybe String |
||||
, cc :: Array RoutingKey |
||||
, bcc :: Array RoutingKey |
||||
, priority :: Maybe Int |
||||
, persistent :: Maybe Boolean |
||||
, mandatory :: Maybe Boolean |
||||
, contentType :: Maybe String |
||||
, contentEncoding :: Maybe String |
||||
, headers :: StrMap.StrMap Foreign |
||||
, correlationId :: Maybe String |
||||
, replyTo :: Maybe QueueName |
||||
, messageId :: Maybe String |
||||
, timestamp :: Maybe Instant |
||||
, type :: Maybe String |
||||
, appId :: Maybe String |
||||
} |
||||
|
||||
-- | Default options to publish a message. Every field is set to `Nothing`. |
||||
defaultPublishOptions :: PublishOptions |
||||
defaultPublishOptions = { expiration : Nothing |
||||
, userId : Nothing |
||||
, cc : [] |
||||
, bcc : [] |
||||
, priority : Nothing |
||||
, persistent : Nothing |
||||
, mandatory : Nothing |
||||
, contentType : Nothing |
||||
, contentEncoding: Nothing |
||||
, headers : StrMap.empty |
||||
, correlationId : Nothing |
||||
, replyTo : Nothing |
||||
, messageId : Nothing |
||||
, timestamp : Nothing |
||||
, type : Nothing |
||||
, appId : Nothing |
||||
} |
||||
|
||||
-- | Fields of a message received by a consumer: |
||||
-- | - `deliveryTag`: a serial number for the message. |
||||
-- | - `consumerTag`: an identifier for the consumer for which the message is destined. |
||||
-- | - `exchange`: the exchange to which the message was published. |
||||
-- | - `routingKey`: the routing key with which the message was published. |
||||
-- | - `redelivered`: if true, indicates that this message has been delivered before and has been |
||||
-- | handed back to the server. |
||||
type MessageFields = { deliveryTag :: String |
||||
, consumerTag :: String |
||||
, exchange :: ExchangeName |
||||
, routingKey :: RoutingKey |
||||
, redelivered :: Boolean |
||||
} |
||||
|
||||
-- | Properties of a message received by a consumer. A subset of the `PublishOptions` fields. |
||||
type MessageProperties = { expiration :: Maybe Milliseconds |
||||
, userId :: Maybe String |
||||
, priority :: Maybe Int |
||||
, persistent :: Maybe Boolean |
||||
, contentType :: Maybe String |
||||
, contentEncoding :: Maybe String |
||||
, headers :: StrMap.StrMap Foreign |
||||
, correlationId :: Maybe String |
||||
, replyTo :: Maybe QueueName |
||||
, messageId :: Maybe String |
||||
, timestamp :: Maybe Instant |
||||
, type :: Maybe String |
||||
, appId :: Maybe String |
||||
} |
||||
|
||||
-- | A message received by a consumer: |
||||
-- | - `content`: the content of the message a buffer. |
||||
-- | - `fields`: the message fields. |
||||
-- | - `properties`: the message properties. |
||||
type Message = { content :: Buffer |
||||
, fields :: MessageFields |
||||
, properties :: MessageProperties |
||||
} |
||||
|
||||
-- | Options used to consume message from a queue: |
||||
-- | - `consumerTag`: an identifier for this consumer, must not be already in use on the channel. |
||||
-- | - `noLocal`: if true, the broker does not deliver messages to the consumer if they were also |
||||
-- | published on this connection. RabbitMQ doesn't implement it, and will ignore it. |
||||
-- | - `noAck`: if true, the broker does expect an acknowledgement of the messages delivered to |
||||
-- | this consumer. It dequeues messages as soon as they've been sent down the wire. |
||||
-- | - `exclusive`: if true, the broker does not let anyone else consume from this queue. |
||||
-- | - `priority`: gives a priority to the consumer. Higher priority consumers get messages in |
||||
-- | preference to lower priority consumers. |
||||
-- | - `arguments`: additional arguments. |
||||
-- | |
||||
-- | See <http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume> for details. |
||||
type ConsumeOptions = { consumerTag :: Maybe String |
||||
, noLocal :: Maybe Boolean |
||||
, noAck :: Maybe Boolean |
||||
, exclusive :: Maybe Boolean |
||||
, priority :: Maybe Int |
||||
, arguments :: StrMap.StrMap Foreign |
||||
} |
||||
|
||||
-- | Default options to consume messages from a queue. Every field is set to `Nothing` or `empty`. |
||||
defaultConsumeOptions :: ConsumeOptions |
||||
defaultConsumeOptions = { consumerTag: Nothing |
||||
, noLocal : Nothing |
||||
, noAck : Nothing |
||||
, exclusive : Nothing |
||||
, priority : Nothing |
||||
, arguments : StrMap.empty |
||||
} |
||||
|
||||
-- | Reply from the server for setting up a consumer. Contains the consumer tag which is the unique |
||||
-- | identifier for this consumer. |
||||
type ConsumeOK = { consumerTag :: String } |
||||
|
||||
-- | Options used to get a message from a queue: |
||||
-- | - `noAck`: if true, the broker does expect an acknowledgement of the messages delivered to |
||||
-- | this consumer. It dequeues messages as soon as they've been sent down the wire. |
||||
type GetOptions = { noAck :: Maybe Boolean } |
||||
|
||||
-- | Default options to get a message from a queue. Every field is set to `Nothing`. |
||||
defaultGetOptions :: GetOptions |
||||
defaultGetOptions = { noAck: Nothing } |
@ -0,0 +1,243 @@ |
||||
module Node.AMQP.Types.Internal where |
||||
|
||||
import Node.AMQP.Types |
||||
import Prelude |
||||
|
||||
import Control.Monad.Eff (kind Effect) |
||||
import Data.DateTime.Instant (instant, unInstant) |
||||
import Data.Foreign (Foreign) |
||||
import Data.Foreign.Class (class Encode, encode) |
||||
import Data.Foreign.Generic (defaultOptions, genericEncode) |
||||
import Data.Foreign.NullOrUndefined (NullOrUndefined(..)) |
||||
import Data.Generic.Rep (class Generic) |
||||
import Data.Int (floor, hexadecimal, toStringAs) |
||||
import Data.Newtype (unwrap) |
||||
import Data.Nullable (Nullable, toMaybe) |
||||
import Data.StrMap as StrMap |
||||
import Data.Time.Duration (Milliseconds(..), Seconds(..)) |
||||
import Node.Buffer (Buffer) |
||||
|
||||
connectUrl :: String -> ConnectOptions -> String |
||||
connectUrl url { frameMax, channelMax, heartbeat : (Seconds heartbeat) } = |
||||
url <> "?frame_max=0x" <> toStringAs hexadecimal frameMax |
||||
<> "&channel_max=" <> show channelMax |
||||
<> "&heartbeat=" <> show (floor heartbeat) |
||||
|
||||
encodeQueueOptions :: QueueOptions -> Foreign |
||||
encodeQueueOptions = fromQueueOptions >>> encode |
||||
|
||||
encodeDeleteQueueOptions :: DeleteQueueOptions -> Foreign |
||||
encodeDeleteQueueOptions = fromDeleteQueueOptions >>> encode |
||||
|
||||
encodeExchangeOptions :: ExchangeOptions -> Foreign |
||||
encodeExchangeOptions = fromExchangeOptions >>> encode |
||||
|
||||
encodeDeleteExchangeOptions :: DeleteExchangeOptions -> Foreign |
||||
encodeDeleteExchangeOptions = fromDeleteExchangeOptions >>> encode |
||||
|
||||
encodePublishOptions :: PublishOptions -> Foreign |
||||
encodePublishOptions = fromPublishOptions >>> encode |
||||
|
||||
encodeConsumeOptions :: ConsumeOptions -> Foreign |
||||
encodeConsumeOptions = fromConsumeOptions >>> encode |
||||
|
||||
encodeGetOptions :: GetOptions -> Foreign |
||||
encodeGetOptions = fromGetOptions >>> encode |
||||
|
||||
newtype QueueOptions' = QueueOptions' |
||||
{ exclusive :: NullOrUndefined Boolean |
||||
, durable :: NullOrUndefined Boolean |
||||
, autoDelete :: NullOrUndefined Boolean |
||||
, arguments :: StrMap.StrMap Foreign |
||||
, messageTtl :: NullOrUndefined Number |
||||
, expires :: NullOrUndefined Number |
||||
, deadLetterExchange :: NullOrUndefined String |
||||
, deadLetterRoutingKey :: NullOrUndefined String |
||||
, maxLength :: NullOrUndefined Int |
||||
, maxPriority :: NullOrUndefined Int |
||||
} |
||||
|
||||
derive instance genericQueueOptions' :: Generic QueueOptions' _ |
||||
|
||||
instance encodeQueueOptions' :: Encode QueueOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromQueueOptions :: QueueOptions -> QueueOptions' |
||||
fromQueueOptions opts = QueueOptions' |
||||
{ exclusive : NullOrUndefined opts.exclusive |
||||
, durable : NullOrUndefined opts.durable |
||||
, autoDelete : NullOrUndefined opts.autoDelete |
||||
, arguments : opts.arguments |
||||
, messageTtl : NullOrUndefined (unwrap <$> opts.messageTTL) |
||||
, expires : NullOrUndefined (unwrap <$> opts.expires) |
||||
, deadLetterExchange : NullOrUndefined opts.deadLetterExchange |
||||
, deadLetterRoutingKey: NullOrUndefined opts.deadLetterRoutingKey |
||||
, maxLength : NullOrUndefined opts.maxLength |
||||
, maxPriority : NullOrUndefined opts.maxPriority |
||||
} |
||||
|
||||
newtype DeleteQueueOptions' = DeleteQueueOptions' |
||||
{ ifUnused :: NullOrUndefined Boolean |
||||
, ifEmpty :: NullOrUndefined Boolean } |
||||
|
||||
derive instance genericDeleteQueueOptions' :: Generic DeleteQueueOptions' _ |
||||
|
||||
instance encodeDeleteQueueOptions' :: Encode DeleteQueueOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromDeleteQueueOptions :: DeleteQueueOptions -> DeleteQueueOptions' |
||||
fromDeleteQueueOptions opts = DeleteQueueOptions' |
||||
{ ifUnused: NullOrUndefined opts.ifUnused |
||||
, ifEmpty : NullOrUndefined opts.ifEmpty |
||||
} |
||||
|
||||
newtype ExchangeOptions' = ExchangeOptions' |
||||
{ durable :: NullOrUndefined Boolean |
||||
, internal :: NullOrUndefined Boolean |
||||
, autoDelete :: NullOrUndefined Boolean |
||||
, alternateExchange :: NullOrUndefined String |
||||
, arguments :: StrMap.StrMap Foreign |
||||
} |
||||
|
||||
derive instance genericExchangeOptions' :: Generic ExchangeOptions' _ |
||||
|
||||
instance encodeExchangeOptions' :: Encode ExchangeOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromExchangeOptions :: ExchangeOptions -> ExchangeOptions' |
||||
fromExchangeOptions opts = ExchangeOptions' |
||||
{ durable : NullOrUndefined opts.durable |
||||
, internal : NullOrUndefined opts.internal |
||||
, autoDelete : NullOrUndefined opts.autoDelete |
||||
, alternateExchange: NullOrUndefined opts.alternateExchange |
||||
, arguments : opts.arguments |
||||
} |
||||
|
||||
newtype DeleteExchangeOptions' = DeleteExchangeOptions' { ifUnused :: NullOrUndefined Boolean } |
||||
|
||||
derive instance genericDeleteExchangeOptions' :: Generic DeleteExchangeOptions' _ |
||||
|
||||
instance encodeDeleteExchangeOptions' :: Encode DeleteExchangeOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromDeleteExchangeOptions :: DeleteExchangeOptions -> DeleteExchangeOptions' |
||||
fromDeleteExchangeOptions opts = DeleteExchangeOptions' { ifUnused: NullOrUndefined opts.ifUnused } |
||||
|
||||
newtype PublishOptions' = PublishOptions' |
||||
{ expiration :: NullOrUndefined String |
||||
, userId :: NullOrUndefined String |
||||
, "CC" :: Array RoutingKey |
||||
, "BCC" :: Array RoutingKey |
||||
, priority :: NullOrUndefined Int |
||||
, persistent :: NullOrUndefined Boolean |
||||
, mandatory :: NullOrUndefined Boolean |
||||
, contentType :: NullOrUndefined String |
||||
, contentEncoding :: NullOrUndefined String |
||||
, headers :: StrMap.StrMap Foreign |
||||
, correlationId :: NullOrUndefined String |
||||
, replyTo :: NullOrUndefined QueueName |
||||
, messageId :: NullOrUndefined String |
||||
, timestamp :: NullOrUndefined Number |
||||
, type :: NullOrUndefined String |
||||
, appId :: NullOrUndefined String |
||||
} |
||||
|
||||
derive instance genericPublishOptions' :: Generic PublishOptions' _ |
||||
|
||||
instance encodePublishOptions' :: Encode PublishOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromPublishOptions :: PublishOptions -> PublishOptions' |
||||
fromPublishOptions opts = PublishOptions' |
||||
{ expiration : NullOrUndefined (show <$> opts.expiration) |
||||
, userId : NullOrUndefined opts.userId |
||||
, "CC" : opts.cc |
||||
, "BCC" : opts.bcc |
||||
, priority : NullOrUndefined opts.priority |
||||
, persistent : NullOrUndefined opts.persistent |
||||
, mandatory : NullOrUndefined opts.mandatory |
||||
, contentType : NullOrUndefined opts.contentType |
||||
, contentEncoding: NullOrUndefined opts.contentEncoding |
||||
, headers : opts.headers |
||||
, correlationId : NullOrUndefined opts.correlationId |
||||
, replyTo : NullOrUndefined opts.replyTo |
||||
, messageId : NullOrUndefined opts.messageId |
||||
, timestamp : NullOrUndefined ((unwrap <<< unInstant) <$> opts.timestamp) |
||||
, type : NullOrUndefined opts.type |
||||
, appId : NullOrUndefined opts.appId |
||||
} |
||||
|
||||
newtype MessageProperties' = MessageProperties' |
||||
{ expiration :: Nullable Number |
||||
, userId :: Nullable String |
||||
, priority :: Nullable Int |
||||
, deliveryMode :: Nullable Int |
||||
, contentType :: Nullable String |
||||
, contentEncoding :: Nullable String |
||||
, headers :: StrMap.StrMap Foreign |
||||
, correlationId :: Nullable String |
||||
, replyTo :: Nullable QueueName |
||||
, messageId :: Nullable String |
||||
, timestamp :: Nullable Number |
||||
, type :: Nullable String |
||||
, appId :: Nullable String |
||||
} |
||||
|
||||
newtype Message' = Message' |
||||
{ content :: Buffer |
||||
, fields :: MessageFields |
||||
, properties :: MessageProperties' |
||||
} |
||||
|
||||
toMessage :: Message' -> Message |
||||
toMessage (Message' { content, fields, properties: (MessageProperties' props) }) = |
||||
{ content, fields, properties: msgProperties } |
||||
where |
||||
msgProperties = { expiration : Milliseconds <$> toMaybe props.expiration |
||||
, userId : toMaybe props.userId |
||||
, priority : toMaybe props.priority |
||||
, persistent : (_ == 2) <$> toMaybe props.deliveryMode |
||||
, contentType : toMaybe props.contentType |
||||
, contentEncoding : toMaybe props.contentEncoding |
||||
, headers : props.headers |
||||
, correlationId : toMaybe props.correlationId |
||||
, replyTo : toMaybe props.replyTo |
||||
, messageId : toMaybe props.messageId |
||||
, timestamp : join $ instant <$> (Milliseconds <$> toMaybe props.timestamp) |
||||
, type : toMaybe props.type |
||||
, appId : toMaybe props.appId |
||||
} |
||||
|
||||
newtype ConsumeOptions' = ConsumeOptions' |
||||
{ consumerTag :: NullOrUndefined String |
||||
, noLocal :: NullOrUndefined Boolean |
||||
, noAck :: NullOrUndefined Boolean |
||||
, exclusive :: NullOrUndefined Boolean |
||||
, priority :: NullOrUndefined Int |
||||
, arguments :: StrMap.StrMap Foreign |
||||
} |
||||
|
||||
derive instance genericConsumeOptions' :: Generic ConsumeOptions' _ |
||||
|
||||
instance encodeConsumeOptions' :: Encode ConsumeOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromConsumeOptions :: ConsumeOptions -> ConsumeOptions' |
||||
fromConsumeOptions opts = ConsumeOptions' |
||||
{ consumerTag: NullOrUndefined opts.consumerTag |
||||
, noLocal : NullOrUndefined opts.noLocal |
||||
, noAck : NullOrUndefined opts.noAck |
||||
, exclusive : NullOrUndefined opts.exclusive |
||||
, priority : NullOrUndefined opts.priority |
||||
, arguments : opts.arguments |
||||
} |
||||
|
||||
newtype GetOptions' = GetOptions' { noAck :: NullOrUndefined Boolean } |
||||
|
||||
derive instance genericGetOptions' :: Generic GetOptions' _ |
||||
|
||||
instance encodeGetOptions' :: Encode GetOptions' where |
||||
encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } |
||||
|
||||
fromGetOptions :: GetOptions -> GetOptions' |
||||
fromGetOptions opts = GetOptions' { noAck: NullOrUndefined opts.noAck } |
@ -0,0 +1,9 @@ |
||||
module Test.Main where |
||||
|
||||
import Prelude |
||||
import Control.Monad.Eff (Eff) |
||||
import Control.Monad.Eff.Console (CONSOLE, log) |
||||
|
||||
main :: forall e. Eff (console :: CONSOLE | e) Unit |
||||
main = do |
||||
log "You should add some tests." |
Loading…
Reference in new issue