Tag: Data Flow

Data Factory - Dynamic mappings in Data Flow

Azure Data Factory and Data Flows make transforming data from one format to another super simple with it's code free approach. However that doesn't mean we want to construct entire data flows when changing mappings from one value to another.

For example if my source data contained a field for favourite creature with value An for Ants, and Ca for Cats I could do the transformation using an iif expression in a derived column task. But in a few weeks time if I needed to add Ba for Bats, editing the whole Data Flow seems like a lot of work, not to mention nested iif statements are going to be come ugly and confusing.

One option would be to have the list of conversions as another source in the flow and do a join, but then this means having that data stored somewhere like blob storage.

Instead a solution I have is to pass the data in as a parameter to the data flow. Data Factory doesn't have an array parameter but we can put a comma separated list in as a string. e.g. An=Ants, Ba=Bats, Ca=Cats.

Then in our derived columns expression we can do this:

iif(instr($ParameterString, toString(SourceValue) + "=")==0,"No Mapping",
  substring(
          substring($ParameterString, instr($ParameterString, toString(SourceValue) + "=")),
          length(toString(SourceValue))+2,
          iif(instr(substring($ParameterString,instr($ParameterString, toString(SourceValue) + "=")+length(toString(SourceValue))+2),",") > 0, 
              instr(substring($ParameterString,instr($ParameterString, toString(SourceValue) + "=")+length(toString(SourceValue))+2),","), 
              length(substring($ParameterString,instr($ParameterString, toString(SourceValue) + "=")))
              )
          )
  )

There's quite a lot going on here so lets break it down

iif(instr($ParameterString, toString(SourceValue) + "=")==0,"No Mapping",

First we are doing an iif to check if our Source Value (the value from our dataset) exists within the Parameter String. If it doesn't then we're setting the value to "No Mapping"

substring(

If the value does exist then we need to grab just the part we want. So we need a sub string and we need to get everything after the source values equals to the next comma.

A reminder the parameters for substring are substring(<string to subset>: string,<from 1-based index>: integral, [<number of characters>: integral])

substring($ParameterString, instr($ParameterString, toString(SourceValue) + "=")),

Our first parameter needs to be the string to subset, that's going to come from our ParameterString, but we're going to do another substring on it to ignore everything before the one we ant to match.

So if our ParameterString was set too:

An=Ants, Ba=Bats, Ca=Cats

and our Source Value was Ba we would now have:

Ba=Bats, Ca=Cats

length(toString(SourceValue))+2,

Next is the start index which will be the length of our source value + 2. If we ended the substring now we would get.

Bats, Ca=Cats

iif(instr(substring($ParameterString,instr($ParameterString, toString(SourceValue) + "=")+length(toString(SourceValue))+2),",") > 0, 
              instr(substring($ParameterString,instr($ParameterString, toString(SourceValue) + "=")+length(toString(SourceValue))+2),","), 
              length(substring($ParameterString,instr($ParameterString, toString(SourceValue) + "=")))
              )
          )

The final parameter for the substring is a bit more complex, but it's similar to what we have just done.

Two scenarios need to be catered for:

  1. If there's more items in the list left then there will be a comma separating them. If this is the case then we need to get the position of the first comma in what we have left.
  2. If there isn't anything else in the list then there will be no comma. In this instance we need to get the length of what remains.

With that our substring will now return Bats

Data Factory: How to upsert a record in SQL

When importing data to a database we want to do one of three things, insert the record if it doesn't already exist, update the record if it does or potentially delete the record.

For the first two, if your writing a stored procedure this often can lead to a bit of SQL that looks something like this:

IF EXISTS(SELECT 1 FROM DestinationTable WHERE Foo = @keyValue)
BEGIN
UPDATE DestinationTable
SET Baa = @otherValue
WHERE Foo = @keyValue
END
ELSE
BEGIN
INSERT INTO DestinationTable(Foo, Baa)
VALUES (@keyValue, @otherValue)
END

Essentially an IF statement to see if they record exists based on some matching criteria.

Data Factory - Mapping Data Flows

With a mapping data flow, data is inserted into a SQL DB using a Sink. The Sink let's you specify a dataset (which will specify the table to write to), along with mapping options to map the stream data to the destination fields. However the decision on if a row is an Insert/Update/Delete must already be specified!

Let's use an example of some data containing a persons First Name, Last Name and Age. Here's the table in my DB;

And here's a CSV I have to import;

FirstName,LastName,Age
John,Doe,10
Jane,Doe,25
James,Doe,50

As you can see in my import data Jane's age has changed, there's a new entry for James and Janet doesn't exist (but I do want to keep here in the DB). There's also no ID's in my source data as that's an identity created by SQL.

If I look at the Data preview on my source in the Data Flow, I can see the 3 rows from my CSV, but notice there is also a little green plus symbol next to each one.

This means that they are currently being treated as Inserts. Which while true for one of them is not for the others. If we were to connect this to the sink it would result in 3 new records being added to the DB, rather than two being updated.

To change the Insert to an update you need an alert row step. This allows us to define rules to state what should be an insert and what should be an update.

However to know if something should be an insert or an update requires knowledge of what is in the DB. To do that would mean a second source, followed by a join on First Name/Last Name and then conditions based on which rows have an ID from the DB or not. This all seems a bit needlessly complicated, and it is.

Upsert

When using a SQL sink there is a 4th option for what kind of method should be used and that is an Upsert. An upsert will result in a SQL merge being used. SQL Merges take a set of source data, compare it to the data already in the table based on some matching keys and then decide to either update or insert new records based on the result.

On the sink's Settings tab untick Allow insert and tick Allow upsert. When you tick Allow upsert properties for Key columns will appear which is where you specify which columns should be used as a key. For me this is FirstName and LastName.

If you don't already have an Alter Row step it will warn you that this is missing.

Even though we are only doing what equates to a SQL merge, you still need to alter the rows to say they should be an upsert rather than an insert.

As we are upserting everything our condition can just be set to return true rather than analysing any row data.

And there we have it, all rows will be treated as an upsert. If we look at the Data preview we can now see the upsert icon on each row.

And if we look at the table after running the pipeline, we can see that Janes age has been update, James has been added and John and Janet stayed the same.

Data Factory: Importing multiple files with transformations

Let's assume you have a folder containing a bunch of files that you need to import somewhere. e.g. a database or another file store, and in the process of doing that you also need to transform the data in some sort of way.

One option would be to use a pipeline activity like Get Metadata to get your list of files, a ForEach to loop through them and a Mapping Data Flow within the for each to process each file.

This all sounds quite reasonable, but there's a catch. Each time we use a Data Flow activity, that activity will spin-up a Azure Databricks environment to run the Data Flow. So if you have 100 files to import, then that's 100 Databricks environments that will get created.

An alternative is to do everything within one Data Flow activity, resulting in just one Databrick environment being created.

One Data Flow

In your dataset configuration specify a filepath to a folder rather than an individual file (you probably actually had it this way for the Get Metadata activity).

In your data flow source object, pick your dataset. In the source options you can specify a wildcard path to filter what's in the folder, or leave it blank to load every file.

Now when the source is run it will load data from all files.

One major difference to note is now rather than iteratively going through each file we're loading them all in one go which changes how you may think of things.

If you need to know which file a particular row came from then the source options has a field where you can specify a column name for the file to be added to.

Your data now includes data from every file, and the filename it came from.