BigQuery and Apache Airflow: The Fundamental Tools for Data Warehousing
In any kind of business environment, data has played a major role to drive important decisions. Many companies have realized that the needs for data are getting higher. Any forms of data are valuable, not only to measure the success or failure of past actions but also to predict the future and the next steps that need to be taken.
Choosing the right architecture to maintain those data can be a tough one. The challenges for every company could be different, from budget to amount of data they face. One popular solution to resolve them is the Data Warehouse.
Data Warehouse
Basically, Data Warehouse works as a central repository where information arrives from one or more data sources. As we already know, data can come in different forms: Kafka messages, Relational Database Management System (RDBMS), or even Spreadsheets.
By integrating different sources of data into one place, the concept of Single Source Of Truth (SSOT) can be kept, so everyone in an organization refers to the same data to make business decisions. We also need to ensure that the data quality has been improved, by cleaning up the raw data before inserting them into the data warehouse.
The general procedure to build a data warehouse is ETL (Extract, Transform, and Load).
ETL is an automated process which takes raw data, extracts the information required for analysis, transforms it into a format that can serve business needs, and loads it to a data warehouse.
The two most common tools to perform ETL are BigQuery and Airflow.
1. Google BigQuery
BigQuery is one of the data warehouse solutions to transform and load the data, provided by Google. It’s commonly known as a highly scalable, super-fast, and cost-effective cloud data warehouse. It has the capability to handle large size of datasets, and query a million records of data efficiently.
The raw data which comes from many sources need to be extracted first and stored in Google Cloud Storage (GCS). Then, they can be imported to BigQuery, specifically in Staging Dataset. In this dataset, the data in various formats (e.g. CSV) will be converted to a form of a table, so it can be easily queried in our BigQuery.
After data extraction, the next step to build the data warehouse is defining the data model based on business objectives and requirements. There are many types of the data model that commonly used, including Star Schema, Snowflakes, and Data Vault. Designing a data model is a crucial step to determine the structure of the dataset and what tables we want to create in our data warehouse.
For example, if we want to implement Star Schema in our data warehouse, then there will be several dimension and fact tables in our BigQuery.
After designing the data model, we can start creating those tables by joining the tables in the staging dataset using the query. In this step, we can also cleanse the data by removing duplication, eliminating unwanted characters, or converting the timestamp in a consistent format.
Creating tables in BigQuery can be simply done by running a query then save it as a new table.
However, the data inside the tables are not automatically updated, unless we set a scheduler to refresh those data. In this case, we need a kind of pipeline that allows us to schedule and maintain the workflow of our tables in the data warehouse. Airflow can help us with this.
2. Apache Airflow
Airflow is a platform to programmatically author, schedule and monitor workflows.
The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
As mentioned before, Airflow does not only help us with setting scheduler but also monitor the workflow of our task in transforming the tables. Why does monitoring the workflow becomes important?
In the data warehouse, we handle a large amount of data. Therefore, there might be numerous tables we should take care of. Each table has dependencies on other tables. Failure in transforming one table will give a domino-effect to another table. That’s why a visualized and organized pipeline is necessary.
To configure the task and define the dependencies between them, we can use a Python script called Directed Acyclic Graphs (DAG). Airflow provides many kinds of operators, including Big Query Operator. By using that, we can put our query in the form of SQL syntax. The transforming task will read the query we put on and load the data into the Big Query table.
Below is the example of a DAG script to transform the table in BigQuery, and store it in another table (in this case, the table name is dim_products).
import os
import airflow
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.sensors import ExternalTaskSensor, TimeDeltaSensor
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=2),
'provide_context': True
}
DAG_NAME = 'dim_products'
dag = DAG(
dag_id=DAG_NAME,
schedule_interval='00 17 * * *',
start_date=airflow.utils.dates.days_ago(2),
default_args=default_args
)
delay_sensor = TimeDeltaSensor(
task_id='delay_sensor',
delta=timedelta(minutes=30),
priority_weight=1000,
retries=50,
dag=dag
)
transform_dim_products = BigQueryOperator(
task_id='transform_dim_products',
bql="""
SELECT
*
TIMESTAMP_MILLIS(p.created_time) AS created_timestamp
FROM
`{project}.{dataset}.productdb` p
WHERE
p.code NOT LIKE '%TES%'
""".format(
project=os.environ['GOOGLE_CLOUD_PROJECT_ID'],
dataset=os.environ['DATASET']),
destination_dataset_table='{project}.{dataset}.dim_products'.format(
proj=os.environ['GOOGLE_CLOUD_PROJECT_ID'],
dataset=os.environ['DATASET']),
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id=os.environ['BIGQUERY_CONN_ID'],
allow_large_results=True,
use_legacy_sql=False,
dag=dag
)
When creating the DAG, we can also organize the sequence of task execution, based on the dependencies of the tables we want to transform. For example, the dim_products transformation task will run daily at 5 PM, yet after delay_sensor task finished because we set the dependencies on the bottom line like this.
delay_sensor >> transform_dim_products
For each DAG, we can define what time we want it to be run automatically. By using one of the DAG arguments called schedule interval, we can determine the running schedule of DAG, like hourly, daily, monthly, weekly, or yearly.
Conclusion
As data warehouse becomes a common solution to fulfill various data-driven companies’ needs, deciding the right architecture and tools is an essential thing. BigQuery and Apache Airflow can be the answer for us to ease the ETL procedure in building our Data Warehouse.
Reference:
[1] T. Nguyen. Build your first data warehouse with Airflow on GCP (2020). https://towardsdatascience.com/build-your-first-data-warehouse-with-airflow-on-gcp-fdd0c0ad91bb
[2] P. Patil. Fact Table and Dimension Table (2019). https://medium.com/@patilpoojaif/fact-table-and-dimension-table-62494cab0b14