Publishing lots and lots of messages to SNS
At work, we use Amazon SNS as the trigger for a lot of our data pipelines. You send a message to an SNS topic, and it gets picked up by an SQS queue, an ECS task, or a Lambda function – and they in turn send new messages to other SNS topics, and the pipeline continues.
Sending a single messages to SNS is pretty simple, but sending thousands or millions of messages is a bit harder. I find myself needing to do this on a semi-regular basis, so I wrote a tool to help me out. I create a text file with one message per line, then I pass it to my bulk_sns_publish
tool:
bulk_sns_publish.py \
--topic-arn arn:aws:sns:eu-west-1:1234567890:my-new-topic \
messages_to_send.txt
It reads each line of the file, and sends it to the specified topic. I routinely use this to publish hundreds of thousands of notifications, and it works very reliably and quickly – when I run it from my home desktop, it can send a quarter-million messages in less than five minutes.
This approach also makes it easy for me to spot check the messages before they’re sent. Typically that text file is generated by another script, and it’s useful to check the messages are in the right format before I send them. I can also manipulate the file very easily, for example truncating it so I send a couple of messages as a test before sending the whole batch.
The tool relies on a couple of tricks:
-
The SNS PublishBatch API, which allows you to send ten messages in a single API call.
I combine this with my chunked_iterable snippet to divide the file into batches of ten, and send them to this API:
import uuid def get_batch_entries(path): """ Given a file which contains one message per line, create a list entries that can be passed as `PublishBatchRequestEntries` in the `SNS.Client.publish_batch` method. """ for batch in chunked_iterable(open(path), size=10): yield [ {"Id": str(uuid.uuid4()), "Message": line.strip()} for line in batch ] for batch_entries in get_batch_entries(path): sns_client.publish_batch( TopicArn=topic_arn, PublishBatchRequestEntries=batch_entries )
-
My snippet for running concurrent tasks in Python. Sending messages to SNS is a heavily I/O bound task, which makes it a perfect fit for this handler.
for _ in concurrently( handler=lambda batch_entries: sns_client.publish_batch( TopicArn=topic_arn, PublishBatchRequestEntries=batch_entries ), inputs=get_batch_entries(path), ): pass
This makes the tool significantly faster – I can have multiple PublishBatch requests in-flight at once, and while I’m waiting for one to respond I can be preparing and sending the next one.
Those are the two interesting bits of the script; the rest is mostly command-line argument parsing. If you want to read the whole thing or use it for yourself, you can download it here:
This is slightly different to the version that I run, which has a couple of extra bits to select the appropriate AWS credentials for the accounts I use – but it’s basically the same script.