1

In this post, let's build a project to use AWS Lambda functions to load data from YouTube, and store it in an S3 bucket.

This project can be found on GitHub as well.

To make the project more interesting, let's define the problem we're trying to solve using YouTube data.

What are the key factors that influence video engagement (likes, comments, and shares) and audience retention (watch time and drop-off rates) on YouTube, and how can creators optimize their content to maximize these metrics?

The overall architecture of the solution we'll implement is outlined below. In this post, we'll work on the first part of extracting raw data in to the S3 bucket.

1-1

It's worthwhile to have some idea about why we've chosen AWS Lambda and S3 among the many services available on the AWS platform. Being serverless, Lambda functions are easy to build and deploy. Therefore, it's more suitable for a relatively shorter script to run as the one for getting and storing data. It also falls within the 15 minute maximum timeout value for Lambda functions.

We're using an S3 bucket to store the raw data as it supports open data formats, and is scalable, durable, and cost-effective.

1. Getting started with YouTube API

The first step is to start using the YouTube API to get the data we are looking for using Python.

YouTube has a helpful guide on getting started with the API.

Create a new project in Google Developers Console. Name the project, I've called it as youtube-etl.

aws-yt-2

Create credentials for the project. Because we're not using private data, we're choosing to create an API key.

aws-yt-3

Head to API Console to enable the YouTube API.

aws-yt-4

Enable the YouTube Data API v3 for the project.

aws-yt-5

It should now appear in the list of enabled APIs.

6

Back in the Credentials page, restrict the API key we created to only use the YouTube API.

7

YouTube API has a daily limit of 10000 quota units. You can see this limit on Enabled APIs and Services > Youtube Data API -> Quotas and system limits.

aws-yt-8

Data Model for the dashboard

The API Reference has the quota used for each request.

aws-yt-9

2. Using the YouTube API

Before using the API, there're a few terms to get to know in order to use the API effectively.

When an API request is sent, a list of items are returned. These items are referred to as resources. Each resource has groups of properties which are known as parts. See the list of parts available for video resource here. Each part contains a set of properties known as fields.

When building your API request, you'll need to specify the part and field parameters, so that the response can be filtered. This helps to reduce latency with the request, and saves you from processing many properties which may not be used in your project.

Let's try using curl to send a simple request to get the video categories. The YouTube API has a videoCategories:List endpoint.

You can use the API Explorer in the same page to help build your curl request. Click to view it in full screen mode.

aws-yt-10

Use snippet for the part parameter and US as the region code. The API has different region codes for different countries, and we'll have a look based on US in this example.

aws-yt-11

Run the curl command with your API Key in a terminal. You can remove the line for the access token as we aren't using any.

curl -X GET 'https://youtube.googleapis.com/youtube/v3/videoCategories?part=snippet&regionCode=US&key=YOUR_API_KEY' --header 'Accept: application/json' 

The response will contain a list of video categories in list format.

12

3. Using the YouTube API to get data for the project

As we now have some idea about using the API, let's try to build the request for the data we want for this project.

The project aims to track the daily growth rate and audience engagement of popular travel videos, while identifying trending content. To achieve this, we'll focus on gathering popular travel videos published within the last 24 hours. To limit the scope, we'll focus on videos based on traveling in Japan.

The YouTube API has a search endpoint which allows us to get a list of videos. It's worth noting that one search request will cost 100 quota units.

curl -X GET \
'https://www.googleapis.com/youtube/v3/search?part=snippet&q=japan+travel&type=video&maxResults=50&publishedAfter=2024-12-11T00:00:00Z&publishedBefore=2024-12-12T00:00:00Z&order=viewCount&topic_id=/m/07bxq&videoDuration=medium&key=YOUR_API_KEY'

Let's have a look at the parameters in the request. The part parameter is set to return the snippet which will contain the video id. The q parameter allows querying for travel videos. As we want the videos published in the last 24 hours, the publishedAfter is set to the day before, and the publishedBefore to the next day, to limit the response to 24 hours. As we're looking for the most popular videos, we're ordering the result by viewCount. The topic Id helps to filter results to travel topic. Topic Ids are defined here. The videoDuration parameter has 4 possible values - short (<4 minutes), medium (between 4 and 20 minutes), long (>20 minutes) and any (default value). We'll execute two requests with medium and long values to exclude shorts content on YouTube.

The response contains a list of videos with their ids and other metadata.

13

4. Use Python to build the API request

Now, let's use Python to build the above request.

I've created a simple Python project in PyCharm, but you can use Jupyter Notebook as well.

