I wanted to look at connecting two disparate systems for a recent project. The goal was to be able to enter information into one system and have information processed by another system. The systems have no direct authentication trusts between them but they are both running on Amazon Web Services EC2 platform. This was a perfect use for the decoupling nature of the Amazon Simple Queue Service and I wanted to come up with a proof of concept, which is outlined below.
Before getting into any details, I want to make clear that this is not a best practice use case of SQS. For most uses of SQS there is a need to keep track of the messages being processed in some kind of permanent state such as a database. With a persistent data store containing the processed messages, the queue workers can more effectively process messages if messages are delivered one or more times. That being said lets go over this proof of concept.
Assuming AWS keys with correct permissions are configured and the AWSPowerShell module is loaded, the below command will create a new SQS queue with PowerShell. The command returns the created queue url which will be stored in a variable $NewSQSQueueUrl for future use.
1 |
$NewSQSQueueUrl = New-SQSQueue -QueueName ExampleBlogPostQueue -Region us-east-1 |
A quick peek at the SQS console to ensure the queue was created.
This next bit creates an array of strings which will serve as some example information to share between the systems. For this proof of concept I am sending example PowerShell parameters into the SQS queue.
1 |
$exampleparams = @("All The Things","Another Example Parameter") |
I have written the POC functions which are also uploaded to my GitHub PowerShell repo that get dot sourced. These functions put the information (example parameters) into the new SQS queue as message attributes of the newly created SQS message.
1 2 3 4 |
. "$env:userprofile\Documents\PowerShell\WIP\blog\Start-SQSQueueProcessing.ps1" . "$env:userprofile\Documents\PowerShell\WIP\blog\Send-PowerShellParameterToSQS.ps1" Send-PowerShellParameterToSQS -Parameters $exampleparams -SQSQueueUrl $NewSQSQueueUrl -RequestType Enable |
After running these functions the message ids are returned to the PowerShell host indicating the messages have been inserted into the SQS queue successfully.
Below is the function that was dot sourced that did the uploading. You could customize this to fit your use case with some help from the AWS Send-SQSMessage cmdlet documentation.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
function Send-PowerShellParameterToSQS { [CmdletBinding()] [Alias()] [OutputType([int])] Param ( [Parameter(Mandatory=$true, ValueFromPipelineByPropertyName=$true, Position=0)] [String[]]$Parameters, [Parameter(Mandatory=$true, ValueFromPipelineByPropertyName=$true, Position=1)] [String]$SQSQueueUrl, [Parameter(Mandatory=$true, ValueFromPipelineByPropertyName=$true, Position=2)] [ValidateSet("Enable","Disable")] [String]$RequestType ) Process { try { ForEach ($Parameter in $Parameters){ $messageParameter = New-Object Amazon.SQS.Model.MessageAttributeValue $messageParameter.DataType = "String" $messageParameter.StringValue = $Parameter $messageAttributes = New-Object System.Collections.Hashtable $messageAttributes.Add($RequestType, $messageParameter) Send-SQSMessage -QueueUrl $SQSQueueUrl -MessageAttributes $messageAttributes -MessageBody "Request generated by $($env:Username) at $(Get-Date -Format u)" } } catch { Write-Error $_ } } } |
With messages being put into the queue, I need a function to pull down the messages and process them on the queue worker system (aka the SQS message receiver). My goal is to take different actions on the queue worker system based on the message attributes of the SQS messages pulled out of the queue. That function looks something like this.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
function Start-SQSQueueProccessing { [CmdletBinding()] [Alias()] [OutputType([int])] Param ( [Parameter(Mandatory=$true, ValueFromPipelineByPropertyName=$true, Position=0)] [String]$SQSQueueUrl ) Process { try { Write-Verbose "$(Get-Date -Format u) Polling for $SQSQueueUrl messages" $recmsg = Receive-SQSMessage -QueueUrl $SQSQueueUrl -MessageCount 10 -MessageAttributeName All if ($recmsg) { Write-Verbose "$(Get-Date -Format u) ... Messages found in queue...starting to process them" $recmsg | ForEach-Object { Write-Verbose "$(Get-Date -Format u) ... Processing message $($_.MessageId) that was $($_.Body)" $_.MessageAttributes | ForEach-Object { ###This is where you can take action depending on the Key value of the message attribute### if ($_.Keys -eq "Enable") { Write-Output "We could do a Enable-Noun powershell cmdlet with the -parameter $($_.Enable.StringValue) here" } elseif ($_.Keys -eq "Disable") { Write-Output "We could do a Disable-Noun powershell cmdlet with the -parameter $($_.Disable.StringValue) here" } else { Write-Warning "Thats odd, the key of the message attribute was not what was expected so no action was taken" } } Write-Verbose "$(Get-Date -Format u) Done processing $($_.MessageId) that was $($_.Body) attempting message deletion" Remove-SQSMessage -QueueUrl $SQSQueueUrl -ReceiptHandle $_.ReceiptHandle -Force } }else{ Write-Verbose "$(Get-Date -Format u) ... No messages were pulled from the queue...nothing to do right now"} } catch { Write-Error $_ } } } |
This function isn’t actually doing anything interesting with the messages other than generating some output to the PowerShell streams but this is a proof of concept after all :).
Considerations when using SQS
As SQS is designed to decouple distributed systems, SQS does not assume every message pulled from the queue has been processed successfully. Messages that are pulled from the queue are hidden from the queue until the message visibility timeout period has passed. It is up to the queue workers to delete the messages from the queue after the message has been processed. This is why at the end of the function above, messages are deleted from the queue with the Remove-SQSMessage cmdlet.
After working with SQS a bit, I noticed that the behavior surrounding the delivery of messages sitting in the queue is a little unintuitive. For example, say there are 8 messages in a queue and I request for up to 10 messages to be received with Receive-SQSMessage. A logical assumption would be that all 8 messages are returned but that is rarely the case. After working with some messages in queues it becomes quite apparent that SQS will return a random number of messages. Additionally without using FIFO (First-In First-Out) queues, the messages will often be delivered out of order.
Another bit of a gotcha I ran into at first was that by default, Recieve-SQSMessage will not return any message attributes from SQS. The resulting Amazon.SQS.Model.Message object that was returned had blank MessageAttributeValues until I specified “-MessageAttributeName All” parameter.
Hopefully the above considerations will shed some light on the way the function is written. I wrote it so that it could be run repeatedly from a parent polling script and that it could handle one more more message objects being returned from each poll of SQS.
Back to the functions
Finally, we get to the polling function portions of the script which could run on scheduled intervals via task scheduler. This function first checks a queue for the existence of messages using the Get-SQSQueueAttribute cmdlet. If messages are found in the queue, it will invoke the Start-SQSQueueProcessing function referenced above to handle the messages. I make use PowerShell transcription to keep a log for now. If this ever moves out of proof of concept, logging could be improved quite a bit to make it cleaner.
This is the POC Queue polling function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
Function Start-SQSQueuePolling{ Param ( [Parameter(Mandatory=$true, ValueFromPipelineByPropertyName=$true, Position=0)] [String]$QueueUrl ) $numofmsg = Get-SQSQueueAttribute -QueueUrl $QueueUrl -AttributeName ApproximateNumberOfMessages if ($numofmsg.ApproximateNumberOfMessages -gt 0){ Write-Output "Number of messages in the queue to process is approximately $($numofmsg.ApproximateNumberOfMessages)" While ($numofmsg.ApproximateNumberOfMessages -gt 0) { Start-SQSQueueProccessing -SQSQueueUrl $QueueUrl -Verbose Start-Sleep -Seconds 5 $numofmsg = Get-SQSQueueAttribute -QueueUrl $QueueUrl -AttributeName ApproximateNumberOfMessages Write-Output "Number of messages is $($numofmsg.ApproximateNumberOfMessages)" } }else{ Write-Output "No messages found in the queue to process" } Write-Output "Done processing SQS Messages in the Queue" } |
How I envision the script being called ala cron or task scheduler for regular execution would be something like…
1 2 3 4 |
Import-Module AWSPowerShell . "$env:userprofile\Documents\PowerShell\WIP\blog\Start-SQSQueuePolling.ps1" Start-Transcript -Path "$env:userprofile\Documents\PowerShell\WIP\blog\OurSQSPollingLog.txt" -NoClobber -Append Start-SQSQueuePolling -QueueUrl https://sqs.us-east-1.amazonaws.com/1234567890/ExampleBlogPostQueue |
What does it look like when ran you may be wondering?
The PowerShell transcript output captures the same information. As you may have noticed, the body of the generated SQS messages contains information on who created the SQS message and when it was created which could help for audit trails.
Thanks for following along and happy scripting!