Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
March 10, 2021 02:39 pm GMT

Mock a SQS queue with moto

Below is simple example code to mock the aws sqs api. This can be useful for development so you dont have to actually manage a real SQS queue / connectivity / IAM permissions until you're ready.

app.py

A simple function to write a message to a preexisting queue

QUEUE_URL = os.environ.get('QUEUE_URL', '<default value>')def write_message(data):    sqs = boto3.client('sqs', region_name = 'us-east-1')    r = sqs.send_message(        MessageBody = json.dumps(data),        QueueUrl = QUEUE_URL    )

conftest.py

Pytest fixtures to mock up the aws sqs API. aws_credentials() also ensures that your pytest functions will not actually write to aws resources.

REGION='us-east-'@pytest.fixture(scope='function')def aws_credentials():    """Mocked AWS Credentials for moto."""    os.environ['AWS_ACCESS_KEY_ID'] = 'testing'    os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'    os.environ['AWS_SECURITY_TOKEN'] = 'testing'    os.environ['AWS_SESSION_TOKEN'] = 'testing'@pytest.fixture(scope='function')def sqs_client(aws_credentials):    # setup    with mock_sqs():        yield boto3.client('sqs', region_name=REGION)    # teardown

test_sqs.py

An example test function. Create a queue using the mock client from conftest.py (notice sqs_client parameter matches the conftest function name sqs_client), invoke your python module function app.write_message(). Validate the returned message matches what you sent

def test_write_message(sqs_client):    queue = sqs_client.create_queue(QueueName='test-msg-sender')    queue_url = queue['QueueUrl']    # override function global URL variable    app.QUEUE_URL = queue_url    expected_msg = str({'msg':f'this is a test'})    app.write_message(expected_msg)    sqs_messages = sqs_client.receive_message(QueueUrl=queue_url)    assert json.loads(sqs_messages['Messages'][0]['Body']) == expected_msg

Extra

In case you wanted to see my file structure:

 README.md app.py requirements.txt requirements_dev.txt tests     __init__.py     conftest.py     unit         __init__.py         test_sqs.py

Thank you

kudos to Arunkumar Muralidharan who got me started


Original Link: https://dev.to/drewmullen/mock-a-sqs-queue-with-moto-4ppd

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To