From f3b929ea6034589a2680649b6d0da4648c43337e Mon Sep 17 00:00:00 2001 From: Abhinav Sarkar Date: Mon, 23 Oct 2017 16:18:42 +0530 Subject: [PATCH] Initial commit --- .gitignore | 8 + bower.json | 20 ++ package-lock.json | 68 +++++++ package.json | 18 ++ src/Node/AMQP.purs | 236 +++++++++++++++++++++++ src/Node/AMQP/FFI.js | 221 ++++++++++++++++++++++ src/Node/AMQP/FFI.purs | 121 ++++++++++++ src/Node/AMQP/Main.purs | 111 +++++++++++ src/Node/AMQP/Types.purs | 301 ++++++++++++++++++++++++++++++ src/Node/AMQP/Types/Internal.purs | 243 ++++++++++++++++++++++++ test/Main.purs | 9 + 11 files changed, 1356 insertions(+) create mode 100644 .gitignore create mode 100644 bower.json create mode 100644 package-lock.json create mode 100644 package.json create mode 100644 src/Node/AMQP.purs create mode 100644 src/Node/AMQP/FFI.js create mode 100644 src/Node/AMQP/FFI.purs create mode 100644 src/Node/AMQP/Main.purs create mode 100644 src/Node/AMQP/Types.purs create mode 100644 src/Node/AMQP/Types/Internal.purs create mode 100644 test/Main.purs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9623fa5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,8 @@ +/bower_components/ +/node_modules/ +/.pulp-cache/ +/output/ +/generated-docs/ +/.psc* +/.purs* +/.psa* diff --git a/bower.json b/bower.json new file mode 100644 index 0000000..a2025a9 --- /dev/null +++ b/bower.json @@ -0,0 +1,20 @@ +{ + "name": "purescript-amqp", + "ignore": [ + "**/.*", + "node_modules", + "bower_components", + "output" + ], + "dependencies": { + "purescript-prelude": "^3.1.0", + "purescript-console": "^3.0.0", + "purescript-aff": "^3.1.0", + "purescript-node-buffer": "^3.0.1", + "purescript-node-process": "^5.0.0", + "purescript-foreign-generic": "^4.2.0" + }, + "devDependencies": { + "purescript-psci-support": "^3.0.0" + } +} diff --git a/package-lock.json b/package-lock.json new file mode 100644 index 0000000..b9e4467 --- /dev/null +++ b/package-lock.json @@ -0,0 +1,68 @@ +{ + "name": "purescript-amqp", + "version": "1.0.0", + "lockfileVersion": 1, + "requires": true, + "dependencies": { + "amqplib": { + "version": "0.5.1", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.5.1.tgz", + "integrity": "sha1-fMz+ur5WwumE6noiQ/fO/m+/xs8=", + "requires": { + "bitsyntax": "0.0.4", + "bluebird": "3.5.1", + "buffer-more-ints": "0.0.2", + "readable-stream": "1.1.14" + } + }, + "bitsyntax": { + "version": "0.0.4", + "resolved": "https://registry.npmjs.org/bitsyntax/-/bitsyntax-0.0.4.tgz", + "integrity": "sha1-6xDMb4K4xJDj6FaY8H6D1G4MuoI=", + "requires": { + "buffer-more-ints": "0.0.2" + } + }, + "bluebird": { + "version": "3.5.1", + "resolved": "https://registry.npmjs.org/bluebird/-/bluebird-3.5.1.tgz", + "integrity": "sha512-MKiLiV+I1AA596t9w1sQJ8jkiSr5+ZKi0WKrYGUn6d1Fx+Ij4tIj+m2WMQSGczs5jZVxV339chE8iwk6F64wjA==" + }, + "buffer-more-ints": { + "version": "0.0.2", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-0.0.2.tgz", + "integrity": "sha1-JrOIXRD6E9t/wBquOquHAZngEkw=" + }, + "core-util-is": { + "version": "1.0.2", + "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.2.tgz", + "integrity": "sha1-tf1UIgqivFq1eqtxQMlAdUUDwac=" + }, + "inherits": { + "version": "2.0.3", + "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.3.tgz", + "integrity": "sha1-Yzwsg+PaQqUC9SRmAiSA9CCCYd4=" + }, + "isarray": { + "version": "0.0.1", + "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", + "integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=" + }, + "readable-stream": { + "version": "1.1.14", + "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.1.14.tgz", + "integrity": "sha1-fPTFTvZI44EwhMY23SB54WbAgdk=", + "requires": { + "core-util-is": "1.0.2", + "inherits": "2.0.3", + "isarray": "0.0.1", + "string_decoder": "0.10.31" + } + }, + "string_decoder": { + "version": "0.10.31", + "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", + "integrity": "sha1-YuIDvEF2bGwoyfyEMB2rHFMQ+pQ=" + } + } +} diff --git a/package.json b/package.json new file mode 100644 index 0000000..76875a7 --- /dev/null +++ b/package.json @@ -0,0 +1,18 @@ +{ + "name": "purescript-amqp", + "version": "1.0.0", + "description": "", + "main": "index.js", + "directories": { + "test": "test" + }, + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "amqplib": "^0.5.1" + } +} diff --git a/src/Node/AMQP.purs b/src/Node/AMQP.purs new file mode 100644 index 0000000..38220bc --- /dev/null +++ b/src/Node/AMQP.purs @@ -0,0 +1,236 @@ +module Node.AMQP + ( module MoreExports + , module Node.AMQP.Types + , module Node.AMQP + ) where + +import Node.AMQP.FFI +import Node.AMQP.Types +import Node.AMQP.Types.Internal +import Prelude + +import Control.Monad.Aff (makeAff) +import Control.Monad.Eff.Class (liftEff) +import Control.Monad.Eff.Exception (Error, message) +import Control.Monad.Error.Class (withResource) +import Data.Foreign (Foreign) +import Data.Foreign.Class (encode) +import Data.Function.Uncurried (runFn1, runFn2, runFn3, runFn4, runFn5, runFn6, runFn7) +import Data.Maybe (Maybe) +import Data.Nullable (toMaybe) +import Data.StrMap (StrMap) +import Data.String (Pattern(..), contains) +import Node.AMQP.FFI (EffAMQP, AffAMQP) as MoreExports +import Node.Buffer (Buffer) + +-- | Connects to an AMQP server given an AMQP URL and [connection options]. Returns the connection in +-- | `Aff` monad. See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#connect) for details. +connect :: forall eff. String -> ConnectOptions -> AffAMQP eff Connection +connect url = makeAff <<< runFn3 _connect <<< connectUrl url + +-- | Closes the given AMQP connection. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#model_close) for details. +close :: forall eff. Connection -> AffAMQP eff Unit +close conn = makeAff $ \onError onSuccess -> flip (runFn3 _close conn) onSuccess \err -> do + if contains (Pattern "Connection closed") (message err) + then onSuccess unit + else onError err + +-- | Creates an open channel and returns it. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#model_createChannel) for details. +createChannel :: forall eff. Connection -> AffAMQP eff Channel +createChannel = makeAff <<< runFn3 _createChannel + +-- | Closes the given channel. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_close) for details. +closeChannel :: forall eff. Channel -> AffAMQP eff Unit +closeChannel = makeAff <<< runFn3 _closeChannel + +-- | Asserts a queue into existence with the given name and options. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) for details. +assertQueue :: forall eff. Channel -> QueueName -> QueueOptions -> AffAMQP eff AssertQueueOK +assertQueue channel queue = + makeAff <<< runFn5 _assertQueue channel queue <<< encodeQueueOptions + +-- | Checks that a queue exists with the given queue name. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_checkQueue) for details. +checkQueue :: forall eff. Channel -> QueueName -> AffAMQP eff AssertQueueOK +checkQueue channel = makeAff <<< runFn4 _checkQueue channel + +-- | Deletes the queue by the given name. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_deleteQueue) for details. +deleteQueue :: forall eff. Channel -> QueueName -> DeleteQueueOptions -> AffAMQP eff DeleteQueueOK +deleteQueue channel queue = + makeAff <<< runFn5 _deleteQueue channel queue <<< encodeDeleteQueueOptions + +-- | Purges the messages from the queue by the given name. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_purgeQueue) for details. +purgeQueue :: forall eff. Channel -> QueueName -> AffAMQP eff PurgeQueueOK +purgeQueue channel = makeAff <<< runFn4 _purgeQueue channel + +-- | Asserts a routing path from an exchange to a queue: the given exchange will relay +-- | messages to the given queue, according to the type of the exchange and the given routing key. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_bindQueue) details. +bindQueue :: forall eff. Channel -> QueueName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit +bindQueue channel queue exchange routingKey = + makeAff <<< runFn7 _bindQueue channel queue exchange routingKey <<< encode + +-- | Removes the routing path between the given queue and the given exchange with the given routing key and arguments. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_unbindQueue) for details. +unbindQueue :: forall eff. Channel -> QueueName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit +unbindQueue channel queue exchange routingKey = + makeAff <<< runFn7 _unbindQueue channel queue exchange routingKey <<< encode + +-- | Asserts an exchange into existence with the given exchange name, type and options. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertExchange) for details. +assertExchange :: forall eff. Channel -> ExchangeName -> ExchangeType -> ExchangeOptions -> AffAMQP eff Unit +assertExchange channel exchange exchangeType = + makeAff <<< runFn6 _assertExchange channel exchange (show exchangeType) <<< encodeExchangeOptions + +-- | Checks that the exchange exists by the given name. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_checkExchange) for details. +checkExchange :: forall eff. Channel -> ExchangeName -> AffAMQP eff Unit +checkExchange channel = makeAff <<< runFn4 _checkExchange channel + +-- | Deletes the exchange by the given name. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_deleteExchange) for details. +deleteExchange :: forall eff. Channel -> ExchangeName -> DeleteExchangeOptions -> AffAMQP eff Unit +deleteExchange channel exchange = + makeAff <<< runFn5 _deleteExchange channel exchange <<< encodeDeleteExchangeOptions + +-- | Binds an exchange to another exchange. The exchange named by `destExchange` will receive messages +-- | from the exchange named by `sourceExchange`, according to the type of the source and the given +-- | routing key. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_bindExchange) for details. +bindExchange :: forall eff. Channel -> ExchangeName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit +bindExchange channel destExchange sourceExchange routingKey = + makeAff <<< runFn7 _bindExchange channel destExchange sourceExchange routingKey <<< encode + +-- | Removes a binding from an exchange to another exchange. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_unbindExchange) for details. +unbindExchange :: forall eff. Channel -> ExchangeName -> ExchangeName -> RoutingKey -> StrMap Foreign -> AffAMQP eff Unit +unbindExchange channel destExchange sourceExchange routingKey = + makeAff <<< runFn7 _unbindExchange channel destExchange sourceExchange routingKey <<< encode + +-- | Publish a single message to the given exchange with the given routing key, and the given publish +-- | options. The message content is given as a `Buffer`. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish) for details. +publish :: forall eff. Channel -> ExchangeName -> RoutingKey -> Buffer -> PublishOptions -> AffAMQP eff Unit +publish channel exchange routingKey buffer = + makeAff <<< const <<< runFn6 _publish channel exchange routingKey buffer <<< encodePublishOptions + +-- | Sends a single message with the content given as a `Buffer` to the given queue, bypassing routing. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_sendToQueue) for details. +sendToQueue :: forall eff. Channel -> QueueName -> Buffer -> PublishOptions -> AffAMQP eff Unit +sendToQueue channel queue buffer = + makeAff <<< const <<< runFn5 _sendToQueue channel queue buffer <<< encodePublishOptions + +-- | Sets up a consumer for the given queue and consume options, with a callback to be invoked with each message. +-- | The callback receives `Nothing` if the consumer is cancelled by the broker. Returns the consumer tag. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume) for details. +consume :: forall eff. Channel -> QueueName -> ConsumeOptions -> (Maybe Message -> EffAMQP eff Unit) -> AffAMQP eff ConsumeOK +consume channel queue options onMessage = + makeAff $ runFn6 _consume channel queue (toMaybe >>> map toMessage >>> onMessage) (encodeConsumeOptions options) + +-- | Cancels the consumer identified by the given consumer tag. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_cancel) for details. +cancel :: forall eff. Channel -> String -> AffAMQP eff Unit +cancel channel = makeAff <<< runFn4 _cancel channel + +-- | Gets a message from the given queue. If there are no messages in the queue, returns `Nothing`. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_get) for details. +get :: forall eff. Channel -> QueueName -> GetOptions -> AffAMQP eff (Maybe Message) +get channel queue opts = makeAff \onError onSuccess -> + runFn5 _get channel queue (encodeGetOptions opts) onError (onSuccess <<< map toMessage <<< toMaybe) + +-- | Acknowledges a message given its delivery tag. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_ack) for details. +ack :: forall eff. Channel -> String -> EffAMQP eff Unit +ack channel deliveryTag = runFn3 _ack channel deliveryTag false + +-- | Acknowledges all messages up to the message with the given delivery tag. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_ack) for details. +ackAllUpTo :: forall eff. Channel -> String -> EffAMQP eff Unit +ackAllUpTo channel deliveryTag = runFn3 _ack channel deliveryTag true + +-- | Acknowledges all outstanding messages on the channel. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_ackAll) for details. +ackAll :: forall eff. Channel -> EffAMQP eff Unit +ackAll = runFn1 _ackAll + +-- | Rejects a message given its delivery tag. If the boolean param is true, the server requeues the +-- | message, else it drops it. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_nack) for details. +nack :: forall eff. Channel -> String -> Boolean -> EffAMQP eff Unit +nack channel deliveryTag = runFn4 _nack channel deliveryTag false + +-- | Rejects all messages up to the message with the given delivery tag. If the boolean param is true, +-- | the server requeues the messages, else it drops them. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_nack) for details. +nackAllUpTo :: forall eff. Channel -> String -> Boolean -> EffAMQP eff Unit +nackAllUpTo channel deliveryTag = runFn4 _nack channel deliveryTag true + +-- | Rejects all outstanding messages on the channel. If the boolean param is true, +-- | the server requeues the messages, else it drops them. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_nackAll) for details. +nackAll :: forall eff. Channel -> Boolean -> EffAMQP eff Unit +nackAll = runFn2 _nackAll + +-- | Sets the prefetch count for this channel. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch) for details. +prefetch :: forall eff. Channel -> Int -> AffAMQP eff Unit +prefetch channel = liftEff <<< runFn2 _prefetch channel + +-- | Requeues unacknowledged messages on this channel. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_recover) for details. +recover :: forall eff. Channel -> AffAMQP eff Unit +recover = makeAff <<< runFn3 _recover + +-- | Registers an event handler to the connection which is triggered when the connection closes. +-- | If the connection closes because of an error, the handler is called with `Just error`, else +-- | with `Nothing`. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#model_events) for details. +onConnectionClose :: forall eff. Connection -> (Maybe Error -> EffAMQP eff Unit) -> EffAMQP eff Unit +onConnectionClose conn onClose = runFn2 _onConnectionClose conn (onClose <<< toMaybe) + +-- | Registers an event handler to the connection which is triggered when the connection errors out. +-- | The handler is called with the error. +onConnectionError :: forall eff. Connection -> (Error -> EffAMQP eff Unit) -> EffAMQP eff Unit +onConnectionError = runFn2 _onConnectionError + +-- | Registers an event handler to the connection which is triggered when the RabbitMQ server +-- | decides to block the connection. The handler is called with the reason for blocking. +onConnectionBlocked :: forall eff. Connection -> (String -> EffAMQP eff Unit) -> EffAMQP eff Unit +onConnectionBlocked = runFn2 _onConnectionBlocked + +-- | Registers an event handler to the connection which is triggered when the RabbitMQ server +-- | decides to unblock the connection. The handler is called with no arguments. +onConnectionUnblocked :: forall eff. Connection -> EffAMQP eff Unit -> EffAMQP eff Unit +onConnectionUnblocked = runFn2 _onConnectionUnblocked + +-- | Registers an event handler to the channel which is triggered when the channel closes. +-- | The handler is called with no arguments. +-- | See [amqplib docs](http://www.squaremobius.net/amqp.node/channel_api.html#channel_events) for details. +onChannelClose :: forall eff. Channel -> EffAMQP eff Unit -> EffAMQP eff Unit +onChannelClose = runFn2 _onChannelClose + +-- | Registers an event handler to the channel which is triggered when the channel errors out. +-- | The handler is called with the error. +onChannelError :: forall eff. Channel -> (Error -> EffAMQP eff Unit) -> EffAMQP eff Unit +onChannelError = runFn2 _onChannelError + +-- | Registers an event handler to the channel which is triggered when a message published with +-- | the mandatory flag cannot be routed and is returned to the sending channel. The handler is +-- | called with the returned message. +onChannelReturn :: forall eff. Channel -> (Message -> EffAMQP eff Unit) -> EffAMQP eff Unit +onChannelReturn channel onReturn = runFn2 _onChannelReturn channel (toMessage >>> onReturn) + +-- | Registers an event handler to the channel which is triggered when the channel's write buffer +-- | is emptied. The handler is called with no arguments. +onChannelDrain :: forall eff. Channel -> EffAMQP eff Unit -> EffAMQP eff Unit +onChannelDrain = runFn2 _onChannelDrain + +-- | A convenience function for creating a channel, doing some action with it, and then automatically closing +-- | it, even in case of errors. +withChannel :: forall eff a. Connection -> (Channel -> AffAMQP eff a) -> AffAMQP eff a +withChannel conn = withResource (createChannel conn) closeChannel diff --git a/src/Node/AMQP/FFI.js b/src/Node/AMQP/FFI.js new file mode 100644 index 0000000..fe3c94a --- /dev/null +++ b/src/Node/AMQP/FFI.js @@ -0,0 +1,221 @@ +var amqp = require('amqplib/callback_api'); + +var callback = function(onError, onSuccess) { + return function(err, o) { + if (err) { + onError(err)(); + return; + } + onSuccess(o)(); + }; +}; + +var callbackOnlyError = function(onError, onSuccess) { + return function(err) { + if (err) { + onError(err)(); + return; + } + onSuccess()(); + }; +}; + +exports._connect = function(url, onError,onSuccess) { + return function() { + amqp.connect(url, callback(onError, onSuccess)); + }; +}; + +exports._close = function(conn, onError, onSuccess) { + return function() { + conn.close(callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._createChannel = function(conn, onError, onSuccess) { + return function() { + conn.createChannel(callback(onError, onSuccess)); + }; +}; + +exports._closeChannel = function(channel, onError, onSuccess) { + return function() { + channel.close(callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._assertQueue = function(channel, queue, options, onError, onSuccess) { + return function() { + channel.assertQueue(queue, options, callback(onError, onSuccess)); + }; +}; + +exports._checkQueue = function(channel, queue, onError, onSuccess) { + return function() { + channel.checkQueue(queue, callback(onError, onSuccess)); + }; +}; + +exports._deleteQueue = function(channel, queue, options, onError, onSuccess) { + return function() { + channel.deleteQueue(queue, options, callback(onError, onSuccess)); + }; +}; + +exports._purgeQueue = function(channel, queue, onError, onSuccess) { + return function() { + channel.purgeQueue(queue, callback(onError, onSuccess)); + }; +}; + +exports._bindQueue = function(channel, queue, exchange, pattern, args, onError, onSuccess) { + return function() { + channel.bindQueue(queue, exchange, pattern, args, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._unbindQueue = function(channel, queue, exchange, pattern, args, onError, onSuccess) { + return function() { + channel.unbindQueue(queue, exchange, pattern, args, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._assertExchange = function(channel, exchange, type, options, onError, onSuccess) { + return function() { + channel.assertExchange(exchange, type, options, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._checkExchange = function(channel, exchange, onError, onSuccess) { + return function() { + channel.checkExchange(exchange, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._deleteExchange = function(channel, exchange, options, onError, onSuccess) { + return function() { + channel.deleteExchange(exchange, options, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._bindExchange = function(channel, destExchange, sourceExchange, pattern, args, onError, onSuccess) { + return function() { + channel.bindExchange(destExchange, sourceExchange, pattern, args, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._unbindExchange = function(channel, destExchange, sourceExchange, pattern, args, onError, onSuccess) { + return function() { + channel.unbindExchange(destExchange, sourceExchange, pattern, args, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._publish = function(channel, exchange, routingKey, content, options, onSuccess) { + return function() { + if (!channel.publish(exchange, routingKey, content, options)) { + channel.once('drain', function() { + onSuccess()(); + }); + } else { + onSuccess()(); + } + } +}; + +exports._sendToQueue = function(channel, queue, content, options, onSuccess) { + return function() { + if (!channel.sendToQueue(queue, content, options)) { + channel.once('drain', function() { + onSuccess()(); + }); + } else { + onSuccess()(); + } + } +}; + +exports._consume = function(channel, queue, onMessage, options, onError, onSuccess) { + return function() { + channel.consume(queue, function(msg) { onMessage(msg)(); }, options, callback(onError, onSuccess)); + }; +}; + +exports._cancel = function(channel, consumerTag, onError, onSuccess) { + return function() { + channel.cancel(consumerTag, callbackOnlyError(onError, onSuccess)); + }; +}; + +exports._get = function(channel, queue, options, onError, onSuccess) { + return function() { + channel.get(queue, options, callback(onError, function(msgOrFalse) { + return function() { + if (msgOrFalse === false) { + onSuccess(null)(); + } else { + onSuccess(msgOrFalse)(); + } + }; + })) + }; +}; + +exports._ack = function(channel, deliveryTag, allUpTo) { + return function() { + channel.ack({fields: {deliveryTag: deliveryTag}}, allUpTo); + }; +}; + +exports._nack = function(channel, deliveryTag, allUpTo, requeue) { + return function() { + channel.nack({fields: {deliveryTag: deliveryTag}}, allUpTo, requeue); + }; +}; + +exports._ackAll = function(channel) { + return function() { + channel.ackAll(); + }; +}; + +exports._nackAll = function(channel, requeue) { + return function() { + channel.nackAll(requeue); + }; +}; + +exports._prefetch = function(channel, count) { + return function() { + channel.prefetch(count); + }; +}; + +exports._recover = function(channel, onError, onSuccess) { + return function() { + channel.recover(callbackOnlyError(onError, onSuccess)); + }; +}; + +var eventHandler = function(event) { + return function(model, onEvent) { + return function() { + model.on(event, function(arg) { + if (arg === undefined) { + onEvent(); + } else { + onEvent(arg)(); + } + }); + }; + }; +}; + +exports._onConnectionClose = eventHandler('close'); +exports._onConnectionError = eventHandler('error'); +exports._onConnectionBlocked = eventHandler('blocked'); +exports._onConnectionUnblocked = eventHandler('unblocked'); + +exports._onChannelClose = eventHandler('close'); +exports._onChannelError = eventHandler('error'); +exports._onChannelReturn = eventHandler('return'); +exports._onChannelDrain = eventHandler('drain'); diff --git a/src/Node/AMQP/FFI.purs b/src/Node/AMQP/FFI.purs new file mode 100644 index 0000000..a68f7f4 --- /dev/null +++ b/src/Node/AMQP/FFI.purs @@ -0,0 +1,121 @@ +module Node.AMQP.FFI where + +import Node.AMQP.Types +import Node.AMQP.Types.Internal +import Prelude + +import Control.Monad.Aff (Aff) +import Control.Monad.Eff (Eff, kind Effect) +import Control.Monad.Eff.Exception (Error) +import Data.Foreign (Foreign) +import Data.Function.Uncurried (Fn2, Fn3, Fn4, Fn5, Fn6, Fn7, Fn1) +import Data.Nullable (Nullable) +import Node.Buffer (Buffer) + +type EffAMQP e a = Eff (amqp :: AMQP | e) a +type AffAMQP e a = Aff (amqp :: AMQP | e) a + +type ECb e a = Error -> EffAMQP e a +type SCb r e a = r -> EffAMQP e a + +foreign import _connect :: forall e a. + Fn3 String (ECb e a) (SCb Connection e a) (EffAMQP e a) + +foreign import _close :: forall e a. + Fn3 Connection (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _createChannel :: forall e a. + Fn3 Connection (ECb e a) (SCb Channel e a) (EffAMQP e a) + +foreign import _closeChannel :: forall e a. + Fn3 Channel (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _assertQueue :: forall e a. + Fn5 Channel QueueName Foreign (ECb e a) (SCb AssertQueueOK e a) (EffAMQP e a) + +foreign import _checkQueue :: forall e a. + Fn4 Channel QueueName (ECb e a) (SCb AssertQueueOK e a) (EffAMQP e a) + +foreign import _deleteQueue :: forall e a. + Fn5 Channel QueueName Foreign (ECb e a) (SCb DeleteQueueOK e a) (EffAMQP e a) + +foreign import _purgeQueue :: forall e a. + Fn4 Channel QueueName (ECb e a) (SCb PurgeQueueOK e a) (EffAMQP e a) + +foreign import _bindQueue :: forall e a. + Fn7 Channel QueueName ExchangeName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _unbindQueue :: forall e a. + Fn7 Channel QueueName ExchangeName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _assertExchange :: forall e a. + Fn6 Channel ExchangeName String Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _checkExchange :: forall e a. + Fn4 Channel ExchangeName (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _deleteExchange :: forall e a. + Fn5 Channel ExchangeName Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _bindExchange :: forall e a. + Fn7 Channel QueueName QueueName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _unbindExchange :: forall e a. + Fn7 Channel QueueName QueueName RoutingKey Foreign (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _publish :: forall e a. + Fn6 Channel ExchangeName RoutingKey Buffer Foreign (SCb Unit e a) (EffAMQP e a) + +foreign import _sendToQueue :: forall e a. + Fn5 Channel QueueName Buffer Foreign (SCb Unit e a) (EffAMQP e a) + +foreign import _consume :: forall e a. + Fn6 Channel QueueName (SCb (Nullable Message') e Unit) Foreign (ECb e a) (SCb ConsumeOK e a) (EffAMQP e a) + +foreign import _cancel :: forall e a. + Fn4 Channel String (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _get :: forall e a. + Fn5 Channel QueueName Foreign (ECb e a) (SCb (Nullable Message') e a) (EffAMQP e a) + +foreign import _ack :: forall e a. + Fn3 Channel String Boolean (EffAMQP e a) + +foreign import _nack :: forall e a. + Fn4 Channel String Boolean Boolean (EffAMQP e a) + +foreign import _ackAll :: forall e a. + Fn1 Channel (EffAMQP e a) + +foreign import _nackAll :: forall e a. + Fn2 Channel Boolean (EffAMQP e a) + +foreign import _prefetch :: forall e a. + Fn2 Channel Int (EffAMQP e a) + +foreign import _recover :: forall e a. + Fn3 Channel (ECb e a) (SCb Unit e a) (EffAMQP e a) + +foreign import _onConnectionClose :: forall e a. + Fn2 Connection (SCb (Nullable Error) e Unit) (EffAMQP e a) + +foreign import _onConnectionError :: forall e a. + Fn2 Connection (SCb Error e Unit) (EffAMQP e a) + +foreign import _onConnectionBlocked :: forall e a. + Fn2 Connection (SCb String e Unit) (EffAMQP e a) + +foreign import _onConnectionUnblocked :: forall e a. + Fn2 Connection (EffAMQP e Unit) (EffAMQP e a) + +foreign import _onChannelClose :: forall e a. + Fn2 Channel (EffAMQP e Unit) (EffAMQP e a) + +foreign import _onChannelError :: forall e a. + Fn2 Channel (SCb Error e Unit) (EffAMQP e a) + +foreign import _onChannelReturn :: forall e a. + Fn2 Channel (SCb Message' e Unit) (EffAMQP e a) + +foreign import _onChannelDrain :: forall e a. + Fn2 Channel (EffAMQP e Unit) (EffAMQP e a) diff --git a/src/Node/AMQP/Main.purs b/src/Node/AMQP/Main.purs new file mode 100644 index 0000000..687147b --- /dev/null +++ b/src/Node/AMQP/Main.purs @@ -0,0 +1,111 @@ +module Node.AMQP.Main where + +import Node.AMQP +import Prelude + +import Control.Monad.Aff (delay, runAff) +import Control.Monad.Eff (Eff, kind Effect) +import Control.Monad.Eff.Class (liftEff) +import Control.Monad.Eff.Console (CONSOLE, log, logShow) +import Control.Monad.Eff.Exception (EXCEPTION) +import Data.Maybe (Maybe(..)) +import Data.StrMap as StrMap +import Data.Time.Duration (Milliseconds(..)) +import Node.Buffer (BUFFER, fromString, toString) +import Node.Encoding (Encoding(..)) +import Node.Process (PROCESS) + +main :: forall eff. Eff ( console :: CONSOLE + , amqp :: AMQP + , exception :: EXCEPTION + , process :: PROCESS + , buffer :: BUFFER + | eff ) Unit +main = do + log "Starting" + content <- fromString "test123" UTF8 + void $ runAff (\e -> log "ERRORAFF" *> logShow e) pure do + conn <- connect "amqp://localhost" defaultConnectOptions + liftEff $ log "Connected" + let q = "queue111" + x = "exc2222" + + liftEff $ do + onConnectionClose conn $ case _ of + Just err -> log $ "E: Conn closed: " <> show err + Nothing -> log "E: Conn closed" + onConnectionError conn \err -> log $ "E: Conn errored: " <> show err + onConnectionBlocked conn \reason -> log $ "E: Conn blocked: " <> reason + onConnectionUnblocked conn (log "E: Conn unblocked") + + withChannel conn \channel -> do + liftEff $ log "Channel created" + prefetch channel 10 + + liftEff $ do + onChannelClose channel (log "E: Channel close") + onChannelError channel \err -> log $ "E: Channel errored: " <> show err + onChannelReturn channel \msg -> + toString UTF8 msg.content >>= \m -> log $ "E: Channel message returned: " <> m + onChannelDrain channel (log "E: Channel drained") + + ok <- assertQueue channel q defaultQueueOptions + liftEff $ log $ "Queue created: " <> ok.queue + + consumeChannel <- createChannel conn + liftEff $ onChannelClose consumeChannel (log "E: Consume Channel close") + + recover consumeChannel + + ok <- consume consumeChannel q defaultConsumeOptions $ case _ of + Nothing -> do + log "Consumer closed" + void $ runAff logShow logShow $ closeChannel consumeChannel + Just msg -> do + toString UTF8 msg.content >>= \m -> log $ "Received: " <> m + ack consumeChannel msg.fields.deliveryTag + logShow msg.properties.persistent + + let consumerTag = ok.consumerTag + liftEff $ log $ "Consumer tag: " <> ok.consumerTag + + ok <- purgeQueue channel q + liftEff $ log $ "Queue purged: " <> show ok.messageCount + + assertExchange channel x Fanout defaultExchangeOptions + liftEff $ log $ "Exchange created" + + bindQueue channel q x "*" StrMap.empty + liftEff $ log $ "Queue bound" + + publish channel x "" content $ defaultPublishOptions { persistent = Just false } + liftEff $ log $ "Message published to exchange" + + delay (Milliseconds 500.0) + + liftEff $ ackAll consumeChannel + cancel consumeChannel consumerTag + closeChannel consumeChannel + + sendToQueue channel q content $ defaultPublishOptions { persistent = Just true } + liftEff $ log $ "Message published to queue" + + delay (Milliseconds 500.0) + + get channel q defaultGetOptions >>= case _ of + Nothing -> liftEff $ log "No message received" + Just msg -> liftEff do + toString UTF8 msg.content >>= \m -> log $ "Get Received: " <> m + nackAllUpTo channel msg.fields.deliveryTag true + + unbindQueue channel q x "*" StrMap.empty + liftEff $ log $ "Queue unbound" + + deleteExchange channel x defaultDeleteExchangeOptions + liftEff $ log $ "Exchange deleted" + + ok <- deleteQueue channel q defaultDeleteQueueOptions + liftEff $ log $ "Queue deleted: " <> show ok.messageCount + + close conn + liftEff $ log $ "Connection closed" diff --git a/src/Node/AMQP/Types.purs b/src/Node/AMQP/Types.purs new file mode 100644 index 0000000..e3f7f98 --- /dev/null +++ b/src/Node/AMQP/Types.purs @@ -0,0 +1,301 @@ +module Node.AMQP.Types where + +import Prelude + +import Control.Monad.Eff (kind Effect) +import Control.Plus (empty) +import Data.DateTime.Instant (Instant) +import Data.Foreign (Foreign) +import Data.Generic.Rep (class Generic) +import Data.Generic.Rep.Eq (genericEq) +import Data.Generic.Rep.Show (genericShow) +import Data.Maybe (Maybe(..)) +import Data.StrMap as StrMap +import Data.String (toLower) +import Data.Time.Duration (Milliseconds, Seconds(..)) +import Node.Buffer (Buffer) + +-- | The effect for computations which talk to an AMQP server. +foreign import data AMQP :: Effect + +-- | An AMQP connection. +foreign import data Connection :: Type + +-- | An AMQP Channel. +foreign import data Channel :: Type + +-- | An AMQP queue name. +type QueueName = String + +-- | An AMQP exchange name. +type ExchangeName = String + +-- | An AMQP routing key. +type RoutingKey = String + +-- | Options used to connect to the AMQP server. See +-- | for details. +type ConnectOptions = { frameMax :: Int + , channelMax :: Int + , heartbeat :: Seconds + } + +-- | Default connection options. +defaultConnectOptions :: ConnectOptions +defaultConnectOptions = { frameMax : 4096 + , channelMax: 0 + , heartbeat : Seconds 5.0 + } + +-- | Reply from the server for asserting a queue. Contains the +-- | name of queue asserted, the count of the messages in the queue, and the count of the consumers of +-- | the queue. +type AssertQueueOK = { queue :: String, messageCount :: Int, consumerCount :: Int } + +-- | Reply from the server for deleting a queue. Contains the count of the mssages in the queue. +type DeleteQueueOK = { messageCount :: Int } + +-- | Reply from the server for purging a queue. Contains the count of the mssages purged from the queue. +type PurgeQueueOK = { messageCount :: Int } + +-- | Types of AMQP exchanges. See for details. +data ExchangeType = Direct + | Topic + | Headers + | Fanout + +derive instance genericExchangeType :: Generic ExchangeType _ + +instance showExchangeType :: Show ExchangeType where + show = genericShow >>> toLower + +instance eqExchangeType :: Eq ExchangeType where + eq = genericEq + +-- | Options used to create a queue: +-- | - `exclusive`: if true, the queue is used by only one connection and is deleted when that connection closes. +-- | - `durable`: if true, the queue survives broker restarts. +-- | - `autoDelete`: if true, the queue is deleted when there is no consumers for it. +-- | - `messageTTL`: if specified, expires messages arriving in the queue after n milliseconds. +-- | - `expires`: if specified, the queue is destroyed after n milliseconds of disuse, where "use" +-- | means having consumers, being asserted or checked, or being polled. +-- | - `deadLetterExchange`: if specified, an exchange to which messages discarded from the queue are resent to. +-- | - `deadLetterRoutingKey`: if specified, set as the routing key for the discarded messages. +-- | - `maxLength`: if specified, the maximum number of messages the queue holds. +-- | - `maxPriority`: if specified, makes the queue a [priority queue](http://www.rabbitmq.com/priority.html). +-- | - `arguments`: additional arguments. +-- | +-- | See for details. +type QueueOptions = { exclusive :: Maybe Boolean + , durable :: Maybe Boolean + , autoDelete :: Maybe Boolean + , arguments :: StrMap.StrMap Foreign + , messageTTL :: Maybe Milliseconds + , expires :: Maybe Milliseconds + , deadLetterExchange :: Maybe String + , deadLetterRoutingKey :: Maybe RoutingKey + , maxLength :: Maybe Int + , maxPriority :: Maybe Int + } + +-- | Default options to create a queue. Every field is set to `Nothing` or `empty`. +defaultQueueOptions :: QueueOptions +defaultQueueOptions = { exclusive : Nothing + , durable : Nothing + , autoDelete : Nothing + , arguments : StrMap.empty + , messageTTL : Nothing + , expires : Nothing + , deadLetterExchange : Nothing + , deadLetterRoutingKey: Nothing + , maxLength : Nothing + , maxPriority : Nothing + } + +-- | Options used to delete a queue: +-- | - `ifUnused`: if true and the queue has consumers, it is not deleted and the channel is closed. +-- | - `ifEmpty`: if true and the queue contains messages, the queue is not deleted and the channel is closed. +-- | +-- | See for details. +type DeleteQueueOptions = { ifUnused :: Maybe Boolean, ifEmpty :: Maybe Boolean } + +-- | Default options to delete a queue. Every field is set to `Nothing`. +defaultDeleteQueueOptions :: DeleteQueueOptions +defaultDeleteQueueOptions = { ifUnused: empty, ifEmpty: empty } + +-- | Options to create an exchange: +-- | - `durable`: if true, the exchange survives broker restarts. +-- | - `internal`: if true, messages cannot be published directly to the exchange. +-- | - `autoDelete`: if true, the exchange is destroyed once it has no bindings. +-- | - `alternateExchange`: if specified, an exchange to send messages to if this exchange can't route them to any queues. +-- | - `arguments`: additional arguments. +-- | +-- | See for details. +type ExchangeOptions = { durable :: Maybe Boolean + , internal :: Maybe Boolean + , autoDelete :: Maybe Boolean + , alternateExchange :: Maybe String + , arguments :: StrMap.StrMap Foreign + } + +-- | Default options to create an exchange. Every field is set to `Nothing` or `empty`. +defaultExchangeOptions :: ExchangeOptions +defaultExchangeOptions = { durable : Nothing + , internal : Nothing + , autoDelete : Nothing + , alternateExchange: Nothing + , arguments : StrMap.empty + } + +-- | Options to delete an exchange: +-- | - `ifUnused`: if true and the exchange has bindings, it is not deleted and the channel is closed. +-- | +-- | See for details. +type DeleteExchangeOptions = { ifUnused :: Maybe Boolean } + +-- | Default options to delete an exchange. Every field is set to `Nothing`. +defaultDeleteExchangeOptions :: DeleteExchangeOptions +defaultDeleteExchangeOptions = { ifUnused: Nothing } + +-- | Options to publish a message: +-- | - `expiration`: if specified, the message is discarded from a queue once it has been there +-- | longer than the given number of milliseconds. +-- | - `userId`: if specified, RabbitMQ compares it to the username supplied when opening the connection, +-- | and rejects the messages for which it does not match. +-- | - `cc`: an array of routing keys; messages are routed to these routing keys in addition to the +-- | given routing key. +-- | - `bcc`: like `cc`, except that the value is not sent in the message headers to consumers. +-- | - `priority`: if specified, a priority for the message. +-- | - `persistent`: if true, the message survives broker restarts provided it is in a queue that +-- | also survives restarts. +-- | - `mandatory`: if true, the message is returned if it is not routed to a queue. +-- | - `contentType`: a MIME type for the message content. +-- | - `contentEncoding`: a MIME encoding for the message content. +-- | - `headers`: application specific headers to be carried along with the message content. +-- | - `correlationId`: usually used to match replies to requests, or similar. +-- | - `replyTo`: often used to name a queue to which the receiving application must send replies, in an RPC scenario. +-- | - `messageId`: arbitrary application-specific identifier for the message. +-- | - `timestamp`: a timestamp for the message. +-- | - `type`: an arbitrary application-specific type for the message. +-- | - `appId`: an arbitrary identifier for the originating application. +-- | +-- | See for details. +type PublishOptions = { expiration :: Maybe Milliseconds + , userId :: Maybe String + , cc :: Array RoutingKey + , bcc :: Array RoutingKey + , priority :: Maybe Int + , persistent :: Maybe Boolean + , mandatory :: Maybe Boolean + , contentType :: Maybe String + , contentEncoding :: Maybe String + , headers :: StrMap.StrMap Foreign + , correlationId :: Maybe String + , replyTo :: Maybe QueueName + , messageId :: Maybe String + , timestamp :: Maybe Instant + , type :: Maybe String + , appId :: Maybe String + } + +-- | Default options to publish a message. Every field is set to `Nothing`. +defaultPublishOptions :: PublishOptions +defaultPublishOptions = { expiration : Nothing + , userId : Nothing + , cc : [] + , bcc : [] + , priority : Nothing + , persistent : Nothing + , mandatory : Nothing + , contentType : Nothing + , contentEncoding: Nothing + , headers : StrMap.empty + , correlationId : Nothing + , replyTo : Nothing + , messageId : Nothing + , timestamp : Nothing + , type : Nothing + , appId : Nothing + } + +-- | Fields of a message received by a consumer: +-- | - `deliveryTag`: a serial number for the message. +-- | - `consumerTag`: an identifier for the consumer for which the message is destined. +-- | - `exchange`: the exchange to which the message was published. +-- | - `routingKey`: the routing key with which the message was published. +-- | - `redelivered`: if true, indicates that this message has been delivered before and has been +-- | handed back to the server. +type MessageFields = { deliveryTag :: String + , consumerTag :: String + , exchange :: ExchangeName + , routingKey :: RoutingKey + , redelivered :: Boolean + } + +-- | Properties of a message received by a consumer. A subset of the `PublishOptions` fields. +type MessageProperties = { expiration :: Maybe Milliseconds + , userId :: Maybe String + , priority :: Maybe Int + , persistent :: Maybe Boolean + , contentType :: Maybe String + , contentEncoding :: Maybe String + , headers :: StrMap.StrMap Foreign + , correlationId :: Maybe String + , replyTo :: Maybe QueueName + , messageId :: Maybe String + , timestamp :: Maybe Instant + , type :: Maybe String + , appId :: Maybe String + } + +-- | A message received by a consumer: +-- | - `content`: the content of the message a buffer. +-- | - `fields`: the message fields. +-- | - `properties`: the message properties. +type Message = { content :: Buffer + , fields :: MessageFields + , properties :: MessageProperties + } + +-- | Options used to consume message from a queue: +-- | - `consumerTag`: an identifier for this consumer, must not be already in use on the channel. +-- | - `noLocal`: if true, the broker does not deliver messages to the consumer if they were also +-- | published on this connection. RabbitMQ doesn't implement it, and will ignore it. +-- | - `noAck`: if true, the broker does expect an acknowledgement of the messages delivered to +-- | this consumer. It dequeues messages as soon as they've been sent down the wire. +-- | - `exclusive`: if true, the broker does not let anyone else consume from this queue. +-- | - `priority`: gives a priority to the consumer. Higher priority consumers get messages in +-- | preference to lower priority consumers. +-- | - `arguments`: additional arguments. +-- | +-- | See for details. +type ConsumeOptions = { consumerTag :: Maybe String + , noLocal :: Maybe Boolean + , noAck :: Maybe Boolean + , exclusive :: Maybe Boolean + , priority :: Maybe Int + , arguments :: StrMap.StrMap Foreign + } + +-- | Default options to consume messages from a queue. Every field is set to `Nothing` or `empty`. +defaultConsumeOptions :: ConsumeOptions +defaultConsumeOptions = { consumerTag: Nothing + , noLocal : Nothing + , noAck : Nothing + , exclusive : Nothing + , priority : Nothing + , arguments : StrMap.empty + } + +-- | Reply from the server for setting up a consumer. Contains the consumer tag which is the unique +-- | identifier for this consumer. +type ConsumeOK = { consumerTag :: String } + +-- | Options used to get a message from a queue: +-- | - `noAck`: if true, the broker does expect an acknowledgement of the messages delivered to +-- | this consumer. It dequeues messages as soon as they've been sent down the wire. +type GetOptions = { noAck :: Maybe Boolean } + +-- | Default options to get a message from a queue. Every field is set to `Nothing`. +defaultGetOptions :: GetOptions +defaultGetOptions = { noAck: Nothing } diff --git a/src/Node/AMQP/Types/Internal.purs b/src/Node/AMQP/Types/Internal.purs new file mode 100644 index 0000000..8647c68 --- /dev/null +++ b/src/Node/AMQP/Types/Internal.purs @@ -0,0 +1,243 @@ +module Node.AMQP.Types.Internal where + +import Node.AMQP.Types +import Prelude + +import Control.Monad.Eff (kind Effect) +import Data.DateTime.Instant (instant, unInstant) +import Data.Foreign (Foreign) +import Data.Foreign.Class (class Encode, encode) +import Data.Foreign.Generic (defaultOptions, genericEncode) +import Data.Foreign.NullOrUndefined (NullOrUndefined(..)) +import Data.Generic.Rep (class Generic) +import Data.Int (floor, hexadecimal, toStringAs) +import Data.Newtype (unwrap) +import Data.Nullable (Nullable, toMaybe) +import Data.StrMap as StrMap +import Data.Time.Duration (Milliseconds(..), Seconds(..)) +import Node.Buffer (Buffer) + +connectUrl :: String -> ConnectOptions -> String +connectUrl url { frameMax, channelMax, heartbeat : (Seconds heartbeat) } = + url <> "?frame_max=0x" <> toStringAs hexadecimal frameMax + <> "&channel_max=" <> show channelMax + <> "&heartbeat=" <> show (floor heartbeat) + +encodeQueueOptions :: QueueOptions -> Foreign +encodeQueueOptions = fromQueueOptions >>> encode + +encodeDeleteQueueOptions :: DeleteQueueOptions -> Foreign +encodeDeleteQueueOptions = fromDeleteQueueOptions >>> encode + +encodeExchangeOptions :: ExchangeOptions -> Foreign +encodeExchangeOptions = fromExchangeOptions >>> encode + +encodeDeleteExchangeOptions :: DeleteExchangeOptions -> Foreign +encodeDeleteExchangeOptions = fromDeleteExchangeOptions >>> encode + +encodePublishOptions :: PublishOptions -> Foreign +encodePublishOptions = fromPublishOptions >>> encode + +encodeConsumeOptions :: ConsumeOptions -> Foreign +encodeConsumeOptions = fromConsumeOptions >>> encode + +encodeGetOptions :: GetOptions -> Foreign +encodeGetOptions = fromGetOptions >>> encode + +newtype QueueOptions' = QueueOptions' + { exclusive :: NullOrUndefined Boolean + , durable :: NullOrUndefined Boolean + , autoDelete :: NullOrUndefined Boolean + , arguments :: StrMap.StrMap Foreign + , messageTtl :: NullOrUndefined Number + , expires :: NullOrUndefined Number + , deadLetterExchange :: NullOrUndefined String + , deadLetterRoutingKey :: NullOrUndefined String + , maxLength :: NullOrUndefined Int + , maxPriority :: NullOrUndefined Int + } + +derive instance genericQueueOptions' :: Generic QueueOptions' _ + +instance encodeQueueOptions' :: Encode QueueOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromQueueOptions :: QueueOptions -> QueueOptions' +fromQueueOptions opts = QueueOptions' + { exclusive : NullOrUndefined opts.exclusive + , durable : NullOrUndefined opts.durable + , autoDelete : NullOrUndefined opts.autoDelete + , arguments : opts.arguments + , messageTtl : NullOrUndefined (unwrap <$> opts.messageTTL) + , expires : NullOrUndefined (unwrap <$> opts.expires) + , deadLetterExchange : NullOrUndefined opts.deadLetterExchange + , deadLetterRoutingKey: NullOrUndefined opts.deadLetterRoutingKey + , maxLength : NullOrUndefined opts.maxLength + , maxPriority : NullOrUndefined opts.maxPriority + } + +newtype DeleteQueueOptions' = DeleteQueueOptions' + { ifUnused :: NullOrUndefined Boolean + , ifEmpty :: NullOrUndefined Boolean } + +derive instance genericDeleteQueueOptions' :: Generic DeleteQueueOptions' _ + +instance encodeDeleteQueueOptions' :: Encode DeleteQueueOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromDeleteQueueOptions :: DeleteQueueOptions -> DeleteQueueOptions' +fromDeleteQueueOptions opts = DeleteQueueOptions' + { ifUnused: NullOrUndefined opts.ifUnused + , ifEmpty : NullOrUndefined opts.ifEmpty + } + +newtype ExchangeOptions' = ExchangeOptions' + { durable :: NullOrUndefined Boolean + , internal :: NullOrUndefined Boolean + , autoDelete :: NullOrUndefined Boolean + , alternateExchange :: NullOrUndefined String + , arguments :: StrMap.StrMap Foreign + } + +derive instance genericExchangeOptions' :: Generic ExchangeOptions' _ + +instance encodeExchangeOptions' :: Encode ExchangeOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromExchangeOptions :: ExchangeOptions -> ExchangeOptions' +fromExchangeOptions opts = ExchangeOptions' + { durable : NullOrUndefined opts.durable + , internal : NullOrUndefined opts.internal + , autoDelete : NullOrUndefined opts.autoDelete + , alternateExchange: NullOrUndefined opts.alternateExchange + , arguments : opts.arguments + } + +newtype DeleteExchangeOptions' = DeleteExchangeOptions' { ifUnused :: NullOrUndefined Boolean } + +derive instance genericDeleteExchangeOptions' :: Generic DeleteExchangeOptions' _ + +instance encodeDeleteExchangeOptions' :: Encode DeleteExchangeOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromDeleteExchangeOptions :: DeleteExchangeOptions -> DeleteExchangeOptions' +fromDeleteExchangeOptions opts = DeleteExchangeOptions' { ifUnused: NullOrUndefined opts.ifUnused } + +newtype PublishOptions' = PublishOptions' + { expiration :: NullOrUndefined String + , userId :: NullOrUndefined String + , "CC" :: Array RoutingKey + , "BCC" :: Array RoutingKey + , priority :: NullOrUndefined Int + , persistent :: NullOrUndefined Boolean + , mandatory :: NullOrUndefined Boolean + , contentType :: NullOrUndefined String + , contentEncoding :: NullOrUndefined String + , headers :: StrMap.StrMap Foreign + , correlationId :: NullOrUndefined String + , replyTo :: NullOrUndefined QueueName + , messageId :: NullOrUndefined String + , timestamp :: NullOrUndefined Number + , type :: NullOrUndefined String + , appId :: NullOrUndefined String + } + +derive instance genericPublishOptions' :: Generic PublishOptions' _ + +instance encodePublishOptions' :: Encode PublishOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromPublishOptions :: PublishOptions -> PublishOptions' +fromPublishOptions opts = PublishOptions' + { expiration : NullOrUndefined (show <$> opts.expiration) + , userId : NullOrUndefined opts.userId + , "CC" : opts.cc + , "BCC" : opts.bcc + , priority : NullOrUndefined opts.priority + , persistent : NullOrUndefined opts.persistent + , mandatory : NullOrUndefined opts.mandatory + , contentType : NullOrUndefined opts.contentType + , contentEncoding: NullOrUndefined opts.contentEncoding + , headers : opts.headers + , correlationId : NullOrUndefined opts.correlationId + , replyTo : NullOrUndefined opts.replyTo + , messageId : NullOrUndefined opts.messageId + , timestamp : NullOrUndefined ((unwrap <<< unInstant) <$> opts.timestamp) + , type : NullOrUndefined opts.type + , appId : NullOrUndefined opts.appId + } + +newtype MessageProperties' = MessageProperties' + { expiration :: Nullable Number + , userId :: Nullable String + , priority :: Nullable Int + , deliveryMode :: Nullable Int + , contentType :: Nullable String + , contentEncoding :: Nullable String + , headers :: StrMap.StrMap Foreign + , correlationId :: Nullable String + , replyTo :: Nullable QueueName + , messageId :: Nullable String + , timestamp :: Nullable Number + , type :: Nullable String + , appId :: Nullable String + } + +newtype Message' = Message' + { content :: Buffer + , fields :: MessageFields + , properties :: MessageProperties' + } + +toMessage :: Message' -> Message +toMessage (Message' { content, fields, properties: (MessageProperties' props) }) = + { content, fields, properties: msgProperties } + where + msgProperties = { expiration : Milliseconds <$> toMaybe props.expiration + , userId : toMaybe props.userId + , priority : toMaybe props.priority + , persistent : (_ == 2) <$> toMaybe props.deliveryMode + , contentType : toMaybe props.contentType + , contentEncoding : toMaybe props.contentEncoding + , headers : props.headers + , correlationId : toMaybe props.correlationId + , replyTo : toMaybe props.replyTo + , messageId : toMaybe props.messageId + , timestamp : join $ instant <$> (Milliseconds <$> toMaybe props.timestamp) + , type : toMaybe props.type + , appId : toMaybe props.appId + } + +newtype ConsumeOptions' = ConsumeOptions' + { consumerTag :: NullOrUndefined String + , noLocal :: NullOrUndefined Boolean + , noAck :: NullOrUndefined Boolean + , exclusive :: NullOrUndefined Boolean + , priority :: NullOrUndefined Int + , arguments :: StrMap.StrMap Foreign + } + +derive instance genericConsumeOptions' :: Generic ConsumeOptions' _ + +instance encodeConsumeOptions' :: Encode ConsumeOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromConsumeOptions :: ConsumeOptions -> ConsumeOptions' +fromConsumeOptions opts = ConsumeOptions' + { consumerTag: NullOrUndefined opts.consumerTag + , noLocal : NullOrUndefined opts.noLocal + , noAck : NullOrUndefined opts.noAck + , exclusive : NullOrUndefined opts.exclusive + , priority : NullOrUndefined opts.priority + , arguments : opts.arguments + } + +newtype GetOptions' = GetOptions' { noAck :: NullOrUndefined Boolean } + +derive instance genericGetOptions' :: Generic GetOptions' _ + +instance encodeGetOptions' :: Encode GetOptions' where + encode = genericEncode $ defaultOptions { unwrapSingleConstructors = true } + +fromGetOptions :: GetOptions -> GetOptions' +fromGetOptions opts = GetOptions' { noAck: NullOrUndefined opts.noAck } diff --git a/test/Main.purs b/test/Main.purs new file mode 100644 index 0000000..845d0f4 --- /dev/null +++ b/test/Main.purs @@ -0,0 +1,9 @@ +module Test.Main where + +import Prelude +import Control.Monad.Eff (Eff) +import Control.Monad.Eff.Console (CONSOLE, log) + +main :: forall e. Eff (console :: CONSOLE | e) Unit +main = do + log "You should add some tests."