Hi Pedro, we implemented this pattern using Ions subscribed directly to an AWS SQS queue. Each time there is a new message, the Ion gets executed and the message is processed.
Ions are basically Lambdas so you can run them either periodically or triggered by AWS supported events (SQS, S3, etc.)
The only issue with this pattern is that the SQS trigger provided by AWS is not configurable in terms of concurrency. Depending on your requirements it may or may not be adequate. Check this article for more information about it:
We have had trouble with it, so we are probably moving this to a Clojure process running on a machine that listens to SQS and processes the messages directly, without dealing with Lambda.
Hi Eduardo, it’s an MQTT broker and not managed by AWS so I can’t connect it directly to an Ion - I think. But it’s cool to know about SQS, so thanks for the link!
I was thinking about deploying this program in an EC2 instance or maybe Beanstalk, subscribe to the topic and publish to an SQS queue, or something else, which my Ion would process.
What issues did you have with this setup?
Maybe it’s simpler without Ion in this case? I might be being silly forcing myself to use Ion for this case.
We use SQS but not through Lambda’s. We start a background thread when the Ion is deployed which uses long polling for messages. There was a discussion here on this thread about message listeners on Ions.
I’m getting started with Datomic Ions, and I’m trying to understand Marshall’s response:
You can certainly have a background thread that runs on your Datomic node(s) and handles async processing.
This ‘background thread that runs on your Datomic’ is a background thread that was started inside an Ion?
And your approach to starting the background thread:
Let’s say I have a Lambda ion that starts the background thread after a deployment. I can trigger that Lambda function via an EventBridge rule that watches the Datomic Ion deployments on CodeDeploy.
Is this how you’re doing it? Would you mind giving more details about how you did it?
Hi @pedrorgirardi, I can try to explain more in detail. You can start a background thread with using future and poll your queue within that thread. See below for a simplified version of the function we use to start the background thread and poll for messages from SQS:
(defn listen-messages! [status queue-url]
(future
(try
(loop []
(reset! status :listening)
(let [messages (receive-messages queue-url)] ; Long polling SQS for new messages
(doseq [message messages]
(handle-message! message)))
(if (not= :listening @status)
nil
(recur)))
;; InterruptedException happens if the future is cancelled with future-cancel.
(catch InterruptedException _
(reset! status :stopped)
nil)
;; Catch and handle any other unknown exceptions.
(catch Exception e
(handle-consumer-exception! "Unknown exception in consumer listener." e)))))
(def queue-status (atom :initial))
(listen-messages! queue-status "https://sqs.eu-west-1.amazonaws.com/XXXXXXXXXXXX/your-queue-name")
The function above is called right away so that it will be executed as soon as Datomic Ion deployment loads that namespace.
At the beginning, we wanted to control the start/stop logic of this background thread, to have more control over it. That’s why I had a EventBridge rule that was watching the successful Datomic Ion deployments on CloudDeploy. However, I noticed that, triggering that Lambda will only be executed in one of the many auto-scaled instances and as a result will only start the background thread on one of them, instead of all. In order to achieve what we wanted, we needed to write more complex logic so we skipped it for now and decided to run the threads right away. We added some monitoring logic which notifies us in case the background thread dies for a reason, which did not happen yet.