14

Log in / Sign up to AWS Cloud Console. If you're creating a new account, you'll be able to use the free tier services.

15

In the AWS Console, search of Lambda service and click on Create a Function. Add a function name and set the Runtime option to use Python. In addition to the function, an IAM role will be created with permission to write logs to the Amazon CloudWatch.

16

When you create the function, you'll see the code editor window where you can put in your code for the Lambda function. In the Runtime Settings section has the handler function defined as lambda_function.lambda_handler, which will be executed when the function is run.

aws-yt-17

Let's try running the sample function defined. Head to the Test tab and click on Test button to execute the function. The function will execute and create CloudWatch logs. Expand to view the output and logs.

aws-yt-18

Let's head back to the IDE/Jupyter notebook and write the Python code to call the API.

import os
import json
from googleapiclient.discovery import build


def lambda_handler(event, context):
    # get the API key from Lambda environment variables
    youtube_api_key = os.environ['YOUTUBE_API_KEY']

    # define api variables
    api_name = 'youtube'
    api_version = 'v3'

    # initialise the API client
    youtube = build(api_name, api_version, developerKey=youtube_api_key)

    # execute search request for 50 videos with the highest view count
    search_response = youtube.search().list(
        part='id',
        q='japan + travel',
        type='video',
        maxResults=50,
        publishedAfter='2024-12-11T00:00:00Z',
        publishedBefore='2024-12-12T00:00:00Z',
        order='viewCount',
        topicId='/m/07bxq',
        videoDuration='medium'
    ).execute()

    # get video ids
    video_ids = [item['id']['videoId'] for item in search_response.get('items', [])]

    # get data for each video
    video_data_response = youtube.videos().list(
        part='snippet,content_details,statistics',
        id=','.join(video_ids)
    ).execute()

    # extract video data
    videos = []
    for item in video_data_response.get('items', []):
        # get() function allows a fallback value in case the element is not found in the video_data_response
        snippet = item.get('snippet', {})
        content_details = item.get('contentDetails', {})
        statistics = item.get('statistics', {})

        video = {
            "id": item.get('id', ''),
            "title": snippet.get('title', ''),
            "description": snippet.get('description', ''),
            "publishedAt": snippet.get('publishedAt'),
            "channelId": snippet.get('channelId', ''),
            "channelTitle": snippet.get('channelTitle', ''),
            "videoCategoryId": snippet.get('categoryId', 0),
            "tags": snippet.get('tags', []),
            "videoDuration": content_details.get('duration', ''),
            "videoDefinition": content_details.get('definition', ''),
            "initialViewCount": statistics.get('viewCount', '0'),
            "initialLikeCount": statistics.get('likeCount', '0'),
            "initialFavoriteCount": statistics.get('favoriteCount', '0'),
            "initialCommentCount": statistics.get('commentCount', '0'),
            "collectionDate": search_date
        }
        videos.append(video)

    # return list
    return {
        'statusCode': 200,
        'body': json.dumps(videos)
    }

Replace the code in Lambda function with the above.

Head to 'Environment Variables' in the 'Configuration' section and add a new variable to store the API key.

aws-yt-19

Since AWS Lambda doesn't include googleapicleint by default, we need to run the below command in local python environment to package it into a folder, zip and upload it in the Lambda environment.

We'll use Lambda Layers to package this library separately from our code. Using layers allow the package to be used across multiple Lambda functions as well. Note that all packages in a Lambda layer need to reside inside a python folder.

mkdir -p layer/python
pip install google-api-python-client -t layer/python
cd layer
zip -r googleapiclient_layer.zip python

Go towards the bottom of the page and add a new Layer. Select 'Create a new layer'.

aws-yt-20

Add a layer name such as googleapiclient, choose the Python runtime, and upload the zip file created in the above steps.

Back in the Lambda function page, go towards the end and select 'Add a layer' again. Select the Layer we created to attach it to the function.

aws-yt-21

Now let's create a test event so that the function can be executed for testing. Head to 'Test' tab and create a test event with an empty json input.

22

Back in the 'Code' tab, deploy the code, and run the test.

23

The output will be displayed in the window.

aws-yt-24

5. Store the raw data in S3

Head to S3 in the AWS Console and create a new S3 bucket to store data.

25

