An Interest In:
Web News this Week
- April 20, 2024
- April 19, 2024
- April 18, 2024
- April 17, 2024
- April 16, 2024
- April 15, 2024
- April 14, 2024
Mock a SQS queue with moto
Below is simple example code to mock the aws sqs api. This can be useful for development so you dont have to actually manage a real SQS queue / connectivity / IAM permissions until you're ready.
app.py
A simple function to write a message to a preexisting queue
QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')def write_message(data): sqs = boto3.client('sqs', region_name = 'us-east-1') r = sqs.send_message( MessageBody = json.dumps(data), QueueUrl = QUEUE_URL )
conftest.py
Pytest fixtures to mock up the aws sqs API. aws_credentials()
also ensures that your pytest functions will not actually write to aws resources.
REGION='us-east-'@pytest.fixture(scope='function')def aws_credentials(): """Mocked AWS Credentials for moto.""" os.environ['AWS_ACCESS_KEY_ID'] = 'testing' os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing' os.environ['AWS_SECURITY_TOKEN'] = 'testing' os.environ['AWS_SESSION_TOKEN'] = 'testing'@pytest.fixture(scope='function')def sqs_client(aws_credentials): # setup with mock_sqs(): yield boto3.client('sqs', region_name=REGION) # teardown
test_sqs.py
An example test function. Create a queue using the mock client from conftest.py (notice sqs_client
parameter matches the conftest function name sqs_client
), invoke your python module function app.write_message()
. Validate the returned message matches what you sent
def test_write_message(sqs_client): queue = sqs_client.create_queue(QueueName='test-msg-sender') queue_url = queue['QueueUrl'] # override function global URL variable app.QUEUE_URL = queue_url expected_msg = str({'msg':f'this is a test'}) app.write_message(expected_msg) sqs_messages = sqs_client.receive_message(QueueUrl=queue_url) assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg
Extra
In case you wanted to see my file structure:
README.md app.py requirements.txt requirements_dev.txt tests __init__.py conftest.py unit __init__.py test_sqs.py
Thank you
kudos to Arunkumar Muralidharan who got me started
Original Link: https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd
Dev To
An online community for sharing and discovering great ideas, having debates, and making friendsMore About this Source Visit Dev To