SSIS: Script Component Asynchronous Transformation
Tuesday, March 20, 2012
by jsalvo
The SSIS script component can be configured to use synchronous or asynchronous outputs. If the script component is configured for synchronous outputs, then each input row is processed as it passes through the component. Asynchronous outputs can be configured to wait until multiple input rows have been received before processing.
I recently used an asynchronous transform to group rows based on two column values (ShipToNumber and NoteDateTime) and then concatenated the values from a third column (Note) and output a single row to the downstream data flow.
After configuring the data source to extract the desired data, the first step is to add a Sort component to sort the rows based on the two columns we wish to group by (in this example I also needed to sort on the sequence column so the Note column values are concatenated in the correct order).
The Sort component is configured as follows:
In this example, I am sorting by three input columns in ascending order. The remaining columns are configured to pass through the component. Performing this sort functionality is necessary so the rows are sent in the desired order to the Script Component.
Now we will add a Script Component to the data flow. The Script Component is represented by the following icon in the toolbox.
When you drag the script component onto the data flow surface, you will be prompted to select one of the following options:
In this case you will want to select the ‘Transformation’ option if it is not already selected and click ‘OK’.
Double click on the Script Component to configure its properties.
Setting the Script Language
In this example, I am using C# as the script language. You can change the script language by setting the ScriptLanguage property on the Script page of the editor as shown below.
Configuring Input Columns
The Script Component has one configured input by default named Input0. In this example, that is sufficient for our needs. We will now need to add columns to this input. All columns we want to flow downstream to subsequent components should be included in the input. In this example we are including all available columns as Input Columns, except for the Sequence column. All input columns are set to ‘Read Only’, since their values should not change.
Configuring the Output Columns
Once we have selected the appropriate Input Columns, we now will configure the Inputs and Outputs. Select the ‘Inputs and Outputs’ tab on the left-hand side.
First, we need to configure Output 0 to be an asynchronous output. Click on ‘Output 0’ in the ‘Inputs and outputs:’ hierarchy. Set the ‘SynchronousInputID’ property to ‘None’.
Now we can add the Output columns. To add a column, you first need to select ‘Output 0’ in the ‘Inputs and outputs:’ hierarchy. Then click the ‘Add Column’ button. You will be prompted to give the column a name and then select the correct data type in the right hand panel. In this example, the output columns are the same as the input columns with the exception of the ‘Note’ column which is re-named to ‘CombinedNote’. CombinedNote is a concatenation of the Note column of the input rows.
Modifying the Script Code
Now we are ready to modify the code in the script. First, click the Script tab in the left-hand side of the Editor window. Next, click the ‘Edit Script’ button.
In public class ScriptMain : UserComponent, we first need to create some private member variables that represent the columns in our output. We also need a Boolean variable that is used as a flag to check for the first row.
private string CombinedNote;
private int ShipToNumber;
private DateTime NoteDateTime;
private DateTime Timestamp;
private string User;
private bool firstRow = true;
We must then update the Input0_ProcessInputRow(Input0Buffer Row) method. In this example, we are ‘grouping’ the input rows by the columns ShipToNumber and NoteDateTime. The rows are already sorted by the Sort component in ascending order based on the ShipToNumber and NoteDateTime.
If we are processing the first row we first need to store the values of these columns into the class variables. For all subsequent rows, we check to see if the value of the current row’s ShipToNumber and NoteDateTime matches what we have stored in the class variables. If they are the same, then we concatenate the value from the Note column to the ConcatenatedNote variable. We continue to process rows, concatenating the value of the Note column to ConcatenatedNote for each row, until we come across a row with a new ShipToNumber or NoteDateTime.
When we find a row with new values, we add a row to the OutputBuffer0 and then assign the values of the OutputBuffer0 rows to the values stored in our class variables. We then update the class variables with the values from the current row.
The code is shown below.
public override void Input0_ProcessInputRow(Input0Buffer Row)
{
if ((Row.ShipToNumber == ShipToNumber && Row.NoteDateTime == NoteDateTime) || firstRow)
{
CombinedNote += (Row.Note_IsNull ? "" : Row.Note);
ShipToNumber = (int)Row.ShipToNumber;
NoteDateTime = Row.NoteDateTime;
Timestamp = Row.Timestamp;
User = Row.User;
firstRow = false;
}
else
{
Output0Buffer.AddRow();
Output0Buffer.NoteDateTime = NoteDateTime;
Output0Buffer.Timestamp = Timestamp;
Output0Buffer.ShipToNumber = ShipToNumber;
Output0Buffer.User = User;
Output0Buffer.CombinedNote.AddBlobData(
System.Text.UnicodeEncoding.Unicode.GetBytes(CombinedNote ?? ""));
ShipToNumber = (int)Row.ShipToNumber ;
NoteDateTime = Row.NoteDateTime;
Timestamp = Row.Timestamp;
CombinedNote = (Row.Note_IsNull ? "" : Row.Note);
User = Row.User;
}
}
Just as we needed special processing for the first row, we also need some special processing for the last row. This involves overriding the FinishOutputs method. We first must add a row to our OutputBuffer0 and then assign the values of the OutputBuffer0 output columns to the values in our class variables as shown below.
public override void FinishOutputs()
{
Output0Buffer.AddRow();
Output0Buffer.NoteDateTime = NoteDateTime;
Output0Buffer.Timestamp = Timestamp;
Output0Buffer.ShipToNumber = ShipToNumber;
Output0Buffer.User = User;
Output0Buffer.CombinedNote.AddBlobData(
System.Text.UnicodeEncoding.Unicode.GetBytes(CombinedNote ?? ""));
base.FinishOutputs();
}