In the Python code in the Lambda function, replace the json output with the below. We're using partitions to store the data in S3 to make data reads more efficient. In this instance, we're using the collection_date to partition the data into separate folders. S3 will create folders as collection_date=date so that each file can be easily queried.

    # add new imports
    import os
    from io import BytesIO
    import pyarrow as pa
    import pyarrow.parquet as pq
    import boto3
    from datetime import datetime
    from zoneinfo import ZoneInfo
    from googleapiclient.discovery import build

    # ------------------------------------------

    # get column names 
    columns = videos[0].keys()

    # convert list of dicts to list of lists
    videos_list = {key:[item[key] for item in videos] for key in columns}

    # convert list to pyarrow table
    videos_tb = pa.table(videos_list)

    # init s3 client
    s3 = boto3.client('s3')

    # use collection_date to partition
    bucket_name = '<S3 Bucket name>'
    parquet_file_key = f'raw/videos/collection_date={search_date}/{search_date}.parquet'

    # write to parquet file in memory
    parquet_buffer = BytesIO()
    pq.write_table(videos_tb, parquet_buffer)

    # upload to s3
    s3.put_object(Bucket=bucket_name, Key=parquet_file_key, Body=parquet_buffer.getvalue())
    print(f'Parquet file uploaded to s3://{bucket_name}/{parquet_file_key}')

Before running the function, we need to add the pyarrow package into Lambda as well. We could try the previous method we used for googleapiclient, but for pyarrow this will result in errors because some files are dependent on the OS (Mac, Windows or Linux) it is built on. For the pyarrow package to be usable on Lambda, it should be built on a Linux machine. Alternatively, you could make use of a public pre-built layer resource available here. Make sure to pick the correct Python version and AWS region and obtain the arn value for the resource.

In the Lambda function, select 'Add a Layer' and use the arn to attach the layer. I've also updated the Python version of the Lambda function to match the Python version I picked for pyarrow.

26

Now, if you deploy changes and test the function, it will throw an error saying that the function has timed out after 3 seconds. Since writing parquet files need more time, let's update the timeout value to 1 minute.

26-2

When you execute the function now, it'll throw an error mentioning that permissions are missing.

An error occurred (AccessDenied) when calling the PutObject operation

To resolve this error, we need to enable the IAM role associated with Lambda function to access the S3 bucket.

Go to the Configuration tab and select Permissions. Click on the role name to open it in IAM console.

27

Select to edit the existing policy.

28

Select outside of statement to get the option Add new statement on the left side.

29

Then select s3 as the service to add, and s3:GetObject and s3:PutObject as the allowed actions. In the resource selection, select the S3 bucket created earlier. This should add a snippet similar to below.

        {
			"Effect": "Allow",
			"Action": [
				"s3:PutObject",
				"s3:GetObject"
			],
			"Resource": [
				"arn:aws:s3:::<bucket_name>/*"
			]
		}

Rerun the Lambda function. If you followed all the steps, the parquet file should be created in the S3 bucket.

30

Congratulations on reaching this point! You've done a great job!!

6. Enable the job to run daily using Amazon EventBridge

Before setting up the daily schedule for the Lambda function, we need to update the publishedBefore and publishedAfter dates to change dynamically in the first API call.

Assume we want to run the daily job at 12.30am AEDT (UTC +11), which is 1.30pm UTC. We can get the videos published from 1.30pm on previous day to 1.30pm the next day.

    from datetime import datetime, timedelta

    # set datetime for search
    search_date = (datetime.now(ZoneInfo('Australia/Melbourne'))-timedelta(1)).strftime('%Y-%m-%d')
    previous_date = (datetime.now(ZoneInfo('Australia/Melbourne'))-timedelta(2)).strftime('%Y-%m-%d')
    utc_time = 'T13:30:00Z'
    search_date_utc = search_date + utc_time
    previous_date_utc = previous_date + utc_time

Once this is updated in the API request, we can set up the daily trigger to execute the function using Amazon EventBridge.

Head to EventBridge console and create a rule. Click on 'Continue to create rule'.

31

Set the cron expression to trigger the Lambda function. I've set it to 1.30pm UTC.

32

Set the Lambda function as the target.

33

Head back to the Lambda function to verify that the trigger has been added.

34

  1. Read data from S3 to get data from API

Let's create a second Lambda function as getYoutubeStats. In the Lambda function, let's get all the data files stored in S3:

import boto3

def lambda_handler(event, context):

    # init s3 client
    s3 = boto3.client('s3')

    bucket_name = '<bucket_name>' 
    directory_prefix = 'raw/videos/'

    # get all files in raw/videos/
    all_files_response = s3.list_objects_v2(Bucket=bucket_name, Prefix=directory_prefix)
    print(all_files_response)

