Building Data Pipelines: Ingesting Data on AWS

ยท

3 min read

In today's Data Engineering exploration, lets dive into creating data pipelines on AWS. Our focus will be on the ingestion to buffer (Kinesis) phase. The data used for this project is from UCI machine learning repository.

Overview

Key Steps

  • Lambda Function Setup

  • API Gateway Configuration

  • Kinesis Data Stream Setup

  • Data Ingestion

  • CloudWatch Monitoring:

Lambda Function Setup

To kickstart our data pipeline, we set up a Lambda function. Serving as the heart of our data processing, it handles data from the API Gateway and sends it to the Kinesis data stream.

  1. Lambda Creation:

    • Create a Lambda function tailored for our data processing needs.

  2. IAM Policies:

    • Attach IAM policies to the Lambda function, providing it with the necessary permissions for data processing.

API Gateway Configuration

Next, we configure the API Gateway, acting as the central hub for data interactions. This involves building a new REST API, creating resources and methods for data exchanges, and integrating them seamlessly with our Lambda function. The integration with our Lambda function ensures a well-orchestrated flow, allowing data to move seamlessly through the pipeline.

  1. REST API Creation:

    • Build a new REST API within API Gateway to facilitate data exchanges.
  2. Resource & Method Configuration:

    • Create resources and methods, focusing on POST (data ingestion) for now.
  3. Lambda Integration:

    • Configure connections to the Lambda function, establishing a seamless integration for data processing.

Kinesis Data Stream Setup

Data buffering is crucial for a streamlined data pipeline. In this step, we focus on setting up a Kinesis data stream. Configuring the stream involves optimizing its capacity to handle the incoming e-commerce data effectively. A basic kinesis data stream is created with the number of open shards = 1 and with a data retention period of one day.

  1. Stream Setup:
  • Establish a Kinesis data stream, and optimize it for efficient buffering of our data.

Data Ingestion

With the foundational elements in place, we move on to data ingestion. Leveraging the post request python script, we initiate the process of ingesting data into our pipeline. Once the data reaches the API gateway, the lambda function will be triggered. The preprocessed data will then be sent to the kinesis data stream.

  1. Script Execution:
  • Execute the python script to send data into pipeline.
import pandas as pd
import requests

# End Point of in AWS
URL = ""

# Read the dataset as a DataFrame (df) from the CSV file
df = pd.read_csv('data_path', sep=',')

# Convert each row to JSON and send it to the API
for i in data.index:
    try:
        # convert the row to json
        export = data.loc[i].to_json()

        # send it to the api
        response = requests.post(URL, data=export)
        print(response)
    except:
        print(data.loc[i])

CloudWatch Monitoring

To maintain the health and performance of our data pipeline, we turn to CloudWatch for monitoring. CloudWatch provides real-time insights into the execution of our Lambda function and the status of our Kinesis data stream. Monitoring allows us to identify potential bottlenecks, troubleshoot issues, and optimize the overall efficiency of our data processing.

  1. Monitoring Insights:
  • Monitor data ingestion in CloudWatch, validating the integrity and efficiency of our data processing.

  • {'body-json': {'InvoiceNo': 536365, 'StockCode': '85123A', 'Description': 'WHITE HANGING HEART T-LIGHT HOLDER', 'Quantity': 6, 'InvoiceDate': '12/1/2010 8:26', 'UnitPrice': 2.55, 'CustomerID': 17850, 'Country': 'United Kingdom'}

This guide serves as first steps guide towards building a data pipeline in AWS.

Stay tuned for more updates! ๐Ÿ›๏ธ๐Ÿ’ก