Serverless Data Engineering in Azure - Part 2

In the first part of this blog series we introduced Azure Data Factory and walked through the process of copying data from an external cloud source. This post will cover how we can extend this pipeline to track a slowly changing dimension - again relying only on serverless technologies.

Learn more »

Serverless Data Engineering in Azure - Part 2


05 Oct 2018 - Ed


In the first part of this blog series we introduced Azure Data Factory and walked through the process of copying data from an external cloud source. This post will cover how we can extend this pipeline to track a slowly changing dimension - again relying only on serverless technologies.

Serverless Transform in Azure

The Big Data space is full of competing technologies and it can be hard to understand the differences between them and where each is most appropriate.

Both AWS and Azure have their own serverless wrapper around Spark and other Hadoop related products (AWS Glue and AWS HDInsight). These reduce the operational complexity but we’re still bound to often cumbersome spark code and performance is questionable when data sets are smaller than 10s gigabytes.

Recently products that integrate familiar SQL type operations with the scale and flexibility normally associated with Hadoop have started to emerge. Microsoft’s Azure Data Lake Analytics and its U-SQL language fits into this category, and it’s supported by Data factory to boot!

U-SQL is an extremely powerful language that can be extended with custom .Net code - This allows us to build things not possible in traditional SQL databases and we still get all the distributed elastically for free! The blog post only touches on the power available here.

Slowly Changing Dimensions

A slowly changing dimension is a very common abstraction in data warehousing, it allows us to track changes to an entities properties over time. Each entity is tracked by some id column(s) and the effective start and end timestamps are recorded for each set of associated properties. This results in a schema that looks something like

1
2
3
4
5
6
sid GUID,
effective_timestamp DATETIME,
expiration_timestamp DATETIME,
current_row BOOLEAN,
<id columns>
<property columns>

The following example U-SQL is based on the data we collected in the first part of this series

U-SQL Walkthrough

U-SQL allows us to extract data from a vartiey of different formats - including the parquet file we created in the first blog post. Again the schema is applied on read to the data source - This is handy because most schemas changes won’t affect our process here.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
@INCOMING =
   EXTRACT
        id long?,
        login string,
        node_id string,
        avatar_url string,
        gravatar_id string,
        url string,
        html_url string,
        followers_url string,
        following_url string,
        gists_url string,
        starred_url string,
        subscriptions_url string,
        organizations_url string,
        repos_url string,
        events_url string,
        received_events_url string,
        type string,
        site_admin Boolean?,
        name string,
        company string,
        blog string,
        location string,
        email string,
        hireable string,
        bio string,
        public_repos long?,
        public_gists long?,
        followers long?,
        following long?,
        created_at string,
        updated_at string,
        extract_year long?,
        extract_month long?,
        extract_day long?,
        extract_hour long?,
        extract_minute long?,
        extract_second long?,
        extract_timestamp long?
   FROM @InputFile
   USING Extractors.Parquet();

