The Process
- acquire raw data from various data sources, and pump them into our data lake;
- transform the data from data lake into data warehouse; and,
- select and configure a BI (business intelligence) application to consume data from both the data lake and data warehouse
Data Acquisition (into Data Lake)
As a first phase, we brought in our operational and analytics data, which are stored in MariaDB and Google Analytics respectively. Different data sources requires different acquisition strategies, particularly the syncing process, which is why we chose to utilize Python, a common languages which provides a robust standard set of libraries, and to custom build our SQL scripts.MariaDB
For the initial load phase we used pgLoader, a well known open source database migration tool which simplifies the process of migrating to PostgreSQL. Here's how we implemented pgLoader.
- To ease maintenance we used the LOAD file definition so we could define the parameters and casting rules and reuse when necessary.
- Casting rules were used to convert zero dates to null values. Zero dates are not accepted in PostgreSQL but are valid in MariaDB.
- We adjusted the parallelism level by tuning the workers and concurrency value. We found that 8 workers and single concurrency works best for us.
Here’s a snippet of our pgLoader LOAD file:
LOAD DATABASE FROM mysql://user:password@localhost/sourceDB INTO pgsql://user:password@localhost/targetDB WITH workers =8, concurrency = 1, rows per range = 50000, multiple readers per thread SET MySQL PARAMETERS net_read_timeout = '1200', net_write_timeout = '1200' CAST type datetime with extra on update current timestamp to "timestamp with time zone" drop extra drop not null using zero-dates-to-null type date to date drop not null using zero-dates-to-null;
Google Analytics
Building the extraction from Google Analytics requires a different set of processes, and understanding of the Google API. We used Python to call Google’s Core Reporting API, the response of which gets inserted directly into our Data Lake DB. The following are the libraries we used to make this happen.
- Psycopg2 - PostgreSQL database adapter for Python
- GoogleAPIclient - Client library to allow access to Google API
- Oauth2client - OAuth 2.0 client library protocol for authentication and authorization with Google API
#Authentication for the Core Reporting API credentials = ServiceAccountCredentials.from_json_keyfile_name( 'credentials.json', ['https://www.googleapis.com/auth/analytics.readonly']) http_auth = credentials.authorize(Http()) service = build('analytics', 'v3', http=http_auth) #Run query function and insert into DB traffic_results = get_api_traffic_query(service,i).execute() if traffic_results.get('rows', []): for row in traffic_results.get('rows'): cursor.execute("""INSERT INTO DB.Table (country, yearmonth, day, users, new_users, sessions, bounces, pageviews, pageviews_per_session, avg_page_load_time) VALUES( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", [row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9]]) else: print('No Rows Found') break #Query Function def get_api_traffic_query(service,i): return service.data().ga().get( ids='ga_ID', start_date=datetime.strftime(datetime.now(), '%Y-%m-%d'), end_date=datetime.strftime(datetime.now(), '%Y-%m-%d'), metrics='ga:users,ga:newUsers,ga:sessions,ga:bounces,ga:pageviews,ga:pageViewsPerSession,ga:avgPageLoadTime', dimensions='ga:browser,ga:countryIsoCode,ga:hostname,ga:yearMonth,ga:day,ga:hour,ga:minute', start_index=str(i+1), max_results='10000')Google Analytics Query Explorer is a handy tool which we also used to find out the metrics, dimensions and filters that we needed.
Transformation of Data Lake to Data Warehouse
There are many types of schemas for data warehouse such as star, snowflake and fact constellation. Star schema was chosen so as to minimize the number of joins and to keep the design as simple as possible. Facts and dimensions tables were created based on reporting needs. For example: fact orders, date, time and country dimension. As with other database design, consistent naming conventions can ease understanding and recognition. We utilized the following table naming conventions.- dim_
- Dimension Table - fact_
- Fact Table
Various data transformations were also applied to our data. For example:
- Sales data aggregation
- Product data transposition
- Deriving fields, calculated for direct reporting consumption (for e.g. Sales including GST = Sales * (1 + Tax Rate))
Wrapping Up
We now have a running system extracting and processing operational data, writing that data to our data lake, denormalizing and transforming, and finally persisting to our data warehouse, where our BI application connects for reporting needs. These are some key take-aways from our journey in building this infrastructure:- Spend time designing the system. We knew what we wanted, and we designed a system which is cost efficient and simple.
- Document your metadata and visualize your data model with ER diagrams so you have a chance to identify flaws and design mistakes, making necessary corrections.
- Knowing the right tools to use will save you time. Python’s extensive libraries eased development, and pgLoader does the heavy lifting of data migration.
- Locate your data sources and plan the data acquisition and transformation, for example to extract Google Analytics data from API you need to authenticate using OAuth 2.0 protocol.
- Identify the data analysis needs so you could plan the design of the data warehouse schema (whether it’s star or snowflake) and know what data you need to bring over.