Building Data Pipelines Part 2: Stream To S3 Storage and DynamoDB Pipeline and query transaction data in DynamoDB

·

3 min read

Building Data Pipelines Part 2:
Stream To S3 Storage and DynamoDB Pipeline and query transaction data in DynamoDB

In this phase, we will focus on sending data from Kinesis to storage (S3 and DynamoDB) and querying transaction data in DynamoDB using an API.

  • Stream to s3 raw storage

  • Configure IAM for S3

  • Create lambda function to insert data into S3

  • Test the pipeline

  • Setup DynamoDB

  • Setup IAM for DynamoDB Stream

  • Create DynamoDB Lambda

  • Query Data in DynamoDB

Overview

Our pipeline involves continuously pulling raw CSV data into the system, storing transformed data in S3 and DynamoDB, and connecting this data to Business Analytics tools such as Tableau and Jupyter Notebook. Additionally, customers can utilize an API to query transaction data in DynamoDB using primary key parameters (InvoiceNo & Stockcode).

Setting Up S3 Bucket

  1. Kinesis Data Stream and Lambda Integration: Set up Kinesis data stream insert triggers, configure IAM roles for S3 and create Lambda to write messages to the S3 bucket as files.

  2. Lambda Function for Data Cleaning and Transformation (Optional): Add a Lambda function to clean and transform raw data, converting types to Parquet and adding a timestamp (InvoiceDate).

  3. Testing the Pipeline: Utilize the Lambda test function to configure test events, ensuring a successful event sends test data to S3, confirming the Lambda function's functionality.

Stream to DynamoDB Pipeline

  1. Kinesis Trigger and Lambda for DynamoDB: Configure Kinesis inserts to trigger Lambda for DynamoDB, where the Lambda preprocesses data and writes customer and invoice data.

  2. DynamoDB Table Creation:

    • Customer Table:

      • Keys: "Partition key - CustomerID (String)" and "Sort Key - InvoiceNo (String)"

      • Columns: Country, Description, InvoiceDate, Quantity, Stockcode, UnitPrice

    • Invoice Table:

      • Keys: "Partition key - InvoiceNo (String)" and "Sort Key - StockCode (String)"

      • Columns: Country, Description, InvoiceDate, Quantity, UnitPrice

  3. Lambda Function Configuration: Create a Lambda function that processes incoming Kinesis data records, updating both 'Customers' and 'Invoices' DynamoDB tables. Assign the necessary IAM roles for Kinesis and DynamoDB access.

  4. AWS Kinesis Stream Trigger Configuration:

    • Open AWS Lambda service in the Management Console.

    • Select the desired Lambda function.

    • In the designer panel, click "Add trigger" and choose "Kinesis."

    • Configure the trigger settings, including batch size and starting position.

    • Enable the trigger.

Check the Data: Ensure the data is stored correctly by testing the pipeline.

Connecting to Business Analytics Tools

Connect the stored data in S3 and DynamoDB to Business analytics tools such as Tableau and Jupyter Notebook for advanced analysis and visualization.

Enabling Customer Queries via API

In this phase, we introduce a GET request to the initial Lambda function, allowing customers to access and query transaction data in DynamoDB.

Creating a Request to Retrieve Transaction Data in DynamoDB

  1. Check the URL in API Gateway Stages for the GET method.

     import pandas as pd
     import boto3
     import io
     from botocore.exceptions import NoCredentialsError
    
     AWS_KEY = 'YOUR_ACCESS_KEY'
     AWS_SECRET = 'YOUR_SECRET_KEY'
     BUCKET_NAME = 'your-bucket-name'
     FILE_NAME = 'your-file-name'
    
     s3 = boto3.client('s3', aws_access_key_id=AWS_KEY, aws_secret_access_key=AWS_SECRET)
    
     try:
         obj = s3.get_object(Bucket=BUCKET_NAME, Key=FILE_NAME)
         data = obj['Body'].read()
         df = pd.read_parquet(io.BytesIO(data))
     except NoCredentialsError:
         print("No credentials provided")
    
  2. Using Postman:

    • Go to the "Create a request" menu.

    • Enter the URL and add key-value pairs (InvoiceNo & Stockcode).

    • Get the response from DynamoDB with the new URL.

The implemented data pipelines execute the ETL process, facilitating the extraction of insights. Moreover, seamless connectivity to BI analytics tools empowers efficient data visualization. Further updates including screenshots to enhance the project are in progress.