Here we extract the path contained in @InputFile directory that we pass in from Data Factory using the @CONCAT('/staging/', pipeline().RunId, '.paraquet’) incantation introduced in the first post. The resulting rowset is stored in the @INCOMING variable - this act like a table in ANSI SQL.

1
2
3
4
5
DECLARE @get_hash Func<string,System.Security.Cryptography.HashAlgorithm,string> =
     (raw_value, hasher) => String.Concat(hasher.ComputeHash(Encoding.UTF8.GetBytes(raw_value)));
DECLARE @md5    = System.Security.Cryptography.MD5.Create();
DECLARE @get_md5 Func<string,string> =
    (raw_value) => @get_hash(raw_value, @md5);

Next we use the custom .Net code feature to add some helpers to create MD5 hashes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@HASHED =
    SELECT DISTINCT
        @get_md5(id.ToString()) AS key_hash,
        @get_md5(
        login + "|" +
            node_id + "|" +
            avatar_url + "|" +
            gravatar_id + "|" +
            url + "|" +
            html_url + "|" +
            followers_url + "|" +
            following_url + "|" +
            gists_url + "|" +
            starred_url + "|" +
            subscriptions_url + "|" +
            organizations_url + "|" +
            repos_url + "|" +
            events_url + "|" +
            received_events_url + "|" +
            type + "|" +
            site_admin + "|" +
            name + "|" +
            company + "|" +
            blog + "|" +
            location + "|" +
            email + "|" +
            hireable + "|" +
            bio + "|" +
            public_repos.ToString() + "|" +
            public_gists.ToString() + "|" +
            followers.ToString() + "|" +
            following.ToString() + "|" +
            created_at + "|" +
            updated_at
        ) AS column_hash,
        extract_timestamp AS effective_timestamp,
        id,
        login,
        node_id,
        avatar_url,
        gravatar_id,
        url,
        html_url,
        followers_url,
        following_url,
        gists_url,
        starred_url,
        subscriptions_url,
        organizations_url,
        repos_url,
        events_url,
        received_events_url,
        type,
        site_admin,
        name,
        company,
        blog,
        location,
        email,
        hireable,
        bio,
        public_repos,
        public_gists,
        followers,
        following,
        created_at,
        updated_at
    FROM @INCOMING;

We create hashes of the set of id columns and the remaining columns - this allows us to simply compare the hash strings rather than the entire set of columns each time - which will be handy in the next step.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@NO_DUPS =
    SELECT DISTINCT
        key_hash,
        column_hash,
        MIN(effective_timestamp) OVER(PARTITION BY key_hash, column_hash  ORDER BY effective_timestamp ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS effective_timestamp,
        id,
        login,
        node_id,
        avatar_url,
        gravatar_id,
        url,
        html_url,
        followers_url,
        following_url,
        gists_url,
        starred_url,
        subscriptions_url,
        organizations_url,
        repos_url,
        events_url,
        received_events_url,
        type,
        site_admin,
        name,
        company,
        blog,
        location,
        email,
        hireable,
        bio,
        public_repos,
        public_gists,
        followers,
        following,
        created_at,
        updated_at
    FROM @HASHED;

Since it is possible that we collected the same record several times without any changes between runs we can end up with several records for a single time period that should be represented by a single record covering the whole period. To discard this repeated records we use a window function to find the first timestamp a particular record appears - using the hashes we calculated above, and taking care to not include rows from the future.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@SCD =
    SELECT
        @uuid() AS sid,
        key_hash,
        column_hash,
        effective_timestamp,
        LEAD(effective_timestamp, 1) OVER(PARTITION BY key_hash  ORDER BY effective_timestamp) AS expiration_timestamp,
        (Equals(LEAD(effective_timestamp, 1) OVER(PARTITION BY key_hash ORDER BY effective_timestamp), null)) ? true : false AS current_row,
        id,
        login,
        node_id,
        avatar_url,
        gravatar_id,
        url,
        html_url,
        followers_url,
        following_url,
        gists_url,
        starred_url,
        subscriptions_url,
        organizations_url,
        repos_url,
        events_url,
        received_events_url,
        type,
        site_admin,
        name,
        company,
        blog,
        location,
        email,
        hireable,
        bio,
        public_repos,
        public_gists,
        followers,
        following,
        created_at,
        updated_at
    FROM @NO_DUPS;

We then use the a similar window function to find the next effective timestamp for a particular id - ignoring properties values - this represents the time at which the row expires. The value is also used to set the boolean value for the current_row column - as the window query will return NULL values when there is no next record.

1
2
3
OUTPUT @SCD
TO @OutputFile
USING Outputters.Csv(outputHeader:true);

Finally we write out the result set to a CSV file for easy consumption. The @OutputFile variable is again supplied by dynamic content in Azure Data factory @CONCAT('/datawarehouse/', pipeline().RunId, '.csv')

Conclusions

U-SQL is a pleasure to work with - its extremely performant and enables techniques not possible with existing SQL databases. The power of its extensibility within a distributed framework could be a game changer.

We’ve shown that its easily possible to implement traditional data warehousing techniques using serverless resources in Azure. This really opens the door for new low volume use-cases that do not warrant the cost of a traditional data warehouse.