To run the process, update the IAM role related with this new Lambda function with the permissions to access S3.

        {
			"Effect": "Allow",
			"Action": [
				"s3:GetObject",
				"s3:PutObject"
			],
			"Resource": [
				"arn:aws:s3:::<bucket_name>/*"
			]
		},
		{
			"Effect": "Allow",
			"Action": [
				"s3:ListBucket"
			],
			"Resource": [
				"arn:aws:s3:::<bucket_name>"
			]
		}

When the function is run, the response will have a Contents field, which contains the file names stored in S3. We can use this to get the data in the file.

The completed function is as below:

import os
from io import BytesIO
import pyarrow as pa
import pyarrow.parquet as pq
import boto3
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
from googleapiclient.discovery import build


def lambda_handler(event, context):
    # get the API key from Lambda environment variables
    youtube_api_key = os.environ['YOUTUBE_API_KEY']

    # define api variables
    api_name = 'youtube'
    api_version = 'v3'

    # initialise the API client
    youtube = build(api_name, api_version, developerKey=youtube_api_key)

    # init s3 client
    s3 = boto3.client('s3')

    # set today's datetime for search
    search_date = (datetime.now(ZoneInfo('Australia/Melbourne')) - timedelta(1)).strftime('%Y-%m-%d')
    # get last 6 days for which data should be retrieved
    today = datetime.now(ZoneInfo('Australia/Melbourne'))
    search_date_list = [(today-timedelta(i)).strftime('%Y-%m-%d') for i in range(2,8)]

    # get the oldest date for which video data should be retrieved
    # convert string to datetime
    oldest_collection_date = datetime.strptime((today - timedelta(7)).strftime('%Y-%m-%d'), '%Y-%m-%d')

    # get data for last 7 days
    bucket_name = '<bucket_name>'
    videos = []

    for day in search_date_list:
        directory_prefix = 'raw/videos'
        partition_prefix = f'{directory_prefix}/collection_date={day}'

        # check if partition exists in S3
        file_response = s3.list_objects_v2(Bucket=bucket_name, Prefix=partition_prefix)

        # break loop if latest partition is not available - means previous partitions aren't available as well
        if 'Contents' not in file_response:
            break
        else:
            # file path
            file_key = file_response.get('Contents')[0].get('Key')
            # file date in datetime format
            file_date = datetime.strptime(day, '%Y-%m-%d') 
            search_date_dt = datetime.strptime(search_date, '%Y-%m-%d')
            # get object
            file_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
            file_table = pq.read_table(BytesIO(file_obj['Body'].read()))
            video_ids = file_table.column('id').to_pylist()

            # get data for each video
            video_data_response = youtube.videos().list(
                part='statistics',
                id=','.join(video_ids)
            ).execute()

            for item in video_data_response.get('items', []):
                statistics = item.get('statistics', {})

                video = {
                    "id": item.get('id', ''),
                    "initialCollectionDate": file_date,
                    "collectionDate": search_date_dt,
                    "collectionCount": (file_date - oldest_collection_date).days + 2,
                    "viewCount": statistics.get('viewCount', '0'),
                    "likeCount": statistics.get('likeCount', '0'),
                    "favoriteCount": statistics.get('favoriteCount', '0'),
                    "commentCount": statistics.get('commentCount', '0')
                }
                videos.append(video)

    # save data in s3 if data is available
    if len(videos) > 0:
        # get column names
        columns = videos[0].keys()

        # convert list of dicts to list of lists
        videos_list = {key: [item[key] for item in videos] for key in columns}

        # convert list to pyarrow table
        videos_tb = pa.table(videos_list)

        # write to parquet file in memory
        parquet_buffer = BytesIO()
        pq.write_table(videos_tb, parquet_buffer)

        # upload to s3
        parquet_file_key = f'raw/video_stats/collection_date={search_date}/{search_date}.parquet'
        s3.put_object(Bucket=bucket_name, Key=parquet_file_key, Body=parquet_buffer.getvalue())
        print(f'Parquet file uploaded to s3://{bucket_name}/{parquet_file_key}')
    else:
        print(f'No data available before {search_date_list[0]}')

Before executing the function, make sure you have added environment variables, and attached the required layers similar to what we did for 'getYoutubeData' function.

Finally, add the new function to be triggered with the same EventBridge rule.

35

Now we have setup the data extract Lambda functions to get data using the YouTube API!

Congratulations on reaching this far!

Read the next steps of the project - using AWS Glue to transform data and store in Redshift - here in Part 2.