Data Lake and Data Warehouse, the Foundations of Advanced Analytics

by Loh Wei Ken


Data at Photobook

    The terms "data lake" and "data warehouse" are well known in the IT industry, but are they universally understood? These terms define two forms of data storage that serve as a centralized repository for your important data, and are typically used as the sources from which data is transformed into actionable intelligence. As with any other growing organization, Photobook’s data volume has increased tremendously over the years. Making data-driven decisions is paramount and we obviously need a system to efficiently gather and process our data for our partners in various departments to have better insights into the business.

    The Process

      Before any analysis could be done, we needed to think about how we would gather and process the data using. Basically, we needed to:
      1. acquire raw data from various data sources, and pump them into our data lake;
      2. transform the data from data lake into data warehouse; and,
      3. select and configure a BI (business intelligence) application to consume data from both the data lake and data warehouse
      The data lake must also support the ingestion of vast amounts of data from multiple sources. For this reason we decided to utilize Postgresql, a database server known for its performance, extensibility, strict compliance to SQL standards and cost (free).

      Data Acquisition (into Data Lake)

      As a first phase, we brought in our operational and analytics data, which are stored in MariaDB and Google Analytics respectively. Different data sources requires different acquisition strategies, particularly the syncing process, which is why we chose to utilize Python, a common languages which provides a robust standard set of libraries, and to custom build our SQL scripts.

      MariaDB
      For the initial load phase we used pgLoader, a well known open source database migration tool which simplifies the process of migrating to PostgreSQL. Here's how we implemented pgLoader.
      • To ease maintenance we used the LOAD file definition so we could define the parameters and casting rules and reuse when necessary.
      • Casting rules were used to convert zero dates to null values. Zero dates are not accepted in PostgreSQL but are valid in MariaDB.
      • We adjusted the parallelism level by tuning the workers and concurrency value. We found that 8 workers and single concurrency works best for us.
      To provide continuous replication, we created a custom SQL script to delta sync the data using PostgreSQL’s upsert function. Schema changes are replicated as well using the information_schema table which contains the table and column definition.

      Here’s a snippet of our pgLoader LOAD file:

      LOAD DATABASE
        FROM mysql://user:password@localhost/sourceDB
        INTO pgsql://user:password@localhost/targetDB
      
        WITH workers =8, concurrency = 1, rows per range = 50000,
          multiple readers per thread
      
        SET MySQL PARAMETERS
          net_read_timeout  = '1200',
          net_write_timeout = '1200'
      
        CAST
          type datetime
            with extra on update current timestamp
              to "timestamp with time zone"
              drop extra drop not null
              using zero-dates-to-null
          type date
            to date
            drop not null
            using zero-dates-to-null;
      

      Google Analytics
      Building the extraction from Google Analytics requires a different set of processes, and understanding of the Google API. We used Python to call Google’s Core Reporting API, the response of which gets inserted directly into our Data Lake DB. The following are the libraries we used to make this happen.
      • Psycopg2 - PostgreSQL database adapter for Python
      • GoogleAPIclient - Client library to allow access to Google API
      • Oauth2client - OAuth 2.0 client library protocol for authentication and authorization with Google API
      One key point to note is that the maximum number of records that can be retrieved from this API is 10,000, which means we need to iterate the API call to get all the records, almost always the case if you are pulling historical data. To iterate, we used the start_index attribute. This is a snippet of our code to retrieve the Web Traffic data:
      #Authentication for the Core Reporting API
      credentials = ServiceAccountCredentials.from_json_keyfile_name(
          'credentials.json', ['https://www.googleapis.com/auth/analytics.readonly'])
        http_auth = credentials.authorize(Http())
        service = build('analytics', 'v3', http=http_auth)
      
      #Run query function and insert into DB
      traffic_results = get_api_traffic_query(service,i).execute()
      
          if traffic_results.get('rows', []):
            for row in traffic_results.get('rows'):
              cursor.execute("""INSERT INTO DB.Table (country, yearmonth, day, users, new_users, sessions, bounces, pageviews, pageviews_per_session,  avg_page_load_time)
                              VALUES( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""", [row[0], row[1], row[2], row[3], row[4], row[5], row[6], row[7], row[8], row[9]])
          else:
            print('No Rows Found')
            break
      
      #Query Function
      def get_api_traffic_query(service,i):
        return service.data().ga().get(
          ids='ga_ID',
          start_date=datetime.strftime(datetime.now(), '%Y-%m-%d'),
          end_date=datetime.strftime(datetime.now(), '%Y-%m-%d'),
          metrics='ga:users,ga:newUsers,ga:sessions,ga:bounces,ga:pageviews,ga:pageViewsPerSession,ga:avgPageLoadTime',
          dimensions='ga:browser,ga:countryIsoCode,ga:hostname,ga:yearMonth,ga:day,ga:hour,ga:minute',
      
          start_index=str(i+1),
          max_results='10000')
      
      Google Analytics Query Explorer is a handy tool which we also used to find out the metrics, dimensions and filters that we needed.

      Transformation of Data Lake to Data Warehouse

      There are many types of schemas for data warehouse such as star, snowflake and fact constellation. Star schema was chosen so as to minimize the number of joins and to keep the design as simple as possible. Facts and dimensions tables were created based on reporting needs. For example: fact orders, date, time and country dimension. As with other database design, consistent naming conventions can ease understanding and recognition. We utilized the following table naming conventions.
      • dim_ - Dimension Table
      • fact_ - Fact Table
      We also used convention for foreign keys, using the format <fieldName>_key.

      Various data transformations were also applied to our data. For example:
      • Sales data aggregation
      • Product data transposition
      • Deriving fields, calculated for direct reporting consumption (for e.g. Sales including GST = Sales * (1 + Tax Rate))
      The transformed data are then exported to the data warehouse using postgres’s COPY utility.

      Wrapping Up

      We now have a running system extracting and processing operational data, writing that data to our data lake, denormalizing and transforming, and finally persisting to our data warehouse, where our BI application connects for reporting needs. These are some key take-aways from our journey in building this infrastructure:
      • Spend time designing the system. We knew what we wanted, and we designed a system which is cost efficient and simple.
      • Document your metadata and visualize your data model with ER diagrams so you have a chance to identify flaws and design mistakes, making necessary corrections.
      • Knowing the right tools to use will save you time. Python’s extensive libraries eased development, and pgLoader does the heavy lifting of data migration.
      • Locate your data sources and plan the data acquisition and transformation, for example to extract Google Analytics data from API you need to authenticate using OAuth 2.0 protocol.
      • Identify the data analysis needs so you could plan the design of the data warehouse schema (whether it’s star or snowflake) and know what data you need to bring over.
      We hope this article helps you on your way to building your data analytics infrastructure.